diff --git a/attributes/maestro-schema.c b/attributes/maestro-schema.c index d08fe67b7095cf03bfb267215d1e911b40864504..ac87c0d3e79c38504f2614a78575c289afd29493 100644 --- a/attributes/maestro-schema.c +++ b/attributes/maestro-schema.c @@ -730,10 +730,12 @@ mstro_attribute_val__compute_size(enum mstro_stp_val_kind kind, *val_size = sizeof(double); break; case MSTRO_STP_STR: case MSTRO_STP_REGEX: - if(string) + if(string!=NULL) *val_size = strlen(string)+1; - else + else { + assert(val!=NULL); *val_size = strlen((char*)val)+1; + } break; case MSTRO_STP_TIMESTAMP: *val_size = sizeof(mstro_timestamp); diff --git a/configure.ac b/configure.ac index d0243cd30d83fc04733da09bc838fab245f6490c..5e9274d497216637c1f50b96a494a4ff08fef909 100644 --- a/configure.ac +++ b/configure.ac @@ -175,7 +175,8 @@ AC_ARG_ENABLE([tsan], if test "x$enable_tsan" = "xyes"; then BUILD_TSAN=yes AX_CHECK_COMPILE_FLAG([-fsanitize=thread -fno-omit-frame-pointer], - [CFLAGS="${CFLAGS} -O1 -g -fsanitize=thread -fno-omit-frame-pointer"], + [CFLAGS="${CFLAGS} -O1 -g -fsanitize=thread -fno-omit-frame-pointer" + LDFLAGS="${LDFLAGS} -fsanitize=thread"], AC_MSG_ERROR([compiler does not support -fsanitize=thread flag])) else BUILD_TSAN=no @@ -423,7 +424,9 @@ AS_IF([test "x$enable_ofi_pool_manager" = "xyes"], [ AS_IF([test ! -f $srcdir/deps/libfabric/configure], [(cd $srcdir/deps/libfabric; ./autogen.sh)]) #AX_SUBDIRS_CONFIGURE([deps/libfabric],[[--enable-embedded],[--disable-rxm],[--disable-rxd],[--disable-psm]],[],[],[]) - AX_SUBDIRS_CONFIGURE([deps/libfabric],[[--enable-embedded],[--disable-rxd],[--disable-shm],[--disable-tcp],[--enable-debug]],[],[],[]) + dnl Configuring without kdreg to fix issue https://gitlab.com/cerl/maestro/maestro-core/-/issues/117 which relates to https://github.com/ofiwg/libfabric/issues/5313 + dnl Whether doing so creates a performance problem or not is still to be determined + AX_SUBDIRS_CONFIGURE([deps/libfabric],[[--enable-embedded],[--disable-rxd],[--disable-shm],[--disable-tcp],[--with-kdreg=no],[--enable-debug]],[],[],[]) AC_MSG_NOTICE([================== done preconfiguring private libfabric library build]) ]) diff --git a/include/maestro/attributes.h b/include/maestro/attributes.h index 0788c5d2b6f124f4d313a6694c2971dda7dcc31f..5a66bafe6027b168b4c24c97da95f6e3327df601 100644 --- a/include/maestro/attributes.h +++ b/include/maestro/attributes.h @@ -314,18 +314,20 @@ typedef struct mstro_cdo_ *mstro_cdo; /** ** @brief Add (*key*, *val*) pair to attribute set of *cdo* ** - ** @param[in] cdo A CDO handle - ** @param[in] key Attribute key string - ** @param[in] val Pointer to the value to be set + ** @param[in] cdo A CDO handle + ** @param[in] key Attribute key string + ** @param[in] val Pointer to the value to be set + ** @param[in] copy_value Create an internal allocation for the value and + ** copy @arg val into it ** - ** BEWARE: The memory pointed to by val must remain valid for the - ** entire lifetime of the CDO. Stack-allocated variables passed in by - ** address are a source of ugly to trace bugs. + ** BEWARE: If copy_value is set to false, the memory pointed to by val must + ** remain valid for the entire lifetime of the CDO. Stack-allocated variables + ** passed in by address are a source of ugly to trace bugs. ** ** @returns A status code, ::MSTRO_OK on success. **/ mstro_status -mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val); +mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val, bool copy_value); /** ** @brief Retrieve value into *val_p* associated with *key* of *cdo*. diff --git a/include/maestro/core.h b/include/maestro/core.h index 4954c18343e471514f1471f8cf1de7bbc5ad4453..5a64354e19c7edc39c868fedbf19579596589bc8 100644 --- a/include/maestro/core.h +++ b/include/maestro/core.h @@ -44,12 +44,12 @@ extern "C" { #include "maestro/status.h" #include <stdint.h> #include <inttypes.h> - + /**@defgroup MSTRO_Core Maestro Core API **@{ ** This is the core API, as developed for D3.2 **/ - + /** ** @brief Initialize the maestro core ** @@ -74,7 +74,7 @@ extern "C" { ** @param[in] workflow_name The workflow ID. ** ** @param[in] component_name The component ID. - ** + ** ** @param[in] component_index The unique index among all processes ** using the same *component_name*. ** @@ -84,7 +84,7 @@ extern "C" { mstro_core_init(const char *workflow_name, const char *component_name, uint64_t component_index); - + /** ** @brief De-initialize the maestro core ** @@ -103,8 +103,9 @@ extern "C" { **/ #define PRIappid PRIu64 + /**@} (end of group MSTRO_Core) */ - + /* include the remaining public API */ #include "maestro/cdo.h" #include "maestro/pool.h" @@ -112,10 +113,9 @@ extern "C" { #include "maestro/attributes.h" #include "maestro/groups.h" #include "maestro/env.h" - + #ifdef __cplusplus } /* end of extern "C" */ #endif #endif /* MAESTRO_CORE_H_ */ - diff --git a/include/maestro/env.h b/include/maestro/env.h index f14e36c450c50656b813d70ed3c5ccca5fb55953..8129cba1f79033118804603a93a4708ec8065e14 100644 --- a/include/maestro/env.h +++ b/include/maestro/env.h @@ -69,7 +69,7 @@ * all modules are permitted to log. This variable can be set to * selectively disable or enable certain modules. * - * The syntax is + * The syntax is * component1,component2,^component3,... * * where 'component1' indicates that it should be included, and @@ -96,6 +96,20 @@ */ #define MSTRO_ENV_LOG_COLOR "MSTRO_LOG_COLOR" +/**@brief lists the set of Schemata (in order from left to right) that should be loaded after the maestro-core schema is loaded. + * If set,mstro_init will merge these onto the built-ins. Default: core + ecmwf + * example "export MSTRO_SCHEMA_LIST=ecmwf.yaml;benchmark.yaml" +*/ +#define MSTRO_ENV_SCHEMA_LIST "MSTRO_SCHEMA_LIST" + + +/**@brief path to search for schemata (in order from left to right). + * Default: current directory "." + * example export MSTRO_SCHEMA_PATH=".:/usr/share/maestro/schemata" +*/ +#define MSTRO_ENV_SCHEMA_PATH "MSTRO_SCHEMA_PATH" + + /**@brief enable coloring of error log messages * * If set, enable coloring of error and warning messages. May not be a good idea if logging to syslog (see @ref MSTRO_LOG_DST). @@ -205,9 +219,9 @@ /** ** @brief Path to the MIO transport config file ** - ** This is needed to initialize MIO + ** This is needed to initialize MIO ** - **/ + **/ #define MSTRO_ENV_MIO_CONFIG "MSTRO_MIO_CONFIG" /**@} (end of group MSTRO_ENV) */ diff --git a/include/maestro/i_globals.h b/include/maestro/i_globals.h index 62dfea09f968a521ffee23ea8c182b05a1cc48bc..75fab2a90c9821cb72f43e5a8601fc4584a1845d 100644 --- a/include/maestro/i_globals.h +++ b/include/maestro/i_globals.h @@ -95,7 +95,7 @@ extern mmbMemSpace *g_default_cdo_memspace; /** the default (DRAM) memory interface */ extern mmbMemInterface *g_default_cdo_interface; - + /** MIO can be compiled in, but run time config may be missing, or MIO may fail * to initialize. This flag is TRUE if MIO is properly initialized and usable * */ @@ -105,7 +105,7 @@ extern bool g_mio_available; /** the app token used in communicating to the pool manager. Set after successfully JOINing, cleared after LEAVEing */ extern Mstro__Pool__Apptoken g_pool_apptoken; -/** the app ID (packed version of @ref g_pool_app_id) used in communicating with the pool manager. */ +/** the app ID (packed version of @ref g_pool_app_id) used in communicating with the pool manager. */ extern Mstro__Pool__Appid g_pool_appid; /** the fundamental built-in attribute schema. Filled early in mstro_core_init(), then constant */ @@ -145,6 +145,8 @@ union mstro_component_descriptor { char workflow_name[MSTRO_WORKFLOW_NAME_MAX]; /** the component name */ char component_name[MSTRO_WORKFLOW_NAME_MAX]; + /** the list of user schemas */ + char schema_list[MSTRO_WORKFLOW_NAME_MAX]; /** the maestro core version */ char version[128]; }; /*<** descriptor data */ @@ -161,7 +163,9 @@ extern union mstro_component_descriptor g_component_descriptor; | (MSTRO_POOL_PROTOCOL_VERSION_MINOR << 8) \ | (MSTRO_POOL_PROTOCOL_VERSION_PATCH << 0))) - +/** separators for user defined schema lists and paths enviroment variables */ +#define SCHEMA_LIST_SEP ";" +#define SCHEMA_PATH_SEP ":" /** the precomputed default path for GFS transport (incl. trailing '/') */ extern char *g_mstro_transport_gfs_dir; diff --git a/maestro/attributes.c b/maestro/attributes.c index 8a3250ad128fbd37cab2f289edaf82a01de5ae32..501170a41d27247172d53cbff399ab8567562a55 100644 --- a/maestro/attributes.c +++ b/maestro/attributes.c @@ -127,7 +127,7 @@ mstro_cdo_attr_table__destroy(mstro_cdo_attr_table tab) } mstro_status -mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val) +mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val, bool copy_value) { if(cdo==NULL || key==NULL) return MSTRO_INVARG; @@ -163,7 +163,7 @@ mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val) mstro_status status = mstro_attribute_dict_set(cdo->attributes, fqkey, MSTRO_CDO_ATTR_VALUE_INVALID, /* we dont help in checking */ - val, false); + val, copy_value); if(tmpfqkey) free(tmpfqkey); return status; diff --git a/maestro/cdo.c b/maestro/cdo.c index 15c2d4990328c3a8ac3b9f27b54c24ba746d8971..4a340c8a7349e6658470ccf3194d4479ca9435a8 100644 --- a/maestro/cdo.c +++ b/maestro/cdo.c @@ -452,10 +452,10 @@ mstro_cdo__create_mamba_array(mstro_cdo cdo) int64_t* ndims, *patt, *localsz, *elsz; size_t* dimsz; - DEBUG("Sync'ing mamba_array and attributes (CDO `%s`)\n", cdo->name); - s = mstro_cdo_attribute_get(cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, NULL, (const void**)&localsz); + DEBUG("Sync'ing mamba_array (size: %zu) and attributes (CDO `%s`)\n", (size_t)*localsz, cdo->name); + /* fetching minimal set of attributes for custom layout */ s |= mstro_cdo_attribute_get(cdo, MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, NULL, (const void**)&elsz); @@ -528,6 +528,7 @@ mstro_cdo__create_mamba_array(mstro_cdo cdo) } if(me != MMB_OK) { + status = MSTRO_FAIL; ERR("Failed to create mamba array\n"); goto BAILOUT; } @@ -746,8 +747,8 @@ mstro_cdo_declaration_seal(mstro_cdo cdo) idstr, &cdo->id, DEBUG("CDO %s (id %s) now sealed\n", cdo->name, idstr);); - - status = MSTRO_OK; + + status = MSTRO_OK; BAILOUT: return status; @@ -1850,7 +1851,7 @@ mstro_cdo_allocate_data(mstro_cdo cdo) MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, MSTRO_CDO_ATTR_VALUE_pointer, dimsz, true); if (s != MSTRO_OK) { - ERR("Can't sync mamba array because attribute_set failed\n"); + ERR("Can't sync mamba array because dict_set failed\n"); return MSTRO_FAIL; } } else { @@ -1874,6 +1875,7 @@ mstro_cdo_allocate_data(mstro_cdo cdo) } /* now we know a size */ int64_t size = *(const int64_t*)valp; + DEBUG("size (%" PRIi64 ")\n", size); if(size<0 && cdo->raw_ptr!=NULL) { ERR("CDO |%s| has negative local-size but non-NULL raw-ptr, unsupported\n", @@ -1886,6 +1888,8 @@ mstro_cdo_allocate_data(mstro_cdo cdo) ERR("Failed to create mmbArray wrapper for raw-ptr\n"); return s; } + DEBUG("size (%" PRIi64 ")\n", size); + } return MSTRO_OK; diff --git a/maestro/core.c b/maestro/core.c index a6b31da392020989ab2441a1e54240d3bed0c027..5f8a951b4e618c1c74f808456cf544ba70510031 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -4,21 +4,21 @@ **/ /* * Copyright (C) 2019-2020 Cray Computer GmbH - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * + * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * + * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -154,11 +154,11 @@ mstro_core__mamba_init(void) ERR("Failed to retrieve suitable dram memory interface\n"); goto BAILOUT; } - + BAILOUT: - if(init_opts) + if(init_opts) mmb_options_destroy(init_opts); - + if(stat!=MMB_OK) return MSTRO_FAIL; else @@ -169,6 +169,132 @@ BAILOUT: /** minimum mlock() limit */ #define MSTRO_MIN_MEMLOCK (4*sizeof(g_component_descriptor)) + +mstro_status mstro_core_init__setup_schemata(void) +{ + + mstro_status status = MSTRO_OK; + + char * env_schema_list = getenv(MSTRO_ENV_SCHEMA_LIST); + char * env_schema_path = getenv(MSTRO_ENV_SCHEMA_PATH); + //check that neither are null ...if null make them empty strings + if(env_schema_list == NULL) + { + env_schema_list = ""; + } + else + { + INFO("List of user attributes schemata %s \n", env_schema_list); + strncpy(g_component_descriptor.schema_list, env_schema_list, MSTRO_WORKFLOW_NAME_MAX-1); + g_component_descriptor.schema_list[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; + } + + if(env_schema_path == NULL) + { + env_schema_path = ""; + } + + char *schema_list_token; + char *schema_path_token; + + + // parse and merge the builtin schemas first + DEBUG("Parsing Maestro core schema\n"); + status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE, + MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN, + &g_mstro_core_schema_instance); + if(status!=MSTRO_OK) { + ERR("Failed to parse built-in core schema\n"); + return status; + } + + DEBUG("Parsing ECMWF schema\nFIXME for 0.3.0 ... ECMWF schema shoud not be loaded by default\n"); + mstro_schema ecmwf; + status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_ECMWF, + MSTRO_SCHEMA_BUILTIN_YAML_ECMWF_LEN, + &ecmwf); + if(status!=MSTRO_OK) { + ERR("Failed to parse built-in ecmwf schema\n"); + return status; + } + status=mstro_schema_merge(g_mstro_core_schema_instance, ecmwf); + if(status!=MSTRO_OK) { + ERR("Failed to merge core and ECMWF schema\n"); + return status; + } + + // start reading user-defined schemas and merge them. + char *end_list_token; + char *end_path_token; + char *schema_full_path; + char *env_schema_path_plus_default; + + // the lenght of all paths = length of paths exported by user + separator + default path "." + int path_len = strlen(env_schema_path) + 2 + 2; + env_schema_path_plus_default = (char *) malloc(path_len*sizeof(char)); + + /* get the first schema */ + schema_list_token = strtok_r(env_schema_list, SCHEMA_LIST_SEP, &end_list_token); + DEBUG("first schema_list_token: %s \n", schema_list_token); + + /* walk through other tokens */ + while( schema_list_token != NULL ) + { + // parse a schema from the user + mstro_schema user_schema; + DEBUG("looking for schema_list_token: %s \n", schema_list_token); + + // creating list of paths to visit. i.e. current directory "." + user defined paths + snprintf(env_schema_path_plus_default, path_len, "%s%s%s", ".", SCHEMA_PATH_SEP, env_schema_path); + DEBUG("list of paths for schemas: %s\n", env_schema_path_plus_default); + /* get the first path */ + schema_path_token = strtok_r(env_schema_path_plus_default, SCHEMA_PATH_SEP, &end_path_token); + DEBUG("first schema_path_token: %s\n", schema_path_token); + + /* walk through other paths */ + while( schema_path_token != NULL ) + { + // forming the full path: path token + / + list name + '\0' + int schema_full_path_len = strlen(schema_path_token) + 2 + strlen(schema_list_token) + 2; + schema_full_path = (char *) malloc(sizeof(char)*schema_full_path_len); + snprintf(schema_full_path, schema_full_path_len, "%s%s%s", schema_path_token, "/", schema_list_token); + DEBUG("Parsing user-defined schema from %s\n", schema_full_path); + status=mstro_schema_parse_from_file(schema_full_path, &user_schema); + + if(status==MSTRO_OK) { + DEBUG("user-defined schema is read from %s\n", schema_full_path); + free(schema_full_path); + schema_full_path = NULL; + break; // no need to try other paths + } + + free(schema_full_path); + schema_full_path = NULL; + // read the next path + schema_path_token = strtok_r(NULL, SCHEMA_PATH_SEP, &end_path_token); + + } + + if(status!=MSTRO_OK) { + ERR("Failed to parse user_schema from file: %s \n", schema_list_token); + return status; + } + // merge the schema + DEBUG("Merging user schema: %s\n", schema_list_token); + status=mstro_schema_merge(g_mstro_core_schema_instance, user_schema); + + + if(status!=MSTRO_OK) { + ERR("Failed to merge core and user schema from file %s\n", schema_list_token); + return status; + } + // read the next schema name + schema_list_token = strtok_r(NULL, SCHEMA_LIST_SEP, &end_list_token); + } + + return status; +} + mstro_status mstro_core_init(const char *workflow_name, const char *component_name, @@ -180,7 +306,7 @@ mstro_core_init(const char *workflow_name, if(status!=MSTRO_OK) { return status; } - + struct mstro_core_initdata *data = malloc(sizeof(struct mstro_core_initdata)); if(data) { @@ -221,7 +347,6 @@ mstro_core_init(const char *workflow_name, g_component_descriptor.component_name[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; strcpy(g_component_descriptor.version, mstro_version()); - /* check GFS path setting */ { char *gfs_dir = getenv(MSTRO_ENV_TRANSPORT_GFS_DIR); @@ -234,7 +359,7 @@ mstro_core_init(const char *workflow_name, } size_t l2 = strlen(data->workflow_name); size_t l3 = strlen(data->component_name); - + g_mstro_transport_gfs_dir = malloc(l1 + (need_sep? 1 : 0) + l2 + 1 + l3 + 1 +1); @@ -251,10 +376,10 @@ mstro_core_init(const char *workflow_name, } strcpy(g_mstro_transport_gfs_dir+pos, data->workflow_name); pos += l2; - + g_mstro_transport_gfs_dir[pos] = '/'; pos++; - + strcpy(g_mstro_transport_gfs_dir+pos, data->component_name); pos += l3; g_mstro_transport_gfs_dir[pos] = '/'; @@ -267,34 +392,17 @@ mstro_core_init(const char *workflow_name, g_mstro_transport_gfs_dir); } } - - status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE, - MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN, - &g_mstro_core_schema_instance); - if(status!=MSTRO_OK) { - ERR("Failed to parse built-in core schema\n"); - goto BAILOUT; - } - /* FIXME: this should not be here */ - DEBUG("Including ECMWF schema\n"); - mstro_schema ecmwf; - status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_ECMWF, - MSTRO_SCHEMA_BUILTIN_YAML_ECMWF_LEN, - &ecmwf); - if(status!=MSTRO_OK) { - ERR("Failed to parse built-in ecmwf schema\n"); - goto BAILOUT; - } - status=mstro_schema_merge(g_mstro_core_schema_instance, ecmwf); + + /*Reading and integrating attribute schemas*/ + status = mstro_core_init__setup_schemata(); if(status!=MSTRO_OK) { - ERR("Failed to merge core and ECMWF schema\n"); goto BAILOUT; } DEBUG("init %s/%s/% "PRIi64 " in thread %" PRIxPTR" complete\n", data->workflow_name, data->component_name, data->component_index, (intptr_t)pthread_self()); - + pthread_mutex_lock(&g_initdata_mtx); if(g_initdata==NULL) { g_initdata=data; @@ -323,7 +431,7 @@ mstro_core_init(const char *workflow_name, ERR("Failed to initialize subscription subsystem\n"); goto BAILOUT; } - + /* transport init */ status = mstro_transport_init(); if (! (MSTRO_OK == status)) { @@ -356,7 +464,7 @@ mstro_core_init(const char *workflow_name, } } } - + BAILOUT: return status; } @@ -440,7 +548,7 @@ mstro_core_finalize(void) } g_mstro_core_schema_instance=NULL; } - + x = mmb_finalize(); if(x!=MMB_OK) { ERR("Failed to shut down Mamba: %d (%s)\n", x, mmb_error_description(x)); @@ -450,13 +558,13 @@ mstro_core_finalize(void) status = mstro_memlock_finalize(); -BAILOUT: +BAILOUT: return status; } mstro_status -mstro_core_state_get(const struct mstro_core_initdata **res) +mstro_core_state_get(const struct mstro_core_initdata **res) { mstro_status status=MSTRO_UNIMPL; const struct mstro_core_initdata *ptr; @@ -475,4 +583,4 @@ mstro_core_state_get(const struct mstro_core_initdata **res) } *res = ptr; return status; -} +} diff --git a/maestro/groups.c b/maestro/groups.c index 16d4862aa5903f7652d2d8d555690fa7142267fa..6a7f7cdd65437d85217eec66d871351138520aa1 100644 --- a/maestro/groups.c +++ b/maestro/groups.c @@ -204,7 +204,7 @@ mstro_group_declare(const char *name, mstro_group* group) if(s==MSTRO_OK) { bool trueval = true; s = mstro_cdo_attribute_set((*group)->group_cdo, - MSTRO_ATTR_CORE_CDO_ISGROUP, &trueval); + MSTRO_ATTR_CORE_CDO_ISGROUP, &trueval, true); if(s==MSTRO_OK) { DEBUG("Declared CDO group |%s|\n", name); (*group)->state = MSTRO_GROUP_STATE_DECLARED; @@ -312,20 +312,21 @@ mstro_group_seal(mstro_group g) HASH_ITER(hh, g->members, el, tmp) { m->declared_members[i] = malloc(sizeof(Mstro__Pool__UUID)); /* need to ensure the GID is ready */ - mstro_status s = mstro_cdo_seal(el->entry); - if(s!=MSTRO_OK) { - ERR("Failed to seal declared group member: %d\n", s); - } - if(s!=MSTRO_OK || m->declared_members[i]==NULL) { - for(size_t j=i; j>0; j--) { - free(m->declared_members[j-1]); - }; - free(m->declared_members); - m->declared_members=NULL; - goto BAILOUT_FREE; - + if(!mstro_cdo_state_check(el->entry, MSTRO_CDO_STATE_SEALED)) { + mstro_status s = mstro_cdo_seal(el->entry); + if(s!=MSTRO_OK) { + ERR("Failed to seal declared group member: %d\n", s); + } + if(s!=MSTRO_OK || m->declared_members[i]==NULL) { + for(size_t j=i; j>0; j--) { + free(m->declared_members[j-1]); + }; + free(m->declared_members); + m->declared_members=NULL; + goto BAILOUT_FREE; + + } } - mstro__pool__uuid__init(m->declared_members[i]); m->declared_members[i]->qw0 = el->entry->gid.qw[0]; m->declared_members[i]->qw1 = el->entry->gid.qw[1]; @@ -374,7 +375,7 @@ mstro_group_seal(mstro_group g) s = mstro_cdo_attribute_set(g->group_cdo, MSTRO_ATTR_CORE_CDO_GROUP_MEMBERS, - blob); + blob, false); if(s!=MSTRO_OK) goto BAILOUT_FREE; diff --git a/maestro/ofi.c b/maestro/ofi.c index 72a1572df8ded128f7f66375d169c9eb9b56f951..06f942cdfd4b5021ee4b5e27611272ecb0bf5992 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -5,21 +5,21 @@ /* * Copyright (C) 2019 Cray Computer GmbH - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * + * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * + * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -147,9 +147,9 @@ mstro_ep_desc_create_ofi(mstro_endpoint_descriptor *result_p, return MSTRO_INVARG; enum mstro_endpoint_type ept = MSTRO_EP_INVALID; - + switch(fi->addr_format) { - /** This is the list documented for libfabric 1.5 */ + /** This is the list documented for libfabric 1.5 */ case FI_SOCKADDR_IN: ept = MSTRO_EP_OFI_IN4; break; case FI_SOCKADDR_IN6: @@ -268,13 +268,13 @@ mstro_ep_desc_serialize__in4(char *dst, const struct sockaddr_in *sin) { mstro_status stat = MSTRO_UNIMPL; - + /* struct sockaddr_in is defined platform-independent and * already using network byte order as needed, so we can handle * it as a struct */ tpl_node *tns; tpl_bin tb; - + tb.sz = sizeof(struct sockaddr_in); tb.addr = sin; tns = tpl_map("B", &tb); @@ -318,7 +318,7 @@ mstro_ep_desc_deserialize__in4(struct sockaddr_in *sin, char *b64_strval) { mstro_status stat=MSTRO_UNIMPL; - + tpl_node *tn; tpl_bin tb; tn = tpl_map( "B", &tb ); @@ -333,7 +333,7 @@ mstro_ep_desc_deserialize__in4(struct sockaddr_in *sin, goto BAILOUT; } /* DEBUG("b64decode: buf of length %zu\n", buflen); */ - + tpl_load( tn, TPL_MEM, buf, buflen); tpl_unpack( tn, 0 ); tpl_free(tn); @@ -361,7 +361,7 @@ mstro_ep_desc_serialize__in6(char *dst, tpl_bin tb; INFO("sockaddr_in6 with port %d\n", ntohs(sin->sin6_port)); - + tb.sz = sizeof(struct sockaddr_in6); tb.addr = sin; tns = tpl_map("B", &tb); @@ -388,7 +388,7 @@ mstro_ep_desc_serialize__in6(char *dst, stat=MSTRO_FAIL; goto BAILOUT; } - + stat=MSTRO_OK; BAILOUT: /* clean up */ @@ -422,7 +422,7 @@ mstro_ep_desc_deserialize__in6(struct sockaddr_in6 *sin, tpl_load( tn, TPL_MEM, buf, buflen); tpl_unpack( tn, 0 ); tpl_free(tn); - + assert(tb.sz==sizeof(struct sockaddr_in6)); memcpy(sin, tb.addr, tb.sz); @@ -455,13 +455,13 @@ mstro_ep_desc_serialize(char **result_p, struct serialized_endpoint_element serialized_element; assert(MSTRO_EP__MAX<=INT_MAX); - + tn = tpl_map("A(S(IssUUc#))", &serialized_element, MSTRO_OFI_KEY_LEN_MAX); char *strval = alloca(MSTRO_EP_STRING_MAX*sizeof(char)); if(strval==NULL) return MSTRO_NOMEM; - + LL_FOREACH(epd,elt) { /* each one should serialize into STRVAL buffer, base64-encoded, * NULL-terminated */ @@ -495,7 +495,7 @@ mstro_ep_desc_serialize(char **result_p, return MSTRO_FAIL; } /* b64 encode */ - + size_t needed; encoded = base64_encode(buf, buflen, &needed); if(needed>MSTRO_EP_STRING_MAX) { @@ -513,7 +513,7 @@ mstro_ep_desc_serialize(char **result_p, stat=MSTRO_OK; BAILOUT_1: /* clean up */ - NFREE(encoded); + NFREE(encoded); tpl_free(tns); free(buf); } else if(elt->type == MSTRO_EP_OFI_STR) { @@ -549,7 +549,7 @@ mstro_ep_desc_serialize(char **result_p, stat=MSTRO_NOMEM; goto BAILOUT_2; } - + stat=MSTRO_OK; BAILOUT_2: NFREE(encoded); @@ -558,7 +558,7 @@ mstro_ep_desc_serialize(char **result_p, elt->type, mstro_ep_descriptor_names[elt->type]); return MSTRO_FAIL; } - + serialized_element.type = elt->type; serialized_element.strval=strdup(strval); /* shrink wrap size */ if(serialized_element.strval==NULL) @@ -566,7 +566,7 @@ mstro_ep_desc_serialize(char **result_p, mstro_drc_get_oob_string(&serialized_element.oob_cookie, g_drc_info); DEBUG("Added OOB info %s\n", serialized_element.oob_cookie); - + /* keys */ serialized_element.info_addr = ep->component_info_addr; serialized_element.info_keysize = ep->component_info_keysize; @@ -582,7 +582,7 @@ mstro_ep_desc_serialize(char **result_p, free(serialized_element.strval); free(serialized_element.oob_cookie); } - + /* now write all to a string */ size_t len; void *buf=NULL; @@ -619,7 +619,7 @@ mstro_ep_desc_serialize(char **result_p, BAILOUT: if(buf) free(buf); - + return stat; } @@ -629,7 +629,7 @@ mstro_ofi_pm_info(char **result_p) if(result_p==NULL) return MSTRO_INVOUT; mstro_status status = MSTRO_OK; - + unsigned char *encoded=NULL; if(g_endpoints==NULL) { ERR("No endpoints configured -- did you call mstro_pm_start()?\n"); @@ -645,14 +645,14 @@ mstro_ofi_pm_info(char **result_p) struct serialized_endpoint_element serialized_element; assert(MSTRO_EP__MAX<=INT_MAX); - + tn = tpl_map("A(S(IssUUc#))", &serialized_element, MSTRO_OFI_KEY_LEN_MAX); char *strval = alloca(MSTRO_EP_STRING_MAX*sizeof(char)); if(strval==NULL) return MSTRO_NOMEM; mstro_status stat; - + LL_FOREACH(&(g_endpoints->eps[0]),elt) { mstro_endpoint_descriptor d = elt->descr; /* each one should serialize into STRVAL buffer, base64-encoded, @@ -686,7 +686,7 @@ mstro_ofi_pm_info(char **result_p) return MSTRO_FAIL; } /* b64 encode */ - + size_t needed; encoded = base64_encode(buf, buflen, &needed); if(needed>MSTRO_EP_STRING_MAX) { @@ -704,7 +704,7 @@ mstro_ofi_pm_info(char **result_p) stat=MSTRO_OK; BAILOUT_1: /* clean up */ - NFREE(encoded); + NFREE(encoded); tpl_free(tns); free(buf); } else if(d->type == MSTRO_EP_OFI_STR) { @@ -740,7 +740,7 @@ mstro_ofi_pm_info(char **result_p) stat=MSTRO_NOMEM; goto BAILOUT_2; } - + stat=MSTRO_OK; BAILOUT_2: NFREE(encoded); @@ -749,7 +749,7 @@ mstro_ofi_pm_info(char **result_p) d->type, mstro_ep_descriptor_names[d->type]); return MSTRO_FAIL; } - + serialized_element.type = d->type; serialized_element.strval=strdup(strval); /* shrink wrap size */ if(serialized_element.strval==NULL) @@ -804,7 +804,7 @@ mstro_ofi_pm_info(char **result_p) BAILOUT: free(buf); return status; - + } @@ -821,7 +821,7 @@ mstro_ep_desc_deserialize__str(char **dst, mstro_status stat=MSTRO_UNIMPL; - + size_t buflen; unsigned char *buf = base64_decode((unsigned char*)b64_strval, strlen(b64_strval), @@ -858,7 +858,7 @@ mstro_ep_desc_deserialize__uint64(uint64_t (*dst)[6], } mstro_status stat=MSTRO_UNIMPL; - + size_t buflen; unsigned char *buf = base64_decode((unsigned char*)b64_strval, strlen(b64_strval), @@ -880,7 +880,7 @@ mstro_ep_desc_deserialize__uint64(uint64_t (*dst)[6], BAILOUT: NFREE(buf); - return stat; + return stat; } @@ -893,7 +893,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, tpl_node *tn=NULL; mstro_endpoint_descriptor *next = NULL; - if(result_p==NULL) + if(result_p==NULL) return MSTRO_INVOUT; if(serialized_eps==NULL) return MSTRO_INVARG; @@ -903,7 +903,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, unsigned char *buf = base64_decode((unsigned char*)serialized_eps, strlen(serialized_eps), &buflen); - + if(buf==NULL) { stat=MSTRO_NOMEM; goto BAILOUT; @@ -937,9 +937,9 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, target->oob_cookie=NULL; DEBUG("no cookie\n"); } - + target->name=NULL; - + if(eptype==MSTRO_EP_OFI_IN4) { stat = mstro_ep_desc_deserialize__in4(&(target->in4), serialized_element.strval); @@ -956,11 +956,11 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, || eptype==MSTRO_EP_OFI_MLX) { /** transparently handles one .. six entries */ stat = mstro_ep_desc_deserialize__uint64(&target->gni, - serialized_element.strval); + serialized_element.strval); } else if(eptype==MSTRO_EP_OFI_STR) { target->str=NULL; stat = mstro_ep_desc_deserialize__str(&(target->str), - serialized_element.strval); + serialized_element.strval); } else { ERR("Unsupported EP type: %d\n", eptype); stat = MSTRO_UNIMPL; @@ -994,7 +994,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, } BAILOUT: free(buf); - + return stat; } @@ -1090,7 +1090,7 @@ mstro_ep_desc_describe(mstro_endpoint_descriptor desc) s, errno, strerror(errno)); abort(); } - + desc->name = buf; } } @@ -1107,7 +1107,7 @@ mstro_ep_desc_to_ofi_addr(uint32_t *addr_format, return MSTRO_INVOUT; if(epd==NULL) return MSTRO_INVARG; - + switch(epd->type) { case MSTRO_EP_OFI_IN4: *addr_format = FI_SOCKADDR_IN; @@ -1167,7 +1167,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, return MSTRO_INVOUT; if(fi==NULL) return MSTRO_INVARG; - + struct fid_fabric *fabric = NULL; struct fi_eq_attr eq_attr; struct fid_eq *eq = NULL; @@ -1178,7 +1178,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, struct fi_cq_attr cq_attr; struct fid_cq *cq = NULL; uint8_t *mr_key = NULL; - + /* create fabric object */ stat = fi_fabric(fi->fabric_attr, &fabric, NULL); if(stat!=0) { @@ -1195,15 +1195,15 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, * allocated early on in ofi_init (on pool manager) or obtained * from PM_INFO (on clients) */ assert(g_drc_info!=NULL); - + stat = mstro_drc_insert_ofi(fi, g_drc_info); if(stat!=MSTRO_OK) { ERR("Failed to insert DRC credential into fabric info\n"); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; - } + } } - + /* create domain */ stat = fi_domain(fabric, fi, &domain, NULL); if(stat!=0) { @@ -1232,7 +1232,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_cq_open failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* event queue */ /* create event queue */ memset(&eq_attr, 0, sizeof(eq_attr)); @@ -1253,7 +1253,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, } /* INFO("This is a %s\n", fi_tostr(fi,FI_TYPE_INFO)); */ - + /* connectionless endpoint needs address vector */ memset(&av_attr, 0, sizeof(av_attr)); av_attr.type = FI_AV_MAP; @@ -1262,14 +1262,14 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_av_open failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* bind address vector to endpoint */ stat = fi_ep_bind(ep, &av->fid, 0); if(stat!=0) { ERR("fi_ep_bind for av failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* bind event queue to endpoint */ stat = fi_ep_bind(ep, &eq->fid, 0); if(stat!=0) { @@ -1280,7 +1280,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, eq=NULL; } else { retstat=MSTRO_FAIL; goto BAILOUT_FAIL; - } + } } /* bind cq to endpoint */ stat = fi_ep_bind(ep, &cq->fid, FI_TRANSMIT|FI_RECV); @@ -1288,7 +1288,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_ep_bind for cq failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* enable! */ stat = fi_enable(ep); if(stat!=0) { @@ -1380,7 +1380,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("Failed to retrieve key for component info memory registration: %d (%s)\n"); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - assert(sizeof(key)<=MSTRO_OFI_KEY_LEN_MAX); + assert(sizeof(key)<=MSTRO_OFI_KEY_LEN_MAX); /* keysize is 4 on verbs ... we try to let the compiler handle byte order issues here by casting */ switch(keysize) { case 1: { @@ -1402,7 +1402,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, memcpy(mr_key, &key, sizeof(key)); break; } - default: + default: ERR("Unsupported MR key size %d\n", keysize); assert(0); } @@ -1417,13 +1417,13 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, addr=(uint64_t)&g_component_descriptor; } } - - + + { uint64_t k = 0; memcpy(&k, mr_key, keysize); /* just for printing */ - + INFO("Component info RDMA block registered%s: local %p, MR %p, addr %" PRIx64 ", desc %p, keysize %zu (key uint64 start: 0x%" PRIx64 ")\n", fi->domain_attr->mr_mode & FI_MR_RAW ? " (raw keys)" : "", &g_component_descriptor, mr, addr, fi_mr_desc(mr), keysize, k); @@ -1440,7 +1440,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, dst->component_info_addr = addr; dst->component_info_keysize = keysize; dst->component_info_raw_key = mr_key; - + dst->next = NULL; @@ -1450,7 +1450,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("Failed to construct worker mgmt ep description: %d\n", retstat); goto BAILOUT_FAIL; } - + retstat = mstro_ep_desc_serialize(&dst->addr_serialized, dst); if(retstat!=MSTRO_OK) { @@ -1483,7 +1483,7 @@ BAILOUT_FAIL: /* wait sets (none) */ CLEANUP(fabric,"fabric"); #undef CLEANUP - + BAILOUT: return retstat; } @@ -1502,7 +1502,7 @@ mstro_endpoint_describe(struct mstro_endpoint *ep) char addrbuf[EP_DESC_BUF_MAX]; ep_name_buf[0]='\0'; - + size=0; ret = fi_getname(&ep->ep->fid,NULL,&size); if(ret!=-FI_ETOOSMALL) { @@ -1511,16 +1511,16 @@ mstro_endpoint_describe(struct mstro_endpoint *ep) } assert(size<EP_DESC_BUF_MAX); fi_getname(&ep->ep->fid,addrbuf,&size); - + size=0; fi_av_straddr(ep->av, addrbuf, NULL, &size); assert(size<EP_DESC_BUF_MAX); fi_av_straddr(ep->av, addrbuf, strbuf, &size); - + size=snprintf(ep_name_buf, EP_DESC_BUF_MAX, "OFI EP prov %s name %s straddr %s", - ep->fi->fabric_attr->prov_name, + ep->fi->fabric_attr->prov_name, ep->fi->fabric_attr->name, strbuf); assert(size<1024); DONE: @@ -1655,7 +1655,7 @@ mstro_ofi__order_fi_list(struct fi_info **fi) { if(fi==NULL) return MSTRO_INVARG; - + struct fi_info *head = *fi; LL_SORT(head, mstro_ofi__fi_info_cmp); *fi = head; @@ -1702,11 +1702,11 @@ mstro_ofi_init(void) hints->mode = MSTRO_OFI_MODE; hints->ep_attr->type = MSTRO_OFI_EP_TYPE; hints->domain_attr->mr_mode = MSTRO_OFI_MRMODE; - + /* we really want 1.8 or above */ stat = fi_getinfo(MSTRO_OFI_VERSION, NULL, NULL, 0, hints, &fi); fi_freeinfo(hints); - + if(stat!=0) { ERR("fi_getinfo failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; @@ -1748,7 +1748,7 @@ mstro_ofi_init(void) goto BAILOUT_FAIL; } - /* also do it for the PM descriptor block we'll RDMA read into */ + /* also do it for the PM descriptor block we'll RDMA read into */ assert(sizeof(g_pm_component_descriptor)%sysconf(_SC_PAGESIZE)==0); retstat = mstro_memlock(&g_pm_component_descriptor, sizeof(g_pm_component_descriptor)); if(retstat!=MSTRO_OK) { @@ -1762,11 +1762,11 @@ mstro_ofi_init(void) LL_FOREACH(fi,tmp) { NOISE("index %zu\n", i); i++; - DEBUG("potential endpoint: %s %s\n", tmp->fabric_attr->prov_name, + DEBUG("potential endpoint: %s %s\n", tmp->fabric_attr->prov_name, tmp->fabric_attr->name); retstat = mstro_ep_build_from_ofi(&(g_endpoints->eps[g_endpoints->size]), tmp); - + if(retstat!=MSTRO_OK) { WARN("Failed to build EP %zu, trying others\n", i); } else { @@ -1799,18 +1799,18 @@ mstro_ofi_init(void) } /* fix pointer into invalid last entry */ g_endpoints->eps[g_endpoints->size-1].next = NULL; - + if(g_endpoints!=NULL) { INFO("%zu usable endpoints\n", g_endpoints->size); retstat = MSTRO_OK; } else { ERR("No usable endpoints found\n"); } - + goto BAILOUT; BAILOUT_FAIL: ; /* FIXME: free resources */ - + BAILOUT: return retstat; } @@ -1842,7 +1842,7 @@ mstro_ofi_finalize(void) } /* every EP has one lock on the component descriptor */ mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); - + #define CLOSE_FID(member, descr) do { \ if(e->member) { \ DEBUG("Closing down %s for ep %zu\n", descr, i); \ @@ -1851,7 +1851,7 @@ mstro_ofi_finalize(void) descr, i, s, fi_strerror(-s)); } \ } \ } while(0) - + CLOSE_FID(ep,"EP"); CLOSE_FID(cq,"CQ"); CLOSE_FID(av,"AV"); @@ -1859,16 +1859,16 @@ mstro_ofi_finalize(void) CLOSE_FID(domain,"DOMAIN"); // CLOSE_FID(fi,"FABRIC"); #undef CLOSE_FID - + } } DEBUG("OFI endpoints closed\n"); - + /* It's safe to call this even if we've not yet locked the page; we * ignore the error that we'd get in this case */ mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); mstro_memunlock(&g_pm_component_descriptor, sizeof(g_pm_component_descriptor)); - + return MSTRO_OK; } @@ -1890,7 +1890,7 @@ struct mstro_ofi_msg_context_ { }; bool has_completion; /**< indicates if pthread waiting * infrastructure is initialized in this - * context and should be used*/ + * context and should be used*/ pthread_cond_t completion; /**< Condition variable for originator of * message to */ pthread_mutex_t lock; /**< a lock, mostly because cond_wait @@ -1901,7 +1901,7 @@ typedef struct mstro_ofi_msg_context_ * mstro_ofi_msg_context; /** Create a message completion context for @arg msg. - + If @arg want_completion is true a mutex and a condition variable will be initialized and the completion handler can signal completion on that condition variable to the originator. @@ -1923,7 +1923,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, /* RDMA ops don't have non-NULL msg */ /* if(msg==NULL) */ /* return MSTRO_INVARG; */ - + mstro_ofi_msg_context ctx = malloc(sizeof(struct mstro_ofi_msg_context_)); if(ctx==NULL) { @@ -1939,7 +1939,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, free(ctx); ctx=NULL; goto BAILOUT; - } + } res = pthread_cond_init(&ctx->completion, NULL); if(res!=0) { ERR("Failed to init ofi msg cond var: %d\n", res); @@ -1957,7 +1957,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, stat = MSTRO_OK; BAILOUT: *result_p = ctx; - return stat; + return stat; } /** destroy an OFI message context */ @@ -1975,7 +1975,45 @@ mstro_ofi__msg_context_destroy(mstro_ofi_msg_context ctx) return MSTRO_OK; } - +/** Check the schema list provided from pm is compatible with the component schema list */ +/* compatible means that pm supports all of the schemas in the component schema list or more */ +/* FIXME check that the schema versions are compatible between the pm and the component */ +bool schema_list_compatible_p(const char * component_schema_list, const char * pm_schema_list) +{ + char *schema_name; + char *end_list_token; + + if (component_schema_list == NULL) + { + /*nothing to look for here*/ + //DEBUG("[schema_list_compatible_p] component schema list is NULL"); + return true; + } + else if (pm_schema_list ==NULL) { + /*component_schema_list is not empty but pm_schema_list is empty ... we should look no further*/ + //DEBUG("[schema_list_compatible_p] pm schema list is NULL"); + return false; + } + // split the schema names in the component_schema_list + schema_name = strtok_r(component_schema_list, SCHEMA_LIST_SEP, &end_list_token); + // for each schema name in the component_schema_list ...try to find it in the pm_schema_list + while( schema_name != NULL ) + { + DEBUG("Looking for schema %s \n", schema_name); + /*if we did not find the component schema name in the pm_schema_list*/ + if(strstr(pm_schema_list, schema_name) == NULL) { + DEBUG("Schema %s not found in pm schema list\n", schema_name); + return false; + } + DEBUG("Found schema %s in %s\n", schema_name, pm_schema_list); + // read the next schema name + schema_name = strtok_r(NULL, SCHEMA_LIST_SEP, &end_list_token); + } + + /*We reach here only if all component schemas were found in the pm_schema_list*/ + return true; + +} /** select the best match between remote and local endpoints */ mstro_status @@ -2053,7 +2091,7 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, stat = MSTRO_FAIL; goto BAILOUT; } - + uint64_t mr_addr=0; /* since the EP accepted the address we interpolate that * MR_RAW will be set identically (while we really should be @@ -2075,7 +2113,7 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, } DEBUG("Checking for PM config block MR at (remote addr) 0x%" PRIx64 ", key %" PRIx64 "\n", mr_addr, mr_key); - + assert(ctx->msg==NULL); assert(my_ep->peer_info_mr!=NULL); /* incoming buffer has been registered at local endpoint set creation */ void * local_buf_mr_desc = fi_mr_desc(my_ep->peer_info_mr); @@ -2123,8 +2161,15 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, INFO("Component descriptor mismatch, illegal type or other workflow, skipping\n"); continue; } + DEBUG("PM schema list %s \n", g_pm_component_descriptor.schema_list); + DEBUG("Component schema list %s \n", g_component_descriptor.schema_list); + if(!schema_list_compatible_p(g_component_descriptor.schema_list, g_pm_component_descriptor.schema_list)) + { + INFO("Component descriptor mismatch, PM does not support the schema list of component, skipping\n"); + continue; + } } /* end of possible config block verification */ - + *remote_addr_p = translated_addr; *local_p = my_ep; stat=MSTRO_OK; @@ -2161,7 +2206,7 @@ mstro_ofi__submit_message(struct mstro_endpoint *ep, assert(env!=NULL); DEBUG("Sending message %s, size %zu, %p using ep %p\n", env->descriptor->name, env->payload_size, env, ep); - + int res; do { assert(env!=context); /* catch common mistake: context needs to be a @@ -2174,7 +2219,7 @@ mstro_ofi__submit_message(struct mstro_endpoint *ep, /* DEBUG("tsend with tag %"PRIu64"\n", (uint64_t)msg->type); */ } else { - res = fi_send(ep->ep, env->data, env->payload_size, + res = fi_send(ep->ep, env->data, env->payload_size, descriptor, dst, context); } if(res!=0) { @@ -2205,9 +2250,9 @@ mstro_ofi__submit_message_wait(struct mstro_endpoint *ep, fi_addr_t dst, ERR("Failed to acquire msg context lock: %d\n", ret); return MSTRO_FAIL; } - + stat = mstro_ofi__submit_message(ep, dst, msg, NULL, ctx); - if(stat!=MSTRO_OK) + if(stat!=MSTRO_OK) return stat; ret = pthread_cond_wait(&ctx->completion, &ctx->lock); @@ -2240,8 +2285,8 @@ mstro_ofi__submit_message_nowait(struct mstro_endpoint *ep, fi_addr_t dst, } else { ; } - - + + /* for non-completion contexts the message handler frees the * context */ return mstro_ofi__submit_message(ep, dst, msg, NULL, ctx); @@ -2257,7 +2302,7 @@ mstro_ofi__expect_message_wait(struct fid_ep *ep, fi_addr_t src, = mstro_ofi__msg_context_create(&ctx, msg, true); if(stat!=MSTRO_OK) return stat; - + int ret = pthread_mutex_lock(&ctx->lock); if(ret!=0) { ERR("Failed to acquire msg context lock: %d\n", ret); @@ -2266,7 +2311,7 @@ mstro_ofi__expect_message_wait(struct fid_ep *ep, fi_addr_t src, /* FIXME: split out do loop like for submit so that we don't need * fi_recv in arbitrary places anymore */ - + int res; do { if(MSTRO_MSG_HAS_TAG(msg)) { @@ -2354,7 +2399,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, } INFO("EPD serial %s\n", join_msg->serialized_endpoint); INFO("EPD parsed as %s\n", mstro_ep_desc_describe(epd)); - + uint32_t addr_format; void *remote_addr; size_t addrlen; @@ -2384,7 +2429,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, DEBUG("App %s advertises %zu transport methods\n", join_msg->component_name, join_msg->transport_methods->n_supported); - + /* insert into registry table */ s = mstro_pm_app_register(ep, translated_addr, strdup(join_msg->serialized_endpoint), @@ -2398,7 +2443,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, join_msg->transport_methods = NULL; /* we kept a reference to it, caller will free the message */ DEBUG("Assigned app ID %"PRIu64"\n", (*entry_p)->appid); } - + BAILOUT: return s; BAILOUT_FREE: @@ -2546,24 +2591,24 @@ mstro_ofi__check_and_handle_cq(struct mstro_endpoint *ep, INFO("Completion of op with ctx %p message %p%s of size %zu on endpoint %p\n", ctx, ctx->msg, re_post ? " (slot i)" : "", entry.len, ep); - + if(re_post) { DEBUG("Posting fresh recv on slot %p", expected_slot_p); status = mstro_ofi__loop_post_recv(ep->ep, expected_slot_p, expected_ctx_p); } - - + + if(entry.flags&FI_RECV) { assert(ctx->msg!=NULL); ctx->msg->payload_size = entry.len; /* this permits us to not pass ofi 'entry' to pc/pm handler */ const struct mstro_msg_envelope *msg = ctx->msg; status = incoming_msg_handler(msg, ep); - + if(status!=MSTRO_OK) { ERR("Error handling incoming message, dropping it: %d (%s)\n", status, mstro_status_description(status)); } - + if(re_post) { ; /* fresh receive will be posted later, and slot messages have no completion watchers */ } else { @@ -2571,42 +2616,42 @@ mstro_ofi__check_and_handle_cq(struct mstro_endpoint *ep, DEBUG("non-slot message, cleaning up\n"); if(ctx->has_completion) { DEBUG("ctx asks for completion notification\n"); - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("ctx wants no completion notification, cleaning up ctx\n"); mstro_msg_envelope_free(ctx->msg); status = mstro_ofi__msg_context_destroy(ctx); } } - + } else if(entry.flags&FI_SEND) { assert(ctx->msg!=NULL); struct mstro_msg_envelope *msg = ctx->msg; - + /* completion of our sent messages */ DEBUG("Send of message %p type %s completed\n", msg, msg->descriptor->name); /* otherwise clean it up */ if(ctx->has_completion) { DEBUG("ctx asks for completion notification\n"); - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("ctx wants no completion notification, cleaning up ctx\n"); mstro_msg_envelope_free(msg); status = mstro_ofi__msg_context_destroy(ctx); } - + } else if(entry.flags&FI_RMA) { assert(ctx->msg==NULL); DEBUG("RDMA op completed, ctx %p\n", ctx); assert(ctx->has_completion); /* it does not make sense to * not have this set */ - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("Unexpected completion, flags %ull\n", entry.flags); } - - return status; + + return status; } static inline @@ -2651,15 +2696,15 @@ mstro_ofi__loop(_Atomic bool *terminate, } while(false==atomic_load(terminate)); DEBUG("PM comm thread quits: loop status %d, global status %d\n", s, status); - + BAILOUT: if(slots) { - for(i=0; i<g_endpoints->size; i++) + for(i=0; i<g_endpoints->size; i++) mstro_msg_envelope_free(slots[i]); free(slots); } if(ctxts) { - for(i=0; i<g_endpoints->size; i++) + for(i=0; i<g_endpoints->size; i++) mstro_ofi__msg_context_destroy(ctxts[i]); free(ctxts); } @@ -2672,7 +2717,7 @@ mstro_ofi_pm_loop(_Atomic bool *terminate) g_pool_app_id = MSTRO_APP_ID_MANAGER; return mstro_ofi__loop(terminate, mstro_pm_handle_msg); -} +} mstro_status mstro_ofi_pc_loop(_Atomic bool *terminate) @@ -2756,7 +2801,7 @@ mstro_pc_thread(void *closure) s, mstro_status_description(s)); } DEBUG("pool client thread stopping\n"); - + return NULL; } */ @@ -2800,9 +2845,9 @@ mstro_pc_terminate(void) leave.base.descriptor->name); goto BAILOUT; } - + status = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); - + /* The BYE message will be handled by the normal incoming message handler * We can tell it's done by checking g_mstro_pm_attached */ switch(status) { @@ -2864,7 +2909,7 @@ mstro_pm_attach(const char *remote_pm_info) mstro_endpoint_descriptor pm_epd=NULL; mstro_status s=mstro_ep_desc_deserialize(&pm_epd, remote_pm_info); - + if(s!=MSTRO_OK) { ERR("Failed to parse pool manager info: %d (%s%s)\n", s, mstro_status_description(s), @@ -2881,7 +2926,7 @@ mstro_pm_attach(const char *remote_pm_info) { mstro_endpoint_descriptor tmp; size_t num_cookies = 0; - + LL_FOREACH(pm_epd, tmp) { INFO("Pool manager endpoint: %s\n", mstro_ep_desc_describe(tmp)); if(tmp->type==MSTRO_EP_OFI_GNI) { @@ -2900,13 +2945,13 @@ mstro_pm_attach(const char *remote_pm_info) } } } - + s=mstro_ofi_init(); if(s!=MSTRO_OK) { ERR("Failed to initialize OFI layer\n"); goto BAILOUT; } - + /* start handler thread: run pool client loop. It will stop when mstro_pc_terminate is called */ /* we need to start it before endpoint selection since we do RDMA * read for the config block, and that needs to report completion @@ -2919,30 +2964,30 @@ mstro_pm_attach(const char *remote_pm_info) goto BAILOUT; } g_component_descriptor.type = MSTRO_COMPONENT_TYPE_APP; - - + + s=mstro_ofi__select_endpoint(pm_epd, &g_pm_endpoint, &g_pm_addr); if(s!=MSTRO_OK) { ERR("Failed to select a suitable endpoint to talk to pool manager\n"); goto BAILOUT; } assert(g_pm_endpoint!=NULL); - + /* register pool manager as a pmp app entry */ s = mstro_pm_app_register_manager(g_pm_endpoint, g_pm_addr); if(s!=MSTRO_OK) { ERR("Failed to register Pool Manager in registry\n"); goto BAILOUT; } - - + + /* send JOIN */ - - + + /* FIXME make function set_transport_default() beware protobuf stack init */ /* if we have OFI we'll add another entry */ - + Mstro__Pool__TransportKind transport_kinds[] = { MSTRO__POOL__TRANSPORT_KIND__GFS #ifdef HAVE_MIO @@ -2950,18 +2995,18 @@ mstro_pm_attach(const char *remote_pm_info) #endif }; size_t num_available_transports = sizeof(transport_kinds)/sizeof(transport_kinds[0]); - + Mstro__Pool__TransportMethods transport_methods = MSTRO__POOL__TRANSPORT_METHODS__INIT; transport_methods.n_supported = num_available_transports; transport_methods.supported = (Mstro__Pool__TransportKind*)&transport_kinds; - + const char* env_transport_default = getenv(MSTRO_ENV_TRANSPORT_DEFAULT); if (env_transport_default != NULL) { int i; int found = 0; - for ( i = 0; i < MSTRO__POOL__TRANSPORT_KIND__NUMBER_OF_KINDS; i++ ) { + for ( i = 0; i < MSTRO__POOL__TRANSPORT_KIND__NUMBER_OF_KINDS; i++ ) { if (!strcmp(env_transport_default, g_transport_registry[i].name)) { transport_methods.supported[0] = (Mstro__Pool__TransportKind)i; found = 1; @@ -2978,13 +3023,13 @@ mstro_pm_attach(const char *remote_pm_info) for (i = 0; i < transport_methods.n_supported; i++ ) INFO("#%d %s \n", i, g_transport_registry[transport_methods.supported[i]].name); /**/ - + Mstro__Pool__Join join = MSTRO__POOL__JOIN__INIT; join.protocol_version = MSTRO_POOL_PROTOCOL_VERSION; join.serialized_endpoint = g_pm_endpoint->addr_serialized; join.transport_methods = &transport_methods; join.component_name = g_initdata->component_name; - + Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT; s = mstro_pmp_package(&msg, (ProtobufCMessage*)&join); if(s!=MSTRO_OK) { @@ -2992,13 +3037,13 @@ mstro_pm_attach(const char *remote_pm_info) join.base.descriptor->name); goto BAILOUT; } - + s = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); /* The WELCOME message will be handled by the normal incoming * message handler. We can tell it's done by checking * g_pool_app_id */ - + switch(s) { case MSTRO_OK: { /* spin-wait */ @@ -3010,7 +3055,7 @@ mstro_pm_attach(const char *remote_pm_info) } break; } - case MSTRO_NO_PM: + case MSTRO_NO_PM: ERR("Confusion: JOIN message failed with 'no PM info' error\n"); /* fall-thru */ default: @@ -3018,15 +3063,13 @@ mstro_pm_attach(const char *remote_pm_info) s, mstro_status_description(s)); return s; } - - + + /* Welcome received */ - INFO("Welcome message received, our app id is %"PRIu64"\n", + INFO("Welcome message received, our app id is %"PRIu64"\n", g_pool_app_id); - + BAILOUT: return s; } - - diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 87a15d014032296c3db66ad2f5bfbe5eb0c8681b..a2e169e20fac4c28f7447de9c442290a02e02338 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -386,22 +386,28 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; ticket.cdoid = &id; - const int64_t *size; - enum mstro_cdo_attr_value_type type; - if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, - MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &type, (const void**)&size))) { - ERR("Couldn't retrieve CDO %s local_size needed for transport\n", src_cdo->name); + mstro_status s=MSTRO_UNIMPL; + const void *size=NULL; + enum mstro_cdo_attr_value_type vt; + s = mstro_attribute_dict_get(src_cdo->attributes, + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &vt, &size, NULL, false); + if(s==MSTRO_NOENT && vt==MSTRO_CDO_ATTR_VALUE_INVALID) { + ERR("CDO has mamba-array but no local-size\n"); return MSTRO_FAIL; } - int64_t realsize = *size; + if(s!=MSTRO_OK) { + ERR("Failed to retrieve local-size on CDO\n"); + return MSTRO_FAIL; + } + + int64_t realsize = *(int64_t*)size; if(realsize==-1) { DEBUG("Source CDO empty, doing NULL transfer\n"); realsize = 0; } - if (!g_mio_available - || (( realsize % getpagesize()) != 0 - && init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO) + if ( init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO + && (!g_mio_available || (realsize % getpagesize()) != 0 ) ){ WARN("Not issuing a ticket with MIO. Either not available or CDO size (%zu)" " is not a multiple of the page size (%d)." @@ -433,7 +439,7 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) status, mstro_status_description(status)); return status; } - gfs.keep_file = 0; // Arbitrarily rm the transport file on dst + gfs.keep_file = 1; // don't arbitrarily rm the transport file on dst break; case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: INFO("TICKET CASE MIO\n"); @@ -507,7 +513,7 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) INFO("Issued ticket for CDO %s, and starting execute process\n", src_cdo->name); //INFO("TransferTicket using path %s\n", ticket.gfs->path); - INFO("TransferTicket cdo size %zu\n", ticket.data_size); + INFO("TransferTicket cdo size %" PRIi64 "\n", ticket.data_size); /* Execute transport (non-blocking) */ status = mstro_transport_execute(src_cdo, &ticket); diff --git a/tests/Makefile.am b/tests/Makefile.am index 3fe6c6a9133a611295b0d8054700a010629ca64b..0195198654d6ce7aa2158a3570c4787e687e8fd3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -76,16 +76,16 @@ TESTS = check_version check_init check_uuid \ check_pm_declare.sh \ check_pm_interlock.sh \ check_subscribe.sh \ - check_pm_declare_group.sh + check_pm_declare_group.sh XFAIL_TESTS = \ - check_subscribe_local + check_subscribe_local # too expensive, actually more of a benchmark: -# check_pool_local_multi +# check_pool_local_multi # broken for now: -# check_mempool +# check_mempool check_PROGRAMS = check_version check_init check_uuid \ coverage check_memlock \ @@ -117,13 +117,13 @@ check_PROGRAMS = check_version check_init check_uuid \ simple_injector \ simple_archiver \ simple_telemetry_listener \ - check_events + check_events if WITH_MIO AM_CPPFLAGS += $(TEST_INCLUDES) -I$(top_srcdir)/deps/mio/src -D_REENTRANT -D_GNU_SOURCE -DM0_INTERNAL= -DM0_EXTERN=extern -Wno-attributes check_PROGRAMS += check_transport_mio -TESTS += check_transport_mio +TESTS += check_transport_mio endif CLIENT1_OPS="DECLARE cdo1 1025\ @@ -158,9 +158,9 @@ INJECTOR_OPS="\ -simple_interlock_client_1_SOURCES = simple_interlock_client.c +simple_interlock_client_1_SOURCES = simple_interlock_client.c simple_interlock_client_1_CPPFLAGS = $(AM_CPPFLAGS) -DCLIENT_ARGS=$(CLIENT1_OPS) -simple_interlock_client_2_SOURCES = simple_interlock_client.c +simple_interlock_client_2_SOURCES = simple_interlock_client.c simple_interlock_client_2_CPPFLAGS = $(AM_CPPFLAGS) -DCLIENT_ARGS=$(CLIENT2_OPS) simple_injector_SOURCES = simple_interlock_client.c simple_injector_CPPFLAGS = $(AM_CPPFLAGS) -DCLIENT_ARGS=$(INJECTOR_OPS) @@ -170,14 +170,16 @@ simple_injector_CPPFLAGS = $(AM_CPPFLAGS) -DCLIENT_ARGS=$(INJECTOR_OPS) simple_group_injector_SOURCES = simple_group_client.c simple_group_injector_CPPFLAGS = $(AM_CPPFLAGS) -DINJECT_GROUP_MEMBERS=1 -#check_mempool +#check_mempool check_SCRIPTS = run_demo.sh \ check_pm_declare.sh \ check_pm_declare_group.sh \ check_pm_interlock.sh \ check_subscribe.sh -EXTRA_DIST = $(check_SCRIPTS) generate-mio-config.sh demo_mvp_d3_2_config.yaml run_demo.sh +user_defined_test_schemas = benchmark_attributes.yaml test_attributes.yaml + +EXTRA_DIST = $(check_SCRIPTS) $(user_defined_test_schemas) generate-mio-config.sh demo_mvp_d3_2_config.yaml run_demo.sh clean-local: clean-local-check .PHONY: clean-local-check @@ -186,6 +188,11 @@ clean-local-check: -rm -f $(top_builddir)/tests/consumer_CDO\ [0-9]* -rm -f $(top_builddir)/tests/mio-config*.yaml -rm -f $(top_builddir)/tests/m0trace.* + -rm -f $(top_builddir)/tests/C-handle\ [0-9]* + -rm -f $(top_builddir)/tests/C-name\ [0-9]* + -rm -f $(top_builddir)/tests/{Consumer,Producer}-done-CDO + -rm -f $(top_builddir)/tests/Test\ group + -rm -f $(top_builddir)/tests/cdo[0-9]* .PHONY: all diff --git a/tests/benchmark_attributes.yaml b/tests/benchmark_attributes.yaml new file mode 100644 index 0000000000000000000000000000000000000000..4d2f49cc540b9a768aeae991c8731af542d61476 --- /dev/null +++ b/tests/benchmark_attributes.yaml @@ -0,0 +1,30 @@ +# A user-defined schema. The minimum is name and version +schema-name: Benchmark Attributes +schema-version: 0 + +schema-namespace: ".maestro.benchmark." + +# attributes section is optional; if given it needs to have a sequence value +maestro-attributes: + +# Top-level attributes + + - key: "b_1" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "b_2" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "b_3" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + \ No newline at end of file diff --git a/tests/check_declaration_seal.c b/tests/check_declaration_seal.c index b00a215d86f0b1f8a0b50106e23ac64930ad8796..38cab7997f9ecc5437df53657df368ba99751da2 100644 --- a/tests/check_declaration_seal.c +++ b/tests/check_declaration_seal.c @@ -39,6 +39,15 @@ #define __BASE_FILE__ __FILE__ #endif +#ifdef TOPSRCDIR +#define STRINGIFY(x) #x +#define TOSTRING(x) STRINGIFY(x) + +#define SRC_PATH TOSTRING(TOPSRCDIR) "/tests" +#else +#define SRC_PATH "." +#endif + #include "cheat.h" #include "maestro.h" @@ -88,16 +97,16 @@ CHEAT_TEST(cdo_declaration_seal_works, cheat_assert(!(MSTRO_OK == mstro_cdo_attribute_set( cdo, "maestro.core.cdo.scope.local-size", - &size))); // not an absolute path, not a valid relative path + &size, true))); // not an absolute path, not a valid relative path cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( cdo, ".maestro.core.cdo.scope.local-size", - &size)); + &size, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo)); - + cheat_assert(MSTRO_FAIL == mstro_cdo_attribute_set_yaml( cdo, "allocate-now: \"no\"")); @@ -106,26 +115,26 @@ CHEAT_TEST(cdo_declaration_seal_works, cdo, "allocate-now", &type, &val)); fprintf(stdout, "check: allocate-now: %d (%p)\n", *(const bool*)val, val); - + cheat_assert(MSTRO_OK == mstro_cdo_attribute_get( - cdo, "persist", &type, &val)); + cdo, "persist", &type, &val)); fprintf(stdout, "check: persist: %d (%p)\n", *(const bool*)val, val); cheat_assert(MSTRO_OK == mstro_cdo_attribute_get( - cdo, "name", &type, &val)); + cdo, "name", &type, &val)); fprintf(stdout, "check: name == %s (%p)\n", (const char*)val, val); cheat_assert(MSTRO_OK == mstro_cdo_attribute_get( - cdo, "scope.local-size", &type, &val)); + cdo, "scope.local-size", &type, &val)); fprintf(stderr, "check: scope.local-size == %d (%p)\n", *(const int*)val, val); cheat_assert(MSTRO_NOENT == mstro_cdo_attribute_get( - cdo, "totally_an_attribute", &type, &val)); + cdo, "totally_an_attribute", &type, &val)); cheat_assert(MSTRO_OK == mstro_cdo_dispose(cdo)); @@ -133,3 +142,53 @@ CHEAT_TEST(cdo_declaration_seal_works, ) +/* put a path to user-defined schema*/ + +CHEAT_TEST(user_defined_schemas_works, + + setenv("MSTRO_SCHEMA_LIST","test_attributes.yaml", 1); + setenv("MSTRO_SCHEMA_PATH",SRC_PATH, 1); + + cheat_assert(MSTRO_OK == mstro_init("Tests","DECLARE",0)); + + mstro_cdo cdo; + cheat_assert(MSTRO_OK ==mstro_cdo_declare("test cdo", MSTRO_ATTR_DEFAULT, &cdo)); + cheat_assert(MSTRO_OK ==mstro_cdo_attribute_set(cdo, ".maestro.test.t_1", "test value",true)); + cheat_assert(MSTRO_OK ==mstro_cdo_declaration_seal(cdo)); + + cheat_assert(MSTRO_OK ==mstro_cdo_offer(cdo)); + + cheat_assert(MSTRO_OK ==mstro_cdo_withdraw(cdo)); + cheat_assert(MSTRO_OK ==mstro_cdo_dispose(cdo)); + cheat_assert(MSTRO_OK == mstro_finalize()); + + ) + +/* put a path to user-defined schema with wrong path and wrong separators*/ + +CHEAT_TEST(user_defined_schemas_doesnot_exist, + + setenv("MSTRO_SCHEMA_LIST", "user_attributes.yaml", 1); + cheat_assert(MSTRO_OK != mstro_init("Tests","DECLARE",0)); + cheat_assert(MSTRO_FAIL == mstro_finalize()); + + ) + +CHEAT_TEST(user_defined_schemas_paths, + setenv("MSTRO_SCHEMA_LIST","benchmark_attributes.yaml;test_attributes.yaml",1); + setenv("MSTRO_SCHEMA_PATH",SRC_PATH, 1); + + cheat_assert(MSTRO_OK == mstro_init("Tests","DECLARE",0)); + mstro_cdo cdo; + cheat_assert(MSTRO_OK ==mstro_cdo_declare("test cdo", MSTRO_ATTR_DEFAULT, &cdo)); + cheat_assert(MSTRO_OK ==mstro_cdo_attribute_set(cdo, ".maestro.test.t_1", "test value", true)); + cheat_assert(MSTRO_OK ==mstro_cdo_attribute_set(cdo, ".maestro.benchmark.b_1", "bechmark value", true)); + cheat_assert(MSTRO_OK ==mstro_cdo_declaration_seal(cdo)); + + cheat_assert(MSTRO_OK ==mstro_cdo_offer(cdo)); + + cheat_assert(MSTRO_OK ==mstro_cdo_withdraw(cdo)); + cheat_assert(MSTRO_OK ==mstro_cdo_dispose(cdo)); + cheat_assert(MSTRO_OK == mstro_finalize()); + + ) diff --git a/tests/check_layout.c b/tests/check_layout.c index e3d4f4190fefa72a2b88e4415d8a81ebf60d2033..3570ff2ac0c3ac7957d2ef37b17d780902f822fa 100644 --- a/tests/check_layout.c +++ b/tests/check_layout.c @@ -130,32 +130,32 @@ CHEAT_TEST(layout_attribute_works, cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_RAW_PTR, - src_data)); + src_data, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - (void**)&bytes)); + (void**)&bytes, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, - MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, - MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims)); + MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, - MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_src)); + MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_src, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, - MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_src)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_src, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_offer(cdo_src)); /* Consumer side */ cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_dst)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_dst, - MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_dst, - MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims)); + MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_dst, - MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_dst)); + MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_dst, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_dst, - MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_dst)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_dst, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_dst)); cheat_assert(MSTRO_OK == mstro_cdo_require(cdo_dst)); @@ -167,18 +167,18 @@ CHEAT_TEST(layout_attribute_works, cheat_assert(MSTRO_OK == mstro_cdo_declare(name_unpooled, MSTRO_ATTR_DEFAULT, &cdo_unpooled)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - (void**)&bytes)); + (void**)&bytes, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, MSTRO_ATTR_CORE_CDO_RAW_PTR, - unpooled_data)); + unpooled_data, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, - MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, - MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims)); + MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, - MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_dst)); + MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_dst, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled, - MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_dst)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_dst, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_unpooled)); /* at this point cdo_unpooled has a mamba array (raw-ptr wrapper) */ @@ -200,18 +200,18 @@ CHEAT_TEST(layout_attribute_works, cheat_assert(MSTRO_OK == mstro_cdo_declare(name_unpooled, MSTRO_ATTR_DEFAULT, &cdo_unpooled_t)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - (void**)&bytes)); + (void**)&bytes, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, MSTRO_ATTR_CORE_CDO_RAW_PTR, - unpooled_data_t)); + unpooled_data_t, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, - MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ELEMENT_SIZE, &elsz, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, - MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims)); + MSTRO_ATTR_CORE_CDO_LAYOUT_NDIMS, &ndims, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, - MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_src)); + MSTRO_ATTR_CORE_CDO_LAYOUT_DIMS_SIZE, dimsz_src, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_unpooled_t, - MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_src)); + MSTRO_ATTR_CORE_CDO_LAYOUT_ORDER, &patt_src, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_unpooled_t)); cheat_assert(MSTRO_OK == mstro_transform_layout(cdo_unpooled, cdo_unpooled_t, diff --git a/tests/check_pool_local_rawptr.c b/tests/check_pool_local_rawptr.c index 24ff530aefd8ebecff4fe83465599e23c7d69381..55ba01edf81f3bbf7444eab08b5b507890222438 100644 --- a/tests/check_pool_local_rawptr.c +++ b/tests/check_pool_local_rawptr.c @@ -77,10 +77,10 @@ CHEAT_TEST(cdo_local_pool_works, cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(src_cdo, MSTRO_ATTR_CORE_CDO_RAW_PTR, - src_data)); + src_data, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(dst_cdo, MSTRO_ATTR_CORE_CDO_RAW_PTR, - dst_data)); + dst_data, false)); cheat_yield(); cheat_assert(MSTRO_OK == mstro_cdo_require(dst_cdo)); cheat_yield(); diff --git a/tests/check_pool_mamba.c b/tests/check_pool_mamba.c index d543c6d4fa7be15d08316e46f5bc5a5132935dd8..76e959ecdfbc74d303c35739f44049e9d247ba1d 100644 --- a/tests/check_pool_mamba.c +++ b/tests/check_pool_mamba.c @@ -63,10 +63,10 @@ CHEAT_TEST(cdo_local_pool_mamba_works, // TODO associate (cdo, mmb_array_ptr) cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( - cdo, ".maestro.core.cdo.raw-ptr", (void*)buf)); + cdo, ".maestro.core.cdo.raw-ptr", (void*)buf, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( - cdo, ".maestro.core.cdo.scope.local-size", &size)); + cdo, ".maestro.core.cdo.scope.local-size", &size, true)); mmbArray *mamba_ptr=NULL; /* mamba array not automatically available */ diff --git a/tests/check_transport_gfs.c b/tests/check_transport_gfs.c index 749a03415b7de2c3596240b5b1c03850fe4875e7..05e651307c9f482b65c9d59ec8f5840ce54595d5 100644 --- a/tests/check_transport_gfs.c +++ b/tests/check_transport_gfs.c @@ -87,10 +87,10 @@ CHEAT_TEST(transport_gfs_works, cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_RAW_PTR, - src_data)); + src_data, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - (void**)&bytes)); + (void**)&bytes, true)); cheat_yield(); enum mstro_cdo_attr_value_type ttype; @@ -141,7 +141,7 @@ CHEAT_TEST(transport_gfs_works, cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_dst, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - (void**)&bytes)); + (void**)&bytes, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_dst)); cheat_assert(MSTRO_OK == mstro_transport_gfs_dst_execute(cdo_dst, &ticket)); diff --git a/tests/check_transport_mio.c b/tests/check_transport_mio.c index a2e8a0a187da9cc750ceb891bab4bee0bddbb3f7..f2eb2e065639efb06aea6d09bab708bfe5d6d08f 100644 --- a/tests/check_transport_mio.c +++ b/tests/check_transport_mio.c @@ -89,13 +89,13 @@ CHEAT_DECLARE ( cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_RAW_PTR, - src_data)); + src_data, false)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, ".maestro.core.cdo.scope.local-size", - &bytes)); + &bytes, true)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, ".maestro.core.cdo.layout.pre-pad", - &pad)); + &pad, true)); const int64_t* tval; diff --git a/tests/demo_mvp_d3_2.c b/tests/demo_mvp_d3_2.c index 95082a69e222054b950c07c59874a1aed003c9c8..0855944df6759324aa0314465b09192a4558d073 100644 --- a/tests/demo_mvp_d3_2.c +++ b/tests/demo_mvp_d3_2.c @@ -208,7 +208,7 @@ static const cyaml_config_t config = { int mvp_config_parse(mvp_config* handle, const char* filename) { - if (handle == NULL) return MSTRO_INVARG; + if (handle == NULL) return MSTRO_INVARG; cyaml_err_t err; err = cyaml_load_file(filename, &config, @@ -237,7 +237,7 @@ wait_for_announcement(struct cdo_announcement *announcement) } s = mstro_cdo_attribute_set(announcement_cdo, MSTRO_ATTR_CORE_CDO_RAW_PTR, - announcement); + announcement, false); if(s!=MSTRO_OK) { ERR("Failed to set raw-ptr attribute on announcement CDO\n"); abort(); @@ -255,7 +255,7 @@ wait_for_announcement(struct cdo_announcement *announcement) ERR("Failed to require announcement CDO\n"); abort(); } - + s = mstro_cdo_demand(announcement_cdo); if(s!=MSTRO_OK) { ERR("Failed to withdraw announcement CDO\n"); @@ -286,7 +286,7 @@ do_announce(struct cdo_announcement *announcement, mstro_cdo *result) } s = mstro_cdo_attribute_set(announcement_cdo, MSTRO_ATTR_CORE_CDO_RAW_PTR, - announcement); + announcement, false); if(s!=MSTRO_OK) { ERR("Failed to set raw-ptr attribute on announcement CDO\n"); @@ -450,7 +450,7 @@ archiver_flush_to_disk(const char *name, mmbArray *a) abort(); } - size_t nt; + size_t nt; stat = mmb_tile_iterator_count(it, &nt); if(stat != MMB_OK) { ERR("Failed to get tile iterator count\n"); @@ -550,10 +550,10 @@ archiver_thread_fun(void *closure) } s = mstro_cdo_attribute_set(incoming[i], MSTRO_ATTR_CORE_CDO_RAW_PTR, - incoming_buffers[i]); + incoming_buffers[i], false); s |= mstro_cdo_attribute_set(incoming[i], MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &announcement->cdo_size); + &announcement->cdo_size, true); // INFO("archiver cdo %d incoming buffer %p\n", i, incoming_buffers[i]); if(s!=MSTRO_OK) { @@ -671,10 +671,10 @@ producer_thread_fun(void *closure) s = mstro_cdo_attribute_set(outgoing[i], MSTRO_ATTR_CORE_CDO_RAW_PTR, - outgoing_buffers[i]); + outgoing_buffers[i], false); s |= mstro_cdo_attribute_set(outgoing[i], MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &announcement->cdo_size); + &announcement->cdo_size, true); if(s!=MSTRO_OK) { ERR("Failed to add outgoing buffer to CDO %s\n", @@ -721,8 +721,8 @@ mvp_checksum(const char* name, void* rawptr, uint64_t size) unsigned char x; uint64_t i; - for (i=0,x=0;i<size; i++) - x ^= ((unsigned char*)rawptr)[i]; + for (i=0,x=0;i<size; i++) + x ^= ((unsigned char*)rawptr)[i]; INFO("Checksum for cdo \"%s\": %d\n", name, x); } @@ -731,7 +731,7 @@ void consumer_flush_to_disk(const char *name, void *a, uint64_t size) { char file_name [256]; - sprintf(file_name, "consumer_%s", name); + sprintf(file_name, "consumer_%s", name); FILE *dst = fopen((const char*)file_name, "w"); if(dst==NULL) { @@ -769,7 +769,7 @@ consumer_thread_fun(void *closure) mstro_status s; size_t my_idx = * (size_t*)closure; INFO("Consumer %zu starting\n", my_idx); - + struct cdo_announcement *announcement = malloc(sizeof(struct cdo_announcement)); if(announcement==NULL) { @@ -803,10 +803,10 @@ consumer_thread_fun(void *closure) } s = mstro_cdo_attribute_set(incoming[i], MSTRO_ATTR_CORE_CDO_RAW_PTR, - incoming_buffers[i]); + incoming_buffers[i], false); s |= mstro_cdo_attribute_set(incoming[i], MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &announcement->cdo_size); + &announcement->cdo_size, true); // INFO("consumer cdo %d incoming buffer %p\n", i, incoming_buffers[i]); if(s!=MSTRO_OK) { @@ -826,7 +826,7 @@ consumer_thread_fun(void *closure) /* send ACK that we're ready */ mstro_cdo ready_cdo; - + INFO("Declaring Consumer %zu ready\n", my_idx); declare_consumer_ready(my_idx,&ready_cdo); @@ -853,14 +853,14 @@ consumer_thread_fun(void *closure) /* query the size */ s = mstro_cdo_attribute_get(incoming[i], MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &type, (const void**)&size); + &type, (const void**)&size); if(s!=MSTRO_OK) { ERR("Failed to extract local size from CDO %s for archiving\n", announcement->cdo_names[i]); abort(); } - /* do some work */ + /* do some work */ mvp_checksum(announcement->cdo_names[i], rawptr, *size); /* write-out */ @@ -878,7 +878,7 @@ consumer_thread_fun(void *closure) } /* demand data and process */ - + cdo_sem_post_cleanup(ready_cdo); free(announcement); @@ -893,7 +893,15 @@ consumer_thread_fun(void *closure) #define DEFAULT_CDO_SIZE 1024 #define DEFAULT_CDO_COUNT 42 +#ifdef TOPSRCDIR +#define STRINGIFY(x) #x +#define TOSTRING(x) STRINGIFY(x) + +#define CONFIG_FILE_PATH TOSTRING(TOPSRCDIR) "/tests/demo_mvp_d3_2_config.yaml" +#else #define CONFIG_FILE_PATH "./demo_mvp_d3_2_config.yaml" +#endif + /* * main thread: @@ -933,7 +941,7 @@ int main(int argc, char **argv) WARN("Command line arguments ignored\n"); /* fallthrough: */ case 1: - + err = mvp_config_parse(&config_handle, CONFIG_FILE_PATH); if (err != 0) { WARN("Failed to parse a config file. Using default config.\n"); @@ -949,12 +957,12 @@ int main(int argc, char **argv) announcement.num_archivers = config_handle->num_archivers; announcement.cdo_size = config_handle->cdo_size; announcement.num_entries = config_handle->cdo_count; - } + } break; } assert(announcement.num_entries<=CDO_COUNT_MAX); - + pthread_t producers[announcement.num_producers]; pthread_t consumers[announcement.num_consumers]; pthread_t archivers[announcement.num_archivers]; @@ -1035,7 +1043,7 @@ int main(int argc, char **argv) for(size_t i=0; i<announcement.num_archivers; i++) { pthread_join(archivers[i], NULL); } - + /* withdraw announcement */ withdraw_announcement(announcement_cdo); @@ -1050,11 +1058,11 @@ int main(int argc, char **argv) announcement.num_producers, announcement.num_consumers, announcement.num_archivers); - + BAILOUT: if(config_handle) free(config_handle); - + return s; } diff --git a/tests/simple_interlock_client.c b/tests/simple_interlock_client.c index c71f47a64a6b97c969e1795fb7a923d7aae7ac03..5b4e4fde9a66581505adde34535c116bd54f8e95 100644 --- a/tests/simple_interlock_client.c +++ b/tests/simple_interlock_client.c @@ -194,11 +194,11 @@ CHEAT_DECLARE( cheat_assert(MSTRO_OK==mstro_cdo_attribute_set( e->cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &s->size)); + &s->size, true)); cheat_assert(MSTRO_OK==mstro_cdo_attribute_set( e->cdo, MSTRO_ATTR_CORE_CDO_RAW_PTR, - e->buf)); + e->buf, false)); fprintf(stderr, "declared %s, handle %x\n", e->name, e->cdo); break; diff --git a/tests/test_attributes.yaml b/tests/test_attributes.yaml new file mode 100644 index 0000000000000000000000000000000000000000..68008c9aa1da68dd0df39002d20a627537a127f4 --- /dev/null +++ b/tests/test_attributes.yaml @@ -0,0 +1,16 @@ +# A user-defined schema. The minimum is name and version +schema-name: Test Attributes +schema-version: 0 + +schema-namespace: ".maestro.test." + +# attributes section is optional; if given it needs to have a sequence value +maestro-attributes: + +# Top-level attributes + + - key: "t_1" + type: str() + required: False + default: "" + documentation: Value length is the attribute size diff --git a/transport/gfs.c b/transport/gfs.c index a8844e5d345b9fc50130e45281a3c97888f74e87..c3b12490ba3e9d800654b5902b664bc273ab8f70 100644 --- a/transport/gfs.c +++ b/transport/gfs.c @@ -82,10 +82,14 @@ mstro_transport_gfs_src_execute(mstro_cdo src, Mstro__Pool__TransferTicket* tick return MSTRO_FAIL; } - if (fwrite(dl.data, sizeof(char), dl.len/sizeof(char), f) != dl.len) { - ERR("Partial write on %s (buf: %p) for GFS transport (errno: %d -- %s)\n", +RETRY_GFS_TRANSPORT_WRITE: ; + size_t bytes_written = fwrite(dl.data, sizeof(char), dl.len, f); + if (bytes_written != dl.len) { + if (errno == EAGAIN) + goto RETRY_GFS_TRANSPORT_WRITE; + ERR("Partial write on %s (buf: %p) for GFS transport (errno: %d -- %s)\n", path, dl.data, errno, strerror(errno)); - return MSTRO_FAIL; + return MSTRO_FAIL; } if (fclose(f) != 0) { @@ -113,10 +117,10 @@ mstro_transport_gfs_dst_execute(mstro_cdo dst, INFO("Executing gfs transport dst side for CDO %s\n", dst->name); mstro_status status; - int64_t len = ticket->data_size; + size_t len = (size_t)ticket->data_size; char* data; - DEBUG("Incoming CDO, size %" PRIi64 "\n", len); + DEBUG("Incoming CDO, size %zu\n", len); status = mstro_transport_get_dst_buffer(dst, (void*)&data); if(status!=MSTRO_OK) { @@ -140,10 +144,13 @@ mstro_transport_gfs_dst_execute(mstro_cdo dst, path, errno, strerror(errno)); return MSTRO_FAIL; } - - if (fread(data, sizeof(char), len, f) != (size_t)len) { - ERR("Partial read on %s for GFS transport (errno: %d -- %s)\n", - path, errno, strerror(errno)); +RETRY_GFS_TRANSPORT_READ: ; + size_t bytes_read = fread(data, sizeof(char), len, f); + if (bytes_read != (size_t)len) { + if (errno == EAGAIN) + goto RETRY_GFS_TRANSPORT_READ; + ERR("Partial read (%d out of %d bytes) on %s for GFS transport (errno: %d -- %s)\n", + bytes_read, len, path, errno, strerror(errno)); return MSTRO_FAIL; } diff --git a/transport/transport.c b/transport/transport.c index 4c3f6d73c038e50792a953809daa591e908582ca..0690af7edff5ead202d06172de6cb67b7c0c930f 100644 --- a/transport/transport.c +++ b/transport/transport.c @@ -371,9 +371,32 @@ mstro_transport_execute( if (cdo->state & MSTRO_CDO_STATE_OFFERED) { /* we are called on the source side of transport */ { - /* FIXME: should check this for robustness: */ - DEBUG("Not checking that SRC cdo has expected size (%" PRIi64 ")\n", - ticket->data_size); + /* let's check ticket size matches local CDO size */ + enum mstro_cdo_attr_value_type type; + const void *sizep=NULL; + int64_t src_size; + mstro_status s = mstro_attribute_dict_get(cdo->attributes, + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &type, &sizep, NULL, false); + if(s==MSTRO_OK) { + src_size=*(int64_t*)sizep; + } else if(s==MSTRO_NOENT && type==MSTRO_CDO_ATTR_VALUE_INVALID) { + ERR("Attribute not found or value invalid\n"); + src_size=-1; + } else { + ERR("Failed to look up %s (%d: %s)\n", + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, s, mstro_status_description(s)); + return MSTRO_FAIL; + } + + if (ticket->data_size == 0) { + DEBUG("0-transport\n"); + } + else if (src_size != ticket->data_size) { + ERR("SRC cdo has a size (%" PRIi64 ") that doesn't match ticket-specified size (%" PRIi64 ")\n", + src_size, ticket->data_size); + return MSTRO_FAIL; + } } mstro_status (*f)(mstro_cdo src, Mstro__Pool__TransferTicket *t) = g_transport_registry[ticket->method].src_func;