diff --git a/include/maestro/i_pool_manager.h b/include/maestro/i_pool_manager.h index 0e553424a2a6cf20d499cef28dd91c5fff3b8a87..48bb08dcbae8da5ffe79b564bb3b8ca1dbde64c3 100644 --- a/include/maestro/i_pool_manager.h +++ b/include/maestro/i_pool_manager.h @@ -77,7 +77,7 @@ mstro_status mstro_pm__check_acks(mstro_pool_operation op); mstro_status -mstro_pm_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep); +mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep); /**Handle pm cdo seal operation and send ack*/ mstro_status diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index a8b42c050992c2ac79b253ce46cbc9cb04d7c061..1b5185b1c172744ff27ff27ede82fe97a810fb73 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -74,6 +74,8 @@ enum mstro_pool_operation_kind { MSTRO_OP_PM_UNSUBSCRIBE, /**< pm unsubscribe operation */ MSTRO_OP_PM_EVENT_ACK, /**< pm event ack operation */ MSTRO_OP_PM_MSG_RESOLVE, /**< pm msg resolve operation */ + /**-------------------PC side operations----------------------------------------*/ + MSTRO_OP_PC_INIT_TRANSFER, /**< pc init transfer */ MSTRO_OP_MAX }; @@ -124,10 +126,19 @@ struct mstro_pool_operation_ { { Mstro__Pool__SubscriptionHandle *subscription_handle; /**for subscribe operation*/ bool send_attribute_update; /**for seal operation */ - struct { - const struct mstro_endpoint *ep; /** for join operation*/ + struct { /** for join operation*/ + const struct mstro_endpoint *ep; fi_addr_t translated_addr; } join; + struct /** for init transfer*/ + { + mstro_cdo src_cdo; + struct mstro_endpoint *dst_ep; + fi_addr_t dst_addr; + mstro_request request; + } init; + + }; }; diff --git a/maestro/i_pool_client.h b/maestro/i_pool_client.h index c4594a024dc3ded0defea251835aa204007ec954..00182415c0a73fd7a8e8347ce49e7405493a9383 100644 --- a/maestro/i_pool_client.h +++ b/maestro/i_pool_client.h @@ -65,6 +65,59 @@ mstro_status mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void ** restart_closure); + +/** + * @brief Check ticket and source cdo + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_prepare_init_transfer(mstro_pool_operation op); + + +/** + * @brief Checks dst app registry entry. + * If dst app is in the registry returns MSTRO_OK, and jumps directly to @ref mstro_pc_init_transfer_reg_app for writing the ticket + * If dst app info is currently being read in another thread, returns MSTRO_WOULDBLOCK + * If dst app is not in registry, issues a read (mstro_ofi__select_endpoint) and returns MSTRO_OK + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_app_befriend_op(mstro_pool_operation op); + + +/** + * @brief Waits for app info to be read and registers the app when the read is completed + * returns MSTRO_WOULDBLOCK is the read is not complete yet + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_init_transfer_reg_app(mstro_pool_operation op); + +/** + * @brief Writes and sends the transfer ticket + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_init_transfer_send_ticket(mstro_pool_operation op); + + +/** + * @brief Create mstro_pool_operations from incoming msgs + * pushes the created operations to the *queue* + * + * @param msg message handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg); + /**@} (end of group MSTRO_I_POOL_CLIENT) */ /**@} (end of group MSTRO_Internal) */ diff --git a/maestro/pool_client.c b/maestro/pool_client.c index cc8b124e09a5de35cd26f6747c536a8ea07080d0..df8d3a590fa1f3786a0975416040e193f3a6e590 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -11,6 +11,7 @@ #include "maestro/i_statistics.h" #include "i_subscription_registry.h" #include "maestro/i_misc.h" +#include "i_thread_team.h" #include <unistd.h> #include <inttypes.h> @@ -20,14 +21,26 @@ #include "transport/transport_mio.h" #endif - + /* simplify logging */ #define NOISE(...) LOG_NOISE(MSTRO_LOG_MODULE_PC,__VA_ARGS__) #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_PC,__VA_ARGS__) #define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_PC,__VA_ARGS__) #define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_PC,__VA_ARGS__) #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_PC,__VA_ARGS__) - + + +/**function stack to handle pm declare cdo msg*/ +const mstro_pool_op_st_handler mstro_pc_init_transfer_steps[] = { + NULL, + mstro_pc_prepare_init_transfer, + mstro_pc_app_befriend_op, + mstro_pc_init_transfer_reg_app, + mstro_pc_init_transfer_send_ticket, + NULL, + NULL, + NULL + }; static inline mstro_status @@ -683,6 +696,387 @@ BAILOUT: return status; } +mstro_status +mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + const Mstro__Pool__InitiateTransfer* init = op->msg->initiate_transfer; + + /* Prepare the ticket */ + Mstro__Pool__CDOID srcc = MSTRO__POOL__CDOID__INIT; + srcc.qw0 = init->srccdoid->qw0; + srcc.qw1 = init->srccdoid->qw1; + srcc.local_id = init->srccdoid->local_id; + Mstro__Pool__CDOID dstc = MSTRO__POOL__CDOID__INIT; + dstc.qw0 = init->dstcdoid->qw0; + dstc.qw1 = init->dstcdoid->qw1; + dstc.local_id = init->dstcdoid->local_id; + + Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; + ticket.srccdoid = &srcc; + ticket.dstcdoid = &dstc; + Mstro__Pool__Appid myid = MSTRO__POOL__APPID__INIT; + myid.id = g_pool_app_id; + ticket.srcid = &myid; + + + int64_t realsize, src_offset, dst_offset; + + /**Calculate the length of the data, src and dst offsets and number of segments */ + status = mstro_pc__calculate_data_size_and_offsets( + op->init.src_cdo, init, &realsize, &src_offset, &dst_offset); + if (status != MSTRO_OK) { + return status; + } + + /*fill ticket with the gathered information */ + ticket.src_offset = src_offset; + ticket.dst_offset = dst_offset; + ticket.n_segments = init->n_segments; + ticket.distributed_cdo = init->distributed_cdo; + + if (init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO + && (!g_mio_available || (realsize % getpagesize()) != 0 ) + ){ + WARN("Not issuing a ticket with MIO. Either not available or CDO size (%zu)" + " is not a multiple of the page size (%d)." + " Will use maestro-core default transport\n", + realsize, getpagesize()); + init->methods->supported[0] = MSTRO__POOL__TRANSPORT_KIND__OFI; + } + + status = mstro_pc__select_transfer_method(op->init.src_cdo, init->methods, + &ticket); + if(status!=MSTRO_OK) { + ERR("Failed to select transport method\n"); + return MSTRO_FAIL; + } + + Mstro__Pool__TransferTicketGFS gfs = MSTRO__POOL__TRANSFER_TICKET_GFS__INIT; + Mstro__Pool__TransferTicketMIO mio = MSTRO__POOL__TRANSFER_TICKET_MIO__INIT; + Mstro__Pool__TransferTicketOFI ofi = MSTRO__POOL__TRANSFER_TICKET_OFI__INIT; + Mstro__Pool__RDMAHandle rh = MSTRO__POOL__RDMAHANDLE__INIT; + Mstro__Pool__Appid appid = MSTRO__POOL__APPID__INIT; + + ticket.data_size = realsize; + ticket.gfs = &gfs; + + /* even conditionals are problematic, had to take out protobuf INITs ----^ */ + switch(ticket.ticket_case) { + case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: + NOISE("TICKET CASE GFS\n"); + status= mstro_pc__construct_gfs_path_for_cdo(op->init.src_cdo, &gfs.path); + if(status!=MSTRO_OK) { + ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n", + status, mstro_status_description(status)); + return status; + } + gfs.keep_file = 1; // don't arbitrarily rm the transport file on dst + break; + case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: + NOISE("TICKET CASE MIO\n"); +#ifdef HAVE_MIO + struct mstro_cdo_id* semid; + semid = malloc(sizeof(struct mio_obj_id)); + if (semid == NULL) { + ERR("No more memory.\n"); + return MSTRO_NOMEM; + } + struct mstro_cdo_id* objid; + objid = malloc(sizeof(struct mio_obj_id)); + if (objid == NULL) { + ERR("No more memory.\n"); + return MSTRO_NOMEM; + } + char* semname = NULL; +#define MSTRO_MIO_SEM_STR_MAXLEN 128 +#warning semname should not me constructed here with MIO-specific length + mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); + if (semname == NULL) { + ERR("Couldn't prepare an id for semaphore obj\n"); + return MSTRO_FAIL; + } + status = mstro_cdo_id_from_name(semname, semid); /* So we do collision + detection in only + one place */ + if (status != MSTRO_OK) { + ERR("Couldn't make an id from name for semaphore obj\n"); + return MSTRO_FAIL; + } + + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, + DEBUG("Semaphore has ID: %s\n", + idstr);); + WITH_CDO_ID_STR(idstr, &(op->init.src_cdo->gid), + DEBUG("(CDO associated has ID: %s)\n", + idstr);); + + assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); + assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); + + mio.semid.len = sizeof(struct mstro_cdo_id); + mio.semid.data = (uint8_t*)semid; + mio.objid.len = sizeof(struct mstro_cdo_id); + objid->qw[0] = op->init.src_cdo->gid.qw[0]; + objid->qw[1] = op->init.src_cdo->gid.qw[1]; + objid->local_id = op->init.src_cdo->gid->local_id; + mio.objid.data = (uint8_t*)objid; + + mio.keep_obj = 0; + + ticket.mio = &mio; +#else + ERR("Request to issue an MIO ticket, but built without MIO support\n"); + return MSTRO_FAIL; +#endif + break; + case MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI: { + NOISE("TICKET CASE RDMA\n"); + appid.id = init->dst_appid->id; + ofi.dstid = &appid; + struct mstro_pm_app_registry_entry *e; + mstro_status status = mstro_pm_app_lookup(appid.id, &e); + assert(e!=NULL); // we befriended dst app earlier, it should be in the registry + ticket.src_serialized_endpoint = e->ep->serialized; + ofi.h = &rh; /* rest to be filled in transport_execute */ + ticket.want_completion = 1; /* so we can refcount-- */ + ticket.ofi = &ofi; + break; + } + default: + ERR("Unsupported ticket kind %d\n", ticket.ticket_case); + return MSTRO_UNIMPL; + } + + if(op->init.src_cdo->attributes_msg==NULL) { + ERR("source CDO has no attributes message data -- should not happen\n"); + return MSTRO_FAIL; + } + ticket.attributes = op->init.src_cdo->attributes_msg; + + INFO("Issued ticket to app %" PRIu64 " for CDO %s, and starting execute process\n", init->dst_appid->id, op->init.src_cdo->name); + + NOISE("TransferTicket using path %s\n", ticket.gfs->path); + NOISE("TransferTicket cdo size %" PRIi64 "\n", ticket.data_size); + + /* Execute transport (non-blocking) */ + status = mstro_transport_execute(op->init.src_cdo, &ticket); + if(MSTRO_OK != status) { + ERR("Failure in transport execute for CDO %s\n", + op->init.src_cdo->name); + return MSTRO_FAIL; + } + + /* do some clean-up if applicable */ + if (ticket.ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS) { + if (init->cp == 0) { // That means mv + // TODO Detach raw pointer from CDO handle for Maestro resource + // management while the CDO stays OFFERED by the new location? Or + // is this part of a resource management protocol that originates + // from pool manager and utilizes existing infrastructure for + // movement? + DEBUG("Unimplemented ticket case cp==0\n"); + } + } + + /* Send ticket via pmp_send_nowait() */ + Mstro__Pool__MstroMsg msg = MSTRO__POOL__MSTRO_MSG__INIT; + + if (! (MSTRO_OK == mstro_pmp_package(&msg, (ProtobufCMessage*)&ticket))) { + ERR("Failed to package %s into a pool manager message\n", + ticket.base.descriptor->name); + return MSTRO_FAIL; + } + + status = mstro_pmp_send_nowait(init->dst_appid->id, &msg); + if(status!=MSTRO_OK) { + ERR("Cannot send ticket to %zu: %d (%s)\n", + init->dst_appid->id, status, mstro_status_description(status)); + return MSTRO_FAIL; + } + + mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PC_NUM_TICKETS_OUT, 1); + + /* cleanup ticket content that is not stack-allocated */ + switch(ticket.ticket_case) { + case MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI: { + free(ticket.ofi->h->mr_key.data); + ticket.ofi->h->mr_key.len = 0; + break; + } + default: + break; + } + + return MSTRO_OK; +} + +mstro_status +mstro_pc_init_transfer_reg_app(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + + if(!mstro_request_test(op->init.request)) { + DEBUG("Async request still not done, try again later\n"); + status = MSTRO_WOULDBLOCK; + return status; + } else { + /* execute wait to clean up handle */ + status = mstro_request_wait(op->init.request); + if(status!=MSTRO_OK) { + ERR("Async endpoint selection failed, could not find suitable endpoint to talk to app %" PRIappid ": %d\n", + op->msg->initiate_transfer->dst_appid->id, status); + return status; + } + } + + assert(op->init.dst_ep!=NULL); + /* prepare to duplicate methods structure .. the simple way (pack/unpack) */ + size_t len = mstro__pool__transport_methods__get_packed_size(op->msg->initiate_transfer->methods); + uint8_t buf[len]; + mstro__pool__transport_methods__pack(op->msg->initiate_transfer->methods,buf); + + Mstro__Pool__TransportMethods *methods = mstro__pool__transport_methods__unpack(NULL, len, buf); + + /* pass into registry */ + status = mstro_pc_app_register(op->init.dst_ep, + op->init.dst_addr, + strdup(op->msg->initiate_transfer->dst_serialized_endpoint), + methods, + op->msg->initiate_transfer->dst_appid->id, + NULL); + if(status!=MSTRO_OK) { + ERR("Failed to register peer app %zu: %d\n", op->msg->initiate_transfer->dst_appid->id); + } else { + DEBUG("Registered peer app %zu\n", op->msg->initiate_transfer->dst_appid->id); + } + + return status; +} + +mstro_status +mstro_pc_app_befriend_op(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + struct mstro_pm_app_registry_entry *e; + mstro_app_id appid = op->msg->initiate_transfer->dst_appid->id; + status = mstro_pm_app_lookup(appid, &e); + if(e) { + /**found an entry ... good */ + if(e->pending) { + DEBUG("app entry is already being read ... wait until it completes\n"); + status = MSTRO_WOULDBLOCK; /* we need to wait until the read is complete */ + return status; + } else + { + DEBUG("Found app %zu in local registry, good\n", appid); + op->step++; /**jump app reg step to go directly to writing and sending the ticket */ + return MSTRO_OK; /**we can continue execution */ + } + } + else { + /**Did not find an entry, will submit read*/ + DEBUG("Unknown app %zu, let's make friends\n", appid); + /* mark that this app entry is being read now for others */ + status = mstro_pc_app_register_pending(appid); + /**Submit read */ + + /* we need to be careful to not pass in any stack-local refs */ + Mstro__AppInfo *dst_epd=NULL; + status=mstro_appinfo_deserialize(op->msg->initiate_transfer->dst_serialized_endpoint, &dst_epd); + + status=mstro_ofi__select_endpoint(dst_epd, + &(op->init.dst_ep), + &(op->init.dst_addr), + &(op->init.request)); + + if(status!=MSTRO_OK) { + ERR("Failed to initiate partner lookup: %d\n", status); + } else { + /* return MSTRO_OK to advance the op step and wait for read to complete in the next step */ + status = MSTRO_OK; + } + } + + return status; +} +mstro_status +mstro_pc_prepare_init_transfer(mstro_pool_operation op) +{ + const Mstro__Pool__InitiateTransfer* init = op->msg->initiate_transfer; + assert( init!= NULL); + + if (init->dst_serialized_endpoint== NULL) { + ERR("Cannot initiate transfer without dst app endpoint\n"); + return MSTRO_INVMSG; + } + + NOISE("Initiating a transfer to endpoint %s\n", + init->dst_serialized_endpoint); + + struct mstro_cdo_id srccdoid = { .qw[0] = init->srccdoid->qw0, + .qw[1] = init->srccdoid->qw1, + .local_id = init->srccdoid->local_id }; + struct mstro_cdo_id dstcdoid = { .qw[0] = init->dstcdoid->qw0, + .qw[1] = init->dstcdoid->qw1, + .local_id = init->dstcdoid->local_id }; + WITH_CDO_ID_STR(here_idstr, &srccdoid, { + WITH_CDO_ID_STR(there_idstr, &dstcdoid, { + NOISE("ticket for CDO gid %s (here), gid %s (there)\n", + here_idstr, there_idstr);});}); + if (srccdoid.local_id != MSTRO_CDO_LOCAL_ID_NONE) { + /* We trust that the PM gave us the correct local-id */ + if (!(MSTRO_OK == mstro_pool__find_cdo_with_local_id(&srccdoid, &(op->init.src_cdo)))) { + WITH_CDO_ID_STR(idstr, &srccdoid, + ERR("Cannot transfer CDO gid %s because not in local pool\n", + idstr);); + return MSTRO_FAIL; + } + } + else { /*we do not have a valid local-id ... looking for an appropriate cdo */ + if (!(MSTRO_OK == mstro_pool__find_source_cdo(&srccdoid, init->dst_attributes, &(op->init.src_cdo)))) { + WITH_CDO_ID_STR(idstr, &srccdoid, + ERR("Cannot transfer CDO gid %s because not in local pool\n", + idstr);); + return MSTRO_FAIL; + } + } + + /* we may see the ticket before OFFER ack. In that case + * we do an implicit ack by setting state to OFFERED + * (PM would not send ticket if it has not handled the OFFER) */ + if(mstro_cdo_state_get(op->init.src_cdo)==MSTRO_CDO_STATE_OFFERED_LOCALLY) { + WITH_CDO_ID_STR(idstr, &srccdoid, { + DEBUG("Doing implicit OFFER-ACK for CDO %s at transport ticket creation (src side) time\n", + idstr); + }); + mstro_cdo_state state_flags = (mstro_cdo_state_get(op->init.src_cdo) + & MSTRO_CDO_STATE_FLAGS); + mstro_cdo_state_set(op->init.src_cdo, + MSTRO_CDO_STATE_OFFERED | state_flags); + } + + if(init->dst_attributes==NULL) { + WARN("No attributes on CDO\n"); + } else { + if(init->dst_attributes->val_case==MSTRO__POOL__ATTRIBUTES__VAL_KV_MAP) { + DEBUG("%zu attributes in kv-map\n", init->dst_attributes->kv_map->n_map); + } else { + WARN("non-kv attributes\n"); + } + } + + DEBUG("Initiating transfer from src app %" PRIappid " (me) to dst app %" PRIappid " of CDO %s\n", + g_pool_app_id, init->dst_appid->id, op->init.src_cdo->name); + + if(g_pool_app_id==init->dst_appid->id) { + WARN("FIXME: We will be talking to ourselves via transport, should use a shortcut\n"); + } + + + return MSTRO_OK; +} + static inline mstro_status mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init, @@ -1242,7 +1636,58 @@ mstro_pc__handle_event(const Mstro__Pool__Event *ev) return mstro_pool_event_consume(ev); } - +/**Create mstro_pool_operations from incoming msgs + * pushes the created operations to the *queue* +*/ +mstro_status +mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) +{ + mstro_status status=MSTRO_OK; + mstro_pool_operation op; + + /*Create an operation*/ + status = mstro_pool_op__allocate(&op); + if (status != MSTRO_OK) + { + return status; + } + + /**handle msg */ + switch((*msg)->msg_case) { + /* 'good' messages: */ + case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: + /*fill declare specific parts */ + op->kind = MSTRO_OP_PC_INIT_TRANSFER; + op->handler_steps = mstro_pc_init_transfer_steps; + op->init.dst_addr = 0; + op->init.dst_ep = NULL; + op->init.request = NULL; + break; + + default: + WARN("%s message received, dropping it, not even sending ACK\n", + (*msg)->base.descriptor->name); + /*cleanup*/ + mstro_pool_op__free(op); + mstro_pm__msg_free(*msg); + status = MSTRO_UNIMPL; + break; + } + + if (status == MSTRO_OK) + { + /* continue to fill general operation fields from msg and push to queue */ + op->step = MSTRO_OP_ST_ANNOUNCE; // first step is to announce + op->msg = *msg; + op->appid = (*msg)->token->appid->id; + *msg = NULL; /**consume the message here because caller (mstro_pc_handle_msg) will free it*/ + DEBUG("Filled operation structure from msg\n"); + status = erl_thread_team_enqueue(g_pool_operations_team, op); + assert(status == MSTRO_OK); + } + + return status; +} /** unpack envelope and handle PC-side message. * @@ -1366,8 +1811,9 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_cl break; case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: - status = mstro_pc__handle_initiate_transfer(msg->initiate_transfer, - restart_closure); + status = mstro_pc__op_maker(&msg); + //status = mstro_pc__handle_initiate_transfer(msg->initiate_transfer, + // restart_closure); //DEBUG("PC INIT-TRANSFER result: %d\n", status); break; diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 261f52ddd2fcf3048640730c85e497ce24ac79c0..76012a44e35c0dbbf830037841dfd07ebae83b11 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -455,7 +455,7 @@ mstro_pm__event_notify(mstro_pool_operation op, bool before) * pushes the created operations to the *queue* */ mstro_status -mstro_pm_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) +mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) { mstro_status status=MSTRO_OK; mstro_pool_operation op; @@ -1763,12 +1763,12 @@ mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope, //DEBUG("PM MSG-EVENT-ACK result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE: //DEBUG("PM MSG-RESOLVE result: %d\n", status); - status = mstro_pm_op_maker(msg, NULL); + status = mstro_pm__op_maker(msg, NULL); break; case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: //DEBUG("PM MSG-JOIN result: %d\n", status); - status = mstro_pm_op_maker(msg, ep); + status = mstro_pm__op_maker(msg, ep); break; /* currently unimplemented: */ diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index 75150096c65c897583fd8723c11b036c27510f0e..4970e7094f6251871d6b5d00e4574a37132a74d3 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -149,6 +149,9 @@ mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type) case MSTRO_OP_PM_MSG_RESOLVE: type = "MSG_RESOLVE"; break; + case MSTRO_OP_PC_INIT_TRANSFER: + type = "INIT TRANSFER"; + break; default: ERR("Unknown operation type\n"); type = "UNKNOWN"; @@ -169,7 +172,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st switch (op->kind) { - /**CDO operations */ + /**PM CDO operations */ case MSTRO_OP_PM_DECLARE: cdo_name = op->msg->declare->cdo_name; case MSTRO_OP_PM_SEAL: @@ -199,7 +202,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st op->appid); } break; - /** normal msg processing */ + /** PM other msg processing */ case MSTRO_OP_PM_LEAVE: case MSTRO_OP_PM_JOIN: case MSTRO_OP_PM_SUBSCRIBE: @@ -222,6 +225,22 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st op->appid); } break; + /*----------------------------------PC Operations---------------------------------------------*/ + case MSTRO_OP_PC_INIT_TRANSFER: + if (state == MSTRO_OP_STATE_FAILED) + { + ERR("Failed to execute step %d of operation %s\n", + op->step, + mstro_pool_op_kind_to_string(op->kind)); + } + else + { + DEBUG("%s%s operation %s\n", + (state == MSTRO_OP_STATE_BLOCKED)?"Blocked":"Completed", + (state == MSTRO_OP_STATE_COMPLETED)?"":op_step_str, + mstro_pool_op_kind_to_string(op->kind)); + } + break; default: ERR("Unknown operation \n"); break;