From 82834e809eeb1d610a32391f9dbf7f1c24a896b5 Mon Sep 17 00:00:00 2001
From: Utz-Uwe Haus <uhaus@hpe.com>
Date: Tue, 23 Aug 2022 12:14:19 +0300
Subject: [PATCH] Revert RDMA read fragmentation commits for now

This commit undoes the implementation of RDMA read fragmentation as it's broken
in the context of threaded CQ handling at this time. Issue #204 will look at this.

Revert "Implement RDMA read fragmentation"
This reverts commit 86540e325bf63836d1a2e03e2ff8f1218cff9e8a.

Revert "Add support for fragments in rdma completion context"
This reverts commit 151b7b3018808cea54a1cde15f819d6d02e50c8d.

Revert "Check maximum supported transfer size of endpoints, raise error"
This reverts commit 7b5f3edf6b4f6c6d6175ac3dabb609be5cbf7dbc.
---
 maestro/ofi.c                   |  13 +--
 maestro/pool_manager_registry.c |   2 +-
 transport/rdma.c                | 167 +++++++++-----------------------
 transport/transport_rdma.h      |   1 -
 4 files changed, 47 insertions(+), 136 deletions(-)

diff --git a/maestro/ofi.c b/maestro/ofi.c
index c80408aa..f3da7ddd 100644
--- a/maestro/ofi.c
+++ b/maestro/ofi.c
@@ -1348,12 +1348,6 @@ mstro_ofi__maybe_notify_completion(mstro_ofi_msg_context ctx)
         ERR("triggering event failed: %d (%s)\n", status,
             mstro_status_description(status));
       }
-      /* closure is re-used for multiple events if we fragment, so
-       * clean up here (not in event dtor), but only if
-       * appropriate: */
-      if(result->num_fragments==0)
-	      mstro_transport_rdma_dtor(result);
-
     }
     status |= mstro_event_destroy(ctx->ev);
     if (status != MSTRO_OK) {
@@ -2375,11 +2369,6 @@ mstro_ofi__submit_component_descriptor_read(struct mstro_endpoint *my_ep,
   DEBUG("Checking for PM config block MR at (remote addr) 0x%" PRIx64 ", key of len %zu value %" PRIx64 "\n",
         mr_addr, inforeg->raw_key.len, mr_key);
   
-  if(sizeof(g_pm_component_descriptor) > my_ep->fi->ep_attr->max_msg_size) {
-            ERR("component descriptor size exceeds endpoint's max_msg_size. FIXME: should split up operation\n");
-           
-          }
-
   assert(ctx->msg==NULL);
   assert(my_ep->peer_info_mr!=NULL); /* incoming buffer has been registered at local endpoint set creation */
   void * local_buf_mr_desc = fi_mr_desc(my_ep->peer_info_mr);
@@ -2956,8 +2945,8 @@ BAILOUT_EARLY:
   return status;
 }
 
-
 /** Poll EP for one completion and handle it, if there is one
+ *
  * If expected_slot_p is non-NULL it indicates that if the
  * completion is for a message in this known slot a new RECV should be
  * posted immediately.
diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c
index 6c57015a..e9e79150 100644
--- a/maestro/pool_manager_registry.c
+++ b/maestro/pool_manager_registry.c
@@ -1841,7 +1841,7 @@ mstro_pm_cdo_registry_update_state(const struct mstro_cdo_id *cdoid,
       });
 
   if(found) {
-    mstro_status status = mstro_pm__notify_cdo_registry_change();
+  mstro_status status = mstro_pm__notify_cdo_registry_change();
     if(status!=MSTRO_OK) {
       ERR("Failed to notify waiters of registry change\n");
       return status;
diff --git a/transport/rdma.c b/transport/rdma.c
index f3a107e4..08cf8fa7 100644
--- a/transport/rdma.c
+++ b/transport/rdma.c
@@ -55,10 +55,6 @@
 #include <pthread.h>
 #include <string.h>
 
-#ifndef MIN
-#define MIN(x,y) ((x)<(y) ? (x) : (y))
-#endif
-
 /* simplify logging */
 #define NOISE(...) LOG_NOISE(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__)
 #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__)
@@ -365,72 +361,7 @@ BAILOUT_UNLOCK:
   
   return status;
 }
