diff --git a/transport/rdma.c b/transport/rdma.c index dd1ad574dc59dbd9039094bbfea1fbb077b8776f..864464123f9932b88573751bd8e48fcac3fe4364 100644 --- a/transport/rdma.c +++ b/transport/rdma.c @@ -478,6 +478,7 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ERR("Failed to alloc RDMA transport closure\n"); return MSTRO_NOMEM; } + closure->num_fragments = 1; closure->mr = mr; closure->fresh_alloc_route = fresh_alloc_route; closure->raw_ptr = cdo_dst->raw_ptr; @@ -496,7 +497,7 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* DEBUG("Doing event creation\n"); status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, - (void*)closure, mstro_transport_rdma_dtor, 0, &(ctx->ev)); + (void*)closure, mstro_transport_rdma_dtor, false, &(ctx->ev)); if (status != MSTRO_OK) { ERR("Couldn't create event (%s)\n", mstro_status_description(status)); return MSTRO_FAIL; @@ -579,12 +580,20 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) srcidstr, &(args->srccdoid), WITH_CDO_ID_STR( dstidstr, &(args->dstcdoid), - DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`\n", - srcidstr, dstidstr););); + DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`, %zu fragments left\n", + srcidstr, dstidstr, args->num_fragments););); mstro_status status; args->status = MSTRO_OK; + size_t num_fragments_left = atomic_load(&(args->num_fragments)); + assert(num_fragments_left>0); + if(num_fragments_left>1) { + DEBUG("More fragments in flight, delaying rdma cleanup\n"); + args->status=MSTRO_OK; + return; + } + // unregister int err = fi_close((struct fid*)args->mr); if (err) { diff --git a/transport/transport_rdma.h b/transport/transport_rdma.h index ad81c7fabaff2cae4ad6da667b81304016fb74db..11e57218cb18cf81556cd47060ef96b677b17373 100644 --- a/transport/transport_rdma.h +++ b/transport/transport_rdma.h @@ -63,6 +63,7 @@ struct mstro_transport_rdma_cb_args { void* raw_ptr; size_t len; char* name; + _Atomic(size_t) num_fragments; /**< the number of operations in flight (if transfer had to be chunked) */ struct mstro_cdo_id srccdoid; struct mstro_cdo_id dstcdoid; mstro_app_id appid;