From b9e1ae3a132e1e8d7d8ca3a1200ae53187897e20 Mon Sep 17 00:00:00 2001
From: Ali Mohammed <ali.mohammed@hpe.com>
Date: Thu, 25 Nov 2021 10:52:35 +0100
Subject: [PATCH] fix Unexpected PM cdo registry state for CDO when handling
 multiple completions for distributed cdos

---
 attributes/maestro-schema.c     |   2 +-
 maestro/pool_manager_registry.c | 161 ++++++++++++++++++--------------
 2 files changed, 91 insertions(+), 72 deletions(-)

diff --git a/attributes/maestro-schema.c b/attributes/maestro-schema.c
index fc745012..3cf0543d 100644
--- a/attributes/maestro-schema.c
+++ b/attributes/maestro-schema.c
@@ -2750,7 +2750,7 @@ mstro_attribute_pool_find_dist_layout(Mstro__Pool__Attributes *attributes, mmbLa
   for(size_t i=0; i<n; i++) {
      key = attributes->kv_map->map[i]->key;
      if(strcmp(key,".maestro.core.cdo.dist-layout")==0) {
-       DEBUG("DIST Layout found ... \n\n\n");
+       DEBUG("DIST Layout found ... \n");
        status = mstro_attribute_pool_aval_to_mmbLayout(attributes->kv_map->map[i]->val->mmblayout, dist_layout);
       break;
      }
diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c
index 4839feab..d921eb60 100644
--- a/maestro/pool_manager_registry.c
+++ b/maestro/pool_manager_registry.c
@@ -412,6 +412,7 @@ struct cdo_handle_entry {
   uint64_t local_id; /**< the local-id part of the CDO id */
   mstro_cdo_state cdo_state;            /**< state of this handle */
   Mstro__Pool__Attributes *attributes;  /**< attributes reported from this app */
+  int64_t n_segments; /**< number of transmissions required to fill this CDO -- mainly for dist CDOs*/
   mmbLayout *dist_layout; /**< distributed layout handle*/
 };
 
@@ -1017,6 +1018,51 @@ mstro_pm__find_dist_layout(
   return MSTRO_FAIL;
 }
 
+/** look up per-app cdo entry. Must be called unter CDO_REGISTRY_LOCK. */
+static inline
+mstro_status
+mstro_pm_cdo_app_lookup(const struct mstro_cdo_id* cdo_id,
+                        mstro_app_id app_id,
+                        struct per_app_cdo_entries **app_entry, struct cdo_handle_entry **handle_entry)
+{
+  struct mstro_pm_cdo_registry_entry *regentry=NULL;
+
+  struct mstro_cdo_id head = *cdo_id;
+  head.local_id = MSTRO_CDO_LOCAL_ID_NONE;
+
+  HASH_FIND(hh, g_mstro_pm_cdo_registry,
+            &head, sizeof(struct mstro_cdo_id), regentry);
+  if(!regentry) {
+    WITH_CDO_ID_STR(idstr, &head,
+                    DEBUG("No regentry for cdo %s\n", idstr););
+    *app_entry = NULL;
+    *handle_entry = NULL;
+    return MSTRO_FAIL;
+  } else {
+    struct per_app_cdo_entry *per_app_entry;
+    HASH_FIND(hh, regentry->app_to_attributes,
+              &app_id, sizeof(app_id), *app_entry);
+    if(*app_entry==NULL) {
+      WITH_CDO_ID_STR(idstr, &head,
+                      DEBUG("No regentry for app %" PRIappid " for CDO %s\n", app_id, idstr););
+
+      *handle_entry = NULL;
+      return MSTRO_FAIL;
+    } else {
+      HASH_FIND(hh, (*app_entry)->handles,
+                &cdo_id->local_id, sizeof(cdo_id->local_id), *handle_entry);
+      if(*handle_entry)
+        return MSTRO_OK;
+      else {
+        WITH_CDO_ID_STR(idstr, &head,
+                        DEBUG("No handle entry for app %" PRIappid " for CDO %s, local-id %zu\n",
+                              app_id, idstr, cdo_id->local_id););
+        return MSTRO_FAIL;
+      }
+    }
+  }
+}
+
 
 static inline
 mstro_status
@@ -1129,6 +1175,16 @@ mstro_pm__send_transfer_init_to_candidates(
       return status;
     }
   }
+  /* set the number of required transmissions on cdo handle*/
+  DEBUG("Set the number of required transmissions on cdo handle\n");
+  struct per_app_cdo_entries *app_entry;
+  struct cdo_handle_entry *handle_entry;
+  status = mstro_pm_cdo_app_lookup(&e->cdo_id, e->requestor, &app_entry, &handle_entry);
+  if(status!=MSTRO_OK) {
+    ERR("Failed to find CDO entry \n");
+      return status; 
+  }
+  handle_entry->n_segments = candidates->n_sources;
 
   DEBUG("Disposing demand queue entry that has been handled\n");
   status = mstro_pm_demand_queue_entry__destroy(e);
@@ -1984,50 +2040,6 @@ mstro_pm_app_deregister(mstro_app_id key)
   return MSTRO_OK;
 }
 