-
-#define NUM_READ_RETRIES 3
-static
-mstro_status
-mstro_transport_rdma__read(const struct mstro_pm_app_registry_entry *app_entry,
-                           mstro_cdo cdo_dst,
-                           void *mr_desc,
-                           uint64_t mr_addr, uint64_t mr_key,
-                           size_t offset, size_t len,
-                           struct mstro_transport_rdma_cb_args *closure)
-{
-  mstro_status status;
-  mstro_ofi_msg_context ctx=NULL;
-  status = mstro_ofi__msg_context_create(&ctx, NULL, true, false);
-  if(status!=MSTRO_OK) {
-    ERR("Failed to init msg context for RDMA op: %d (%s)\n",
-        status, mstro_status_description(status));
-    goto BAILOUT;
-  }
-
-  DEBUG("Doing event creation\n");
-  status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb,
-                              (void*)closure, NULL, false, &(ctx->ev));
-  if (status != MSTRO_OK) {
-    ERR("Couldn't create event (%s)\n", mstro_status_description(status));
-    goto BAILOUT;
-  } else {
-    mstro_event_id eid;
-    status = mstro_event_id_get(ctx->ev, &eid);
-    if (status != MSTRO_OK) {
-      DEBUG("Failed to query event id (event @%p)\n", ctx->ev);
-      goto BAILOUT;
-    }
-    DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion of at offset %zu, len %zu.\n",
-          eid, offset, len);
-  }
-
-
-  ctx->ep = app_entry->ep;
-  mstro_ofi__remember_ctx(app_entry->ep, ctx);
-
-
-  size_t num_retries = NUM_READ_RETRIES;
-RETRY_RDMA_TRANSPORT_READ: ;
-  int ret = fi_read(app_entry->ep->ep,
-                    (uint8_t*)cdo_dst->raw_ptr+offset, len,
-                    mr_desc,
-                    app_entry->addr, mr_addr+offset, mr_key, ctx);
-  if(ret==-FI_EAGAIN) {
-    NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name);
-    sleep(1);
-    if(num_retries-->0)
-      goto RETRY_RDMA_TRANSPORT_READ;
-  }
-  if(ret<0) {
-    ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n",
-        cdo_dst->name, fi_strerror(-ret));
-    status = MSTRO_FAIL;
-    goto BAILOUT;
-  }
-
-  status=MSTRO_OK;
-BAILOUT:
-  return status;
-}
-
+  
 mstro_status
 mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ticket)
 {
@@ -498,8 +429,11 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket*
   }
 */
 
+  /*write data with correct offset at dst for dist_cdos --silence compiler warnings*/
+  void * dst_ptr = (void *) ((char *) cdo_dst->raw_ptr+ticket->dst_offset); 
+
   uint64_t requested_key = app_entry->ep->fi->domain_attr->mr_mode & FI_MR_PROV_KEY ? 0 : mstro_memory_new_key();
-  int err = fi_mr_reg(app_entry->ep->domain, cdo_dst->raw_ptr, len,
+  int err = fi_mr_reg(app_entry->ep->domain, dst_ptr, len,
                   FI_READ, 0, requested_key, 0, &mr, NULL);
   if (err) {
 	ERR("Couldn't register memory region for RDMA transport (err: %d, %s)\n", 
@@ -508,6 +442,13 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket*
   }
 
   void* local_buf_mr_desc = fi_mr_desc(mr);
+  mstro_ofi_msg_context ctx=NULL;
+  status = mstro_ofi__msg_context_create(&ctx, NULL, true, false);
+  if(status!=MSTRO_OK) {
+    ERR("Failed to init msg context for RDMA op: %d (%s)\n",
+        status, mstro_status_description(status));
+    return status;
+  }
 
   DEBUG("Doing closure creation\n");
   closure = malloc(sizeof(struct mstro_transport_rdma_cb_args));
@@ -515,7 +456,6 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket*
     ERR("Failed to alloc RDMA transport closure\n");
     return MSTRO_NOMEM;
   }
-  closure->num_fragments = 1;
   closure->mr = mr;
   closure->fresh_alloc_route = fresh_alloc_route;
   closure->raw_ptr = cdo_dst->raw_ptr;
@@ -532,6 +472,21 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket*
   closure->dstcdoid = cdo_dst->gid;
   closure->appid = ticket->srcid->id;
 
+  DEBUG("Doing event creation\n");
+  status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb,
+                              (void*)closure, mstro_transport_rdma_dtor, 0, &(ctx->ev));
+  if (status != MSTRO_OK) {
+    ERR("Couldn't create event (%s)\n", mstro_status_description(status));
+    return MSTRO_FAIL;
+  } else {
+    mstro_event_id eid;
+    status = mstro_event_id_get(ctx->ev, &eid);
+    if (status != MSTRO_OK) {
+      DEBUG("Failed to query event id (event @%p)\n", ctx->ev);
+      return MSTRO_FAIL;
+    }
+    DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion.\n", eid);
+  }
 
   /* make a fresh entry (for completion) */
   struct mstro_transport_rdma_pending_entry* regentry = malloc(sizeof(struct mstro_transport_rdma_pending_entry));
