diff --git a/include/maestro/i_pool_client.h b/include/maestro/i_pool_client.h index 7384abdacb62ad4face0d66b923e253d917fcfcd..ae3dca9460ba97c6ddef68cb7be33ae6d4cecea8 100644 --- a/include/maestro/i_pool_client.h +++ b/include/maestro/i_pool_client.h @@ -73,7 +73,7 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, * @return mstro_status return status */ mstro_status -mstro_pc_prepare_init_transfer(mstro_pool_operation op); +mstro_pc__prepare_init_transfer(mstro_pool_operation op); /** * @brief Check ticket and source cdo for transfer ticket @@ -82,7 +82,7 @@ mstro_pc_prepare_init_transfer(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pc_prepare_transfer(mstro_pool_operation op); +mstro_pc__prepare_transfer(mstro_pool_operation op); /** * @brief Execute cdo transport @@ -91,19 +91,36 @@ mstro_pc_prepare_transfer(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pc_transfer_execute(mstro_pool_operation op); +mstro_pc__transfer_execute(mstro_pool_operation op); +/** + * @brief handle event at the PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__event(mstro_pool_operation op); + +/** + * @brief handle transfer completed at the PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__transfer_completed(mstro_pool_operation op); /** * @brief Checks dst app registry entry. - * If dst app is in the registry returns MSTRO_OK, and jumps directly to @ref mstro_pc_init_transfer_reg_app for writing the ticket + * If dst app is in the registry returns MSTRO_OK, and jumps directly to @ref mstro_pc__init_transfer_reg_app for writing the ticket * If dst app info is currently being read in another thread, returns MSTRO_WOULDBLOCK * If dst app is not in registry, issues a read (mstro_ofi__select_endpoint) and returns MSTRO_OK * @param op operation handle * @return mstro_status return status */ mstro_status -mstro_pc_app_befriend_op(mstro_pool_operation op); +mstro_pc__app_befriend_op(mstro_pool_operation op); /** @@ -114,7 +131,7 @@ mstro_pc_app_befriend_op(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pc_init_transfer_reg_app(mstro_pool_operation op); +mstro_pc__init_transfer_reg_app(mstro_pool_operation op); /** * @brief Writes and sends the transfer ticket @@ -123,8 +140,52 @@ mstro_pc_init_transfer_reg_app(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pc_init_transfer_send_ticket(mstro_pool_operation op); +mstro_pc__init_transfer_send_ticket(mstro_pool_operation op); + +/** + * @brief handle declare ack at pc side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__declare_ack(mstro_pool_operation op); +/** + * @brief handle subscribe ack at PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__subscribe_ack(mstro_pool_operation op); + +/** + * @brief handle resolve reply at the PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__resolve_reply(mstro_pool_operation op); + +/** + * @brief Handle PoolOp ack at PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__poolop_ack(mstro_pool_operation op); + +/** + * @brief handle bye operation at the PC side + * + * @param op operation handle + * @return mstro_status return status + */ +mstro_status +mstro_pc__bye(mstro_pool_operation op); /** * @brief Create mstro_pool_operations from pc incoming msgs diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index f7b5ce32b6ac3ad7775c4aeb660baa9d8b3b68aa..bb61138ded7b37fb4d617438e3de19881c9ff6ec 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -77,6 +77,13 @@ enum mstro_pool_operation_kind { /**-------------------PC side operations----------------------------------------*/ MSTRO_OP_PC_INIT_TRANSFER, /**< pc init transfer */ MSTRO_OP_PC_TRANSFER, /**< pc transfer ticket */ + MSTRO_OP_PC_DECLARE_ACK, /**< pc declare ack */ + MSTRO_OP_PC_POOL_ACK, /**< pc pool operation ack */ + MSTRO_OP_PC_SUBSCRIBE_ACK, /**< pc subscribe ack */ + MSTRO_OP_PC_BYE, /**< pc bye (leave ack) */ + MSTRO_OP_PC_EVENT, /**< pc event */ + MSTRO_OP_PC_RESOLVE_REPLY, /**< pc resolve reply */ + MSTRO_OP_PC_TRANSFER_COMPLETED, /**< pc transfer completed */ MSTRO_OP_MAX }; diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 7ed218348b359670d8bd46864cfd8f2e09836f20..70e018ee16d3ba80ed87ff9f784420ac39f458a5 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -31,12 +31,12 @@ /**function stack to handle pc init transfer msg*/ -const mstro_pool_op_st_handler mstro_pc_init_transfer_steps[] = { +const mstro_pool_op_st_handler mstro_pc__init_transfer_steps[] = { NULL, - mstro_pc_prepare_init_transfer, - mstro_pc_app_befriend_op, - mstro_pc_init_transfer_reg_app, - mstro_pc_init_transfer_send_ticket, + mstro_pc__prepare_init_transfer, + mstro_pc__app_befriend_op, + mstro_pc__init_transfer_reg_app, + mstro_pc__init_transfer_send_ticket, NULL, NULL, NULL @@ -44,21 +44,106 @@ const mstro_pool_op_st_handler mstro_pc_init_transfer_steps[] = { /**function stack to handle pc transfer ticket */ -const mstro_pool_op_st_handler mstro_pc_transfer_steps[] = { +const mstro_pool_op_st_handler mstro_pc__transfer_steps[] = { NULL, - mstro_pc_prepare_transfer, - mstro_pc_app_befriend_op, /**optional*/ - mstro_pc_init_transfer_reg_app, /*optional*/ - mstro_pc_transfer_execute, + mstro_pc__prepare_transfer, + mstro_pc__app_befriend_op, /**optional*/ + mstro_pc__init_transfer_reg_app, /*optional*/ + mstro_pc__transfer_execute, NULL, NULL, NULL }; -static inline +/**function stack to handle pc declare ack */ +const mstro_pool_op_st_handler mstro_pc__declare_ack_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__declare_ack, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc subscribe ack*/ +const mstro_pool_op_st_handler mstro_pc__subscribe_ack_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__subscribe_ack, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc resolve reply*/ +const mstro_pool_op_st_handler mstro_pc__resolve_reply_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__resolve_reply, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc PoolOp Ack steps*/ +const mstro_pool_op_st_handler mstro_pc__poolop_ack_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__poolop_ack, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc bye operation steps*/ +const mstro_pool_op_st_handler mstro_pc__bye_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__bye, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc event operation steps*/ +const mstro_pool_op_st_handler mstro_pc__event_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__event, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + +/**function stack to handle pc transfer completed operation steps*/ +const mstro_pool_op_st_handler mstro_pc__transfer_completed_steps[] = { + NULL, + NULL, + NULL, + mstro_pc__transfer_completed, /*execute*/ + NULL, + NULL, + NULL, + NULL + }; + mstro_status -mstro_pc__handle_declare_ack(const Mstro__Pool__DeclareAck *declare_ack) +mstro_pc__declare_ack(mstro_pool_operation op) { + + const Mstro__Pool__DeclareAck *declare_ack = op->msg->declare_ack; assert(declare_ack!=NULL); DEBUG("CDO DeclareAck for serial %zu\n", declare_ack->serial); @@ -75,10 +160,11 @@ mstro_pc__handle_declare_ack(const Mstro__Pool__DeclareAck *declare_ack) return mstro_cdo_declare_completion(declare_ack->serial, &cdoid, declare_ack->channel); } -static inline + mstro_status -mstro_pc__handle_subscribe_ack(const Mstro__Pool__SubscribeAck *sack) +mstro_pc__subscribe_ack(mstro_pool_operation op) { + const Mstro__Pool__SubscribeAck *sack = op->msg->subscribe_ack; assert(sack!=NULL); assert(sack->handle!=NULL); DEBUG("SubscribeAck received, local id %" PRIu64", handle %" PRIu64 "\n", @@ -93,10 +179,11 @@ mstro_pc__handle_subscribe_ack(const Mstro__Pool__SubscribeAck *sack) return s; } -static inline + mstro_status -mstro_pc__handle_resolve_reply(const Mstro__Pool__ResolveReply *reply) +mstro_pc__resolve_reply(mstro_pool_operation op) { + const Mstro__Pool__ResolveReply *reply = op->msg->resolve_reply; assert(reply!=NULL); assert(reply->query!=NULL); switch(reply->query->query_case) { @@ -177,10 +264,12 @@ BAILOUT: return s; } -static inline + mstro_status -mstro_pc__handle_poolop_ack(Mstro__Pool__PoolOpAck *poack) +mstro_pc__poolop_ack(mstro_pool_operation op) { + Mstro__Pool__PoolOpAck *poack = op->msg->ack; + assert(poack!=NULL); DEBUG("PoolOpAck for a %s operation\n", @@ -710,7 +799,7 @@ BAILOUT: } mstro_status -mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) +mstro_pc__init_transfer_send_ticket(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; const Mstro__Pool__InitiateTransfer* init = op->msg->initiate_transfer; @@ -925,7 +1014,7 @@ mstro_pc_init_transfer_send_ticket(mstro_pool_operation op) } mstro_status -mstro_pc_init_transfer_reg_app(mstro_pool_operation op) +mstro_pc__init_transfer_reg_app(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -986,7 +1075,7 @@ mstro_pc_init_transfer_reg_app(mstro_pool_operation op) } mstro_status -mstro_pc_app_befriend_op(mstro_pool_operation op) +mstro_pc__app_befriend_op(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; struct mstro_pm_app_registry_entry *e; @@ -1032,7 +1121,7 @@ mstro_pc_app_befriend_op(mstro_pool_operation op) return status; } mstro_status -mstro_pc_prepare_init_transfer(mstro_pool_operation op) +mstro_pc__prepare_init_transfer(mstro_pool_operation op) { const Mstro__Pool__InitiateTransfer* init = op->msg->initiate_transfer; assert( init!= NULL); @@ -1410,10 +1499,11 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init, return MSTRO_OK; } -static inline mstro_status -mstro_pc__handle_bye(const Mstro__Pool__Bye *bye) +mstro_pc__bye(mstro_pool_operation op) { + const Mstro__Pool__Bye *bye = op->msg->bye; + assert(bye!=NULL); DEBUG("Bye message on app %zu incoming\n", g_pool_app_id); @@ -1488,7 +1578,7 @@ mstro_pc__transport_send_completion(mstro_app_id srcappid, return MSTRO_OK; } mstro_status -mstro_pc_prepare_transfer(mstro_pool_operation op) +mstro_pc__prepare_transfer(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; Mstro__Pool__TransferTicket* ticket = op->msg->transfer_ticket; @@ -1549,7 +1639,7 @@ mstro_pc_prepare_transfer(mstro_pool_operation op) } mstro_status -mstro_pc_transfer_execute(mstro_pool_operation op) +mstro_pc__transfer_execute(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; Mstro__Pool__TransferTicket* ticket = op->msg->transfer_ticket; @@ -1677,12 +1767,13 @@ mstro_pc__handle_transfer_ticket(Mstro__Pool__TransferTicket* ticket, } -static inline + mstro_status -mstro_pc__handle_transfer_completed(Mstro__Pool__MstroMsg *msg) +mstro_pc__transfer_completed(mstro_pool_operation op) { - Mstro__Pool__TransferCompleted *completion = msg->transfer_completed; - mstro_app_id app_id = msg->token->appid->id; + + Mstro__Pool__TransferCompleted *completion = op->msg->transfer_completed; + mstro_app_id app_id = op->msg->token->appid->id; assert(completion!=NULL); assert(app_id!=MSTRO_APP_ID_INVALID); mstro_status s = MSTRO_UNIMPL; @@ -1732,8 +1823,10 @@ DONE: mstro_status -mstro_pc__handle_event(const Mstro__Pool__Event *ev) +mstro_pc__event(mstro_pool_operation op) { + const Mstro__Pool__Event *ev = op->msg->event; + if(ev==NULL) { ERR("Invalid event\n"); return MSTRO_INVMSG; @@ -1775,7 +1868,7 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: /*fill initiate transfer specific parts */ op->kind = MSTRO_OP_PC_INIT_TRANSFER; - op->handler_steps = mstro_pc_init_transfer_steps; + op->handler_steps = mstro_pc__init_transfer_steps; op->pc_transport.target_addr = 0; op->pc_transport.target_ep = NULL; op->pc_transport.request = NULL; @@ -1786,7 +1879,7 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_TICKET: /*fill transfer specific parts*/ op->kind = MSTRO_OP_PC_TRANSFER; - op->handler_steps = mstro_pc_transfer_steps; + op->handler_steps = mstro_pc__transfer_steps; op->pc_transport.target_addr = 0; op->pc_transport.target_ep = NULL; op->pc_transport.request = NULL; @@ -1794,6 +1887,34 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) op->pc_transport.methods = NULL; op->pc_transport.target_serialized_endpoint = (*msg)->transfer_ticket->src_serialized_endpoint; break; + case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE_ACK: + op->kind = MSTRO_OP_PC_DECLARE_ACK; + op->handler_steps = mstro_pc__declare_ack_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_ACK: + op->kind = MSTRO_OP_PC_POOL_ACK; + op->handler_steps = mstro_pc__poolop_ack_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE_ACK: + op->kind = MSTRO_OP_PC_SUBSCRIBE_ACK; + op->handler_steps = mstro_pc__subscribe_ack_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_BYE: + op->kind = MSTRO_OP_PC_BYE; + op->handler_steps = mstro_pc__bye_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_EVENT: + op->kind = MSTRO_OP_PC_EVENT; + op->handler_steps = mstro_pc__event_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE_REPLY: + op->kind = MSTRO_OP_PC_RESOLVE_REPLY; + op->handler_steps = mstro_pc__resolve_reply_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: + op->kind = MSTRO_OP_PC_TRANSFER_COMPLETED; + op->handler_steps = mstro_pc__transfer_completed_steps; + break; default: WARN("%s message received, dropping it, not even sending ACK\n", (*msg)->base.descriptor->name); @@ -1921,50 +2042,24 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope, void** restart_cl } case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE_ACK: - status = mstro_pc__handle_declare_ack(msg->declare_ack); //DEBUG("PC DECLARE-ACK result: %d\n", status); - break; - - case MSTRO__POOL__MSTRO_MSG__MSG_ACK: - status = mstro_pc__handle_poolop_ack(msg->ack); - //DEBUG("PC MSG-ACK result: %d\n", status); - break; - case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE_ACK: - status = mstro_pc__handle_subscribe_ack(msg->subscribe_ack); //DEBUG("PC SUBSCRIBE-ACK result: %d\n", status); - break; - + case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE_REPLY: + //DEBUG("PC MSG-RESOLVE-REPLY result: %d\n", status); + case MSTRO__POOL__MSTRO_MSG__MSG_ACK: + //DEBUG("PC MSG-ACK result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_BYE: - status = mstro_pc__handle_bye(msg->bye); //DEBUG("PC MSG-BYE result: %d\n", status); - break; - - case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: - status = mstro_pc__op_maker(&msg); - //DEBUG("PC INIT-TRANSFER result: %d\n", status); - break; - case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_TICKET: - status = mstro_pc__op_maker(&msg); - //status = mstro_pc__handle_transfer_ticket(msg->transfer_ticket, - // restart_closure); //DEBUG("PC TRANSFER-TICKET result: %d\n", status); - break; - + case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: + //DEBUG("PC INIT-TRANSFER result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_EVENT: - status = mstro_pc__handle_event(msg->event); //DEBUG("PC MSG-EVENT result: %d\n", status); - break; - - case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE_REPLY: - status = mstro_pc__handle_resolve_reply(msg->resolve_reply); - //DEBUG("PC MSG-RESOLVE-REPLY result: %d\n", status); - break; - case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: - status = mstro_pc__handle_transfer_completed(msg); //DEBUG("PC MSG-TRANSFER-COMPLETED result: %d\n", status); + status = mstro_pc__op_maker(&msg); break; case MSTRO__POOL__MSTRO_MSG__MSG_DEMAND_ATTR_RES: diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 8d74fb6a6df2cc140f61752416f65da840324814..172205f2bc3288dc1450fd40478978f5c032028e 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -75,7 +75,7 @@ */ /**function stack to handle pm dispose cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_dispose_steps[] = { +const mstro_pool_op_st_handler mstro_pm__dispose_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -87,7 +87,7 @@ const mstro_pool_op_st_handler mstro_pm_dispose_steps[] = { }; /**function stack to handle pm declare cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_declare_steps[] = { +const mstro_pool_op_st_handler mstro_pm__declare_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -100,7 +100,7 @@ const mstro_pool_op_st_handler mstro_pm_declare_steps[] = { /**function stack to handle pm seal cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_seal_steps[] = { +const mstro_pool_op_st_handler mstro_pm__seal_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -112,7 +112,7 @@ const mstro_pool_op_st_handler mstro_pm_seal_steps[] = { }; /**function stack to handle pm offer cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_offer_steps[] = { +const mstro_pool_op_st_handler mstro_pm__offer_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -124,7 +124,7 @@ const mstro_pool_op_st_handler mstro_pm_offer_steps[] = { }; /**function stack to handle pm require cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_require_steps[] = { +const mstro_pool_op_st_handler mstro_pm__require_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -136,7 +136,7 @@ const mstro_pool_op_st_handler mstro_pm_require_steps[] = { }; /**function stack to handle pm retract cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_retract_steps[] = { +const mstro_pool_op_st_handler mstro_pm__retract_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -148,7 +148,7 @@ const mstro_pool_op_st_handler mstro_pm_retract_steps[] = { }; /**function stack to handle pm demand cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_demand_steps[] = { +const mstro_pool_op_st_handler mstro_pm__demand_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -160,7 +160,7 @@ const mstro_pool_op_st_handler mstro_pm_demand_steps[] = { }; /**function stack to handle pm withdraw cdo msg*/ -const mstro_pool_op_st_handler mstro_pm_withdraw_steps[] = { +const mstro_pool_op_st_handler mstro_pm__withdraw_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -172,7 +172,7 @@ const mstro_pool_op_st_handler mstro_pm_withdraw_steps[] = { }; /**function stack to handle pm transfer completed msg*/ -const mstro_pool_op_st_handler mstro_pm_transfer_completed_steps[] = { +const mstro_pool_op_st_handler mstro_pm__transfer_completed_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -184,7 +184,7 @@ const mstro_pool_op_st_handler mstro_pm_transfer_completed_steps[] = { }; /**function stack to handle app leave msg*/ -const mstro_pool_op_st_handler mstro_pm_leave_steps[] = { +const mstro_pool_op_st_handler mstro_pm__leave_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -196,7 +196,7 @@ const mstro_pool_op_st_handler mstro_pm_leave_steps[] = { }; /**function stack to handle app leave msg*/ -const mstro_pool_op_st_handler mstro_pm_join_steps[] = { +const mstro_pool_op_st_handler mstro_pm__join_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -208,7 +208,7 @@ const mstro_pool_op_st_handler mstro_pm_join_steps[] = { }; /**function stack to handle app subscribe msg*/ -const mstro_pool_op_st_handler mstro_pm_subscribe_steps[] = { +const mstro_pool_op_st_handler mstro_pm__subscribe_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -220,7 +220,7 @@ const mstro_pool_op_st_handler mstro_pm_subscribe_steps[] = { }; /**function stack to handle unsubscribe msg*/ -const mstro_pool_op_st_handler mstro_pm_unsubscribe_steps[] = { +const mstro_pool_op_st_handler mstro_pm__unsubscribe_steps[] = { NULL, mstro_pm__event_notify_before, /**<announce before operation */ mstro_pm__check_acks, /**<check acks before */ @@ -232,7 +232,7 @@ const mstro_pool_op_st_handler mstro_pm_unsubscribe_steps[] = { }; /**function stack to handle event ack msg*/ -const mstro_pool_op_st_handler mstro_pm_ack_steps[] = { +const mstro_pool_op_st_handler mstro_pm__ack_steps[] = { NULL, NULL, /**<announce before operation */ NULL, /**<check acks before */ @@ -244,7 +244,7 @@ const mstro_pool_op_st_handler mstro_pm_ack_steps[] = { }; /**function stack to handle msg resolve msg*/ -const mstro_pool_op_st_handler mstro_pm_resolve_steps[] = { +const mstro_pool_op_st_handler mstro_pm__resolve_steps[] = { NULL, NULL, /**<announce before operation */ NULL, /**<check acks before */ @@ -474,12 +474,12 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE: /*fill declare specific parts */ op->kind = MSTRO_OP_PM_DECLARE; - op->handler_steps = mstro_pm_declare_steps; + op->handler_steps = mstro_pm__declare_steps; break; case MSTRO__POOL__MSTRO_MSG__MSG_SEAL: /**fill seal parts*/ op->kind = MSTRO_OP_PM_SEAL; - op->handler_steps = mstro_pm_seal_steps; + op->handler_steps = mstro_pm__seal_steps; op->cdoid.qw[0] = msg->seal->cdoid->qw0; op->cdoid.qw[1] = msg->seal->cdoid->qw1; op->cdoid.local_id = msg->seal->cdoid->local_id; @@ -488,7 +488,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_OFFER: /**fill offer parts*/ op->kind = MSTRO_OP_PM_OFFER; - op->handler_steps = mstro_pm_offer_steps; + op->handler_steps = mstro_pm__offer_steps; op->cdoid.qw[0] = msg->offer->cdoid->qw0; op->cdoid.qw[1] = msg->offer->cdoid->qw1; op->cdoid.local_id = msg->offer->cdoid->local_id; @@ -496,7 +496,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_REQUIRE: /**fill require parts*/ op->kind = MSTRO_OP_PM_REQUIRE; - op->handler_steps = mstro_pm_require_steps; + op->handler_steps = mstro_pm__require_steps; op->cdoid.qw[0] = msg->require->cdoid->qw0; op->cdoid.qw[1] = msg->require->cdoid->qw1; op->cdoid.local_id = msg->require->cdoid->local_id; @@ -504,7 +504,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_RETRACT: /**fill retract parts*/ op->kind = MSTRO_OP_PM_RETRACT; - op->handler_steps = mstro_pm_retract_steps; + op->handler_steps = mstro_pm__retract_steps; op->cdoid.qw[0] = msg->retract->cdoid->qw0; op->cdoid.qw[1] = msg->retract->cdoid->qw1; op->cdoid.local_id = msg->retract->cdoid->local_id; @@ -512,7 +512,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_DEMAND: /**fill demand parts*/ op->kind = MSTRO_OP_PM_DEMAND; - op->handler_steps = mstro_pm_demand_steps; + op->handler_steps = mstro_pm__demand_steps; op->cdoid.qw[0] = msg->demand->cdoid->qw0; op->cdoid.qw[1] = msg->demand->cdoid->qw1; op->cdoid.local_id = msg->demand->cdoid->local_id; @@ -520,7 +520,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_WITHDRAW: /**fill withdraw parts*/ op->kind = MSTRO_OP_PM_WITHDRAW; - op->handler_steps = mstro_pm_withdraw_steps; + op->handler_steps = mstro_pm__withdraw_steps; op->cdoid.qw[0] = msg->withdraw->cdoid->qw0; op->cdoid.qw[1] = msg->withdraw->cdoid->qw1; op->cdoid.local_id = msg->withdraw->cdoid->local_id; @@ -528,7 +528,7 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_DISPOSE: /*fill dispose specific parts */ op->kind = MSTRO_OP_PM_DISPOSE; - op->handler_steps = mstro_pm_dispose_steps; // cdo pm dispose stack + op->handler_steps = mstro_pm__dispose_steps; // cdo pm dispose stack op->cdoid.qw[0] = msg->dispose->cdoid->qw0; op->cdoid.qw[1] = msg->dispose->cdoid->qw1; op->cdoid.local_id = msg->dispose->cdoid->local_id; @@ -536,37 +536,37 @@ mstro_pm__op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: /*fill transfer complete specific parts */ op->kind = MSTRO_OP_PM_TRANSFER_COMPLETE; - op->handler_steps = mstro_pm_transfer_completed_steps; + op->handler_steps = mstro_pm__transfer_completed_steps; op->cdoid.qw[0] = msg->transfer_completed->dstcdoid->qw0; /*the operation concerns the dst cdo*/ op->cdoid.qw[1] = msg->transfer_completed->dstcdoid->qw1; op->cdoid.local_id = msg->transfer_completed->dstcdoid->local_id; break; case MSTRO__POOL__MSTRO_MSG__MSG_LEAVE: op->kind = MSTRO_OP_PM_LEAVE; - op->handler_steps = mstro_pm_leave_steps; + op->handler_steps = mstro_pm__leave_steps; break; case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: op->kind = MSTRO_OP_PM_JOIN; - op->handler_steps = mstro_pm_join_steps; + op->handler_steps = mstro_pm__join_steps; op->join.ep = ep; break; case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE: op->kind = MSTRO_OP_PM_SUBSCRIBE; - op->handler_steps = mstro_pm_subscribe_steps; + op->handler_steps = mstro_pm__subscribe_steps; op->subscription_handle = NULL; break; case MSTRO__POOL__MSTRO_MSG__MSG_UNSUBSCRIBE: op->kind = MSTRO_OP_PM_UNSUBSCRIBE; - op->handler_steps = mstro_pm_unsubscribe_steps; + op->handler_steps = mstro_pm__unsubscribe_steps; op->subscription_handle = msg->unsubscribe->subscription_handle; break; case MSTRO__POOL__MSTRO_MSG__MSG_EVENT_ACK: op->kind = MSTRO_OP_PM_EVENT_ACK; - op->handler_steps = mstro_pm_ack_steps; + op->handler_steps = mstro_pm__ack_steps; break; case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE: op->kind = MSTRO_OP_PM_MSG_RESOLVE; - op->handler_steps = mstro_pm_resolve_steps; + op->handler_steps = mstro_pm__resolve_steps; break; default: WARN("%s message received, dropping it, not even sending ACK\n", diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index 036c8119b65a4d7c34d3ead0a06a0480a9abc983..af0282e7a6e89b6db5d6c256f49599c2fa27a765 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -155,6 +155,27 @@ mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type) case MSTRO_OP_PC_TRANSFER: type = "TRANSFER TICKET"; break; + case MSTRO_OP_PC_DECLARE_ACK: + type = "DECLARE ACK"; + break; + case MSTRO_OP_PC_POOL_ACK: + type = "POOL OP ACK"; + break; + case MSTRO_OP_PC_SUBSCRIBE_ACK: + type = "SUBSCRIBE ACK"; + break; + case MSTRO_OP_PC_BYE: + type = "BYE"; + break; + case MSTRO_OP_PC_EVENT: + type = "EVENT"; + break; + case MSTRO_OP_PC_RESOLVE_REPLY: + type = "RESOLVE REPLY"; + break; + case MSTRO_OP_PC_TRANSFER_COMPLETED: + type = "TRANSFER COMPLETED"; + break; default: ERR("Unknown operation type\n"); type = "UNKNOWN"; @@ -231,6 +252,13 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st /*----------------------------------PC Operations---------------------------------------------*/ case MSTRO_OP_PC_INIT_TRANSFER: case MSTRO_OP_PC_TRANSFER: + case MSTRO_OP_PC_DECLARE_ACK: + case MSTRO_OP_PC_POOL_ACK: + case MSTRO_OP_PC_SUBSCRIBE_ACK: + case MSTRO_OP_PC_BYE: + case MSTRO_OP_PC_EVENT: + case MSTRO_OP_PC_RESOLVE_REPLY: + case MSTRO_OP_PC_TRANSFER_COMPLETED: if (state == MSTRO_OP_STATE_FAILED) { ERR("Failed to execute step %d of operation %s\n",