diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index 1b5185b1c172744ff27ff27ede82fe97a810fb73..f7b5ce32b6ac3ad7775c4aeb660baa9d8b3b68aa 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -75,8 +75,9 @@ enum mstro_pool_operation_kind { MSTRO_OP_PM_EVENT_ACK, /**< pm event ack operation */ MSTRO_OP_PM_MSG_RESOLVE, /**< pm msg resolve operation */ /**-------------------PC side operations----------------------------------------*/ - MSTRO_OP_PC_INIT_TRANSFER, /**< pc init transfer */ - MSTRO_OP_MAX + MSTRO_OP_PC_INIT_TRANSFER, /**< pc init transfer */ + MSTRO_OP_PC_TRANSFER, /**< pc transfer ticket */ + MSTRO_OP_MAX }; /** maestro pool operation steps @@ -132,11 +133,14 @@ struct mstro_pool_operation_ { } join; struct /** for init transfer*/ { - mstro_cdo src_cdo; - struct mstro_endpoint *dst_ep; - fi_addr_t dst_addr; + mstro_cdo target_cdo; + mstro_app_id target_appid; + Mstro__Pool__TransportMethods *methods; + char *target_serialized_endpoint; + struct mstro_endpoint *target_ep; + fi_addr_t target_addr; mstro_request request; - } init; + } pc_transport; }; diff --git a/maestro/i_pool_client.h b/maestro/i_pool_client.h index 00182415c0a73fd7a8e8347ce49e7405493a9383..7384abdacb62ad4face0d66b923e253d917fcfcd 100644 --- a/maestro/i_pool_client.h +++ b/maestro/i_pool_client.h @@ -75,6 +75,24 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, mstro_status mstro_pc_prepare_init_transfer(mstro_pool_operation op); +/** + * @brief Check ticket and source cdo for transfer ticket + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_prepare_transfer(mstro_pool_operation op); + +/** + * @brief Execute cdo transport + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc_transfer_execute(mstro_pool_operation op); + /** * @brief Checks dst app registry entry. @@ -109,7 +127,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op); /** - * @brief Create mstro_pool_operations from incoming msgs + * @brief Create mstro_pool_operations from pc incoming msgs * pushes the created operations to the *queue* * * @param msg message handle diff --git a/maestro/pool_client.c b/maestro/pool_client.c index df8d3a590fa1f3786a0975416040e193f3a6e590..a9165401706746cea0a5305bff8032b50254c9ad 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -30,7 +30,7 @@ #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_PC,__VA_ARGS__) -/**function stack to handle pm declare cdo msg*/ +/**function stack to handle pc init transfer msg*/ const mstro_pool_op_st_handler mstro_pc_init_transfer_steps[] = { NULL, mstro_pc_prepare_init_transfer, @@ -42,6 +42,19 @@ const mstro_pool_op_st_handler mstro_pc_init_transfer_steps[] = { NULL }; + +/**function stack to handle pc transfer ticket */ +const mstro_pool_op_st_handler mstro_pc_transfer_steps[] = { + NULL, + mstro_pc_prepare_transfer, + mstro_pc_app_befriend_op, /**optional*/ + mstro_pc_init_transfer_reg_app, /*optional*/ + mstro_pc_transfer_execute, + NULL, + NULL, + NULL + }; + static inline mstro_status mstro_pc__handle_declare_ack(const Mstro__Pool__DeclareAck *declare_ack) @@ -724,7 +737,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) /**Calculate the length of the data, src and dst offsets and number of segments */ status = mstro_pc__calculate_data_size_and_offsets( - op->init.src_cdo, init, &realsize, &src_offset, &dst_offset); + op->pc_transport.target_cdo, init, &realsize, &src_offset, &dst_offset); if (status != MSTRO_OK) { return status; } @@ -745,7 +758,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) init->methods->supported[0] = MSTRO__POOL__TRANSPORT_KIND__OFI; } - status = mstro_pc__select_transfer_method(op->init.src_cdo, init->methods, + status = mstro_pc__select_transfer_method(op->pc_transport.target_cdo, init->methods, &ticket); if(status!=MSTRO_OK) { ERR("Failed to select transport method\n"); @@ -765,7 +778,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) switch(ticket.ticket_case) { case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: NOISE("TICKET CASE GFS\n"); - status= mstro_pc__construct_gfs_path_for_cdo(op->init.src_cdo, &gfs.path); + status= mstro_pc__construct_gfs_path_for_cdo(op->pc_transport.target_cdo, &gfs.path); if(status!=MSTRO_OK) { ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n", status, mstro_status_description(status)); @@ -807,7 +820,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, DEBUG("Semaphore has ID: %s\n", idstr);); - WITH_CDO_ID_STR(idstr, &(op->init.src_cdo->gid), + WITH_CDO_ID_STR(idstr, &(op->init.target_cdo->gid), DEBUG("(CDO associated has ID: %s)\n", idstr);); @@ -817,9 +830,9 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) mio.semid.len = sizeof(struct mstro_cdo_id); mio.semid.data = (uint8_t*)semid; mio.objid.len = sizeof(struct mstro_cdo_id); - objid->qw[0] = op->init.src_cdo->gid.qw[0]; - objid->qw[1] = op->init.src_cdo->gid.qw[1]; - objid->local_id = op->init.src_cdo->gid->local_id; + objid->qw[0] = op->init.target_cdo->gid.qw[0]; + objid->qw[1] = op->init.target_cdo->gid.qw[1]; + objid->local_id = op->init.target_cdo->gid->local_id; mio.objid.data = (uint8_t*)objid; mio.keep_obj = 0; @@ -848,22 +861,22 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) return MSTRO_UNIMPL; } - if(op->init.src_cdo->attributes_msg==NULL) { + if(op->pc_transport.target_cdo->attributes_msg==NULL) { ERR("source CDO has no attributes message data -- should not happen\n"); return MSTRO_FAIL; } - ticket.attributes = op->init.src_cdo->attributes_msg; + ticket.attributes = op->pc_transport.target_cdo->attributes_msg; - INFO("Issued ticket to app %" PRIu64 " for CDO %s, and starting execute process\n", init->dst_appid->id, op->init.src_cdo->name); + INFO("Issued ticket to app %" PRIu64 " for CDO %s, and starting execute process\n", init->dst_appid->id, op->pc_transport.target_cdo->name); NOISE("TransferTicket using path %s\n", ticket.gfs->path); NOISE("TransferTicket cdo size %" PRIi64 "\n", ticket.data_size); /* Execute transport (non-blocking) */ - status = mstro_transport_execute(op->init.src_cdo, &ticket); + status = mstro_transport_execute(op->pc_transport.target_cdo, &ticket); if(MSTRO_OK != status) { ERR("Failure in transport execute for CDO %s\n", - op->init.src_cdo->name); + op->pc_transport.target_cdo->name); return MSTRO_FAIL; } @@ -916,13 +929,13 @@ mstro_pc_init_transfer_reg_app(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; - if(!mstro_request_test(op->init.request)) { + if(!mstro_request_test(op->pc_transport.request)) { DEBUG("Async request still not done, try again later\n"); status = MSTRO_WOULDBLOCK; return status; } else { /* execute wait to clean up handle */ - status = mstro_request_wait(op->init.request); + status = mstro_request_wait(op->pc_transport.request); if(status!=MSTRO_OK) { ERR("Async endpoint selection failed, could not find suitable endpoint to talk to app %" PRIappid ": %d\n", op->msg->initiate_transfer->dst_appid->id, status); @@ -930,25 +943,43 @@ mstro_pc_init_transfer_reg_app(mstro_pool_operation op) } } - assert(op->init.dst_ep!=NULL); + assert(op->pc_transport.target_ep!=NULL); + size_t len; + uint8_t *buf; /* prepare to duplicate methods structure .. the simple way (pack/unpack) */ - size_t len = mstro__pool__transport_methods__get_packed_size(op->msg->initiate_transfer->methods); - uint8_t buf[len]; - mstro__pool__transport_methods__pack(op->msg->initiate_transfer->methods,buf); + /* ensure we know how to talk to the recipient */ + /* FIXME: for transfer tickets we don't have the methods list at hand -- fake one */ + if (op->pc_transport.methods == NULL) + { + Mstro__Pool__TransportKind ofi_method[1] = { MSTRO__POOL__TRANSPORT_KIND__OFI }; + Mstro__Pool__TransportMethods m = MSTRO__POOL__TRANSPORT_METHODS__INIT; + m.n_supported = 1; + m.supported = &ofi_method[0]; + len = mstro__pool__transport_methods__get_packed_size(&m); + buf = malloc(sizeof(uint8_t)*len); + mstro__pool__transport_methods__pack(&m,buf); + } + else + { + len = mstro__pool__transport_methods__get_packed_size(op->pc_transport.methods); + buf = malloc(sizeof(uint8_t)*len); + mstro__pool__transport_methods__pack(op->pc_transport.methods,buf); + } Mstro__Pool__TransportMethods *methods = mstro__pool__transport_methods__unpack(NULL, len, buf); + free(buf); /* pass into registry */ - status = mstro_pc_app_register(op->init.dst_ep, - op->init.dst_addr, - strdup(op->msg->initiate_transfer->dst_serialized_endpoint), + status = mstro_pc_app_register(op->pc_transport.target_ep, + op->pc_transport.target_addr, + strdup(op->pc_transport.target_serialized_endpoint), methods, op->msg->initiate_transfer->dst_appid->id, NULL); if(status!=MSTRO_OK) { ERR("Failed to register peer app %zu: %d\n", op->msg->initiate_transfer->dst_appid->id); } else { - DEBUG("Registered peer app %zu\n", op->msg->initiate_transfer->dst_appid->id); + DEBUG("Registered peer app %zu\n", op->pc_transport.target_appid); } return status; @@ -959,7 +990,7 @@ mstro_pc_app_befriend_op(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; struct mstro_pm_app_registry_entry *e; - mstro_app_id appid = op->msg->initiate_transfer->dst_appid->id; + mstro_app_id appid = op->pc_transport.target_appid; status = mstro_pm_app_lookup(appid, &e); if(e) { /**found an entry ... good */ @@ -983,12 +1014,12 @@ mstro_pc_app_befriend_op(mstro_pool_operation op) /* we need to be careful to not pass in any stack-local refs */ Mstro__AppInfo *dst_epd=NULL; - status=mstro_appinfo_deserialize(op->msg->initiate_transfer->dst_serialized_endpoint, &dst_epd); + status=mstro_appinfo_deserialize(op->pc_transport.target_serialized_endpoint, &dst_epd); status=mstro_ofi__select_endpoint(dst_epd, - &(op->init.dst_ep), - &(op->init.dst_addr), - &(op->init.request)); + &(op->pc_transport.target_ep), + &(op->pc_transport.target_addr), + &(op->pc_transport.request)); if(status!=MSTRO_OK) { ERR("Failed to initiate partner lookup: %d\n", status); @@ -1026,7 +1057,7 @@ mstro_pc_prepare_init_transfer(mstro_pool_operation op) here_idstr, there_idstr);});}); if (srccdoid.local_id != MSTRO_CDO_LOCAL_ID_NONE) { /* We trust that the PM gave us the correct local-id */ - if (!(MSTRO_OK == mstro_pool__find_cdo_with_local_id(&srccdoid, &(op->init.src_cdo)))) { + if (!(MSTRO_OK == mstro_pool__find_cdo_with_local_id(&srccdoid, &(op->pc_transport.target_cdo)))) { WITH_CDO_ID_STR(idstr, &srccdoid, ERR("Cannot transfer CDO gid %s because not in local pool\n", idstr);); @@ -1034,7 +1065,7 @@ mstro_pc_prepare_init_transfer(mstro_pool_operation op) } } else { /*we do not have a valid local-id ... looking for an appropriate cdo */ - if (!(MSTRO_OK == mstro_pool__find_source_cdo(&srccdoid, init->dst_attributes, &(op->init.src_cdo)))) { + if (!(MSTRO_OK == mstro_pool__find_source_cdo(&srccdoid, init->dst_attributes, &(op->pc_transport.target_cdo)))) { WITH_CDO_ID_STR(idstr, &srccdoid, ERR("Cannot transfer CDO gid %s because not in local pool\n", idstr);); @@ -1045,14 +1076,14 @@ mstro_pc_prepare_init_transfer(mstro_pool_operation op) /* we may see the ticket before OFFER ack. In that case * we do an implicit ack by setting state to OFFERED * (PM would not send ticket if it has not handled the OFFER) */ - if(mstro_cdo_state_get(op->init.src_cdo)==MSTRO_CDO_STATE_OFFERED_LOCALLY) { + if(mstro_cdo_state_get(op->pc_transport.target_cdo)==MSTRO_CDO_STATE_OFFERED_LOCALLY) { WITH_CDO_ID_STR(idstr, &srccdoid, { DEBUG("Doing implicit OFFER-ACK for CDO %s at transport ticket creation (src side) time\n", idstr); }); - mstro_cdo_state state_flags = (mstro_cdo_state_get(op->init.src_cdo) + mstro_cdo_state state_flags = (mstro_cdo_state_get(op->pc_transport.target_cdo) & MSTRO_CDO_STATE_FLAGS); - mstro_cdo_state_set(op->init.src_cdo, + mstro_cdo_state_set(op->pc_transport.target_cdo, MSTRO_CDO_STATE_OFFERED | state_flags); } @@ -1067,7 +1098,7 @@ mstro_pc_prepare_init_transfer(mstro_pool_operation op) } DEBUG("Initiating transfer from src app %" PRIappid " (me) to dst app %" PRIappid " of CDO %s\n", - g_pool_app_id, init->dst_appid->id, op->init.src_cdo->name); + g_pool_app_id, init->dst_appid->id, op->pc_transport.target_cdo->name); if(g_pool_app_id==init->dst_appid->id) { WARN("FIXME: We will be talking to ourselves via transport, should use a shortcut\n"); @@ -1456,6 +1487,92 @@ mstro_pc__transport_send_completion(mstro_app_id srcappid, return MSTRO_OK; } +mstro_status +mstro_pc_prepare_transfer(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + Mstro__Pool__TransferTicket* ticket = op->msg->transfer_ticket; + + if(ticket==NULL || ticket->srccdoid==NULL || ticket->dstcdoid==NULL) { + ERR("Invalid transfer ticket: %p/%p/%p\n", + ticket, + ticket ? ticket->srccdoid : (void*)0xdeadbeef, + ticket ? ticket->dstcdoid : (void*)0xdeadbeef); + return MSTRO_INVMSG; + } + + + struct mstro_cdo_id cdoid = { .qw[0] = ticket->dstcdoid->qw0, + .qw[1] = ticket->dstcdoid->qw1, + .local_id = ticket->dstcdoid->local_id }; + + WITH_CDO_ID_STR(idstr, &cdoid, + INFO("Incoming ticket for CDO gid %s, ticket kind %s\n", + idstr, + (protobuf_c_enum_descriptor_get_value( + &mstro__pool__transport_kind__descriptor, + ticket->method)) + ->name);); + + status = mstro_pool__find_cdo_with_local_id(&cdoid, &(op->pc_transport.target_cdo)); + + if (status != MSTRO_OK) { + /* we did not find the cdo by local id */ + status = mstro_pool__find_sink_cdo(&cdoid, ticket->attributes, + &(op->pc_transport.target_cdo)); + if(MSTRO_OK!=status && MSTRO_NOENT!=status) { + WITH_CDO_ID_STR(idstr, &cdoid, + ERR("Cannot transfer CDO gid %s because not in local pool\n", + idstr);); + return MSTRO_FAIL; + } + } + + if(MSTRO_NOENT==status) { + WITH_CDO_ID_STR(idstr, &cdoid, { + WARN("Incoming ticket for CDO %s, but CDO not present in local pool, skipping ticket\n", + idstr);}); + if(ticket->force_offer) { + WARN("Ticket has force-offer set, but this is unimplemented\n"); + /* FIXME: create a fresh CDO, use that CDO for incoming transport, then perform OFFER */ + } + } else { + WITH_CDO_ID_STR(idstr, &(op->pc_transport.target_cdo->gid), { + DEBUG("Initiating incoming transfer to app %" PRIappid " (me) from %zu for CDO %s (%s)\n", + g_pool_app_id, ticket->srcid->id, op->pc_transport.target_cdo->name, idstr);}); + + if (ticket->ticket_case != MSTRO__POOL__TRANSFER_TICKET__TICKET_OFI) { + op->step = 3; /*skip app befriend and app register and go directly to executing transport */ + } + } + return status; +} + +mstro_status +mstro_pc_transfer_execute(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + Mstro__Pool__TransferTicket* ticket = op->msg->transfer_ticket; + /** check and update the number of segments on the dst cdo + * if n_segments of the cdo is zero, then it means this is the first ticket + * and we should update the n_segments value from the ticket + * Otherwise, then we are not the first ticket, and we should leave it alone + * with the completion of every transport the value of n_segments is decremented + * until it reaches zero, marking that all required data is filled. + */ + int64_t expected = 0; + atomic_compare_exchange_strong(&(op->pc_transport.target_cdo->n_segments), &expected, ticket->n_segments); + /* Execute transport (non-blocking) */ + status = mstro_transport_execute(op->pc_transport.target_cdo, ticket); + if(MSTRO_OK != status) { + ERR("Failure in transport execute for CDO %s\n", op->pc_transport.target_cdo->name); + return MSTRO_FAIL; + } + + /* Each (GFS and MIO TODO) method waits for the transport to complete before sending completion themselves */ + mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PC_NUM_TICKETS_IN, 1); + return status; +} static inline mstro_status @@ -1656,14 +1773,27 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) switch((*msg)->msg_case) { /* 'good' messages: */ case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: - /*fill declare specific parts */ + /*fill initiate transfer specific parts */ op->kind = MSTRO_OP_PC_INIT_TRANSFER; op->handler_steps = mstro_pc_init_transfer_steps; - op->init.dst_addr = 0; - op->init.dst_ep = NULL; - op->init.request = NULL; + op->pc_transport.target_addr = 0; + op->pc_transport.target_ep = NULL; + op->pc_transport.request = NULL; + op->pc_transport.target_appid = (*msg)->initiate_transfer->dst_appid->id; + op->pc_transport.methods = (*msg)->initiate_transfer->methods; + op->pc_transport.target_serialized_endpoint = (*msg)->initiate_transfer->dst_serialized_endpoint; break; - + case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_TICKET: + /*fill transfer specific parts*/ + op->kind = MSTRO_OP_PC_TRANSFER; + op->handler_steps = mstro_pc_transfer_steps; + op->pc_transport.target_addr = 0; + op->pc_transport.target_ep = NULL; + op->pc_transport.request = NULL; + op->pc_transport.target_appid = (*msg)->transfer_ticket->srcid->id; + op->pc_transport.methods = NULL; + op->pc_transport.target_serialized_endpoint = (*msg)->transfer_ticket->src_serialized_endpoint; + break; default: WARN("%s message received, dropping it, not even sending ACK\n", (*msg)->base.descriptor->name); @@ -1679,7 +1809,7 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) /* continue to fill general operation fields from msg and push to queue */ op->step = MSTRO_OP_ST_ANNOUNCE; // first step is to announce op->msg = *msg; - op->appid = (*msg)->token->appid->id; + op->appid = g_pool_app_id; *msg = NULL; /**consume the message here because caller (mstro_pc_handle_msg) will free it*/ DEBUG("Filled operation structure from msg\n"); status = erl_thread_team_enqueue(g_pool_operations_team, op); @@ -1812,14 +1942,13 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_cl case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: status = mstro_pc__op_maker(&msg); - //status = mstro_pc__handle_initiate_transfer(msg->initiate_transfer, - // restart_closure); //DEBUG("PC INIT-TRANSFER result: %d\n", status); break; case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_TICKET: - status = mstro_pc__handle_transfer_ticket(msg->transfer_ticket, - restart_closure); + status = mstro_pc__op_maker(&msg); + //status = mstro_pc__handle_transfer_ticket(msg->transfer_ticket, + // restart_closure); //DEBUG("PC TRANSFER-TICKET result: %d\n", status); break; diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 76012a44e35c0dbbf830037841dfd07ebae83b11..8d74fb6a6df2cc140f61752416f65da840324814 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -534,7 +534,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) op->cdoid.local_id = msg->dispose->cdoid->local_id; break; case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: - /*fill dispose specific parts */ + /*fill transfer complete specific parts */ op->kind = MSTRO_OP_PM_TRANSFER_COMPLETE; op->handler_steps = mstro_pm_transfer_completed_steps; op->cdoid.qw[0] = msg->transfer_completed->dstcdoid->qw0; /*the operation concerns the dst cdo*/ diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index 4970e7094f6251871d6b5d00e4574a37132a74d3..036c8119b65a4d7c34d3ead0a06a0480a9abc983 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -152,6 +152,9 @@ mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type) case MSTRO_OP_PC_INIT_TRANSFER: type = "INIT TRANSFER"; break; + case MSTRO_OP_PC_TRANSFER: + type = "TRANSFER TICKET"; + break; default: ERR("Unknown operation type\n"); type = "UNKNOWN"; @@ -227,6 +230,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st break; /*----------------------------------PC Operations---------------------------------------------*/ case MSTRO_OP_PC_INIT_TRANSFER: + case MSTRO_OP_PC_TRANSFER: if (state == MSTRO_OP_STATE_FAILED) { ERR("Failed to execute step %d of operation %s\n",