@@ -563,47 +518,25 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket*
   }
 
 
-  size_t fragment_size = app_entry->ep->fi->ep_attr->max_msg_size;
-
-  /* mostly for debugging purposes to select an random small fragment size */
-  //#define OVERRIDE_MAX_MSG_SIZE 1
-#ifdef OVERRIDE_MAX_MSG_SIZE
-  fragment_size = MIN(fragment_size, OVERRIDE_MAX_MSG_SIZE);
-#endif
-
-  closure->num_fragments = 1 + len / fragment_size;
+  ctx->ep = app_entry->ep;
+  mstro_ofi__remember_ctx(app_entry->ep, ctx);
 
-  if(closure->num_fragments>1) {
-    DEBUG("Message too large for endpoint, splitting into %zu fragments\n",
-          closure->num_fragments);
-  }
+  int num_retries = 3;
+  
+ RETRY_RDMA_TRANSPORT_READ: ;
+  ret = fi_read(app_entry->ep->ep, dst_ptr, len, local_buf_mr_desc,
+                    app_entry->addr, mr_addr, mr_key, ctx);
 
-  /* read full-fragment blocks */
-  size_t offset=ticket->dst_offset; // write dst cdos at the correct position
-  size_t num_retries;
-  for(size_t i=closure->num_fragments;
-      i-->1; ) {
-    status = mstro_transport_rdma__read(app_entry, cdo_dst,
-                                        local_buf_mr_desc, mr_addr, mr_key,
-                                        offset, fragment_size,
-                                        closure);
-    if(status!=MSTRO_OK) {
-      ERR("Failed to read fragment %zu\n", closure->num_fragments - i);
-      goto BAILOUT;
-    }
-    offset+=fragment_size;
+  if(ret==-FI_EAGAIN) {
+    NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name);
+    sleep(1);
+    if(num_retries-->0)
+      goto RETRY_RDMA_TRANSPORT_READ;
   }
-  
-  /* final (partial) read */
-  status = mstro_transport_rdma__read(app_entry, cdo_dst,
-                                      local_buf_mr_desc, mr_addr, mr_key,
-                                      offset, len-offset,
-                                      closure);
-  if(status!=MSTRO_OK) {
-    ERR("Failed to read last fragment\n");
-    goto BAILOUT;
+  if(ret<0) {
+    ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n", cdo_dst->name,fi_strerror(-ret));
+	status = MSTRO_FAIL;
   }
-
 BAILOUT:
   return status;
 }
@@ -618,22 +551,12 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure)
       srcidstr, &(args->srccdoid),
       WITH_CDO_ID_STR(
           dstidstr, &(args->dstcdoid),
-          DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`, %zu fragments left\n",
-                srcidstr, dstidstr, args->num_fragments);););
+          DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`\n",
+                srcidstr, dstidstr);););
 
   mstro_status status;
   args->status = MSTRO_OK;
 
-  size_t num_fragments_left = atomic_fetch_sub(&(args->num_fragments), 1);
-  assert(num_fragments_left>0);
-  if(num_fragments_left>1) {
-    DEBUG("%zu more fragments in flight, delaying rdma cleanup\n",
-          num_fragments_left-1);
-    /* closure is shared across all contexts/events, so don't free it */
-    args->status=MSTRO_OK;
-    return;
-  }
-
   // unregister
   int err = fi_close((struct fid*)args->mr);
   if (err) {
@@ -710,7 +633,7 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure)
     return;
   }
   args->status=MSTRO_OK;
-
+  
   DEBUG("RDMA transport for CDO `%s` successful\n", args->name);
 }
 
diff --git a/transport/transport_rdma.h b/transport/transport_rdma.h
index da64fc9b..aede2229 100644
--- a/transport/transport_rdma.h
+++ b/transport/transport_rdma.h
@@ -63,7 +63,6 @@ struct mstro_transport_rdma_cb_args {
   void* raw_ptr;
   size_t len;
   char* name;
-  _Atomic(size_t) num_fragments; /**< the number of operations in flight (if transfer had to be chunked) */
   struct mstro_cdo_id srccdoid;
   struct mstro_cdo_id dstcdoid;
   mstro_app_id appid;
-- 
GitLab