From d846f90752dcb9f61f95e90ea816b3f1c73ca609 Mon Sep 17 00:00:00 2001
From: Utz-Uwe Haus <uhaus@cray.com>
Date: Tue, 27 Jul 2021 13:04:41 +0200
Subject: [PATCH] Implement component-index handling

---
 include/maestro/i_pool_manager_registry.h |  5 +++--
 include/maestro/i_state.h                 |  6 +++---
 maestro/ofi.c                             |  3 ++-
 maestro/pool_manager.c                    |  4 ++--
 maestro/pool_manager_registry.c           | 14 +++++---------
 protocols/mstro_pool.pb-c.c               | 19 ++++++++++++++++---
 protocols/mstro_pool.pb-c.h               |  6 +++++-
 protocols/mstro_pool.proto                |  2 ++
 8 files changed, 38 insertions(+), 21 deletions(-)

diff --git a/include/maestro/i_pool_manager_registry.h b/include/maestro/i_pool_manager_registry.h
index db627980..f6f4b29c 100644
--- a/include/maestro/i_pool_manager_registry.h
+++ b/include/maestro/i_pool_manager_registry.h
@@ -80,10 +80,11 @@ struct mstro_pm_app_registry_entry {
   /* FIMXE: this should be OFI-independent */
   fi_addr_t addr;            /**< address to use on endpoint to talk to it */
   char* serialized_desc;	/**< needed to ship with InitiateTransfer,
-  									so apps can talk to each other without PM */
+                                   so apps can talk to each other without PM */
   /** transport methods advertised at JOIN time */
   Mstro__Pool__TransportMethods *transport_methods;
   char * component_name; /**< the component name provided at JOIN time */
+  uint64_t component_index; /**< the component index provided at JOIN time */
 };
 
 
@@ -105,7 +106,7 @@ mstro_pm_app_register(struct mstro_endpoint *ep,
                       char* serialized_desc,
                       const Mstro__Pool__TransportMethods *transport_methods,
                       mstro_app_id *id_p,
-                      const char *component_name,
+                      const char *component_name, uint64_t component_index,
                       struct mstro_pm_app_registry_entry **entry_p);
 
 
diff --git a/include/maestro/i_state.h b/include/maestro/i_state.h
index 8a146170..e30d3678 100644
--- a/include/maestro/i_state.h
+++ b/include/maestro/i_state.h
@@ -46,9 +46,9 @@
 
 /** State of each participant in a Maestro run: */
 struct mstro_core_initdata {
-  char *workflow_name;     /**< The workflow name (from init) */
-  char *component_name;    /**< The component name (from init) */
-  int64_t component_index; /**< The component index (from init) */
+  char *workflow_name;      /**< The workflow name (from init) */
+  char *component_name;     /**< The component name (from init) */
+  uint64_t component_index; /**< The component index (from init) */
   /* more to come ...*/
 };
 
