diff --git a/transport/rdma.c b/transport/rdma.c index ef012f4a69b0588cae6424e4ba0a98ac4565fdd2..5405e60cfacfb941f6661f654809d7685b1099e3 100644 --- a/transport/rdma.c +++ b/transport/rdma.c @@ -450,6 +450,7 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* ERR("Failed to alloc RDMA transport closure\n"); return MSTRO_NOMEM; } + closure->num_fragments = 1; closure->mr = mr; closure->fresh_alloc_route = fresh_alloc_route; closure->raw_ptr = cdo_dst->raw_ptr; @@ -468,7 +469,7 @@ mstro_transport_rdma_dst_execute(mstro_cdo cdo_dst, Mstro__Pool__TransferTicket* DEBUG("Doing event creation\n"); status = mstro_event_create(g_transport_rdma_edom, mstro_transport_rdma_cb, - (void*)closure, mstro_transport_rdma_dtor, 0, &(ctx->ev)); + (void*)closure, mstro_transport_rdma_dtor, false, &(ctx->ev)); if (status != MSTRO_OK) { ERR("Couldn't create event (%s)\n", mstro_status_description(status)); return MSTRO_FAIL; @@ -545,12 +546,20 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure) srcidstr, &(args->srccdoid), WITH_CDO_ID_STR( dstidstr, &(args->dstcdoid), - DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`\n", - srcidstr, dstidstr););); + DEBUG("Callback is finishing completion msg of CDO `%s`-> `%s`, %zu fragments left\n", + srcidstr, dstidstr, args->num_fragments););); mstro_status status; args->status = MSTRO_OK; + size_t num_fragments_left = atomic_load(&(args->num_fragments)); + assert(num_fragments_left>0); + if(num_fragments_left>1) { + DEBUG("More fragments in flight, delaying rdma cleanup\n"); + args->status=MSTRO_OK; + return; + } + // unregister int err = fi_close((struct fid*)args->mr); if (err) { diff --git a/transport/transport_rdma.h b/transport/transport_rdma.h index 37f3d8584c25604ca4d68afd1cfd6acd504d7271..7ddfb23cdd09580ae44c73955de3ab9df7fb3394 100644 --- a/transport/transport_rdma.h +++ b/transport/transport_rdma.h @@ -52,6 +52,7 @@ struct mstro_transport_rdma_cb_args { int fresh_alloc_route; void* raw_ptr; size_t len; + _Atomic(size_t) num_fragments; /**< the number of operations in flight (if transfer had to be chunked) */ const char* name; struct mstro_cdo_id srccdoid; struct mstro_cdo_id dstcdoid;