-/** look up per-app cdo entry. Must be called unter CDO_REGISTRY_LOCK. */
-static inline
-mstro_status
-mstro_pm_cdo_app_lookup(const struct mstro_cdo_id* cdo_id,
-                        mstro_app_id app_id,
-                        struct per_app_cdo_entries **app_entry, struct cdo_handle_entry **handle_entry)
-{
-  struct mstro_pm_cdo_registry_entry *regentry=NULL;
-
-  struct mstro_cdo_id head = *cdo_id;
-  head.local_id = MSTRO_CDO_LOCAL_ID_NONE;
-
-  HASH_FIND(hh, g_mstro_pm_cdo_registry,
-            &head, sizeof(struct mstro_cdo_id), regentry);
-  if(!regentry) {
-    WITH_CDO_ID_STR(idstr, &head,
-                    DEBUG("No regentry for cdo %s\n", idstr););
-    *app_entry = NULL;
-    *handle_entry = NULL;
-    return MSTRO_FAIL;
-  } else {
-    struct per_app_cdo_entry *per_app_entry;
-    HASH_FIND(hh, regentry->app_to_attributes,
-              &app_id, sizeof(app_id), *app_entry);
-    if(*app_entry==NULL) {
-      WITH_CDO_ID_STR(idstr, &head,
-                      DEBUG("No regentry for app %" PRIappid " for CDO %s\n", app_id, idstr););
-
-      *handle_entry = NULL;
-      return MSTRO_FAIL;
-    } else {
-      HASH_FIND(hh, (*app_entry)->handles,
-                &cdo_id->local_id, sizeof(cdo_id->local_id), *handle_entry);
-      if(*handle_entry)
-        return MSTRO_OK;
-      else {
-        WITH_CDO_ID_STR(idstr, &head,
-                        DEBUG("No handle entry for app %" PRIappid " for CDO %s, local-id %zu\n",
-                              app_id, idstr, cdo_id->local_id););
-        return MSTRO_FAIL;
-      }
-    }
-  }
-}
 
 mstro_status
 mstro_pm_cdo_registry_cdo_name_lookup(const struct mstro_cdo_id *id,
@@ -2190,52 +2202,59 @@ mstro_pm_cdo_registry_transfer_completed(const struct mstro_cdo_id *cdo_id,
         ERR("Failed to find CDO entry for app %" PRIappid "\n", app_id);
         status = MSTRO_INVARG;
       } else {
-        if(! (handle_entry->cdo_state & MSTRO_CDO_STATE_IN_TRANSPORT)) {
-          if(handle_entry->cdo_state==MSTRO_CDO_STATE_RETRACTED) {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+        /* decrement the number of outstanding transmissions*/
+        handle_entry->n_segments--;
+        WITH_CDO_ID_STR(idstr, cdo_id,
+                            DEBUG("There are %zu outstanding pieces for (probably distributed) CDO  %s\n",
+                                  handle_entry->n_segments, idstr););
+        if (handle_entry->n_segments == 0) {
+          if(! (handle_entry->cdo_state & MSTRO_CDO_STATE_IN_TRANSPORT)) {
+            if(handle_entry->cdo_state==MSTRO_CDO_STATE_RETRACTED) {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             ERR("RETRACTED and not IN_TRANSPORT state for CDO %s (app %" PRIappid
                                 ": %d (%s), assuming retract overlapped with eager transfer.\n",
                                 idstr, app_id,
                                 handle_entry->cdo_state,
                                 mstro_cdo_state_describe(handle_entry->cdo_state)););
-          } else {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+            } else {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             ERR("Unexpected PM cdo registry state for CDO %s (app %" PRIappid ": %d (%s)\n",
                                 idstr, app_id,
                                 handle_entry->cdo_state,
                                 mstro_cdo_state_describe(handle_entry->cdo_state)););
-            status = MSTRO_INVARG;
-          }
-        } else {
+              status = MSTRO_INVARG;
+            }
+          } else {
           /* ok, it was in-transport */
-          if(handle_entry->cdo_state & MSTRO_CDO_STATE_DEMANDED) {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+            if(handle_entry->cdo_state & MSTRO_CDO_STATE_DEMANDED) {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             DEBUG("Clearing IN_TRANSPORT state from DEMANDED CDO %s\n",
                                   idstr););
-            handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
-          } else if(handle_entry->cdo_state & MSTRO_CDO_STATE_REQUIRED) {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+              handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
+            } else if(handle_entry->cdo_state & MSTRO_CDO_STATE_REQUIRED) {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             DEBUG("Clearing IN_TRANSPORT state from REQUIRED CDO %s\n",
                                   idstr););
-            handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
-            handle_entry->cdo_state |= MSTRO_CDO_STATE_SATISFIED;
-          } else if(handle_entry->cdo_state & MSTRO_CDO_STATE_INJECTED) {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+              handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
+              handle_entry->cdo_state |= MSTRO_CDO_STATE_SATISFIED;
+            } else if(handle_entry->cdo_state & MSTRO_CDO_STATE_INJECTED) {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             DEBUG("Clearing IN_TRANSPORT state from INJECTED CDO %s\n",
                                   idstr););
-            handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
-            handle_entry->cdo_state |= MSTRO_CDO_STATE_SATISFIED;
-          } else {
-            WITH_CDO_ID_STR(idstr, cdo_id,
+              handle_entry->cdo_state &= ~MSTRO_CDO_STATE_IN_TRANSPORT;
+              handle_entry->cdo_state |= MSTRO_CDO_STATE_SATISFIED;
+            } else {
+              WITH_CDO_ID_STR(idstr, cdo_id,
                             ERR("Unhandled CDO state %s for CDO %s\n",
                                 mstro_cdo_state_describe(handle_entry->cdo_state), idstr););
-            status = MSTRO_UNIMPL;
+              status = MSTRO_UNIMPL;
+            }
+            /* WITH_CDO_ID_STR(idstr, cdo_id, { */
+            /*     DEBUG("New state for CDO %s (app %" PRIappid ": %d (%s)\n", */
+            /*           idstr, app_id, */
+            /*           handle_entry->cdo_state, */
+            /*           mstro_cdo_state_describe(handle_entry->cdo_state));}); */
           }
-          /* WITH_CDO_ID_STR(idstr, cdo_id, { */
-          /*     DEBUG("New state for CDO %s (app %" PRIappid ": %d (%s)\n", */
-          /*           idstr, app_id, */
-          /*           handle_entry->cdo_state, */
-          /*           mstro_cdo_state_describe(handle_entry->cdo_state));}); */
         }
       }
     });
-- 
GitLab