diff --git a/attributes/maestro-schema.c b/attributes/maestro-schema.c index ac87c0d3e79c38504f2614a78575c289afd29493..1511d11d91ba3f64d4d5ac4b18a1aa7ae1285c2b 100644 --- a/attributes/maestro-schema.c +++ b/attributes/maestro-schema.c @@ -336,6 +336,25 @@ struct mstro_schema_ { } while(0) +static inline +mstro_status +mstro_schema_type__dispose(struct mstro_schema_type_ *t) +{ + if(t==NULL) + return MSTRO_INVARG; + + if(t->typename) + free(t->typename); + if(t->unparsed_typespec) + free(t->unparsed_typespec); + if(t->documentation) + free(t->documentation); + if(t->parsed_type) + mstro_stp_val_dispose(t->parsed_type); + free(t); + return MSTRO_OK; +} + mstro_status mstro_schema_free(mstro_schema sch) { @@ -378,6 +397,14 @@ mstro_schema_free(mstro_schema sch) free(sch->schema_type_vals); } + if(sch->attribute_table) { + struct mstro_schema_attribute_ *el,*tmp; + HASH_ITER(hh, sch->attribute_table, el, tmp) { + DEBUG("Deleting attribute %s from schema %p attribute table\n", + el->key, sch); + HASH_DELETE(hh, sch->attribute_table, el); + } + } if(sch->schema_attributes) { for(size_t i=0; i<sch->schema_attributes_count; i++) { struct mstro_schema_attribute_ * sa = &sch->schema_attributes[i]; @@ -396,14 +423,21 @@ mstro_schema_free(mstro_schema sch) free(sa->defaultval_string); if(sa->documentation) free(sa->documentation); - /* the individual sa pointers are not to be freed, since all are allocated in one chunk by cyaml */ + /* the individual sa pointers are not to be freed, since all are + * allocated in one chunk by cyaml */ } free(sch->schema_attributes); } - /* FIXME: type table */ - /* FIXME: attribute_table */ - + { + struct mstro_schema_type_ *el,*tmp; + HASH_ITER(hh, sch->type_table, el, tmp) { + DEBUG("Deleting schema type %s from schema %p type table\n", + el->typename, sch); + HASH_DELETE(hh, sch->type_table, el); + mstro_schema_type__dispose(el); + } + } mstro_symtab_destroy(&sch->symtab); int err= pthread_rwlock_destroy(&sch->lock); @@ -924,6 +958,7 @@ mstro_attribute_val_parse(const struct mstro_stp_val *parsed_type, const char *s s=MSTRO_NOMEM; goto BAILOUT; } + DEBUG("VAL-PARSE allocation %p\n", *val_p); /* numeric bound checking should happen in this switch. All the info is in the stp_val */ void *dst=*val_p; /* to make the assignments and casts below less messy wrt. '**' */ @@ -977,6 +1012,8 @@ mstro_attribute_val_parse(const struct mstro_stp_val *parsed_type, const char *s } break; case MSTRO_STP_POINTER: { + free(*val_p); /* POINTER does not need extra space, we use the + * caller's cell directly */ uintptr_t val; if(strcasecmp(string, "NIL")==0) val=0; @@ -1007,8 +1044,9 @@ mstro_schema_attribute__parse_defaultval(struct mstro_schema_attribute_ *attr) { assert(attr->type_parse_closure.info!=NULL); if(attr->defaultval_string) - return mstro_attribute_val_parse(attr->type_parse_closure.info, attr->defaultval_string, - &attr->defaultval, &attr->defaultval_size); + return mstro_attribute_val_parse( + attr->type_parse_closure.info, attr->defaultval_string, + &attr->defaultval, &attr->defaultval_size); else { DEBUG("Attribute |%s| has no default value in schema\n", attr->key); attr->defaultval = MSTRO_SCHEMA_DEFAULT_VAL_UNSET; @@ -1125,7 +1163,7 @@ mstro_schema_validate_and_instantiate(mstro_schema schema) } HASH_ADD(hh, schema->type_table, type_symbol, sizeof(mstro_symbol), current); } - + for(size_t i=0; i<schema->schema_attributes_count; i++) { const char *fqstr; struct mstro_schema_attribute_ *current = schema->schema_attributes+i; @@ -1150,7 +1188,9 @@ mstro_schema_validate_and_instantiate(mstro_schema schema) goto BAILOUT; } - HASH_ADD(hh, schema->attribute_table, key_symbol, sizeof(mstro_symbol), current); + HASH_ADD(hh, schema->attribute_table, + key_symbol, sizeof(mstro_symbol), + current); /* parse default value */ if(!current->required) @@ -1440,7 +1480,8 @@ mstro_schema_lookup_type__builtins( s=MSTRO_OK; goto BAILOUT; } else { - DEBUG("Failed to find builtin type |%s| in type table, instantiating\n", typename); + DEBUG("Failed to find builtin type |%s| in type table, instantiating\n", + typename); WITH_LOCKED_SCHEMA_WRITE_UPGRADE(schema, { /* try to parse */ struct mstro_stp_val *parsed_type; @@ -1689,7 +1730,8 @@ mstro_attribute_entry_dispose(struct mstro_attribute_entry_ *entry) switch(entry->serialized_pb.val_case) { case MSTRO__POOL__AVAL__VAL_STRING: - free(entry->serialized_pb.string); + if(entry->serialized_pb.string) + free(entry->serialized_pb.string); break; case MSTRO__POOL__AVAL__VAL_BYTES: if(entry->serialized_pb.bytes.len) @@ -2605,13 +2647,8 @@ mstro_attribute_dict_set_kventry(mstro_attribute_dict dict, MSTRO_CDO_ATTR_VALUE_double, &tmp, true); } case MSTRO__POOL__AVAL__VAL_STRING: { - char *tmp = strdup(aval->string); - if(tmp==NULL) { - ERR("Failed to duplicate string value\n"); - return MSTRO_NOMEM; - } return mstro_attribute_dict_set(dict, key, - MSTRO_CDO_ATTR_VALUE_cstring, tmp, false); + MSTRO_CDO_ATTR_VALUE_cstring, aval->string, true); } case MSTRO__POOL__AVAL__VAL_BYTES: { /* FIXME: blobs still have issues in their handling. For now we @@ -2718,7 +2755,7 @@ mstro_attribute_entry_to_mapentry(const struct mstro_attribute_entry_ *entry, * when the message is deallocated (or vice versa) */ res->val->string = strdup((char *)entry->val); if(res->val->string==NULL) { - ERR("Failed to allocat string value for KV entry\n"); + ERR("Failed to allocate string value for KV entry\n"); free(res->val); free(res); s=MSTRO_NOMEM; @@ -2763,6 +2800,7 @@ mstro_attribute_entry_to_mapentry(const struct mstro_attribute_entry_ *entry, break; } if(s!=MSTRO_OK) { + free(res->key); free(res->val); free(res); res=NULL; @@ -2777,9 +2815,8 @@ static inline void mstro_attribute_map__mapentry_destroy(Mstro__Pool__KvEntry *entry) { - WARN("Leaking memory"); - /* key is shared with symbol-name */ - entry->key=NULL; + /* this function's idea of object ownership needs to match mstro_attribute_entry_to_mapentry() */ + /* assumes key is not shared with symbol-name */ switch(entry->val->val_case) { case MSTRO__POOL__AVAL__VAL__NOT_SET: case MSTRO__POOL__AVAL__VAL_BOOL: @@ -2792,7 +2829,8 @@ mstro_attribute_map__mapentry_destroy(Mstro__Pool__KvEntry *entry) /* immediate values */ break; case MSTRO__POOL__AVAL__VAL_STRING: - /* shared with dict */ + /* NOT shared with dict */ + free(entry->val->string); entry->val->string = NULL; break; case MSTRO__POOL__AVAL__VAL_BYTES: @@ -2801,7 +2839,8 @@ mstro_attribute_map__mapentry_destroy(Mstro__Pool__KvEntry *entry) entry->val->bytes.data = NULL; break; case MSTRO__POOL__AVAL__VAL_TIMESTAMP: - /* shared with dict */ + /* NOT shared with dict */ + free(entry->val->timestamp); entry->val->timestamp = NULL; break; default: @@ -2821,7 +2860,8 @@ mstro_attribute_dict_to_kvmap(mstro_attribute_dict dict, { mstro_status s=MSTRO_UNIMPL; - Mstro__Pool__Attributes__Map *res = malloc(sizeof(Mstro__Pool__Attributes__Map)); + Mstro__Pool__Attributes__Map *res + = malloc(sizeof(Mstro__Pool__Attributes__Map)); if(res==NULL) { ERR("Cannot allocate k-v-map\n"); s=MSTRO_NOMEM; @@ -2856,7 +2896,8 @@ mstro_attribute_dict_to_kvmap(mstro_attribute_dict dict, if(s==MSTRO_NOENT) { WARN("Skipped dictionary entry %s\n", mstro_symbol_name(el->key)); } else { - ERR("Failed to dictionary entry %s\n", mstro_symbol_name(el->key)); + ERR("Failed to serialize dictionary entry %s\n", + mstro_symbol_name(el->key)); res->n_map = 0; free(res->map); free(res); @@ -2926,9 +2967,22 @@ mstro_attribute_dict_message_dispose(mstro_attribute_dict dict, return MSTRO_INVARG; if(msg==NULL) return MSTRO_INVARG; - - /* FIXME: leaking here, but better safe than sorry */ - free(msg); + + /* It's nontrivial to free the message, as some of its content + * overlaps with dictionary entries */ + if(msg) { + /* certain types share pointers with dictionary entries. We'll + * kill them here, then let protobuf clean up around us */ + assert(msg->val_case == MSTRO__POOL__ATTRIBUTES__VAL_KV_MAP); + for(size_t i=0; i<msg->kv_map->n_map; i++) { + mstro_attribute_map__mapentry_destroy(msg->kv_map->map[i]); + } + msg->kv_map->n_map = 0; + free(msg->kv_map->map); + msg->kv_map->map=NULL; + + mstro__pool__attributes__free_unpacked(msg, NULL); + } return MSTRO_OK; } diff --git a/include/maestro/i_statistics.h b/include/maestro/i_statistics.h index ade827fb109e3557789c753543baee692c282cf9..a44d25004b93b4a799d0783f2b4f63323da8a123 100644 --- a/include/maestro/i_statistics.h +++ b/include/maestro/i_statistics.h @@ -92,6 +92,9 @@ mstro_stats_finalize(void); /** type to express nanosecond time points (since an arbitrary point in time) */ typedef uint64_t mstro_nanosec_t; +/** Printing support */ +#define PRInanosec PRIu64 + /** Return the current time */ mstro_nanosec_t mstro_clock(void); diff --git a/maestro/cdo.c b/maestro/cdo.c index 4d87779d964fc4f0e0d8748082e52a6877e5514a..87419eae0e2050dcab5bc75d63885e5064e25733 100644 --- a/maestro/cdo.c +++ b/maestro/cdo.c @@ -211,7 +211,6 @@ mstro_cdo__free(mstro_cdo *cdoptr) goto BAILOUT; } } - free(*cdoptr); *cdoptr=NULL; BAILOUT: diff --git a/maestro/cdo_sel_parse.c b/maestro/cdo_sel_parse.c index c6f1aa9215996e53817aa83f3e60c16cfca2cb2b..9a34d4ef6160d9f8229b6beeccf7215a31db753d 100644 --- a/maestro/cdo_sel_parse.c +++ b/maestro/cdo_sel_parse.c @@ -111,7 +111,7 @@ mstro_csq_val__free(struct mstro_csq_val * val) if(val->kv_op.lhs) free(val->kv_op.lhs); if(val->kv_op.rhs) free(val->kv_op.rhs); break; - } + } free(val); mstro_csq_val__free(nxt); } @@ -1353,13 +1353,15 @@ static void pcc_action_kv_op_5(mstro_csq_context_t *__pcc_ctx, pcc_thunk_t *__pc static void pcc_action_kv_op_6(mstro_csq_context_t *__pcc_ctx, pcc_thunk_t *__pcc_in, pcc_value_t *__pcc_out) { #define auxil (__pcc_ctx->auxil) #define __ (*__pcc_out) +#define r (*__pcc_in->data.leaf.values.buf[0]) #define _0 pcc_get_capture_string(__pcc_ctx, &__pcc_in->data.leaf.capt0) #define _0s ((const)__pcc_in->data.leaf.capt0.range.start) #define _0e ((const)__pcc_in->data.leaf.capt0.range.end) - __ = mstro_csq_val__alloc(MSTRO_CSQ_OP_RMATCH_ICASE); + __ = mstro_csq_val__alloc(MSTRO_CSQ_OP_RMATCH_ICASE); mstro_csq_val__free(r); #undef _0e #undef _0s #undef _0 +#undef r #undef __ #undef auxil } @@ -2363,7 +2365,7 @@ L0000:; static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { pcc_thunk_chunk_t *const chunk = pcc_thunk_chunk__create(ctx->auxil); chunk->pos = ctx->pos; - pcc_value_table__resize(ctx->auxil, &chunk->values, 0); + pcc_value_table__resize(ctx->auxil, &chunk->values, 1); pcc_capture_table__resize(ctx->auxil, &chunk->capts, 0); { const int p = ctx->pos; @@ -2380,7 +2382,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ctx->pos++; L0003:; { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_0, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_0, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2398,7 +2400,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ctx->pos += 2; } { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_1, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_1, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2413,7 +2415,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ) goto L0005; ctx->pos++; { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_2, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_2, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2431,7 +2433,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ctx->pos += 2; } { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_3, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_3, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2446,7 +2448,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ) goto L0007; ctx->pos++; { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_4, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_4, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2464,7 +2466,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ctx->pos += 2; } { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_5, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_5, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2490,7 +2492,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { } L0012:; } - if (!pcc_apply_rule(ctx, pcc_evaluate_rule_regex_i, &chunk->thunks, NULL)) goto L0010; + if (!pcc_apply_rule(ctx, pcc_evaluate_rule_regex_i, &chunk->thunks, &(chunk->values.buf[0]))) goto L0010; ctx->pos = p; goto L0011; L0010:; @@ -2499,7 +2501,8 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { L0011:; } { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_6, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_6, 1, 0); + thunk->data.leaf.values.buf[0] = &(chunk->values.buf[0]); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -2517,7 +2520,7 @@ static pcc_thunk_chunk_t *pcc_evaluate_rule_kv_op(mstro_csq_context_t *ctx) { ctx->pos += 2; } { - pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_7, 0, 0); + pcc_thunk_t *const thunk = pcc_thunk__create_leaf(ctx->auxil, pcc_action_kv_op_7, 1, 0); thunk->data.leaf.capt0.range.start = chunk->pos; thunk->data.leaf.capt0.range.end = ctx->pos; pcc_thunk_array__add(ctx->auxil, &chunk->thunks, thunk); @@ -3327,11 +3330,7 @@ mstro_csq_val_dispose(struct mstro_csq_val *v) ERR("NULL stp_val argument\n"); return MSTRO_INVARG; } - switch(v->kind) { - default: - break; - } - free(v); + mstro_csq_val__free(v); return MSTRO_OK; } diff --git a/maestro/cdo_sel_parse.peg b/maestro/cdo_sel_parse.peg index 8360d84e731726e0f8172c28d1cacf0d1b4d08b1..79a43ae36f2d0ff5e5f6f365705a4d5d855934e0 100644 --- a/maestro/cdo_sel_parse.peg +++ b/maestro/cdo_sel_parse.peg @@ -96,7 +96,7 @@ mstro_csq_val__free(struct mstro_csq_val * val) if(val->kv_op.lhs) free(val->kv_op.lhs); if(val->kv_op.rhs) free(val->kv_op.rhs); break; - } + } free(val); mstro_csq_val__free(nxt); } @@ -285,7 +285,7 @@ kv_op <- "=" "="? { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_EQ); } / "<=" { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_LE); } / ">" { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_GT); } / ">=" { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_GE); } - / "~=" & (space* regex_i) { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_RMATCH_ICASE); } + / "~=" & (space* r:regex_i) { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_RMATCH_ICASE); mstro_csq_val__free(r); } / "~=" { $$ = mstro_csq_val__alloc(MSTRO_CSQ_OP_RMATCH); } ## attribute key, matching what Maestro Schema is using @@ -479,11 +479,7 @@ mstro_csq_val_dispose(struct mstro_csq_val *v) ERR("NULL stp_val argument\n"); return MSTRO_INVARG; } - switch(v->kind) { - default: - break; - } - free(v); + mstro_csq_val__free(v); return MSTRO_OK; } diff --git a/maestro/core.c b/maestro/core.c index dc6a967ee0af82ef8d15a00e9f2149e5d39d271e..60308871e28ac66507f3f72dc63cb4199d3f5d72 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -195,20 +195,18 @@ mstro_status mstro_core_init__setup_schemata(void) char * env_schema_list = getenv(MSTRO_ENV_SCHEMA_LIST); char * env_schema_path = getenv(MSTRO_ENV_SCHEMA_PATH); + char *env_schema_path_plus_default = NULL; + //check that neither are null ...if null make them empty strings - if(env_schema_list == NULL) - { + if(env_schema_list == NULL) { env_schema_list = ""; - } - else - { + } else { INFO("List of user attributes schemata %s \n", env_schema_list); strncpy(g_component_descriptor.schema_list, env_schema_list, MSTRO_WORKFLOW_NAME_MAX-1); g_component_descriptor.schema_list[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; } - if(env_schema_path == NULL) - { + if(env_schema_path == NULL) { env_schema_path = ""; } @@ -223,7 +221,7 @@ mstro_status mstro_core_init__setup_schemata(void) &g_mstro_core_schema_instance); if(status!=MSTRO_OK) { ERR("Failed to parse built-in core schema\n"); - return status; + goto BAILOUT; } DEBUG("Parsing ECMWF schema\nFIXME for 0.3.0 ... ECMWF schema shoud not be loaded by default\n"); @@ -233,31 +231,34 @@ mstro_status mstro_core_init__setup_schemata(void) &ecmwf); if(status!=MSTRO_OK) { ERR("Failed to parse built-in ecmwf schema\n"); - return status; + goto BAILOUT; } status=mstro_schema_merge(g_mstro_core_schema_instance, ecmwf); if(status!=MSTRO_OK) { ERR("Failed to merge core and ECMWF schema\n"); - return status; + goto BAILOUT; } // start reading user-defined schemas and merge them. char *end_list_token; char *end_path_token; char *schema_full_path; - char *env_schema_path_plus_default; // the lenght of all paths = length of paths exported by user + separator + default path "." int path_len = strlen(env_schema_path) + 2 + 2; - env_schema_path_plus_default = (char *) malloc(path_len*sizeof(char)); + env_schema_path_plus_default = malloc(path_len*sizeof(char)); + if(env_schema_path_plus_default==NULL) { + ERR("Failed to allocate schema path tempspace\n"); + status=MSTRO_NOMEM; + goto BAILOUT; + } /* get the first schema */ schema_list_token = strtok_r(env_schema_list, SCHEMA_LIST_SEP, &end_list_token); DEBUG("first schema_list_token: %s \n", schema_list_token); /* walk through other tokens */ - while( schema_list_token != NULL ) - { + while( schema_list_token != NULL ) { // parse a schema from the user mstro_schema user_schema; DEBUG("looking for schema_list_token: %s \n", schema_list_token); @@ -270,8 +271,7 @@ mstro_status mstro_core_init__setup_schemata(void) DEBUG("first schema_path_token: %s\n", schema_path_token); /* walk through other paths */ - while( schema_path_token != NULL ) - { + while( schema_path_token != NULL ) { // forming the full path: path token + / + list name + '\0' int schema_full_path_len = strlen(schema_path_token) + 2 + strlen(schema_list_token) + 2; schema_full_path = (char *) malloc(sizeof(char)*schema_full_path_len); @@ -295,7 +295,7 @@ mstro_status mstro_core_init__setup_schemata(void) if(status!=MSTRO_OK) { ERR("Failed to parse user_schema from file: %s \n", schema_list_token); - return status; + goto BAILOUT; } // merge the schema DEBUG("Merging user schema: %s\n", schema_list_token); @@ -304,13 +304,17 @@ mstro_status mstro_core_init__setup_schemata(void) if(status!=MSTRO_OK) { ERR("Failed to merge core and user schema from file %s\n", schema_list_token); - return status; + goto BAILOUT; } // read the next schema name schema_list_token = strtok_r(NULL, SCHEMA_LIST_SEP, &end_list_token); } - return status; +BAILOUT: + if(env_schema_path_plus_default!=NULL) + free(env_schema_path_plus_default); + + return status; } /** the number of RLIMIT_ values in the table of @ref MSTRO_DEFAULT_RLIMIT definitions */ @@ -373,6 +377,9 @@ mstro_core__adapt_rlimits(void) } +/** indicator for earliest entry into init phase, reset as last step in finalize: */ +static _Atomic(bool) g_core_init = false; + mstro_status mstro_core_init(const char *workflow_name, const char *component_name, @@ -380,6 +387,12 @@ mstro_core_init(const char *workflow_name, { mstro_status status = MSTRO_UNIMPL; + bool ival = atomic_exchange(&g_core_init, true); + if(ival) { + ERR("Init called twice without finalization in-between.\n"); + return MSTRO_NOT_TWICE; + } + mstro_core__adapt_rlimits(); status = mstro_memlock_init(MSTRO_MIN_MEMLOCK); @@ -643,6 +656,7 @@ mstro_core_finalize(void) status = mstro_memlock_finalize(); BAILOUT: + g_core_init = false; return status; } diff --git a/maestro/init.c b/maestro/init.c index c286b605d4d3123dda09ebc1ea64b4a35a70c58d..ef883b42fcaff1b083642d16d933be248d383e29 100644 --- a/maestro/init.c +++ b/maestro/init.c @@ -54,6 +54,20 @@ static mstro_nanosec_t g_startup_time; +static +void free_thread_descriptor(void* x) +{ + /* this is not a good place to call DEBUG and friends anymore */ + free(x); +} + +static void +mstro__atexit(void) +{ + /* nothing currently */ + ; +} + mstro_status mstro_init(const char *workflow_name, const char *component_name, @@ -62,7 +76,7 @@ mstro_init(const char *workflow_name, mstro_status status = MSTRO_UNIMPL; /* FIXME: should move to a threading-init function eventually */ - int s = pthread_key_create(&g_thread_descriptor_key, free); + int s = pthread_key_create(&g_thread_descriptor_key, free_thread_descriptor); if(s!=0) { /* logging quality may be a bit off this early in a failure case, but we * can't help it */ @@ -71,6 +85,12 @@ mstro_init(const char *workflow_name, abort(); } + if(atexit(mstro__atexit)!=0) { + ERR("Failed to register maestro at-exit handler: %d (%s)\n", + errno, strerror(errno)); + abort(); + } + INFO("Maestro Middleware startup: %s\n", mstro_version()); @@ -132,6 +152,13 @@ mstro_finalize(void) goto BAILOUT; } + /* dedicated threads will clean up at exit, but we need to do it manually */ + char *tid=pthread_getspecific(g_thread_descriptor_key); + if(tid) { + pthread_setspecific(g_thread_descriptor_key, NULL); + free(tid); + } + int s = pthread_key_delete(g_thread_descriptor_key); if(s!=0) { ERR("Failed to perform thread infrastructure teardown: %d (%s)\n", diff --git a/maestro/logging.c b/maestro/logging.c index a1fff5c616e4dae5bb93ac68ec1707f575c4c173..d9fd9fa9dcd65e5b265316245ead412698ff0bf6 100644 --- a/maestro/logging.c +++ b/maestro/logging.c @@ -219,9 +219,10 @@ static const char* mstro__ensure_threadid(void) { const char *tid=pthread_getspecific(g_thread_descriptor_key); + /* on a given thread there is only one instance of this, so there's + * no need for a mutex here */ if(!tid) { /* first call -- compute and store */ - /* valgrind users: this will leak. Sorry :) (we could use C11 _Tread_local instead, but need to check all compilers we care about do it well) */ char *tmp = malloc(16+2); if(tmp==NULL) { /* cannot use our logging infrastructure yet, so use stderr */ @@ -270,7 +271,8 @@ mstro__ensure_threadid(void) return tid; } -/* Calling getenv more than 6236 times on MSTRO_LOG_LEVEL gives a segfault on OSx */ +/* Calling getenv more than 6236 times on MSTRO_LOG_LEVEL gives a + * segfault on OSx */ static _Atomic(bool) g_queried_env = false; /* log destination choice */ static int g_log_dst = MSTRO_LOG_DST_STDERR; @@ -369,7 +371,7 @@ void mstro_log__init() { - pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; int err = pthread_mutex_lock(&mtx); if(err!=0) diff --git a/maestro/ofi.c b/maestro/ofi.c index 9cdc0b1b428013b91f480c97636f3024c71d1421..03c9e1245aaef669c6bb0bbb9694c7b29c5c712e 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -119,6 +119,9 @@ static mstro_drc_info g_drc_info = NULL; * worker may include other endpoint types */ static struct mstro_endpoint_set *g_endpoints = NULL; +/** the results of fi_getinfo after successful mstro_ofi_init */ +static struct fi_info *g_fi = NULL; + @@ -131,6 +134,8 @@ mstro_ep_desc_free(mstro_endpoint_descriptor desc) LL_FOREACH_SAFE(desc,el,tmp) { if(el->name) free(el->name); + if(el->oob_cookie) + free(el->oob_cookie); free(el); } } @@ -450,7 +455,7 @@ mstro_ep_desc_serialize(char **result_p, * https://troydhanson.github.io/tpl/userguide.html#_linked_lists * for details */ - tpl_node *tn; + tpl_node *tn=NULL; const struct mstro_endpoint_descriptor_ *elt; struct serialized_endpoint_element serialized_element; @@ -619,13 +624,16 @@ mstro_ep_desc_serialize(char **result_p, BAILOUT: if(buf) free(buf); - + if(tn) + tpl_free(tn); return stat; } mstro_status mstro_ofi_pm_info(char **result_p) { + void *strbuf=NULL; + if(result_p==NULL) return MSTRO_INVOUT; mstro_status status = MSTRO_OK; @@ -664,7 +672,8 @@ mstro_ofi_pm_info(char **result_p) stat = mstro_ep_desc_serialize__in6(strval, &d->in6); } else if(d->type == MSTRO_EP_OFI_IB) { ERR("Infiniband endpoints unsupported\n"); - return MSTRO_FAIL; + stat=MSTRO_FAIL; + goto BAILOUT; } else if(d->type == MSTRO_EP_OFI_PSMX || d->type == MSTRO_EP_OFI_PSMX2 || d->type == MSTRO_EP_OFI_GNI @@ -683,7 +692,8 @@ mstro_ofi_pm_info(char **result_p) size_t buflen; if(-1==tpl_dump(tns,TPL_MEM, &buf, &buflen)) { ERR("Not enough space to pack fabric endpoint\n"); - return MSTRO_FAIL; + stat = MSTRO_FAIL; + goto BAILOUT; } /* b64 encode */ @@ -692,7 +702,7 @@ mstro_ofi_pm_info(char **result_p) if(needed>MSTRO_EP_STRING_MAX) { ERR("Cannot b64 encode fabric endpoint data, need %d\n", needed); stat=MSTRO_FAIL; - return stat; + goto BAILOUT; } if(encoded!=NULL) { strcpy(strval, (char*)encoded); @@ -717,12 +727,14 @@ mstro_ofi_pm_info(char **result_p) break; default: ERR("Confusion\n"); - return MSTRO_FAIL; + stat=MSTRO_FAIL; + goto BAILOUT; } size_t len = strlen(tmp)+1; if(len>MSTRO_EP_STRING_MAX) { ERR("config endpoint string too long (%d) for serialization\n", len); - return MSTRO_FAIL; + stat=MSTRO_FAIL; + goto BAILOUT; } /* b64 encode */ size_t needed; @@ -747,13 +759,16 @@ mstro_ofi_pm_info(char **result_p) } else { ERR("Unsupported MSTRO_EP type: %d\n", d->type, mstro_ep_descriptor_names[d->type]); - return MSTRO_FAIL; + stat = MSTRO_FAIL; + goto BAILOUT; } serialized_element.type = d->type; serialized_element.strval=strdup(strval); /* shrink wrap size */ - if(serialized_element.strval==NULL) - return MSTRO_NOMEM; + if(serialized_element.strval==NULL) { + stat=MSTRO_NOMEM; + goto BAILOUT; + } mstro_drc_get_oob_string(&serialized_element.oob_cookie, g_drc_info); DEBUG("Added OOB info %s\n", serialized_element.oob_cookie); @@ -770,19 +785,20 @@ mstro_ofi_pm_info(char **result_p) /* serialized_element.strval); */ tpl_pack(tn,1); free(serialized_element.strval); + free(serialized_element.oob_cookie); } /* now write all to a string */ size_t len; - void *buf; - tpl_dump(tn,TPL_MEM, &buf, &len); + tpl_dump(tn,TPL_MEM, &strbuf, &len); if(len>MSTRO_EP_STRING_MAX) { ERR("EP serialization took %zu bytes, but only %zu are supported\n", len, MSTRO_EP_STRING_MAX); - return MSTRO_FAIL; + stat = MSTRO_FAIL; + goto BAILOUT; } /* b64 encode */ size_t needed; - encoded = base64_encode(buf, len, &needed); + encoded = base64_encode(strbuf, len, &needed); if(needed>MSTRO_EP_STRING_MAX) { ERR("Cannot b64 encode endpoint descriptor list, need %d\n", needed); stat=MSTRO_FAIL; @@ -802,7 +818,10 @@ mstro_ofi_pm_info(char **result_p) } #endif BAILOUT: - free(buf); + if(strbuf) + free(strbuf); + if(tn) + tpl_free(tn); return status; } @@ -1814,6 +1833,7 @@ mstro_ofi_init(void) } } } + /* fix pointer into invalid last entry */ g_endpoints->eps[g_endpoints->size-1].next = NULL; @@ -1829,6 +1849,11 @@ BAILOUT_FAIL: ; /* FIXME: free resources */ BAILOUT: + + if(fi) { + g_fi = fi; + } + return retstat; } @@ -1843,6 +1868,9 @@ mstro_ofi_finalize(bool destroy_drc_info) int s; struct mstro_endpoint *e = &g_endpoints->eps[i]; + if(e->addr_serialized) + free(e->addr_serialized); + if(e->peer_info_mr) { DEBUG("closing RDMA peer_info MR for ep %zu\n", i); s = fi_close((struct fid*)e->peer_info_mr); @@ -1860,8 +1888,11 @@ mstro_ofi_finalize(bool destroy_drc_info) } } /* every EP has one lock on the component descriptor */ - mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); - + mstro_memunlock(&g_component_descriptor, + sizeof(g_component_descriptor)); + if(e->component_info_raw_key) + free(e->component_info_raw_key); + #define CLOSE_FID(member, descr) do { \ if(e->member) { \ DEBUG("Closing down %s for ep %zu\n", descr, i); \ @@ -1879,10 +1910,14 @@ mstro_ofi_finalize(bool destroy_drc_info) // CLOSE_FID(fi,"FABRIC"); #undef CLOSE_FID + mstro_ep_desc_free(e->descr); } + free(g_endpoints); } DEBUG("OFI endpoints closed\n"); - + if(g_fi) + fi_freeinfo(g_fi); + /* It's safe to call this even if we've not yet locked the page; we * ignore the error that we'd get in this case */ mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); diff --git a/maestro/pool_manager_ofi.c b/maestro/pool_manager_ofi.c index f56804d8cc39ffd80e6e1f9cb3ce302db5101537..a154daafdb8729da323c38028261834e54f6fdc3 100644 --- a/maestro/pool_manager_ofi.c +++ b/maestro/pool_manager_ofi.c @@ -224,6 +224,9 @@ mstro_pm_terminate(void) if(s!=0) status = MSTRO_FAIL; } + + status |= mstro_event_domain_destroy(g_mstro_pm_continuations); + return status; } diff --git a/tests/.gitignore b/tests/.gitignore index 817c704ffab2d8c4b0d9b2a90f928b1828d6331d..af165d10e70513132315c15d7e3763d3c461635b 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -21,4 +21,6 @@ simple_archiver simple_injector simple_group_injector simple_telemetry_listener -coverage \ No newline at end of file +coverage +massif.* +xtleak.* diff --git a/tests/check_layout.c b/tests/check_layout.c index 3570ff2ac0c3ac7957d2ef37b17d780902f822fa..04f8f5c4e3fa1ee9e65d82a14e54a721c913fbd2 100644 --- a/tests/check_layout.c +++ b/tests/check_layout.c @@ -228,6 +228,7 @@ CHEAT_TEST(layout_attribute_works, cheat_assert(MSTRO_OK == mstro_finalize()); + free(dimsz_src); free(dimsz_dst); free(unpooled_data_t); free(unpooled_data); free(src_data); diff --git a/tests/check_schema_parse.c b/tests/check_schema_parse.c index cd12f1b327dfa40c2f374f5f3b038c0b5c3b51fc..2283d7b4bbd2b329aaf3e1241cb53752ec7cf54f 100644 --- a/tests/check_schema_parse.c +++ b/tests/check_schema_parse.c @@ -111,6 +111,8 @@ CHEAT_TEST(core_schema_parse, cheat_assert(MSTRO_OK==mstro_schema_merge(s1, s2)); cheat_yield(); + cheat_assert(MSTRO_OK==mstro_schema_merge(s1, s3)); + cheat_yield(); /* now use the merged schema to parse some sample defs */ cheat_assert(MSTRO_OK==mstro_attributes_parse(s1, @@ -118,6 +120,8 @@ CHEAT_TEST(core_schema_parse, &dict, NULL)); cheat_yield(); + cheat_assert(MSTRO_OK==mstro_attribute_dict_dispose(dict)); + /* s1 is the merged one, and freeing it will recurse */ cheat_assert(MSTRO_OK==mstro_schema_free(s1)); @@ -206,7 +210,8 @@ CHEAT_TEST(core_schema_parse_builtin, cheat_assert(tsval_cast->sec==ts.sec && tsval_cast->nsec==ts.nsec && tsval_cast->offset==ts.offset); - + cheat_assert(MSTRO_OK==mstro_attribute_dict_dispose(dict)); + /* s1 is the merged one, and freeing it will recurse */ cheat_assert(MSTRO_OK==mstro_schema_free(s1)); diff --git a/tests/check_transport_gfs.c b/tests/check_transport_gfs.c index 1200546b95b1409201724d2771c40389b40ee2b2..7198581f0d5a9ff76a2ae484d2d585a92b0c8b7a 100644 --- a/tests/check_transport_gfs.c +++ b/tests/check_transport_gfs.c @@ -188,6 +188,8 @@ CHEAT_TEST(transport_gfs_works, cheat_assert(MSTRO_OK == mstro_cdo_dispose(cdo_src)); cheat_assert(MSTRO_OK == mstro_finalize()); + free(src_data); + free(dst_data); ) diff --git a/tests/check_uuid.c b/tests/check_uuid.c index 8b1f7df483e295825c075e29df0d1a6a855324f7..5f6a77ca8bebe54c497f038222a742f8f8e55fe0 100644 --- a/tests/check_uuid.c +++ b/tests/check_uuid.c @@ -120,5 +120,8 @@ CHEAT_TEST(uuid_import_export, printf("%s", uuid_error(msg)); cheat_assert(UUID_RC_OK==uuid_destroy(uuid1)); + cheat_assert(UUID_RC_OK==uuid_destroy(uuid2)); + free(data); + free(data2); ) diff --git a/tests/simple_telemetry_listener.c b/tests/simple_telemetry_listener.c index b9e2ad8e6189012afc106b8c6c015f798173072c..93273138e32b7befa4df09269859602e196ee880 100644 --- a/tests/simple_telemetry_listener.c +++ b/tests/simple_telemetry_listener.c @@ -203,7 +203,7 @@ parse_arguments(int argc, char **argv) } if(g_verbose) { - fprintf(stderr, "Configuration: %s/%s/%s/%" PRIu64 "/%d\n", + fprintf(stderr, "Configuration: %s/%s/%s/%" PRInanosec "/%d\n", g_conf_workflow_name, g_conf_component_name, g_conf_terminate_after, g_conf_max_wait, g_conf_logdst); }