diff --git a/include/maestro/i_ofi.h b/include/maestro/i_ofi.h index e526da687396aaa0f53f7d7af9ef492e9ee9ac8b..987a1ed07dadea63eaae4b6b8f2a6e2de0594821 100644 --- a/include/maestro/i_ofi.h +++ b/include/maestro/i_ofi.h @@ -70,8 +70,7 @@ #include "maestro/i_pool_manager_protocol.h" /** function prototype of message handlers */ -typedef mstro_status (*mstro_msg_handler)(const struct mstro_msg_envelope *envelope, - void **restart_closure); +typedef mstro_status (*mstro_msg_handler)(const struct mstro_msg_envelope *envelope); /** messaging context: To make the communication fully asynchronous we need to store some information about the send/receive message for diff --git a/include/maestro/i_pool_client.h b/include/maestro/i_pool_client.h index ae3dca9460ba97c6ddef68cb7be33ae6d4cecea8..1614190d3edf7fdebd45dd61271826853f3f73cf 100644 --- a/include/maestro/i_pool_client.h +++ b/include/maestro/i_pool_client.h @@ -54,16 +54,9 @@ #include <stdatomic.h> -/* Pool client "Handle incoming Message" dispatcher function - * - * If the message handling needs to be voluntarily preempted, the - * handler returns MSTRO_WOULDBKLOCK and fills in @arg - * *restart_closure, expecting to be called again with the same - * envelope and the same restart_closure to retry processing. - */ +/* Pool client "Handle incoming Message" dispatcher function*/ mstro_status -mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, - void ** restart_closure); +mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope); /** diff --git a/include/maestro/i_pool_manager.h b/include/maestro/i_pool_manager.h index 48bb08dcbae8da5ffe79b564bb3b8ca1dbde64c3..56019756d8de39840cc0cec67f784a686f0c3ff3 100644 --- a/include/maestro/i_pool_manager.h +++ b/include/maestro/i_pool_manager.h @@ -55,8 +55,7 @@ struct mstro_endpoint; /* Pool manager "Handle incoming Message" dispatcher function */ mstro_status -mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope, - void **restart_closure); +mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope); void mstro_pm__msg_free(Mstro__Pool__MstroMsg *msg); diff --git a/include/maestro/i_pool_manager_protocol.h b/include/maestro/i_pool_manager_protocol.h index cf785723b1fb04ae3a877d34596f1de79b4512ff..5543bdcf6745ad184b915b53208224915f4d2d3f 100644 --- a/include/maestro/i_pool_manager_protocol.h +++ b/include/maestro/i_pool_manager_protocol.h @@ -75,13 +75,6 @@ struct mstro_msg_envelope { uint64_t tag; /**< possibly a tag to use for communication (default: 0 for no tag) */ const ProtobufCMessageDescriptor *descriptor; /**< Introspection: what has been packed here? */ const struct mstro_endpoint *ep; /**< endpoint that this message is associated with. E.g., which one did it come in on? */ - /** if non-NULL: indicates this message has been through some - processing, but got re-queued for retry. The closure needs to - contain any info that might be needed to resume processing where - it got preempted last time. mstro_msg_envelope_free() will call - 'free' on this slot if non-NULL; any other cleanup must be done - by the function handling the processing */ - void *restart_closure; /* possible padding in this region */ _Alignas(max_align_t) uint8_t data[MSTRO_MSG_SHORT_SIZE]; /** data region, minimum: a small msg */ }; diff --git a/maestro/ofi.c b/maestro/ofi.c index 95659a1d8baaf0b3db9e7dfa3d9252e3fb4db0c6..4c7c72dfdfd21d2aa6d80ee5ffdea4c64aa54875 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -1667,14 +1667,11 @@ mstro_ofi__handle_cq_entry__recv(mstro_ofi_msg_context ctx) assert(ctx->msg!=NULL); struct mstro_msg_envelope *msg = ctx->msg; - mstro_status status = ctx->msg_handler(msg, &ctx->msg->restart_closure); + mstro_status status = ctx->msg_handler(msg); if(status!=MSTRO_OK) { if(status==MSTRO_WOULDBLOCK) { - DEBUG("Message handler preempted, restart closure %p\n", - ctx->msg->restart_closure); - assert(ctx->msg->restart_closure!=NULL); - /* resubmit is done by dispatcher above us, reusing ctx */ + ERR("Message handler should not block, this should use operations to restart properly\n"); return status; } else { ERR("Error handling incoming message, dropping it: %d (%s)\n", @@ -2338,11 +2335,9 @@ mstro_ofi_finalize(bool destroy_drc_info) } mstro_status -mstro_ofi__failing_msg_handler(const struct mstro_msg_envelope *e, - void **restart_closure) +mstro_ofi__failing_msg_handler(const struct mstro_msg_envelope *e) { const void * dummy1 = e; - void ** dummy2 = restart_closure; ERR("Invoking default message handler that should never run, likely the context was not initialized at receive time\n"); abort(); return MSTRO_FAIL; diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 70e018ee16d3ba80ed87ff9f784420ac39f458a5..d048ffdd3c74f86f8584a0d1d0138308cee821a3 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -576,227 +576,6 @@ mstro_pc__calculate_data_size_and_offsets( } -/** structure to re-start init transfer operation if it has to wait for an async pre-dependency */ -struct init_transfer_closure { - /* basically the arguments of the mstro_pc_app_befriend() call: */ - mstro_app_id appid; /**< remote appid */ - char* serialized_ep; /**< endpoint info */ - Mstro__Pool__TransportMethods *methods; /**< methods supported */ - - /* state variables of mstro_pc_app_befriend() preemption */ - Mstro__AppInfo *dst_epd; /**< unpacked remote endpoint info */ - fi_addr_t dst_addr; /**< remote address */ - - /* handles to data filled in by async operation */ - mstro_request request_handle; /**< handle of outstanding endpoint selection request */ - struct mstro_endpoint *dst_ep; /**< endpoint selected */ - bool pending; /**< another fi_read is ongoing, we should wait */ -}; - -static -mstro_status -mstro_pc__handle_init_transfer__closure_alloc(struct init_transfer_closure**result) -{ - assert(result!=NULL); - *result = calloc(1,sizeof(struct init_transfer_closure)); - if(*result==NULL) - return MSTRO_NOMEM; - else - return MSTRO_OK; -} - -static -mstro_status -mstro_pc__app_befriend__resume(mstro_app_id appid, - struct init_transfer_closure *restart_closure) -{ - assert(restart_closure!=NULL); assert(appid!=MSTRO_APP_ID_INVALID); - mstro_status status = MSTRO_OK; - struct mstro_pm_app_registry_entry *e; - if(restart_closure->pending) - { - - /* try to read the entry from registry */ - status = mstro_pm_app_lookup(appid, &e); - if(e) { - if(!e->pending) { - status = MSTRO_OK; - DEBUG("Bingo, app entry appeared in registry\n"); - goto BAILOUT; - } - } - status = MSTRO_WOULDBLOCK; - DEBUG("We are waiting for the other fi_read to complete\n"); - goto BAILOUT; - } - - /* called in continuation of previous attempt */ - if(restart_closure->appid!=appid) { - ERR("Called with nonempty mismatching closure\n"); - abort(); - } - - /* is it done? */ - if(!mstro_request_test(restart_closure->request_handle)) { - DEBUG("Async request still not done, try again later\n"); - status = MSTRO_WOULDBLOCK; - goto BAILOUT; - } else { - /* execute wait to clean up handle */ - status = mstro_request_wait(restart_closure->request_handle); - if(status!=MSTRO_OK) { - ERR("Async endpoint selection failed, could not find suitable endpoint to talk to app %" PRIappid ": %d\n", - appid, status); - goto BAILOUT; - } - } - - assert(restart_closure->dst_ep!=NULL); - /* pass into registry */ - status = mstro_pc_app_register(restart_closure->dst_ep, - restart_closure->dst_addr, - strdup(restart_closure->serialized_ep), - restart_closure->methods, - restart_closure->appid, - NULL); - if(status!=MSTRO_OK) { - ERR("Failed to register peer app %zu: %d\n", appid); - } else { - DEBUG("Registered peer app %zu\n", appid); - } - -BAILOUT: - return status; -} - -static -mstro_status -mstro_pc__app_befriend__start(mstro_app_id appid, const char* serialized_ep, - const Mstro__Pool__TransportMethods *methods, - struct init_transfer_closure *restart_closure) -{ - mstro_status status = MSTRO_OK; - - assert(restart_closure!=NULL); /* allocated by caller */ - - /* prepare to duplicate methods structure .. the simple way (pack/unpack) */ - size_t len = mstro__pool__transport_methods__get_packed_size(methods); - uint8_t buf[len]; - mstro__pool__transport_methods__pack(methods,buf); - - restart_closure->appid = appid; - - Mstro__AppInfo *dst_epd=NULL; - status=mstro_appinfo_deserialize(serialized_ep, &dst_epd); - if(status!=MSTRO_OK) { - ERR("Failed to parse destination endpoint descriptor\n"); - status=MSTRO_INVMSG; - goto BAILOUT; - } - restart_closure->dst_epd = dst_epd; - - restart_closure->serialized_ep = strdup(serialized_ep); - if(restart_closure->serialized_ep==NULL) { - ERR("Failed to duplicate serialized EP\n"); - status = MSTRO_NOMEM; - goto BAILOUT; - } - - restart_closure->methods = mstro__pool__transport_methods__unpack(NULL, len, buf); - if(restart_closure->methods==NULL) { - ERR("Failed to duplicate transport methods\n"); - status = MSTRO_NOMEM; - goto BAILOUT; - } - - restart_closure->dst_addr = FI_ADDR_UNSPEC; /* will be filled */ - restart_closure->request_handle = NULL; /* will be filled */ - restart_closure->dst_ep = NULL; /* will be filled */ - - /* we need to be careful to not pass in any stack-local refs */ - status=mstro_ofi__select_endpoint(restart_closure->dst_epd, - &(restart_closure->dst_ep), - &(restart_closure->dst_addr), - &(restart_closure->request_handle)); - - if(status!=MSTRO_OK) { - ERR("Failed to initiate partner lookup: %d\n", status); - } else { - /* can't wait for completion here */ - status = MSTRO_WOULDBLOCK; - } -BAILOUT: - return status; -} - -static -mstro_status -mstro_pc__app_befriend(mstro_app_id appid, const char* serialized_ep, - const Mstro__Pool__TransportMethods *methods, - struct init_transfer_closure **restart_closure) -{ - assert(restart_closure!=NULL); - assert(methods!=NULL); - assert(serialized_ep!=NULL); - struct mstro_pm_app_registry_entry *e; - mstro_status status = MSTRO_UNIMPL; - - /* check if this is a restart */ - if(*restart_closure!=NULL) { - DEBUG("Resuming preempted call for appid %" PRIappid "\n", appid); - status = mstro_pc__app_befriend__resume(appid, *restart_closure); - - } else { - /* try to read the entry from registry */ - status = mstro_pm_app_lookup(appid, &e); - if(e) { - if(e->pending) { - DEBUG("app entry is already being read ... wait until it completes\n"); - /*FIXME we do not need a restart closure in this case, but we check that it is not null everywhere*/ - status = mstro_pc__handle_init_transfer__closure_alloc(restart_closure); - if(status!=MSTRO_OK) { - ERR("Cannot allocate closure\n"); - goto BAILOUT; - } - (*restart_closure)->pending = true; /* mark that there is a pending read */ - status = MSTRO_WOULDBLOCK; /* we need to wait until the read is complete */ - goto BAILOUT; - - } else - { - DEBUG("Found app %zu in local registry, good\n", appid); - goto BAILOUT; - } - } else { /* we could not find the app in registry */ - DEBUG("Unknown app %zu, let's make friends\n", appid); - status = mstro_pc__handle_init_transfer__closure_alloc(restart_closure); - if(status!=MSTRO_OK) { - ERR("Cannot allocate closure\n"); - goto BAILOUT; - } - (*restart_closure)->pending = false; /*we are going to do the fi_read */ - /* mark that this app entry is being read now for others */ - status = mstro_pc_app_register_pending(appid); - status = mstro_pc__app_befriend__start(appid, serialized_ep, - methods, *restart_closure); - } - } - if(status!=MSTRO_WOULDBLOCK) { - NOISE("Deallocating restart closure for lookup of peer app %" PRIappid "\n", appid); - assert(*restart_closure!=NULL); - if((*restart_closure)->dst_epd) - mstro__app_info__free_unpacked((*restart_closure)->dst_epd, NULL); - if((*restart_closure)->serialized_ep) - free((*restart_closure)->serialized_ep); - if((*restart_closure)->methods) - mstro__pool__transport_methods__free_unpacked((*restart_closure)->methods, NULL); - free(*restart_closure); - (*restart_closure)=NULL; - } - -BAILOUT: - return status; -} mstro_status mstro_pc__init_transfer_send_ticket(mstro_pool_operation op) @@ -1197,307 +976,6 @@ mstro_pc__prepare_init_transfer(mstro_pool_operation op) return MSTRO_OK; } -static inline -mstro_status -mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init, - void** closure_p) -{ - assert(init != NULL); - assert(closure_p!=NULL); - struct init_transfer_closure** restart_closure - = (struct init_transfer_closure**)closure_p; - - mstro_cdo src_cdo = NULL; - if (init->dst_serialized_endpoint == NULL) { - ERR("Cannot initiate transfer without dst app endpoint\n"); - return MSTRO_INVMSG; - } - - NOISE("Initiating a transfer to endpoint %s\n", - init->dst_serialized_endpoint); - - struct mstro_cdo_id srccdoid = { .qw[0] = init->srccdoid->qw0, - .qw[1] = init->srccdoid->qw1, - .local_id = init->srccdoid->local_id }; - struct mstro_cdo_id dstcdoid = { .qw[0] = init->dstcdoid->qw0, - .qw[1] = init->dstcdoid->qw1, - .local_id = init->dstcdoid->local_id }; - WITH_CDO_ID_STR(here_idstr, &srccdoid, { - WITH_CDO_ID_STR(there_idstr, &dstcdoid, { - NOISE("ticket for CDO gid %s (here), gid %s (there)\n", - here_idstr, there_idstr);});}); - if (srccdoid.local_id != MSTRO_CDO_LOCAL_ID_NONE) { - /* We trust that the PM gave us the correct local-id */ - if (!(MSTRO_OK == mstro_pool__find_cdo_with_local_id(&srccdoid, &src_cdo))) { - WITH_CDO_ID_STR(idstr, &srccdoid, - ERR("Cannot transfer CDO gid %s because not in local pool\n", - idstr);); - return MSTRO_FAIL; - } - } - else { /*we do not have a valid local-id ... looking for an appropriate cdo */ - if (!(MSTRO_OK == mstro_pool__find_source_cdo(&srccdoid, init->dst_attributes, &src_cdo))) { - WITH_CDO_ID_STR(idstr, &srccdoid, - ERR("Cannot transfer CDO gid %s because not in local pool\n", - idstr);); - return MSTRO_FAIL; - } - } - - /* we may see the ticket before OFFER ack. In that case - * we do an implicit ack by setting state to OFFERED - * (PM would not send ticket if it has not handled the OFFER) */ - if(mstro_cdo_state_get(src_cdo)==MSTRO_CDO_STATE_OFFERED_LOCALLY) { - WITH_CDO_ID_STR(idstr, &srccdoid, { - DEBUG("Doing implicit OFFER-ACK for CDO %s at transport ticket creation (src side) time\n", - idstr); - }); - mstro_cdo_state state_flags = (mstro_cdo_state_get(src_cdo) - & MSTRO_CDO_STATE_FLAGS); - mstro_cdo_state_set(src_cdo, - MSTRO_CDO_STATE_OFFERED | state_flags); - } - - if(init->dst_attributes==NULL) { - WARN("No attributes on CDO\n"); - } else { - if(init->dst_attributes->val_case==MSTRO__POOL__ATTRIBUTES__VAL_KV_MAP) { - DEBUG("%zu attributes in kv-map\n", init->dst_attributes->kv_map->n_map); - } else { - WARN("non-kv attributes\n"); - } - } - - DEBUG("Initiating transfer from src app %" PRIappid " (me) to dst app %" PRIappid " of CDO %s\n", - g_pool_app_id, init->dst_appid->id, src_cdo->name); - - if(g_pool_app_id==init->dst_appid->id) { - WARN("FIXME: We will be talking to ourselves via transport, should use a shortcut\n"); - } - - mstro_status status = mstro_pc__app_befriend(init->dst_appid->id, - init->dst_serialized_endpoint, - init->methods, - restart_closure); - if (status != MSTRO_OK) { - if(status==MSTRO_WOULDBLOCK) { - WARN("Need to re-try this operation because of outstanding async partner lookup\n"); - /* some members of the message have been *copied* into the - * restart closure, but not moved */ - assert(*restart_closure!=NULL); - } - return status; - } - - /* Prepare the ticket */ - Mstro__Pool__CDOID srcc = MSTRO__POOL__CDOID__INIT; - srcc.qw0 = srccdoid.qw[0]; - srcc.qw1 = srccdoid.qw[1]; - srcc.local_id = srccdoid.local_id; - Mstro__Pool__CDOID dstc = MSTRO__POOL__CDOID__INIT; - dstc.qw0 = dstcdoid.qw[0]; - dstc.qw1 = dstcdoid.qw[1]; - dstc.local_id = dstcdoid.local_id; - - Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; - ticket.srccdoid = &srcc; - ticket.dstcdoid = &dstc; - Mstro__Pool__Appid myid = MSTRO__POOL__APPID__INIT; - myid.id = g_pool_app_id; - ticket.srcid = &myid; - - mstro_status s=MSTRO_UNIMPL; - int64_t realsize, src_offset, dst_offset; - - /**Calculate the length of the data, src and dst offsets and number of segments */ - s = mstro_pc__calculate_data_size_and_offsets( - src_cdo, init, &realsize, &src_offset, &dst_offset); - if (s != MSTRO_OK) { - return s; - } - - /*fill ticket with the gathered information */ - ticket.src_offset = src_offset; - ticket.dst_offset = dst_offset; - ticket.n_segments = init->n_segments; - ticket.distributed_cdo = init->distributed_cdo; - - if (init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO - && (!g_mio_available || (realsize % getpagesize()) != 0 ) - ){ - WARN("Not issuing a ticket with MIO. Either not available or CDO size (%zu)" - " is not a multiple of the page size (%d)." - " Will use maestro-core default transport\n", - realsize, getpagesize()); - init->methods->supported[0] = MSTRO__POOL__TRANSPORT_KIND__OFI; - } - - status = mstro_pc__select_transfer_method(src_cdo, init->methods, - &ticket); - if(status!=MSTRO_OK) { - ERR("Failed to select transport method\n"); - return MSTRO_FAIL; - } - - Mstro__Pool__TransferTicketGFS gfs = MSTRO__POOL__TRANSFER_TICKET_GFS__INIT; - Mstro__Pool__TransferTicketMIO mio = MSTRO__POOL__TRANSFER_TICKET_MIO__INIT; - Mstro__Pool__TransferTicketOFI ofi = MSTRO__POOL__TRANSFER_TICKET_OFI__INIT; - Mstro__Pool__RDMAHandle rh = MSTRO__POOL__RDMAHANDLE__INIT; - Mstro__Pool__Appid appid = MSTRO__POOL__APPID__INIT; - - ticket.data_size = realsize; - ticket.gfs = &gfs; - - /* even conditionals are problematic, had to take out protobuf INITs ----^ */ - switch(ticket.ticket_case) { - case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: - NOISE("TICKET CASE GFS\n"); - status= mstro_pc__construct_gfs_path_for_cdo(src_cdo, &gfs.path); - if(status!=MSTRO_OK) { - ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n", - status, mstro_status_description(status)); - return status; - } - gfs.keep_file = 1; // don't arbitrarily rm the transport file on dst - break; - case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: - NOISE("TICKET CASE MIO\n"); -#ifdef HAVE_MIO - struct mstro_cdo_id* semid; - semid = malloc(sizeof(struct mio_obj_id)); - if (semid == NULL) { - ERR("No more memory.\n"); - return MSTRO_NOMEM; - } - struct mstro_cdo_id* objid; - objid = malloc(sizeof(struct mio_obj_id)); - if (objid == NULL) { - ERR("No more memory.\n"); - return MSTRO_NOMEM; - } - char* semname = NULL; -#define MSTRO_MIO_SEM_STR_MAXLEN 128 -#warning semname should not me constructed here with MIO-specific length - mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); - if (semname == NULL) { - ERR("Couldn't prepare an id for semaphore obj\n"); - return MSTRO_FAIL; - } - status = mstro_cdo_id_from_name(semname, semid); /* So we do collision - detection in only - one place */ - if (status != MSTRO_OK) { - ERR("Couldn't make an id from name for semaphore obj\n"); - return MSTRO_FAIL; - } - - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, - DEBUG("Semaphore has ID: %s\n", - idstr);); - WITH_CDO_ID_STR(idstr, &(src_cdo->gid), - DEBUG("(CDO associated has ID: %s)\n", - idstr);); - - assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); - assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); - - mio.semid.len = sizeof(struct mstro_cdo_id); - mio.semid.data = (uint8_t*)semid; - mio.objid.len = sizeof(struct mstro_cdo_id); - objid->qw[0] = src_cdo->gid.qw[0]; - objid->qw[1] = src_cdo->gid.qw[1]; - objid->local_id = src_cdo->gid->local_id; - mio.objid.data = (uint8_t*)objid; - - mio.keep_obj = 0; - - ticket.mio = &mio; -#else - ERR("Request to issue an MIO ticket, but built without MIO support\n"); - return MSTRO_FAIL; -#endif - break; - case MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI: { - NOISE("TICKET CASE RDMA\n"); - appid.id = init->dst_appid->id; - ofi.dstid = &appid; - struct mstro_pm_app_registry_entry *e; - mstro_status status = mstro_pm_app_lookup(appid.id, &e); - assert(e!=NULL); // we befriended dst app earlier, it should be in the registry - ticket.src_serialized_endpoint = e->ep->serialized; - ofi.h = &rh; /* rest to be filled in transport_execute */ - ticket.want_completion = 1; /* so we can refcount-- */ - ticket.ofi = &ofi; - - break; - } - default: - ERR("Unsupported ticket kind %d\n", ticket.ticket_case); - return MSTRO_UNIMPL; - } - - if(src_cdo->attributes_msg==NULL) { - ERR("source CDO has no attributes message data -- should not happen\n"); - return MSTRO_FAIL; - } - ticket.attributes = src_cdo->attributes_msg; - - INFO("Issued ticket to app %" PRIu64 " for CDO %s, and starting execute process\n", init->dst_appid->id, src_cdo->name); - - NOISE("TransferTicket using path %s\n", ticket.gfs->path); - NOISE("TransferTicket cdo size %" PRIi64 "\n", ticket.data_size); - - /* Execute transport (non-blocking) */ - status = mstro_transport_execute(src_cdo, &ticket); - if(MSTRO_OK != status) { - ERR("Failure in transport execute for CDO %s\n", - src_cdo->name); - return MSTRO_FAIL; - } - - /* do some clean-up if applicable */ - if (ticket.ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS) { - if (init->cp == 0) { // That means mv - // TODO Detach raw pointer from CDO handle for Maestro resource - // management while the CDO stays OFFERED by the new location? Or - // is this part of a resource management protocol that originates - // from pool manager and utilizes existing infrastructure for - // movement? - DEBUG("Unimplemented ticket case cp==0\n"); - } - } - - /* Send ticket via pmp_send_nowait() */ - Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT; - - if (! (MSTRO_OK == mstro_pmp_package(&msg, (ProtobufCMessage*)&ticket))) { - ERR("Failed to package %s into a pool manager message\n", - ticket.base.descriptor->name); - return MSTRO_FAIL; - } - - status = mstro_pmp_send_nowait(init->dst_appid->id, &msg); - if(status!=MSTRO_OK) { - ERR("Cannot send ticket to %zu: %d (%s)\n", - init->dst_appid->id, status, mstro_status_description(status)); - return MSTRO_FAIL; - } - - mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PC_NUM_TICKETS_OUT, 1); - - /* cleanup ticket content that is not stack-allocated */ - switch(ticket.ticket_case) { - case MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI: { - free(ticket.ofi->h->mr_key.data); - ticket.ofi->h->mr_key.len = 0; - break; - } - default: - break; - } - - return MSTRO_OK; -} mstro_status mstro_pc__bye(mstro_pool_operation op) @@ -1664,109 +1142,6 @@ mstro_pc__transfer_execute(mstro_pool_operation op) return status; } -static inline -mstro_status -mstro_pc__handle_transfer_ticket(Mstro__Pool__TransferTicket* ticket, - void** closure_p) -{ - if(ticket==NULL || ticket->srccdoid==NULL || ticket->dstcdoid==NULL) { - ERR("Invalid transfer ticket: %p/%p/%p\n", - ticket, - ticket ? ticket->srccdoid : (void*)0xdeadbeef, - ticket ? ticket->dstcdoid : (void*)0xdeadbeef); - return MSTRO_INVMSG; - } - mstro_status status; - - struct init_transfer_closure** restart_closure - = (struct init_transfer_closure**)closure_p; - - - struct mstro_cdo_id cdoid = { .qw[0] = ticket->dstcdoid->qw0, - .qw[1] = ticket->dstcdoid->qw1, - .local_id = ticket->dstcdoid->local_id }; - - WITH_CDO_ID_STR(idstr, &cdoid, - INFO("Incoming ticket for CDO gid %s, ticket kind %s\n", - idstr, - (protobuf_c_enum_descriptor_get_value( - &mstro__pool__transport_kind__descriptor, - ticket->method)) - ->name);); - mstro_cdo dst_cdo; - status = mstro_pool__find_cdo_with_local_id(&cdoid, &dst_cdo); - - if (status != MSTRO_OK) { - /* we did not find the cdo by local id */ - status = mstro_pool__find_sink_cdo(&cdoid, ticket->attributes, - &dst_cdo); - if(MSTRO_OK!=status && MSTRO_NOENT!=status) { - WITH_CDO_ID_STR(idstr, &cdoid, - ERR("Cannot transfer CDO gid %s because not in local pool\n", - idstr);); - return MSTRO_FAIL; - } - } - - if(MSTRO_NOENT==status) { - WITH_CDO_ID_STR(idstr, &cdoid, { - WARN("Incoming ticket for CDO %s, but CDO not present in local pool, skipping ticket\n", - idstr);}); - if(ticket->force_offer) { - WARN("Ticket has force-offer set, but this is unimplemented\n"); - /* FIXME: create a fresh CDO, use that CDO for incoming transport, then perform OFFER */ - } - } else { - WITH_CDO_ID_STR(idstr, &dst_cdo->gid, { - DEBUG("Initiating incoming transfer to app %" PRIappid " (me) from %zu for CDO %s (%s)\n", - g_pool_app_id, ticket->srcid->id, dst_cdo->name, idstr);}); - - /* ensure we know how to talk to the recipient */ - /* FIXME: we don't have the methods list at hand -- fake one */ - Mstro__Pool__TransportKind ofi_method[1] = { MSTRO__POOL__TRANSPORT_KIND__OFI }; - Mstro__Pool__TransportMethods m = MSTRO__POOL__TRANSPORT_METHODS__INIT; - m.n_supported = 1; - m.supported = &ofi_method[0]; - - if (ticket->ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI) { - status = mstro_pc__app_befriend(ticket->srcid->id, - ticket->src_serialized_endpoint, - &m, - restart_closure); - if(status != MSTRO_OK) { - if(status==MSTRO_WOULDBLOCK) { - WARN("Need to re-try this operation because of outstanding async partner lookup\n"); - assert(*restart_closure!=NULL); - } - return status; - } - } - - /** check and update the number of segments on the dst cdo - * if n_segments of the cdo is zero, then it means this is the first ticket - * and we should update the n_segments value from the ticket - * Otherwise, then we are not the first ticket, and we should leave it alone - * with the completion of every transport the value of n_segments is decremented - * until it reaches zero, marking that all required data is filled. - */ - int64_t expected = 0; - atomic_compare_exchange_strong(&dst_cdo->n_segments, &expected, ticket->n_segments); - /* Execute transport (non-blocking) */ - status = mstro_transport_execute(dst_cdo, ticket); - if(MSTRO_OK != status) { - ERR("Failure in transport execute for CDO %s\n", - dst_cdo->name); - return MSTRO_FAIL; - } - } - - /* Each (GFS and MIO TODO) method waits for the transport to complete before sending completion themselves */ - mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PC_NUM_TICKETS_IN, 1); - - return MSTRO_OK; -} - - mstro_status mstro_pc__transfer_completed(mstro_pool_operation op) @@ -1940,19 +1315,10 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) return status; } -/** unpack envelope and handle PC-side message. - * - * Handlers must not block. If they can't handle the message in one - * go, they need to return MSTRO_WOULDBLOCK and a closure through - * *restart_closure. - * - * In this case this function will return MSTRO_WOULDBLOCK. - */ +/** unpack envelope and handle PC-side message*/ mstro_status -mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_closure) -{ - assert(restart_closure!=NULL); - +mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope) +{ mstro_status status=MSTRO_OK; NOISE("PC Handling incoming message size %" PRIu64 "\n", @@ -1991,11 +1357,6 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_cl goto BAILOUT; } - if(*restart_closure!=NULL) { - DEBUG("This is a restart of a previously preempted message handling, closure %p\n", - *restart_closure); - } - /* early check for NACK */ if(msg->msg_case == MSTRO__POOL__MSTRO_MSG__MSG_WELCOME && msg->welcome->nack) { @@ -2097,9 +1458,7 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_cl } BAILOUT: - /* kill message. If we are in MSTRO_WOULDBLOCK, any parts of the - * message that are needed in the restart_closure will have been - * *copied*, so no hold no ref to parts of msg */ + /* kill message. Everything should already be contained in the operation */ if(msg!=NULL) { assert(msg->token!=&g_pool_apptoken); mstro__pool__mstro_msg__free_unpacked(msg, NULL); diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 172205f2bc3288dc1450fd40478978f5c032028e..1715e4d52c53897bd68aecfee8a6674d54436c80 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -1654,21 +1654,13 @@ mstro_pm__msg_resolve(mstro_pool_operation op) } -/** unpack envelope and handle PM-side message. - * - * Handlers must not block. If they can't handle the message in one - * go, they need to return MSTRO_WOULDBLOCK and a closure through - * *restart_closure. - * - * In this case this function will return MSTRO_WOULDBLOCK. - */ +/** unpack envelope and handle PM-side message*/ /* Pool manager "Handle incoming Message" dispatcher function */ mstro_status -mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope, - void **restart_closure) +mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope) { - assert(restart_closure!=NULL); + /* for now we serialize things in here. Instead we could push to a * queue and have a queue runner in a different thread pick things diff --git a/maestro/pool_manager_protocol.c b/maestro/pool_manager_protocol.c index c587f0247139328638d23d6f6078d99df12daf5c..b677251444512a23a15c880364e8dcff3f7905ab 100644 --- a/maestro/pool_manager_protocol.c +++ b/maestro/pool_manager_protocol.c @@ -107,7 +107,6 @@ mstro_msg_envelope_allocate(struct mstro_msg_envelope **res) (*res)->size = MSTRO_MSG_SHORT_SIZE; (*res)->tag = 0; (*res)->descriptor = NULL; - (*res)->restart_closure = NULL; /* if this fails you need to fix the definition of MSTRO_MSG_SHORT_SIZE */ assert(offsetof(struct mstro_msg_envelope,data)==48); NOISE("Allocated envelope at %p\n", *res); @@ -121,11 +120,7 @@ mstro_msg_envelope_free(struct mstro_msg_envelope *e) { if(e==NULL) return MSTRO_INVARG; - - if(e->restart_closure) { - free(e->restart_closure); - } - + mstro_mempool_free(g_msg_env_pool_small, e); NOISE("De-Allocated envelope at %p\n", e); return MSTRO_OK;