diff --git a/maestro/ofi.c b/maestro/ofi.c index c80408aafb10a02366c10115121dfa9edfe62262..f3da7ddd82462377718e11e51455ef6513ac44f1 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -1348,12 +1348,6 @@ mstro_ofi__maybe_notify_completion(mstro_ofi_msg_context ctx) ERR("triggering event failed: %d (%s)\n", status, mstro_status_description(status)); } - /* closure is re-used for multiple events if we fragment, so - * clean up here (not in event dtor), but only if - * appropriate: */ - if(result->num_fragments==0) - mstro_transport_rdma_dtor(result); - } status |= mstro_event_destroy(ctx->ev); if (status != MSTRO_OK) { @@ -2375,11 +2369,6 @@ mstro_ofi__submit_component_descriptor_read(struct mstro_endpoint *my_ep, DEBUG("Checking for PM config block MR at (remote addr) 0x%" PRIx64 ", key of len %zu value %" PRIx64 "\n", mr_addr, inforeg->raw_key.len, mr_key); - if(sizeof(g_pm_component_descriptor) > my_ep->fi->ep_attr->max_msg_size) { - ERR("component descriptor size exceeds endpoint's max_msg_size. FIXME: should split up operation\n"); - - } - assert(ctx->msg==NULL); assert(my_ep->peer_info_mr!=NULL); /* incoming buffer has been registered at local endpoint set creation */ void * local_buf_mr_desc = fi_mr_desc(my_ep->peer_info_mr); @@ -2956,8 +2945,8 @@ BAILOUT_EARLY: return status; } - /** Poll EP for one completion and handle it, if there is one + * * If expected_slot_p is non-NULL it indicates that if the * completion is for a message in this known slot a new RECV should be * posted immediately. diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index 6c57015ab5e613f928fd94d7e640914194d73a22..e9e79150398e0d7bbc4679e3c949d6f9907d3971 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -1841,7 +1841,7 @@ mstro_pm_cdo_registry_update_state(const struct mstro_cdo_id *cdoid, }); if(found) { - mstro_status status = mstro_pm__notify_cdo_registry_change(); + mstro_status status = mstro_pm__notify_cdo_registry_change(); if(status!=MSTRO_OK) { ERR("Failed to notify waiters of registry change\n"); return status; diff --git a/transport/rdma.c b/transport/rdma.c index f3a107e4a03da7d8ca897fb358904580eea986e0..08cf8fa763718fae9d15eb3d6a3563e1db18d1f5 100644 --- a/transport/rdma.c +++ b/transport/rdma.c @@ -55,10 +55,6 @@ #include <pthread.h> #include <string.h> -#ifndef MIN -#define MIN(x,y) ((x)<(y) ? (x) : (y)) -#endif - /* simplify logging */ #define NOISE(...) LOG_NOISE(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__) #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__) @@ -365,72 +361,7 @@ BAILOUT_UNLOCK: return status; } - -#define NUM_READ_RETRIES 3 -static -mstro_status -mstro_transport_rdma__read(const struct mstro_pm_app_registry_entry *app_entry, - mstro_cdo cdo_dst, - void *mr_desc, - uint64_t mr_addr, uint64_t mr_key, - size_t offset, size_t len, - struct mstro_transport_rdma_cb_args *closure) -{ - mstro_status status; - mstro_ofi_msg_context ctx=NULL; - status = mstro_ofi__msg_context_create(&ctx, NULL, true, false); - if(status!=MSTRO_OK) { - ERR("Failed to init msg context for RDMA op: %d (%s)\n", - status, mstro_status_description(status)); - goto BAILOUT; - } - - DEBUG("Doing event creation\n"); - status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, - (void*)closure, NULL, false, &(ctx->ev)); - if (status != MSTRO_OK) { - ERR("Couldn't create event (%s)\n", mstro_status_description(status)); - goto BAILOUT; - } else { - mstro_event_id eid; - status = mstro_event_id_get(ctx->ev, &eid); - if (status != MSTRO_OK) { - DEBUG("Failed to query event id (event @%p)\n", ctx->ev); - goto BAILOUT; - } - DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion of at offset %zu, len %zu.\n", - eid, offset, len); - } - - - ctx->ep = app_entry->ep; - mstro_ofi__remember_ctx(app_entry->ep, ctx); - - - size_t num_retries = NUM_READ_RETRIES; -RETRY_RDMA_TRANSPORT_READ: ; - int ret = fi_read(app_entry->ep->ep, - (uint8_t*)cdo_dst->raw_ptr+offset, len, - mr_desc, - app_entry->addr, mr_addr+offset, mr_key, ctx); - if(ret==-FI_EAGAIN) { - NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name); - sleep(1); - if(num_retries-->0) - goto RETRY_RDMA_TRANSPORT_READ; - } - if(ret<0) { - ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n", - cdo_dst->name, fi_strerror(-ret)); - status = MSTRO_FAIL; - goto BAILOUT; - } - - status=MSTRO_OK; -BAILOUT: - return status; -} - + mstro_status mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ticket) { @@ -498,8 +429,11 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* } */ + /*write data with correct offset at dst for dist_cdos --silence compiler warnings*/ + void * dst_ptr = (void *) ((char *) cdo_dst->raw_ptr+ticket->dst_offset); + uint64_t requested_key = app_entry->ep->fi->domain_attr->mr_mode & FI_MR_PROV_KEY ? 0 : mstro_memory_new_key(); - int err = fi_mr_reg(app_entry->ep->domain, cdo_dst->raw_ptr, len, + int err = fi_mr_reg(app_entry->ep->domain, dst_ptr, len, FI_READ, 0, requested_key, 0, &mr, NULL); if (err) { ERR("Couldn't register memory region for RDMA transport (err: %d, %s)\n", @@ -508,6 +442,13 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* } void* local_buf_mr_desc = fi_mr_desc(mr); + mstro_ofi_msg_context ctx=NULL; + status = mstro_ofi__msg_context_create(&ctx, NULL, true, false); + if(status!=MSTRO_OK) { + ERR("Failed to init msg context for RDMA op: %d (%s)\n", + status, mstro_status_description(status)); + return status; + } DEBUG("Doing closure creation\n"); closure = malloc(sizeof(struct mstro_transport_rdma_cb_args)); @@ -515,7 +456,6 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ERR("Failed to alloc RDMA transport closure\n"); return MSTRO_NOMEM; } - closure->num_fragments = 1; closure->mr = mr; closure->fresh_alloc_route = fresh_alloc_route; closure->raw_ptr = cdo_dst->raw_ptr; @@ -532,6 +472,21 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* closure->dstcdoid = cdo_dst->gid; closure->appid = ticket->srcid->id; + DEBUG("Doing event creation\n"); + status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, + (void*)closure, mstro_transport_rdma_dtor, 0, &(ctx->ev)); + if (status != MSTRO_OK) { + ERR("Couldn't create event (%s)\n", mstro_status_description(status)); + return MSTRO_FAIL; + } else { + mstro_event_id eid; + status = mstro_event_id_get(ctx->ev, &eid); + if (status != MSTRO_OK) { + DEBUG("Failed to query event id (event @%p)\n", ctx->ev); + return MSTRO_FAIL; + } + DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion.\n", eid); + } /* make a fresh entry (for completion) */ struct mstro_transport_rdma_pending_entry* regentry = malloc(sizeof(struct mstro_transport_rdma_pending_entry)); @@ -563,47 +518,25 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* } - size_t fragment_size = app_entry->ep->fi->ep_attr->max_msg_size; - - /* mostly for debugging purposes to select an random small fragment size */ - //#define OVERRIDE_MAX_MSG_SIZE 1 -#ifdef OVERRIDE_MAX_MSG_SIZE - fragment_size = MIN(fragment_size, OVERRIDE_MAX_MSG_SIZE); -#endif - - closure->num_fragments = 1 + len / fragment_size; + ctx->ep = app_entry->ep; + mstro_ofi__remember_ctx(app_entry->ep, ctx); - if(closure->num_fragments>1) { - DEBUG("Message too large for endpoint, splitting into %zu fragments\n", - closure->num_fragments); - } + int num_retries = 3; + + RETRY_RDMA_TRANSPORT_READ: ; + ret = fi_read(app_entry->ep->ep, dst_ptr, len, local_buf_mr_desc, + app_entry->addr, mr_addr, mr_key, ctx); - /* read full-fragment blocks */ - size_t offset=ticket->dst_offset; // write dst cdos at the correct position - size_t num_retries; - for(size_t i=closure->num_fragments; - i-->1; ) { - status = mstro_transport_rdma__read(app_entry, cdo_dst, - local_buf_mr_desc, mr_addr, mr_key, - offset, fragment_size, - closure); - if(status!=MSTRO_OK) { - ERR("Failed to read fragment %zu\n", closure->num_fragments - i); - goto BAILOUT; - } - offset+=fragment_size; + if(ret==-FI_EAGAIN) { + NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name); + sleep(1); + if(num_retries-->0) + goto RETRY_RDMA_TRANSPORT_READ; } - - /* final (partial) read */ - status = mstro_transport_rdma__read(app_entry, cdo_dst, - local_buf_mr_desc, mr_addr, mr_key, - offset, len-offset, - closure); - if(status!=MSTRO_OK) { - ERR("Failed to read last fragment\n"); - goto BAILOUT; + if(ret<0) { + ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n", cdo_dst->name,fi_strerror(-ret)); + status = MSTRO_FAIL; } - BAILOUT: return status; } @@ -618,22 +551,12 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) srcidstr, &(args->srccdoid), WITH_CDO_ID_STR( dstidstr, &(args->dstcdoid), - DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`, %zu fragments left\n", - srcidstr, dstidstr, args->num_fragments););); + DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`\n", + srcidstr, dstidstr););); mstro_status status; args->status = MSTRO_OK; - size_t num_fragments_left = atomic_fetch_sub(&(args->num_fragments), 1); - assert(num_fragments_left>0); - if(num_fragments_left>1) { - DEBUG("%zu more fragments in flight, delaying rdma cleanup\n", - num_fragments_left-1); - /* closure is shared across all contexts/events, so don't free it */ - args->status=MSTRO_OK; - return; - } - // unregister int err = fi_close((struct fid*)args->mr); if (err) { @@ -710,7 +633,7 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) return; } args->status=MSTRO_OK; - + DEBUG("RDMA transport for CDO `%s` successful\n", args->name); } diff --git a/transport/transport_rdma.h b/transport/transport_rdma.h index da64fc9b95e2ca17930448d22cfeb54df67044aa..aede2229a932c83791a23343d1074262076127a0 100644 --- a/transport/transport_rdma.h +++ b/transport/transport_rdma.h @@ -63,7 +63,6 @@ struct mstro_transport_rdma_cb_args { void* raw_ptr; size_t len; char* name; - _Atomic(size_t) num_fragments; /**< the number of operations in flight (if transfer had to be chunked) */ struct mstro_cdo_id srccdoid; struct mstro_cdo_id dstcdoid; mstro_app_id appid;