diff --git a/maestro/ofi.c b/maestro/ofi.c
index 03c9e124..bd7af23d 100644
--- a/maestro/ofi.c
+++ b/maestro/ofi.c
@@ -2473,7 +2473,7 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg,
   s = mstro_pm_app_register(ep, translated_addr,
                             strdup(join_msg->serialized_endpoint),
                             join_msg->transport_methods,
-                            NULL, join_msg->component_name,
+                            NULL, join_msg->component_name, join_msg->component_index,
                             entry_p);
   if(s!=MSTRO_OK) {
     ERR("Failed to register application: %s\n", s);
@@ -3126,6 +3126,7 @@ mstro_pm_attach(const char *remote_pm_info)
   join.serialized_endpoint = g_pm_endpoint->addr_serialized;
   join.transport_methods = &transport_methods;
   join.component_name = g_initdata->component_name;
+  join.component_index = g_initdata->component_index;
 
   Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT;
   s = mstro_pmp_package(&msg, (ProtobufCMessage*)&join);
diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c
index 28b82cac..e71f1885 100644
--- a/maestro/pool_manager.c
+++ b/maestro/pool_manager.c
@@ -2223,8 +2223,8 @@ mstro_pm__handle_join(Mstro__Pool__MstroMsg *msg,
   Mstro__Pool__Join *join = msg->join;
   mstro_status status = MSTRO_OK;
   
-  INFO("JOIN message received. Caller %s (PM proto version %d) advertises endpoint %s\n",
-       join->component_name, join->protocol_version, join->serialized_endpoint);
+  INFO("JOIN message received. Caller %s[%d] (PM proto version %d) advertises endpoint %s\n",
+       join->component_name, join->component_index, join->protocol_version, join->serialized_endpoint);
   
   if(join->transport_methods==NULL
      || (join->transport_methods->n_supported==0)) {
diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c
index ec7aaae0..f5cb5ad6 100644
--- a/maestro/pool_manager_registry.c
+++ b/maestro/pool_manager_registry.c
@@ -124,18 +124,12 @@ mstro_pm_app_reg__new_entry(struct mstro_pm_app_registry_entry **entry_p, mstro_
   e->serialized_desc = NULL;
   e->transport_methods = NULL;
   e->component_name = NULL;
+  e->component_index = UINT64_MAX;
 
   *entry_p = e;
   return MSTRO_OK;
 }
 
-
-static const Mstro__Pool__TransportKind default_transport_kinds[2] =   { MSTRO__POOL__TRANSPORT_KIND__GFS, MSTRO__POOL__TRANSPORT_KIND__MIO};
-
-static const Mstro__Pool__TransportMethods default_transport_methods = {
-  PROTOBUF_C_MESSAGE_INIT (&mstro__pool__transport_methods__descriptor), \
-  2, (Mstro__Pool__TransportKind*)&default_transport_kinds};
-
 mstro_status
 mstro_pm_app_register(struct mstro_endpoint *ep,
                       fi_addr_t addr,
@@ -143,6 +137,7 @@ mstro_pm_app_register(struct mstro_endpoint *ep,
                       const Mstro__Pool__TransportMethods *transport_methods,
                       mstro_app_id *id_p,
                       const char *component_name,
+                      uint64_t component_index,
                       struct mstro_pm_app_registry_entry **entry_p)
 {
   if(ep==NULL)
@@ -160,9 +155,10 @@ mstro_pm_app_register(struct mstro_endpoint *ep,
 	e->serialized_desc = serialized_desc;
     e->transport_methods = (Mstro__Pool__TransportMethods *)transport_methods;
     e->component_name = strdup(component_name);
+    e->component_index = component_index;
 
-    INFO("Registered app with transport methods %p\n",
-         e->transport_methods);
+    INFO("Registered app %" PRIappid " for %s[%" PRIu64 "] with transport methods %p\n",
+         e->appid, e->component_name, e->component_index, e->transport_methods);
 
     WITH_LOCKED_APP_REGISTRY({
         HASH_ADD(hh, g_mstro_pm_app_registry,
diff --git a/protocols/mstro_pool.pb-c.c b/protocols/mstro_pool.pb-c.c
index b538d32d..484875e6 100644
--- a/protocols/mstro_pool.pb-c.c
+++ b/protocols/mstro_pool.pb-c.c
@@ -2621,7 +2621,7 @@ const ProtobufCMessageDescriptor mstro__pool__vsmannouncement__descriptor =
   (ProtobufCMessageInit) mstro__pool__vsmannouncement__init,
   NULL,NULL,NULL    /* reserved[123] */
 };
-static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[4] =
+static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[5] =
 {
   {
     "protocol_version",
@@ -2671,8 +2671,21 @@ static const ProtobufCFieldDescriptor mstro__pool__join__field_descriptors[4] =
     0,             /* flags */
     0,NULL,NULL    /* reserved1,reserved2, etc */
   },
+  {
+    "component_index",
+    5,
+    PROTOBUF_C_LABEL_NONE,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(Mstro__Pool__Join, component_index),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
 };
 static const unsigned mstro__pool__join__field_indices_by_name[] = {
+  4,   /* field[4] = component_index */
   3,   /* field[3] = component_name */
   0,   /* field[0] = protocol_version */
   1,   /* field[1] = serialized_endpoint */
@@ -2681,7 +2694,7 @@ static const unsigned mstro__pool__join__field_indices_by_name[] = {
 static const ProtobufCIntRange mstro__pool__join__number_ranges[1 + 1] =
 {
   { 1, 0 },
-  { 0, 4 }
+  { 0, 5 }
 };
 const ProtobufCMessageDescriptor mstro__pool__join__descriptor =
 {
@@ -2691,7 +2704,7 @@ const ProtobufCMessageDescriptor mstro__pool__join__descriptor =
   "Mstro__Pool__Join",
   "mstro.pool",
   sizeof(Mstro__Pool__Join),
-  4,
+  5,
   mstro__pool__join__field_descriptors,
   mstro__pool__join__field_indices_by_name,
   1,  mstro__pool__join__number_ranges,
diff --git a/protocols/mstro_pool.pb-c.h b/protocols/mstro_pool.pb-c.h
index 78387b20..54be84ec 100644
--- a/protocols/mstro_pool.pb-c.h
+++ b/protocols/mstro_pool.pb-c.h
@@ -332,10 +332,14 @@ struct  _Mstro__Pool__Join
    ** the name of the component 
    */
   char *component_name;
+  /*
+   ** the index among all JOINs using the same component_name (for multiple 'ranks' inside one component) 
+   */
+  uint64_t component_index;
 };
 #define MSTRO__POOL__JOIN__INIT \
  { PROTOBUF_C_MESSAGE_INIT (&mstro__pool__join__descriptor) \
-    , 0, (char *)protobuf_c_empty_string, NULL, (char *)protobuf_c_empty_string }
+    , 0, (char *)protobuf_c_empty_string, NULL, (char *)protobuf_c_empty_string, 0 }
 
 
 /*
diff --git a/protocols/mstro_pool.proto b/protocols/mstro_pool.proto
index 38a21e59..743ce97f 100644
--- a/protocols/mstro_pool.proto
+++ b/protocols/mstro_pool.proto
@@ -102,6 +102,8 @@ message Join {
   TransportMethods transport_methods = 3;
   /** the name of the component */
   string component_name = 4;
+  /** the index among all JOINs using the same component_name (for multiple 'ranks' inside one component) */
+  fixed64 component_index = 5;      
 };
 
 /** Welcome message: assigns the app token **/
-- 
GitLab