diff --git a/include/maestro/i_globals.h b/include/maestro/i_globals.h index 62dfea09f968a521ffee23ea8c182b05a1cc48bc..cec6fbbc807b377c24b409075512bba3b9d401f3 100644 --- a/include/maestro/i_globals.h +++ b/include/maestro/i_globals.h @@ -95,7 +95,7 @@ extern mmbMemSpace *g_default_cdo_memspace; /** the default (DRAM) memory interface */ extern mmbMemInterface *g_default_cdo_interface; - + /** MIO can be compiled in, but run time config may be missing, or MIO may fail * to initialize. This flag is TRUE if MIO is properly initialized and usable * */ @@ -105,7 +105,7 @@ extern bool g_mio_available; /** the app token used in communicating to the pool manager. Set after successfully JOINing, cleared after LEAVEing */ extern Mstro__Pool__Apptoken g_pool_apptoken; -/** the app ID (packed version of @ref g_pool_app_id) used in communicating with the pool manager. */ +/** the app ID (packed version of @ref g_pool_app_id) used in communicating with the pool manager. */ extern Mstro__Pool__Appid g_pool_appid; /** the fundamental built-in attribute schema. Filled early in mstro_core_init(), then constant */ @@ -145,6 +145,8 @@ union mstro_component_descriptor { char workflow_name[MSTRO_WORKFLOW_NAME_MAX]; /** the component name */ char component_name[MSTRO_WORKFLOW_NAME_MAX]; + /** the list of user schemas */ + char schema_list[MSTRO_WORKFLOW_NAME_MAX]; /** the maestro core version */ char version[128]; }; /*<** descriptor data */ diff --git a/maestro/core.c b/maestro/core.c index accbf882ce442cf87ea08552cf5c417b5994b4c0..9a3d591743925f020344d68d2ea7e7b66b404e4c 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -182,6 +182,13 @@ mstro_status mstro_core_init__setup_schemata(void) { env_schema_list = ""; } + else + { + INFO("List of user attributes schemata %s \n", env_schema_list); + strncpy(g_component_descriptor.schema_list, env_schema_list, MSTRO_WORKFLOW_NAME_MAX-1); + g_component_descriptor.schema_list[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; + } + if(env_schema_path == NULL) { env_schema_path = ""; @@ -343,7 +350,7 @@ mstro_core_init(const char *workflow_name, } size_t l2 = strlen(data->workflow_name); size_t l3 = strlen(data->component_name); - + g_mstro_transport_gfs_dir = malloc(l1 + (need_sep? 1 : 0) + l2 + 1 + l3 + 1 +1); @@ -360,10 +367,10 @@ mstro_core_init(const char *workflow_name, } strcpy(g_mstro_transport_gfs_dir+pos, data->workflow_name); pos += l2; - + g_mstro_transport_gfs_dir[pos] = '/'; pos++; - + strcpy(g_mstro_transport_gfs_dir+pos, data->component_name); pos += l3; g_mstro_transport_gfs_dir[pos] = '/'; @@ -376,7 +383,7 @@ mstro_core_init(const char *workflow_name, g_mstro_transport_gfs_dir); } } - + /*Reading and integrating attribute schemas*/ status = mstro_core_init__setup_schemata(); if(status!=MSTRO_OK) { diff --git a/maestro/ofi.c b/maestro/ofi.c index 72a1572df8ded128f7f66375d169c9eb9b56f951..49fb5f1a8f100850be03b74e03b0b61f2ab4e496 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -5,21 +5,21 @@ /* * Copyright (C) 2019 Cray Computer GmbH - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * + * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * + * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -147,9 +147,9 @@ mstro_ep_desc_create_ofi(mstro_endpoint_descriptor *result_p, return MSTRO_INVARG; enum mstro_endpoint_type ept = MSTRO_EP_INVALID; - + switch(fi->addr_format) { - /** This is the list documented for libfabric 1.5 */ + /** This is the list documented for libfabric 1.5 */ case FI_SOCKADDR_IN: ept = MSTRO_EP_OFI_IN4; break; case FI_SOCKADDR_IN6: @@ -268,13 +268,13 @@ mstro_ep_desc_serialize__in4(char *dst, const struct sockaddr_in *sin) { mstro_status stat = MSTRO_UNIMPL; - + /* struct sockaddr_in is defined platform-independent and * already using network byte order as needed, so we can handle * it as a struct */ tpl_node *tns; tpl_bin tb; - + tb.sz = sizeof(struct sockaddr_in); tb.addr = sin; tns = tpl_map("B", &tb); @@ -318,7 +318,7 @@ mstro_ep_desc_deserialize__in4(struct sockaddr_in *sin, char *b64_strval) { mstro_status stat=MSTRO_UNIMPL; - + tpl_node *tn; tpl_bin tb; tn = tpl_map( "B", &tb ); @@ -333,7 +333,7 @@ mstro_ep_desc_deserialize__in4(struct sockaddr_in *sin, goto BAILOUT; } /* DEBUG("b64decode: buf of length %zu\n", buflen); */ - + tpl_load( tn, TPL_MEM, buf, buflen); tpl_unpack( tn, 0 ); tpl_free(tn); @@ -361,7 +361,7 @@ mstro_ep_desc_serialize__in6(char *dst, tpl_bin tb; INFO("sockaddr_in6 with port %d\n", ntohs(sin->sin6_port)); - + tb.sz = sizeof(struct sockaddr_in6); tb.addr = sin; tns = tpl_map("B", &tb); @@ -388,7 +388,7 @@ mstro_ep_desc_serialize__in6(char *dst, stat=MSTRO_FAIL; goto BAILOUT; } - + stat=MSTRO_OK; BAILOUT: /* clean up */ @@ -422,7 +422,7 @@ mstro_ep_desc_deserialize__in6(struct sockaddr_in6 *sin, tpl_load( tn, TPL_MEM, buf, buflen); tpl_unpack( tn, 0 ); tpl_free(tn); - + assert(tb.sz==sizeof(struct sockaddr_in6)); memcpy(sin, tb.addr, tb.sz); @@ -455,13 +455,13 @@ mstro_ep_desc_serialize(char **result_p, struct serialized_endpoint_element serialized_element; assert(MSTRO_EP__MAX<=INT_MAX); - + tn = tpl_map("A(S(IssUUc#))", &serialized_element, MSTRO_OFI_KEY_LEN_MAX); char *strval = alloca(MSTRO_EP_STRING_MAX*sizeof(char)); if(strval==NULL) return MSTRO_NOMEM; - + LL_FOREACH(epd,elt) { /* each one should serialize into STRVAL buffer, base64-encoded, * NULL-terminated */ @@ -495,7 +495,7 @@ mstro_ep_desc_serialize(char **result_p, return MSTRO_FAIL; } /* b64 encode */ - + size_t needed; encoded = base64_encode(buf, buflen, &needed); if(needed>MSTRO_EP_STRING_MAX) { @@ -513,7 +513,7 @@ mstro_ep_desc_serialize(char **result_p, stat=MSTRO_OK; BAILOUT_1: /* clean up */ - NFREE(encoded); + NFREE(encoded); tpl_free(tns); free(buf); } else if(elt->type == MSTRO_EP_OFI_STR) { @@ -549,7 +549,7 @@ mstro_ep_desc_serialize(char **result_p, stat=MSTRO_NOMEM; goto BAILOUT_2; } - + stat=MSTRO_OK; BAILOUT_2: NFREE(encoded); @@ -558,7 +558,7 @@ mstro_ep_desc_serialize(char **result_p, elt->type, mstro_ep_descriptor_names[elt->type]); return MSTRO_FAIL; } - + serialized_element.type = elt->type; serialized_element.strval=strdup(strval); /* shrink wrap size */ if(serialized_element.strval==NULL) @@ -566,7 +566,7 @@ mstro_ep_desc_serialize(char **result_p, mstro_drc_get_oob_string(&serialized_element.oob_cookie, g_drc_info); DEBUG("Added OOB info %s\n", serialized_element.oob_cookie); - + /* keys */ serialized_element.info_addr = ep->component_info_addr; serialized_element.info_keysize = ep->component_info_keysize; @@ -582,7 +582,7 @@ mstro_ep_desc_serialize(char **result_p, free(serialized_element.strval); free(serialized_element.oob_cookie); } - + /* now write all to a string */ size_t len; void *buf=NULL; @@ -619,7 +619,7 @@ mstro_ep_desc_serialize(char **result_p, BAILOUT: if(buf) free(buf); - + return stat; } @@ -629,7 +629,7 @@ mstro_ofi_pm_info(char **result_p) if(result_p==NULL) return MSTRO_INVOUT; mstro_status status = MSTRO_OK; - + unsigned char *encoded=NULL; if(g_endpoints==NULL) { ERR("No endpoints configured -- did you call mstro_pm_start()?\n"); @@ -645,14 +645,14 @@ mstro_ofi_pm_info(char **result_p) struct serialized_endpoint_element serialized_element; assert(MSTRO_EP__MAX<=INT_MAX); - + tn = tpl_map("A(S(IssUUc#))", &serialized_element, MSTRO_OFI_KEY_LEN_MAX); char *strval = alloca(MSTRO_EP_STRING_MAX*sizeof(char)); if(strval==NULL) return MSTRO_NOMEM; mstro_status stat; - + LL_FOREACH(&(g_endpoints->eps[0]),elt) { mstro_endpoint_descriptor d = elt->descr; /* each one should serialize into STRVAL buffer, base64-encoded, @@ -686,7 +686,7 @@ mstro_ofi_pm_info(char **result_p) return MSTRO_FAIL; } /* b64 encode */ - + size_t needed; encoded = base64_encode(buf, buflen, &needed); if(needed>MSTRO_EP_STRING_MAX) { @@ -704,7 +704,7 @@ mstro_ofi_pm_info(char **result_p) stat=MSTRO_OK; BAILOUT_1: /* clean up */ - NFREE(encoded); + NFREE(encoded); tpl_free(tns); free(buf); } else if(d->type == MSTRO_EP_OFI_STR) { @@ -740,7 +740,7 @@ mstro_ofi_pm_info(char **result_p) stat=MSTRO_NOMEM; goto BAILOUT_2; } - + stat=MSTRO_OK; BAILOUT_2: NFREE(encoded); @@ -749,7 +749,7 @@ mstro_ofi_pm_info(char **result_p) d->type, mstro_ep_descriptor_names[d->type]); return MSTRO_FAIL; } - + serialized_element.type = d->type; serialized_element.strval=strdup(strval); /* shrink wrap size */ if(serialized_element.strval==NULL) @@ -804,7 +804,7 @@ mstro_ofi_pm_info(char **result_p) BAILOUT: free(buf); return status; - + } @@ -821,7 +821,7 @@ mstro_ep_desc_deserialize__str(char **dst, mstro_status stat=MSTRO_UNIMPL; - + size_t buflen; unsigned char *buf = base64_decode((unsigned char*)b64_strval, strlen(b64_strval), @@ -858,7 +858,7 @@ mstro_ep_desc_deserialize__uint64(uint64_t (*dst)[6], } mstro_status stat=MSTRO_UNIMPL; - + size_t buflen; unsigned char *buf = base64_decode((unsigned char*)b64_strval, strlen(b64_strval), @@ -880,7 +880,7 @@ mstro_ep_desc_deserialize__uint64(uint64_t (*dst)[6], BAILOUT: NFREE(buf); - return stat; + return stat; } @@ -893,7 +893,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, tpl_node *tn=NULL; mstro_endpoint_descriptor *next = NULL; - if(result_p==NULL) + if(result_p==NULL) return MSTRO_INVOUT; if(serialized_eps==NULL) return MSTRO_INVARG; @@ -903,7 +903,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, unsigned char *buf = base64_decode((unsigned char*)serialized_eps, strlen(serialized_eps), &buflen); - + if(buf==NULL) { stat=MSTRO_NOMEM; goto BAILOUT; @@ -937,9 +937,9 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, target->oob_cookie=NULL; DEBUG("no cookie\n"); } - + target->name=NULL; - + if(eptype==MSTRO_EP_OFI_IN4) { stat = mstro_ep_desc_deserialize__in4(&(target->in4), serialized_element.strval); @@ -956,11 +956,11 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, || eptype==MSTRO_EP_OFI_MLX) { /** transparently handles one .. six entries */ stat = mstro_ep_desc_deserialize__uint64(&target->gni, - serialized_element.strval); + serialized_element.strval); } else if(eptype==MSTRO_EP_OFI_STR) { target->str=NULL; stat = mstro_ep_desc_deserialize__str(&(target->str), - serialized_element.strval); + serialized_element.strval); } else { ERR("Unsupported EP type: %d\n", eptype); stat = MSTRO_UNIMPL; @@ -994,7 +994,7 @@ mstro_ep_desc_deserialize(mstro_endpoint_descriptor *result_p, } BAILOUT: free(buf); - + return stat; } @@ -1090,7 +1090,7 @@ mstro_ep_desc_describe(mstro_endpoint_descriptor desc) s, errno, strerror(errno)); abort(); } - + desc->name = buf; } } @@ -1107,7 +1107,7 @@ mstro_ep_desc_to_ofi_addr(uint32_t *addr_format, return MSTRO_INVOUT; if(epd==NULL) return MSTRO_INVARG; - + switch(epd->type) { case MSTRO_EP_OFI_IN4: *addr_format = FI_SOCKADDR_IN; @@ -1167,7 +1167,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, return MSTRO_INVOUT; if(fi==NULL) return MSTRO_INVARG; - + struct fid_fabric *fabric = NULL; struct fi_eq_attr eq_attr; struct fid_eq *eq = NULL; @@ -1178,7 +1178,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, struct fi_cq_attr cq_attr; struct fid_cq *cq = NULL; uint8_t *mr_key = NULL; - + /* create fabric object */ stat = fi_fabric(fi->fabric_attr, &fabric, NULL); if(stat!=0) { @@ -1195,15 +1195,15 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, * allocated early on in ofi_init (on pool manager) or obtained * from PM_INFO (on clients) */ assert(g_drc_info!=NULL); - + stat = mstro_drc_insert_ofi(fi, g_drc_info); if(stat!=MSTRO_OK) { ERR("Failed to insert DRC credential into fabric info\n"); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; - } + } } - + /* create domain */ stat = fi_domain(fabric, fi, &domain, NULL); if(stat!=0) { @@ -1232,7 +1232,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_cq_open failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* event queue */ /* create event queue */ memset(&eq_attr, 0, sizeof(eq_attr)); @@ -1253,7 +1253,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, } /* INFO("This is a %s\n", fi_tostr(fi,FI_TYPE_INFO)); */ - + /* connectionless endpoint needs address vector */ memset(&av_attr, 0, sizeof(av_attr)); av_attr.type = FI_AV_MAP; @@ -1262,14 +1262,14 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_av_open failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* bind address vector to endpoint */ stat = fi_ep_bind(ep, &av->fid, 0); if(stat!=0) { ERR("fi_ep_bind for av failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* bind event queue to endpoint */ stat = fi_ep_bind(ep, &eq->fid, 0); if(stat!=0) { @@ -1280,7 +1280,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, eq=NULL; } else { retstat=MSTRO_FAIL; goto BAILOUT_FAIL; - } + } } /* bind cq to endpoint */ stat = fi_ep_bind(ep, &cq->fid, FI_TRANSMIT|FI_RECV); @@ -1288,7 +1288,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("fi_ep_bind for cq failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - + /* enable! */ stat = fi_enable(ep); if(stat!=0) { @@ -1380,7 +1380,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("Failed to retrieve key for component info memory registration: %d (%s)\n"); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; } - assert(sizeof(key)<=MSTRO_OFI_KEY_LEN_MAX); + assert(sizeof(key)<=MSTRO_OFI_KEY_LEN_MAX); /* keysize is 4 on verbs ... we try to let the compiler handle byte order issues here by casting */ switch(keysize) { case 1: { @@ -1402,7 +1402,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, memcpy(mr_key, &key, sizeof(key)); break; } - default: + default: ERR("Unsupported MR key size %d\n", keysize); assert(0); } @@ -1417,13 +1417,13 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, addr=(uint64_t)&g_component_descriptor; } } - - + + { uint64_t k = 0; memcpy(&k, mr_key, keysize); /* just for printing */ - + INFO("Component info RDMA block registered%s: local %p, MR %p, addr %" PRIx64 ", desc %p, keysize %zu (key uint64 start: 0x%" PRIx64 ")\n", fi->domain_attr->mr_mode & FI_MR_RAW ? " (raw keys)" : "", &g_component_descriptor, mr, addr, fi_mr_desc(mr), keysize, k); @@ -1440,7 +1440,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, dst->component_info_addr = addr; dst->component_info_keysize = keysize; dst->component_info_raw_key = mr_key; - + dst->next = NULL; @@ -1450,7 +1450,7 @@ mstro_ep_build_from_ofi(struct mstro_endpoint *dst, ERR("Failed to construct worker mgmt ep description: %d\n", retstat); goto BAILOUT_FAIL; } - + retstat = mstro_ep_desc_serialize(&dst->addr_serialized, dst); if(retstat!=MSTRO_OK) { @@ -1483,7 +1483,7 @@ BAILOUT_FAIL: /* wait sets (none) */ CLEANUP(fabric,"fabric"); #undef CLEANUP - + BAILOUT: return retstat; } @@ -1502,7 +1502,7 @@ mstro_endpoint_describe(struct mstro_endpoint *ep) char addrbuf[EP_DESC_BUF_MAX]; ep_name_buf[0]='\0'; - + size=0; ret = fi_getname(&ep->ep->fid,NULL,&size); if(ret!=-FI_ETOOSMALL) { @@ -1511,16 +1511,16 @@ mstro_endpoint_describe(struct mstro_endpoint *ep) } assert(size<EP_DESC_BUF_MAX); fi_getname(&ep->ep->fid,addrbuf,&size); - + size=0; fi_av_straddr(ep->av, addrbuf, NULL, &size); assert(size<EP_DESC_BUF_MAX); fi_av_straddr(ep->av, addrbuf, strbuf, &size); - + size=snprintf(ep_name_buf, EP_DESC_BUF_MAX, "OFI EP prov %s name %s straddr %s", - ep->fi->fabric_attr->prov_name, + ep->fi->fabric_attr->prov_name, ep->fi->fabric_attr->name, strbuf); assert(size<1024); DONE: @@ -1655,7 +1655,7 @@ mstro_ofi__order_fi_list(struct fi_info **fi) { if(fi==NULL) return MSTRO_INVARG; - + struct fi_info *head = *fi; LL_SORT(head, mstro_ofi__fi_info_cmp); *fi = head; @@ -1702,11 +1702,11 @@ mstro_ofi_init(void) hints->mode = MSTRO_OFI_MODE; hints->ep_attr->type = MSTRO_OFI_EP_TYPE; hints->domain_attr->mr_mode = MSTRO_OFI_MRMODE; - + /* we really want 1.8 or above */ stat = fi_getinfo(MSTRO_OFI_VERSION, NULL, NULL, 0, hints, &fi); fi_freeinfo(hints); - + if(stat!=0) { ERR("fi_getinfo failed: %d (%s)\n", stat, fi_strerror(-stat)); retstat=MSTRO_FAIL; goto BAILOUT_FAIL; @@ -1748,7 +1748,7 @@ mstro_ofi_init(void) goto BAILOUT_FAIL; } - /* also do it for the PM descriptor block we'll RDMA read into */ + /* also do it for the PM descriptor block we'll RDMA read into */ assert(sizeof(g_pm_component_descriptor)%sysconf(_SC_PAGESIZE)==0); retstat = mstro_memlock(&g_pm_component_descriptor, sizeof(g_pm_component_descriptor)); if(retstat!=MSTRO_OK) { @@ -1762,11 +1762,11 @@ mstro_ofi_init(void) LL_FOREACH(fi,tmp) { NOISE("index %zu\n", i); i++; - DEBUG("potential endpoint: %s %s\n", tmp->fabric_attr->prov_name, + DEBUG("potential endpoint: %s %s\n", tmp->fabric_attr->prov_name, tmp->fabric_attr->name); retstat = mstro_ep_build_from_ofi(&(g_endpoints->eps[g_endpoints->size]), tmp); - + if(retstat!=MSTRO_OK) { WARN("Failed to build EP %zu, trying others\n", i); } else { @@ -1799,18 +1799,18 @@ mstro_ofi_init(void) } /* fix pointer into invalid last entry */ g_endpoints->eps[g_endpoints->size-1].next = NULL; - + if(g_endpoints!=NULL) { INFO("%zu usable endpoints\n", g_endpoints->size); retstat = MSTRO_OK; } else { ERR("No usable endpoints found\n"); } - + goto BAILOUT; BAILOUT_FAIL: ; /* FIXME: free resources */ - + BAILOUT: return retstat; } @@ -1842,7 +1842,7 @@ mstro_ofi_finalize(void) } /* every EP has one lock on the component descriptor */ mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); - + #define CLOSE_FID(member, descr) do { \ if(e->member) { \ DEBUG("Closing down %s for ep %zu\n", descr, i); \ @@ -1851,7 +1851,7 @@ mstro_ofi_finalize(void) descr, i, s, fi_strerror(-s)); } \ } \ } while(0) - + CLOSE_FID(ep,"EP"); CLOSE_FID(cq,"CQ"); CLOSE_FID(av,"AV"); @@ -1859,16 +1859,16 @@ mstro_ofi_finalize(void) CLOSE_FID(domain,"DOMAIN"); // CLOSE_FID(fi,"FABRIC"); #undef CLOSE_FID - + } } DEBUG("OFI endpoints closed\n"); - + /* It's safe to call this even if we've not yet locked the page; we * ignore the error that we'd get in this case */ mstro_memunlock(&g_component_descriptor, sizeof(g_component_descriptor)); mstro_memunlock(&g_pm_component_descriptor, sizeof(g_pm_component_descriptor)); - + return MSTRO_OK; } @@ -1890,7 +1890,7 @@ struct mstro_ofi_msg_context_ { }; bool has_completion; /**< indicates if pthread waiting * infrastructure is initialized in this - * context and should be used*/ + * context and should be used*/ pthread_cond_t completion; /**< Condition variable for originator of * message to */ pthread_mutex_t lock; /**< a lock, mostly because cond_wait @@ -1901,7 +1901,7 @@ typedef struct mstro_ofi_msg_context_ * mstro_ofi_msg_context; /** Create a message completion context for @arg msg. - + If @arg want_completion is true a mutex and a condition variable will be initialized and the completion handler can signal completion on that condition variable to the originator. @@ -1923,7 +1923,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, /* RDMA ops don't have non-NULL msg */ /* if(msg==NULL) */ /* return MSTRO_INVARG; */ - + mstro_ofi_msg_context ctx = malloc(sizeof(struct mstro_ofi_msg_context_)); if(ctx==NULL) { @@ -1939,7 +1939,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, free(ctx); ctx=NULL; goto BAILOUT; - } + } res = pthread_cond_init(&ctx->completion, NULL); if(res!=0) { ERR("Failed to init ofi msg cond var: %d\n", res); @@ -1957,7 +1957,7 @@ mstro_ofi__msg_context_create(mstro_ofi_msg_context *result_p, stat = MSTRO_OK; BAILOUT: *result_p = ctx; - return stat; + return stat; } /** destroy an OFI message context */ @@ -1975,7 +1975,46 @@ mstro_ofi__msg_context_destroy(mstro_ofi_msg_context ctx) return MSTRO_OK; } - +/** Check the schema list provided from pm is compatible with the component schema list */ +/* compatible means that pm supports all of the schemas in the component schema list or more */ +/* FIXME check that the schema versions are compatible between the pm and the component */ +bool schema_list_compatible_p(const char * component_schema_list, const char * pm_schema_list) +{ + char * schema_list_sep = ";"; + char *schema_name; + char *end_list_token; + + if (component_schema_list == NULL) + { + /*nothing to look for here*/ + //DEBUG("[schema_list_compatible_p] component schema list is NULL"); + return true; + } + else if (pm_schema_list ==NULL) { + /*component_schema_list is not empty but pm_schema_list is empty ... we should look no further*/ + //DEBUG("[schema_list_compatible_p] pm schema list is NULL"); + return false; + } + // split the schema names in the component_schema_list + schema_name = strtok_r(component_schema_list, schema_list_sep, &end_list_token); + // for each schema name in the component_schema_list ...try to find it in the pm_schema_list + while( schema_name != NULL ) + { + DEBUG("Looking for schema %s \n", schema_name); + /*if we did not find the component schema name in the pm_schema_list*/ + if(strstr(pm_schema_list, schema_name) == NULL) { + DEBUG("Schema %s not found in pm schema list\n", schema_name); + return false; + } + DEBUG("Found schema %s in %s\n", schema_name, pm_schema_list); + // read the next schema name + schema_name = strtok_r(NULL, schema_list_sep, &end_list_token); + } + + /*We reach here only if all component schemas were found in the pm_schema_list*/ + return true; + +} /** select the best match between remote and local endpoints */ mstro_status @@ -2053,7 +2092,7 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, stat = MSTRO_FAIL; goto BAILOUT; } - + uint64_t mr_addr=0; /* since the EP accepted the address we interpolate that * MR_RAW will be set identically (while we really should be @@ -2075,7 +2114,7 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, } DEBUG("Checking for PM config block MR at (remote addr) 0x%" PRIx64 ", key %" PRIx64 "\n", mr_addr, mr_key); - + assert(ctx->msg==NULL); assert(my_ep->peer_info_mr!=NULL); /* incoming buffer has been registered at local endpoint set creation */ void * local_buf_mr_desc = fi_mr_desc(my_ep->peer_info_mr); @@ -2123,8 +2162,15 @@ mstro_ofi__select_endpoint(struct mstro_endpoint_descriptor_ *remote, INFO("Component descriptor mismatch, illegal type or other workflow, skipping\n"); continue; } + DEBUG("PM schema list %s \n", g_pm_component_descriptor.schema_list); + DEBUG("Component schema list %s \n", g_component_descriptor.schema_list); + if(!schema_list_compatible_p(g_component_descriptor.schema_list, g_pm_component_descriptor.schema_list)) + { + INFO("Component descriptor mismatch, PM does not support the schema list of component, skipping\n"); + continue; + } } /* end of possible config block verification */ - + *remote_addr_p = translated_addr; *local_p = my_ep; stat=MSTRO_OK; @@ -2161,7 +2207,7 @@ mstro_ofi__submit_message(struct mstro_endpoint *ep, assert(env!=NULL); DEBUG("Sending message %s, size %zu, %p using ep %p\n", env->descriptor->name, env->payload_size, env, ep); - + int res; do { assert(env!=context); /* catch common mistake: context needs to be a @@ -2174,7 +2220,7 @@ mstro_ofi__submit_message(struct mstro_endpoint *ep, /* DEBUG("tsend with tag %"PRIu64"\n", (uint64_t)msg->type); */ } else { - res = fi_send(ep->ep, env->data, env->payload_size, + res = fi_send(ep->ep, env->data, env->payload_size, descriptor, dst, context); } if(res!=0) { @@ -2205,9 +2251,9 @@ mstro_ofi__submit_message_wait(struct mstro_endpoint *ep, fi_addr_t dst, ERR("Failed to acquire msg context lock: %d\n", ret); return MSTRO_FAIL; } - + stat = mstro_ofi__submit_message(ep, dst, msg, NULL, ctx); - if(stat!=MSTRO_OK) + if(stat!=MSTRO_OK) return stat; ret = pthread_cond_wait(&ctx->completion, &ctx->lock); @@ -2240,8 +2286,8 @@ mstro_ofi__submit_message_nowait(struct mstro_endpoint *ep, fi_addr_t dst, } else { ; } - - + + /* for non-completion contexts the message handler frees the * context */ return mstro_ofi__submit_message(ep, dst, msg, NULL, ctx); @@ -2257,7 +2303,7 @@ mstro_ofi__expect_message_wait(struct fid_ep *ep, fi_addr_t src, = mstro_ofi__msg_context_create(&ctx, msg, true); if(stat!=MSTRO_OK) return stat; - + int ret = pthread_mutex_lock(&ctx->lock); if(ret!=0) { ERR("Failed to acquire msg context lock: %d\n", ret); @@ -2266,7 +2312,7 @@ mstro_ofi__expect_message_wait(struct fid_ep *ep, fi_addr_t src, /* FIXME: split out do loop like for submit so that we don't need * fi_recv in arbitrary places anymore */ - + int res; do { if(MSTRO_MSG_HAS_TAG(msg)) { @@ -2354,7 +2400,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, } INFO("EPD serial %s\n", join_msg->serialized_endpoint); INFO("EPD parsed as %s\n", mstro_ep_desc_describe(epd)); - + uint32_t addr_format; void *remote_addr; size_t addrlen; @@ -2384,7 +2430,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, DEBUG("App %s advertises %zu transport methods\n", join_msg->component_name, join_msg->transport_methods->n_supported); - + /* insert into registry table */ s = mstro_pm_app_register(ep, translated_addr, strdup(join_msg->serialized_endpoint), @@ -2398,7 +2444,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, join_msg->transport_methods = NULL; /* we kept a reference to it, caller will free the message */ DEBUG("Assigned app ID %"PRIu64"\n", (*entry_p)->appid); } - + BAILOUT: return s; BAILOUT_FREE: @@ -2546,24 +2592,24 @@ mstro_ofi__check_and_handle_cq(struct mstro_endpoint *ep, INFO("Completion of op with ctx %p message %p%s of size %zu on endpoint %p\n", ctx, ctx->msg, re_post ? " (slot i)" : "", entry.len, ep); - + if(re_post) { DEBUG("Posting fresh recv on slot %p", expected_slot_p); status = mstro_ofi__loop_post_recv(ep->ep, expected_slot_p, expected_ctx_p); } - - + + if(entry.flags&FI_RECV) { assert(ctx->msg!=NULL); ctx->msg->payload_size = entry.len; /* this permits us to not pass ofi 'entry' to pc/pm handler */ const struct mstro_msg_envelope *msg = ctx->msg; status = incoming_msg_handler(msg, ep); - + if(status!=MSTRO_OK) { ERR("Error handling incoming message, dropping it: %d (%s)\n", status, mstro_status_description(status)); } - + if(re_post) { ; /* fresh receive will be posted later, and slot messages have no completion watchers */ } else { @@ -2571,42 +2617,42 @@ mstro_ofi__check_and_handle_cq(struct mstro_endpoint *ep, DEBUG("non-slot message, cleaning up\n"); if(ctx->has_completion) { DEBUG("ctx asks for completion notification\n"); - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("ctx wants no completion notification, cleaning up ctx\n"); mstro_msg_envelope_free(ctx->msg); status = mstro_ofi__msg_context_destroy(ctx); } } - + } else if(entry.flags&FI_SEND) { assert(ctx->msg!=NULL); struct mstro_msg_envelope *msg = ctx->msg; - + /* completion of our sent messages */ DEBUG("Send of message %p type %s completed\n", msg, msg->descriptor->name); /* otherwise clean it up */ if(ctx->has_completion) { DEBUG("ctx asks for completion notification\n"); - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("ctx wants no completion notification, cleaning up ctx\n"); mstro_msg_envelope_free(msg); status = mstro_ofi__msg_context_destroy(ctx); } - + } else if(entry.flags&FI_RMA) { assert(ctx->msg==NULL); DEBUG("RDMA op completed, ctx %p\n", ctx); assert(ctx->has_completion); /* it does not make sense to * not have this set */ - status = mstro_ofi__maybe_notify_completion(ctx); + status = mstro_ofi__maybe_notify_completion(ctx); } else { DEBUG("Unexpected completion, flags %ull\n", entry.flags); } - - return status; + + return status; } static inline @@ -2651,15 +2697,15 @@ mstro_ofi__loop(_Atomic bool *terminate, } while(false==atomic_load(terminate)); DEBUG("PM comm thread quits: loop status %d, global status %d\n", s, status); - + BAILOUT: if(slots) { - for(i=0; i<g_endpoints->size; i++) + for(i=0; i<g_endpoints->size; i++) mstro_msg_envelope_free(slots[i]); free(slots); } if(ctxts) { - for(i=0; i<g_endpoints->size; i++) + for(i=0; i<g_endpoints->size; i++) mstro_ofi__msg_context_destroy(ctxts[i]); free(ctxts); } @@ -2672,7 +2718,7 @@ mstro_ofi_pm_loop(_Atomic bool *terminate) g_pool_app_id = MSTRO_APP_ID_MANAGER; return mstro_ofi__loop(terminate, mstro_pm_handle_msg); -} +} mstro_status mstro_ofi_pc_loop(_Atomic bool *terminate) @@ -2756,7 +2802,7 @@ mstro_pc_thread(void *closure) s, mstro_status_description(s)); } DEBUG("pool client thread stopping\n"); - + return NULL; } */ @@ -2800,9 +2846,9 @@ mstro_pc_terminate(void) leave.base.descriptor->name); goto BAILOUT; } - + status = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); - + /* The BYE message will be handled by the normal incoming message handler * We can tell it's done by checking g_mstro_pm_attached */ switch(status) { @@ -2864,7 +2910,7 @@ mstro_pm_attach(const char *remote_pm_info) mstro_endpoint_descriptor pm_epd=NULL; mstro_status s=mstro_ep_desc_deserialize(&pm_epd, remote_pm_info); - + if(s!=MSTRO_OK) { ERR("Failed to parse pool manager info: %d (%s%s)\n", s, mstro_status_description(s), @@ -2881,7 +2927,7 @@ mstro_pm_attach(const char *remote_pm_info) { mstro_endpoint_descriptor tmp; size_t num_cookies = 0; - + LL_FOREACH(pm_epd, tmp) { INFO("Pool manager endpoint: %s\n", mstro_ep_desc_describe(tmp)); if(tmp->type==MSTRO_EP_OFI_GNI) { @@ -2900,13 +2946,13 @@ mstro_pm_attach(const char *remote_pm_info) } } } - + s=mstro_ofi_init(); if(s!=MSTRO_OK) { ERR("Failed to initialize OFI layer\n"); goto BAILOUT; } - + /* start handler thread: run pool client loop. It will stop when mstro_pc_terminate is called */ /* we need to start it before endpoint selection since we do RDMA * read for the config block, and that needs to report completion @@ -2919,30 +2965,30 @@ mstro_pm_attach(const char *remote_pm_info) goto BAILOUT; } g_component_descriptor.type = MSTRO_COMPONENT_TYPE_APP; - - + + s=mstro_ofi__select_endpoint(pm_epd, &g_pm_endpoint, &g_pm_addr); if(s!=MSTRO_OK) { ERR("Failed to select a suitable endpoint to talk to pool manager\n"); goto BAILOUT; } assert(g_pm_endpoint!=NULL); - + /* register pool manager as a pmp app entry */ s = mstro_pm_app_register_manager(g_pm_endpoint, g_pm_addr); if(s!=MSTRO_OK) { ERR("Failed to register Pool Manager in registry\n"); goto BAILOUT; } - - + + /* send JOIN */ - - + + /* FIXME make function set_transport_default() beware protobuf stack init */ /* if we have OFI we'll add another entry */ - + Mstro__Pool__TransportKind transport_kinds[] = { MSTRO__POOL__TRANSPORT_KIND__GFS #ifdef HAVE_MIO @@ -2950,18 +2996,18 @@ mstro_pm_attach(const char *remote_pm_info) #endif }; size_t num_available_transports = sizeof(transport_kinds)/sizeof(transport_kinds[0]); - + Mstro__Pool__TransportMethods transport_methods = MSTRO__POOL__TRANSPORT_METHODS__INIT; transport_methods.n_supported = num_available_transports; transport_methods.supported = (Mstro__Pool__TransportKind*)&transport_kinds; - + const char* env_transport_default = getenv(MSTRO_ENV_TRANSPORT_DEFAULT); if (env_transport_default != NULL) { int i; int found = 0; - for ( i = 0; i < MSTRO__POOL__TRANSPORT_KIND__NUMBER_OF_KINDS; i++ ) { + for ( i = 0; i < MSTRO__POOL__TRANSPORT_KIND__NUMBER_OF_KINDS; i++ ) { if (!strcmp(env_transport_default, g_transport_registry[i].name)) { transport_methods.supported[0] = (Mstro__Pool__TransportKind)i; found = 1; @@ -2978,13 +3024,13 @@ mstro_pm_attach(const char *remote_pm_info) for (i = 0; i < transport_methods.n_supported; i++ ) INFO("#%d %s \n", i, g_transport_registry[transport_methods.supported[i]].name); /**/ - + Mstro__Pool__Join join = MSTRO__POOL__JOIN__INIT; join.protocol_version = MSTRO_POOL_PROTOCOL_VERSION; join.serialized_endpoint = g_pm_endpoint->addr_serialized; join.transport_methods = &transport_methods; join.component_name = g_initdata->component_name; - + Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT; s = mstro_pmp_package(&msg, (ProtobufCMessage*)&join); if(s!=MSTRO_OK) { @@ -2992,13 +3038,13 @@ mstro_pm_attach(const char *remote_pm_info) join.base.descriptor->name); goto BAILOUT; } - + s = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); /* The WELCOME message will be handled by the normal incoming * message handler. We can tell it's done by checking * g_pool_app_id */ - + switch(s) { case MSTRO_OK: { /* spin-wait */ @@ -3010,7 +3056,7 @@ mstro_pm_attach(const char *remote_pm_info) } break; } - case MSTRO_NO_PM: + case MSTRO_NO_PM: ERR("Confusion: JOIN message failed with 'no PM info' error\n"); /* fall-thru */ default: @@ -3018,15 +3064,13 @@ mstro_pm_attach(const char *remote_pm_info) s, mstro_status_description(s)); return s; } - - + + /* Welcome received */ - INFO("Welcome message received, our app id is %"PRIu64"\n", + INFO("Welcome message received, our app id is %"PRIu64"\n", g_pool_app_id); - + BAILOUT: return s; } - -