diff --git a/maestro/ofi.c b/maestro/ofi.c index 34d4cf056ed4e1b92755f465030c41e1f8b5d84d..d23d63fc90ad63be4a5b083a8452f833bbb4230f 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -2585,33 +2585,38 @@ mstro_ofi__maybe_notify_completion(mstro_ofi_msg_context ctx) if (ctx->ev != NULL) { mstro_event_id eid; status = mstro_event_id_get(ctx->ev, &eid); - if (status != MSTRO_OK) { + if (status != MSTRO_OK) { DEBUG("Failed to query event id tied to context `%p`\n", ctx); - return MSTRO_FAIL; - } + return MSTRO_FAIL; + } DEBUG("OFI RDMA transport completion event id %" PRIx64 "\n", eid); struct mstro_transport_rdma_cb_args* result = NULL; - char* name = NULL; -DEBUG("There is a RDMA event, fetching event name...\n"); - status = mstro_event_domain_name_get(ctx->ev, &name); - if (status != MSTRO_OK) { - ERR("Couldn't get event domain name\n"); - return status; + char* name = NULL; + DEBUG("There is a RDMA event, fetching event name...\n"); + status = mstro_event_domain_name_get(ctx->ev, &name); + if (status != MSTRO_OK) { + ERR("Couldn't get event domain name\n"); + return status; } if (!strcmp(name, mstro_transport_rdma_edom_name)) { -DEBUG("Event name corresponds to a pending RDMA transport to finalize\n"); + DEBUG("Event name corresponds to a pending RDMA transport to finalize\n"); free(name); status = mstro_event_trigger(ctx->ev, (void**)&result); - assert(status == MSTRO_OK); - if (result->status != MSTRO_OK) - return result->status; + assert(status == MSTRO_OK); + if (result->status != MSTRO_OK) + return result->status; status = mstro_event_destroy(ctx->ev); - if (status != MSTRO_OK) { + if (status != MSTRO_OK) { ERR("Failed destroying event\n"); return status; - } - } + } + /* closure is re-used for multiple events if we fragment, so + * clean up here (not in event dtor), but only if + * appropriate: */ + if(result->num_fragments==0) + mstro_transport_rdma_dtor(result); + } status = mstro_ofi__msg_context_destroy(ctx); if (status != MSTRO_OK) { diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index de42534abbd2bcd9bf715e5a651329a1a09c0882..c0bc4f2c08624c0ff83997096acbf806bff4d870 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -1438,7 +1438,7 @@ mstro_pm_cdo_registry_update_state(const struct mstro_cdo_id *cdoid, }); if(found) { - mstro_status status = mstro_pm__notify_cdo_registry_change(); + mstro_status status = mstro_pm__notify_cdo_registry_change(); if(status!=MSTRO_OK) { ERR("Failed to notify waiters of registry change\n"); return status; diff --git a/transport/rdma.c b/transport/rdma.c index 5405e60cfacfb941f6661f654809d7685b1099e3..49e4f89726f79f2cff1fea2ceb1b57361f849d93 100644 --- a/transport/rdma.c +++ b/transport/rdma.c @@ -55,6 +55,10 @@ #include <pthread.h> #include <string.h> +#ifndef MIN +#define MIN(x,y) ((x)<(y) ? (x) : (y)) +#endif + /* simplify logging */ #define NOISE(...) LOG_DEBUG(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__) #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__) @@ -359,7 +363,67 @@ BAILOUT_UNLOCK: return status; } - + +#define NUM_READ_RETRIES 3 +static +mstro_status +mstro_transport_rdma__read(const struct mstro_pm_app_registry_entry *app_entry, + mstro_cdo cdo_dst, + void *mr_desc, + uint64_t mr_addr, uint64_t mr_key, + size_t offset, size_t len, + struct mstro_transport_rdma_cb_args *closure) +{ + mstro_status status; + mstro_ofi_msg_context ctx=NULL; + status = mstro_ofi__msg_context_create(&ctx, NULL, true); + if(status!=MSTRO_OK) { + ERR("Failed to init msg context for RDMA op: %d (%s)\n", + status, mstro_status_description(status)); + goto BAILOUT; + } + + DEBUG("Doing event creation\n"); + status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, + (void*)closure, NULL, false, &(ctx->ev)); + if (status != MSTRO_OK) { + ERR("Couldn't create event (%s)\n", mstro_status_description(status)); + goto BAILOUT; + } else { + mstro_event_id eid; + status = mstro_event_id_get(ctx->ev, &eid); + if (status != MSTRO_OK) { + DEBUG("Failed to query event id (event @%p)\n", ctx->ev); + goto BAILOUT; + } + DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion of at offset %zu, len %zu.\n", + eid, offset, len); + } + + size_t num_retries = NUM_READ_RETRIES; +RETRY_RDMA_TRANSPORT_READ: ; + int ret = fi_read(app_entry->ep->ep, + (uint8_t*)cdo_dst->raw_ptr+offset, len, + mr_desc, + app_entry->addr, mr_addr+offset, mr_key, ctx); + if(ret==-FI_EAGAIN) { + NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name); + sleep(1); + if(num_retries-->0) + goto RETRY_RDMA_TRANSPORT_READ; + } + if(ret<0) { + ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n", + cdo_dst->name, fi_strerror(-ret)); + status = MSTRO_FAIL; + goto BAILOUT; + } + + status=MSTRO_OK; +BAILOUT: + return status; +} + mstro_status mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ticket) { @@ -436,13 +500,6 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* } void* local_buf_mr_desc = fi_mr_desc(mr); - mstro_ofi_msg_context ctx=NULL; - status = mstro_ofi__msg_context_create(&ctx, NULL, true); - if(status!=MSTRO_OK) { - ERR("Failed to init msg context for RDMA op: %d (%s)\n", - status, mstro_status_description(status)); - return status; - } DEBUG("Doing closure creation\n"); closure = malloc(sizeof(struct mstro_transport_rdma_cb_args)); @@ -467,22 +524,6 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* closure->dstcdoid = cdo_dst->gid; closure->appid = ticket->srcid->id; - DEBUG("Doing event creation\n"); - status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, - (void*)closure, mstro_transport_rdma_dtor, false, &(ctx->ev)); - if (status != MSTRO_OK) { - ERR("Couldn't create event (%s)\n", mstro_status_description(status)); - return MSTRO_FAIL; - } else { - mstro_event_id eid; - status = mstro_event_id_get(ctx->ev, &eid); - if (status != MSTRO_OK) { - DEBUG("Failed to query event id (event @%p)\n", ctx->ev); - return MSTRO_FAIL; - } - DEBUG("Created event id %" PRIx64 " to monitor RDMA read completion.\n", eid); - } - /* make a fresh entry (for completion) */ struct mstro_transport_rdma_pending_entry* regentry = malloc(sizeof(struct mstro_transport_rdma_pending_entry)); if(regentry==NULL) { @@ -512,26 +553,47 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* goto BAILOUT; } - if(len> app_entry->ep->fi->ep_attr->max_msg_size) { - ERR("Transfer size exceeds endpoint's maximum operation size. FIXME: should split up operation\n"); - status=MSTRO_UNIMPL; - goto BAILOUT; + size_t fragment_size = app_entry->ep->fi->ep_attr->max_msg_size; + + /* mostly for debugging purposes to select an random small fragment size */ + //#define OVERRIDE_MAX_MSG_SIZE 1 +#ifdef OVERRIDE_MAX_MSG_SIZE + fragment_size = MIN(fragment_size, OVERRIDE_MAX_MSG_SIZE); +#endif + + closure->num_fragments = 1 + len / fragment_size; + + if(closure->num_fragments>1) { + DEBUG("Message too large for endpoint, splitting into %zu fragments\n", + closure->num_fragments); } - int num_retries = 3; - RETRY_RDMA_TRANSPORT_READ: ; - ret = fi_read(app_entry->ep->ep, cdo_dst->raw_ptr, len, local_buf_mr_desc, - app_entry->addr, mr_addr, mr_key, ctx); - if(ret==-FI_EAGAIN) { - NOISE("RDMA read for CDO transport (%s) needs to be retried\n", cdo_dst->name); - sleep(1); - if(num_retries-->0) - goto RETRY_RDMA_TRANSPORT_READ; + /* read full-fragment blocks */ + size_t offset=0; + size_t num_retries; + for(size_t i=closure->num_fragments; + i-->1; ) { + status = mstro_transport_rdma__read(app_entry, cdo_dst, + local_buf_mr_desc, mr_addr, mr_key, + offset, fragment_size, + closure); + if(status!=MSTRO_OK) { + ERR("Failed to read fragment %zu\n", closure->num_fragments - i); + goto BAILOUT; + } + offset+=fragment_size; } - if(ret<0) { - ERR("Failed to do RDMA read for CDO transport (%s) (err: %s)\n", cdo_dst->name,fi_strerror(-ret)); - status = MSTRO_FAIL; + + /* final (partial) read */ + status = mstro_transport_rdma__read(app_entry, cdo_dst, + local_buf_mr_desc, mr_addr, mr_key, + offset, len-offset, + closure); + if(status!=MSTRO_OK) { + ERR("Failed to read last fragment\n"); + goto BAILOUT; } + BAILOUT: return status; } @@ -552,10 +614,12 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) mstro_status status; args->status = MSTRO_OK; - size_t num_fragments_left = atomic_load(&(args->num_fragments)); + size_t num_fragments_left = atomic_fetch_sub(&(args->num_fragments), 1); assert(num_fragments_left>0); if(num_fragments_left>1) { - DEBUG("More fragments in flight, delaying rdma cleanup\n"); + DEBUG("%zu more fragments in flight, delaying rdma cleanup\n", + num_fragments_left-1); + /* closure is shared across all contexts/events, so don't free it */ args->status=MSTRO_OK; return; } @@ -635,7 +699,7 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) return; } args->status=MSTRO_OK; - + DEBUG("RDMA transport for CDO `%s` successful\n", args->name); }