diff --git a/include/maestro/i_pool_manager_registry.h b/include/maestro/i_pool_manager_registry.h index db62798084c77caa946f88f1b178ffd04433d403..f6f4b29c5c8eda5a14a157ae6ce43611b146699e 100644 --- a/include/maestro/i_pool_manager_registry.h +++ b/include/maestro/i_pool_manager_registry.h @@ -80,10 +80,11 @@ struct mstro_pm_app_registry_entry { /* FIMXE: this should be OFI-independent */ fi_addr_t addr; /**< address to use on endpoint to talk to it */ char* serialized_desc; /**< needed to ship with InitiateTransfer, - so apps can talk to each other without PM */ + so apps can talk to each other without PM */ /** transport methods advertised at JOIN time */ Mstro__Pool__TransportMethods *transport_methods; char * component_name; /**< the component name provided at JOIN time */ + uint64_t component_index; /**< the component index provided at JOIN time */ }; @@ -105,7 +106,7 @@ mstro_pm_app_register(struct mstro_endpoint *ep, char* serialized_desc, const Mstro__Pool__TransportMethods *transport_methods, mstro_app_id *id_p, - const char *component_name, + const char *component_name, uint64_t component_index, struct mstro_pm_app_registry_entry **entry_p); diff --git a/include/maestro/i_state.h b/include/maestro/i_state.h index 8a146170c050939204327bf790cc03989f32a18e..e30d36782f0cb56c227c6c39823957cca400b4b9 100644 --- a/include/maestro/i_state.h +++ b/include/maestro/i_state.h @@ -46,9 +46,9 @@ /** State of each participant in a Maestro run: */ struct mstro_core_initdata { - char *workflow_name; /**< The workflow name (from init) */ - char *component_name; /**< The component name (from init) */ - int64_t component_index; /**< The component index (from init) */ + char *workflow_name; /**< The workflow name (from init) */ + char *component_name; /**< The component name (from init) */ + uint64_t component_index; /**< The component index (from init) */ /* more to come ...*/ }; diff --git a/maestro/ofi.c b/maestro/ofi.c index 03c9e1245aaef669c6bb0bbb9694c7b29c5c712e..bd7af23d7c095c0283070ffb8a2a2df0d62a720e 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -2473,7 +2473,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, s = mstro_pm_app_register(ep, translated_addr, strdup(join_msg->serialized_endpoint), join_msg->transport_methods, - NULL, join_msg->component_name, + NULL, join_msg->component_name, join_msg->component_index, entry_p); if(s!=MSTRO_OK) { ERR("Failed to register application: %s\n", s); @@ -3126,6 +3126,7 @@ mstro_pm_attach(const char *remote_pm_info) join.serialized_endpoint = g_pm_endpoint->addr_serialized; join.transport_methods = &transport_methods; join.component_name = g_initdata->component_name; + join.component_index = g_initdata->component_index; Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT; s = mstro_pmp_package(&msg, (ProtobufCMessage*)&join); diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 28b82caca630c11764b6bcc932b946b2005f25a3..e71f1885c5895dd657b9f50b7539ddd938616eb1 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -2223,8 +2223,8 @@ mstro_pm__handle_join(Mstro__Pool__MstroMsg *msg, Mstro__Pool__Join *join = msg->join; mstro_status status = MSTRO_OK; - INFO("JOIN message received. Caller %s (PM proto version %d) advertises endpoint %s\n", - join->component_name, join->protocol_version, join->serialized_endpoint); + INFO("JOIN message received. Caller %s[%d] (PM proto version %d) advertises endpoint %s\n", + join->component_name, join->component_index, join->protocol_version, join->serialized_endpoint); if(join->transport_methods==NULL || (join->transport_methods->n_supported==0)) { diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index ec7aaae03bd20d2161f5dfc755ad81194049de9c..f5cb5ad63439c1660cb1d10f20ec0a9f3461c2e1 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -124,18 +124,12 @@ mstro_pm_app_reg__new_entry(struct mstro_pm_app_registry_entry **entry_p, mstro_ e->serialized_desc = NULL; e->transport_methods = NULL; e->component_name = NULL; + e->component_index = UINT64_MAX; *entry_p = e; return MSTRO_OK; } - -static const Mstro__Pool__TransportKind default_transport_kinds[2] = { MSTRO__POOL__TRANSPORT_KIND__GFS, MSTRO__POOL__TRANSPORT_KIND__MIO}; - -static const Mstro__Pool__TransportMethods default_transport_methods = { - PROTOBUF_C_MESSAGE_INIT (&mstro__pool__transport_methods__descriptor), \ - 2, (Mstro__Pool__TransportKind*)&default_transport_kinds}; - mstro_status mstro_pm_app_register(struct mstro_endpoint *ep, fi_addr_t addr, @@ -143,6 +137,7 @@ mstro_pm_app_register(struct mstro_endpoint *ep, const Mstro__Pool__TransportMethods *transport_methods, mstro_app_id *id_p, const char *component_name, + uint64_t component_index, struct mstro_pm_app_registry_entry **entry_p) { if(ep==NULL) @@ -160,9 +155,10 @@ mstro_pm_app_register(struct mstro_endpoint *ep, e->serialized_desc = serialized_desc; e->transport_methods = (Mstro__Pool__TransportMethods *)transport_methods; e->component_name = strdup(component_name); + e->component_index = component_index; - INFO("Registered app with transport methods %p\n", - e->transport_methods); + INFO("Registered app %" PRIappid " for %s[%" PRIu64 "] with transport methods %p\n", + e->appid, e->component_name, e->component_index, e->transport_methods); WITH_LOCKED_APP_REGISTRY({ HASH_ADD(hh, g_mstro_pm_app_registry, diff --git a/protocols/mstro_pool.pb-c.c b/protocols/mstro_pool.pb-c.c index b538d32d27ffcd8b7d7ea0eb2f62acb4fb43b386..484875e6b1c7c4eb91718286d9c510b4de6b2d22 100644 --- a/protocols/mstro_pool.pb-c.c +++ b/protocols/mstro_pool.pb-c.c @@ -2621,7 +2621,7 @@ const ProtobufCMessageDescriptor mstro__pool__vsmannouncement__descriptor = (ProtobufCMessageInit) mstro__pool__vsmannouncement__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[4] = +static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[5] = { { "protocol_version", @@ -2671,8 +2671,21 @@ static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[4] = 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "component_index", + 5, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_FIXED64, + 0, /* quantifier_offset */ + offsetof(Mstro__Pool__Join, component_index), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned mstro__pool__join__field_indices_by_name[] = { + 4, /* field[4] = component_index */ 3, /* field[3] = component_name */ 0, /* field[0] = protocol_version */ 1, /* field[1] = serialized_endpoint */ @@ -2681,7 +2694,7 @@ static const unsigned mstro__pool__join__field_indices_by_name[] = { static const ProtobufCIntRange mstro__pool__join__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 4 } + { 0, 5 } }; const ProtobufCMessageDescriptor mstro__pool__join__descriptor = { @@ -2691,7 +2704,7 @@ const ProtobufCMessageDescriptor mstro__pool__join__descriptor = "Mstro__Pool__Join", "mstro.pool", sizeof(Mstro__Pool__Join), - 4, + 5, mstro__pool__join__field_descriptors, mstro__pool__join__field_indices_by_name, 1, mstro__pool__join__number_ranges, diff --git a/protocols/mstro_pool.pb-c.h b/protocols/mstro_pool.pb-c.h index 78387b208e87d0b2a997b1c3e58ea88957adcdde..54be84ece428ce22e9b8eae940d67e640cf399b3 100644 --- a/protocols/mstro_pool.pb-c.h +++ b/protocols/mstro_pool.pb-c.h @@ -332,10 +332,14 @@ struct _Mstro__Pool__Join ** the name of the component */ char *component_name; + /* + ** the index among all JOINs using the same component_name (for multiple 'ranks' inside one component) + */ + uint64_t component_index; }; #define MSTRO__POOL__JOIN__INIT \ { PROTOBUF_C_MESSAGE_INIT (&mstro__pool__join__descriptor) \ - , 0, (char *)protobuf_c_empty_string, NULL, (char *)protobuf_c_empty_string } + , 0, (char *)protobuf_c_empty_string, NULL, (char *)protobuf_c_empty_string, 0 } /* diff --git a/protocols/mstro_pool.proto b/protocols/mstro_pool.proto index 38a21e59484e307f1bd301b1dc214ac7ee1e31ae..743ce97f1eaa2ef00237d0b101cdd71c454ef4bd 100644 --- a/protocols/mstro_pool.proto +++ b/protocols/mstro_pool.proto @@ -102,6 +102,8 @@ message Join { TransportMethods transport_methods = 3; /** the name of the component */ string component_name = 4; + /** the index among all JOINs using the same component_name (for multiple 'ranks' inside one component) */ + fixed64 component_index = 5; }; /** Welcome message: assigns the app token **/