diff --git a/include/maestro/i_globals.h b/include/maestro/i_globals.h index a9cf4723d0b862193d93184e6322a9960017dfb4..70f1868484fef00ffa20df967056b2a6cc2c92f2 100644 --- a/include/maestro/i_globals.h +++ b/include/maestro/i_globals.h @@ -190,8 +190,4 @@ extern struct erl_thread_team *g_ofi_cq_team; /** handle to the pool operations handling thread team */ extern struct erl_thread_team *g_pool_operations_team; -/** Event domain for handler continuations */ -extern mstro_event_domain g_mstro_pm_continuations; - - #endif /* MAESTRO_I_GLOBALS_H_ */ diff --git a/include/maestro/i_pool_manager.h b/include/maestro/i_pool_manager.h index d11e21d19f0e78b6a3af7ea6ab9c0bcb73a5f90e..0e553424a2a6cf20d499cef28dd91c5fff3b8a87 100644 --- a/include/maestro/i_pool_manager.h +++ b/include/maestro/i_pool_manager.h @@ -67,9 +67,21 @@ mstro_pm__possibly_fill_event_cdoname(const struct mstro_cdo_id *cdoid, Mstro__Pool__Event *ev); +mstro_status +mstro_pm__event_notify_before(mstro_pool_operation op); + +mstro_status +mstro_pm__event_notify_after(mstro_pool_operation op); + +mstro_status +mstro_pm__check_acks(mstro_pool_operation op); + +mstro_status +mstro_pm_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep); + /**Handle pm cdo seal operation and send ack*/ mstro_status -mstro_pm_cdo_seal(mstro_pool_operation op); +mstro_pm__cdo_seal(mstro_pool_operation op); /** * @brief wrapper for mstro_pm__send_ack to use within operations @@ -78,7 +90,7 @@ mstro_pm_cdo_seal(mstro_pool_operation op); * @return mstro_status */ mstro_status -mstro_pm_send_ack_op(mstro_pool_operation op); +mstro_pm__send_ack_op(mstro_pool_operation op); /** * @brief perm pm cdo demand @@ -87,7 +99,7 @@ mstro_pm_send_ack_op(mstro_pool_operation op); * @return mstro_status */ mstro_status -mstro_pm_handle_demand(mstro_pool_operation op); +mstro_pm__cdo_demand(mstro_pool_operation op); /** * @brief Send bye reply for application @@ -96,7 +108,7 @@ mstro_pm_handle_demand(mstro_pool_operation op); * @return mstro_status out status */ mstro_status -mstro_pm_send_bye(mstro_pool_operation op); +mstro_pm__send_bye(mstro_pool_operation op); /** * @brief Send welcome msg to joining apps @@ -105,7 +117,7 @@ mstro_pm_send_bye(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_send_welcome(mstro_pool_operation op); +mstro_pm__send_welcome(mstro_pool_operation op); /** * @brief Handle subscribe operation at the pool manager @@ -114,7 +126,7 @@ mstro_pm_send_welcome(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_subscribe(mstro_pool_operation op); +mstro_pm__subscribe(mstro_pool_operation op); /** * @brief wrapper for mstro_subscription_message_unregister @@ -123,7 +135,7 @@ mstro_pm_handle_subscribe(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_unsubscribe(mstro_pool_operation op); +mstro_pm__unsubscribe(mstro_pool_operation op); /** * @brief Send subscribe ack message @@ -132,7 +144,7 @@ mstro_pm_handle_unsubscribe(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_send_subscribe_ack(mstro_pool_operation op); +mstro_pm__send_subscribe_ack(mstro_pool_operation op); /** * @brief Update the statistics for completed PM operations @@ -141,7 +153,7 @@ mstro_pm_send_subscribe_ack(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_update_stats(mstro_pool_operation op); +mstro_pm__update_stats(mstro_pool_operation op); /** * @brief handle ack msg on the PM @@ -150,7 +162,7 @@ mstro_pm_update_stats(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_event_ack(mstro_pool_operation op); +mstro_pm__event_ack(mstro_pool_operation op); /** * @brief handle msg resolve at pm @@ -159,7 +171,7 @@ mstro_pm_handle_event_ack(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_msg_resolve(mstro_pool_operation op); +mstro_pm__msg_resolve(mstro_pool_operation op); /** * @brief handle join operation @@ -168,7 +180,7 @@ mstro_pm_handle_msg_resolve(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_join(mstro_pool_operation op); +mstro_pm__app_join(mstro_pool_operation op); diff --git a/include/maestro/i_pool_manager_protocol.h b/include/maestro/i_pool_manager_protocol.h index 9b67613ce4ccbca05402bac2d017d91b164752b1..cf785723b1fb04ae3a877d34596f1de79b4512ff 100644 --- a/include/maestro/i_pool_manager_protocol.h +++ b/include/maestro/i_pool_manager_protocol.h @@ -113,7 +113,7 @@ mstro_pmp_send_nowait(mstro_app_id target, const Mstro__Pool__MstroMsg *msg); * local-scope object at the call-site of this function (it will be * serialized immediately on entry and not referenced afterwards) */ mstro_status -mstro_pmp_send_nowait_ep(struct mstro_endpoint *ep, fi_addr_t addr, const Mstro__Pool__MstroMsg *msg); +mstro_pmp_send_nowait_ep(const struct mstro_endpoint *ep, fi_addr_t addr, const Mstro__Pool__MstroMsg *msg); /** Take protobuf-based message and pack it into a message envelope * and send it to the pool manager. The @arg msg argument can be a diff --git a/include/maestro/i_pool_manager_registry.h b/include/maestro/i_pool_manager_registry.h index 3c620249249d8eecf6a1321ea1f43c1af4e42a47..d64f55058c48dd5df65ec086e40b8946247b04b9 100644 --- a/include/maestro/i_pool_manager_registry.h +++ b/include/maestro/i_pool_manager_registry.h @@ -130,7 +130,7 @@ typedef struct mstro_pm_candidates { **/ mstro_status -mstro_pm_app_register(struct mstro_endpoint *ep, +mstro_pm_app_register(const struct mstro_endpoint *ep, fi_addr_t addr, char* serialized_desc, const Mstro__Pool__TransportMethods *transport_methods, diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index c09d990474d7f8184363545a7be7fd74a8551fb2..a8b42c050992c2ac79b253ce46cbc9cb04d7c061 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -58,22 +58,22 @@ /** maestro pool operation kinds */ enum mstro_pool_operation_kind { - MSTRO_OP_INVALID = 0, /**< invalid pool operation kind */ - MSTRO_OP_DECLARE, /**< cdo declare operation */ - MSTRO_OP_SEAL, /**< cdo seal operation */ - MSTRO_OP_OFFER, /**< cdo offer operation */ - MSTRO_OP_REQUIRE, /**< cdo require operation */ - MSTRO_OP_RETRACT, /**< cdo retract operation */ - MSTRO_OP_DEMAND, /**< cdo demand operation */ - MSTRO_OP_WITHDRAW, /**< cdo withdraw operation */ - MSTRO_OP_DISPOSE, /**< cdo dispose operation */ - MSTRO_OP_LEAVE, /**< app leave operation */ - MSTRO_OP_JOIN, /**< app join operation */ - MSTRO_OP_TRANSFER_COMPLETE, /**<transfer complete operation */ - MSTRO_OP_SUBSCRIBE, /**< subscribe operation */ - MSTRO_OP_UNSUBSCRIBE, /**< unsubscribe operation */ - MSTRO_OP_EVENT_ACK, /**< event ack operation */ - MSTRO_OP_MSG_RESOLVE, /**< msg resolve operation */ + MSTRO_OP_INVALID = 0, /**< invalid pool operation kind */ + MSTRO_OP_PM_DECLARE, /**< cdo pm declare operation */ + MSTRO_OP_PM_SEAL, /**< cdo pm seal operation */ + MSTRO_OP_PM_OFFER, /**< cdo pm offer operation */ + MSTRO_OP_PM_REQUIRE, /**< cdo pm require operation */ + MSTRO_OP_PM_RETRACT, /**< cdo pm retract operation */ + MSTRO_OP_PM_DEMAND, /**< cdo pm demand operation */ + MSTRO_OP_PM_WITHDRAW, /**< cdo pm withdraw operation */ + MSTRO_OP_PM_DISPOSE, /**< cdo pm dispose operation */ + MSTRO_OP_PM_LEAVE, /**< app pm leave operation */ + MSTRO_OP_PM_JOIN, /**< app pm join operation */ + MSTRO_OP_PM_TRANSFER_COMPLETE, /**< pm cdo transfer complete operation */ + MSTRO_OP_PM_SUBSCRIBE, /**< pm subscribe operation */ + MSTRO_OP_PM_UNSUBSCRIBE, /**< pm unsubscribe operation */ + MSTRO_OP_PM_EVENT_ACK, /**< pm event ack operation */ + MSTRO_OP_PM_MSG_RESOLVE, /**< pm msg resolve operation */ MSTRO_OP_MAX }; @@ -81,14 +81,14 @@ enum mstro_pool_operation_kind { * This list can grow as we need, and we can insert NULL (no-op) for the steps that do not apply for a certain op kind. */ enum mstro_pool_operation_step { - MSTRO_OP_ST_INVALID = 0, /**<invalid pool operation kind*/ + MSTRO_OP_ST_INVALID = 0, /**<invalid pool operation step*/ MSTRO_OP_ST_ANNOUNCE, /**<announce before operation */ MSTRO_OP_ST_CHECK_ACK, /**<check acks before */ MSTRO_OP_ST_EXE, /**<execute operation */ MSTRO_OP_ST_AFTR, /**<announce after operation */ MSTRO_OP_ST_CHECK_AFTR_ACK, /**<check acks after */ MSTRO_OP_ST_SEND_POOLOP_ACK,/**<PM ack operation completion*/ - MSTRO_OP_ST_UPDATE_STATS, /**Update stats*/ + MSTRO_OP_ST_UPDATE_STATS, /**Update stats */ MSTRO_OP_ST_MAX }; @@ -141,16 +141,6 @@ mstro_pool_op__allocate(mstro_pool_operation *op); mstro_status mstro_pool_op__free(mstro_pool_operation op); -mstro_status -mstro_pool_op__event_notify_before(mstro_pool_operation op); - -mstro_status -mstro_pool_op__event_notify_after(mstro_pool_operation op); - -mstro_status -mstro_pool_op__check_acks(mstro_pool_operation op); - - /** * @brief convenience function to print the operation kind * Useful for debugging messages @@ -166,9 +156,6 @@ mstro_status mstro_pool_op_engine(void *operation); -mstro_status -mstro_pool_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep); - /**@} (end of group MSTRO_I_POOL_OP) */ /**@} (end of group MSTRO_Internal) */ diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index c8f1818f74cae77cc3ffb8bb9d017fa025570dd5..261f52ddd2fcf3048640730c85e497ce24ac79c0 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -43,6 +43,7 @@ #include "maestro/i_statistics.h" #include "maestro/i_pool_manager.h" #include "maestro/i_ofi.h" +#include "i_thread_team.h" #include <assert.h> @@ -58,9 +59,579 @@ #define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_PM,__VA_ARGS__) #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_PM,__VA_ARGS__) +/** + * Function stacks to handle various pm operations + * steps are coming from @ref mstro_pool_operation_step in i_pool_operations.h + * steps are + * MSTRO_OP_ST_INVALID = 0, invalid pool operation kind + * MSTRO_OP_ST_ANNOUNCE, announce before operation + * MSTRO_OP_ST_CHECK_ACK, check acks before + * MSTRO_OP_ST_EXE, execute operation + * MSTRO_OP_ST_AFTR, announce after operation + * MSTRO_OP_ST_CHECK_AFTR_ACK, check acks after + * MSTRO_OP_ST_SEND_POOLOP_ACK, PM ack operation completion + * MSTRO_OP_ST_UPDATE_STATS, Update stats + * MSTRO_OP_ST_MAX + */ + +/**function stack to handle pm dispose cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_dispose_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_dispose, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + NULL, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm declare cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_declare_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_declare_op, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm_cdo_registry_send_declare_ack, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + + +/**function stack to handle pm seal cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_seal_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm__cdo_seal, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm offer cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_offer_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_update_state_op, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm require cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_require_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_update_state_op, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm retract cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_retract_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_update_state_op, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + + /**function stack to handle pm demand cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_demand_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm__cdo_demand, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + NULL, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm withdraw cdo msg*/ +const mstro_pool_op_st_handler mstro_pm_withdraw_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_withdraw, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle pm transfer completed msg*/ +const mstro_pool_op_st_handler mstro_pm_transfer_completed_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_cdo_registry_transfer_completed,/**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + NULL, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle app leave msg*/ +const mstro_pool_op_st_handler mstro_pm_leave_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm_app_deregister, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_bye, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle app leave msg*/ +const mstro_pool_op_st_handler mstro_pm_join_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm__app_join, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_welcome, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle app subscribe msg*/ +const mstro_pool_op_st_handler mstro_pm_subscribe_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm__subscribe, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_subscribe_ack, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + +/**function stack to handle unsubscribe msg*/ +const mstro_pool_op_st_handler mstro_pm_unsubscribe_steps[] = { + NULL, + mstro_pm__event_notify_before, /**<announce before operation */ + mstro_pm__check_acks, /**<check acks before */ + mstro_pm__unsubscribe, /**<execute operation */ + mstro_pm__event_notify_after, /**<announce after operation */ + mstro_pm__check_acks, /**<check acks after */ + mstro_pm__send_ack_op, /**<PM ack operation completion*/ + NULL /**Update stats */ + }; + +/**function stack to handle event ack msg*/ +const mstro_pool_op_st_handler mstro_pm_ack_steps[] = { + NULL, + NULL, /**<announce before operation */ + NULL, /**<check acks before */ + mstro_pm__event_ack, /**<execute operation */ + NULL, /**<announce after operation */ + NULL, /**<check acks after */ + NULL, /**<PM ack operation completion*/ + NULL /**Update stats */ + }; + +/**function stack to handle msg resolve msg*/ +const mstro_pool_op_st_handler mstro_pm_resolve_steps[] = { + NULL, + NULL, /**<announce before operation */ + NULL, /**<check acks before */ + mstro_pm__msg_resolve, /**<execute operation */ + NULL, /**<announce after operation */ + NULL, /**<check acks after */ + NULL, /**<PM ack operation completion*/ + mstro_pm__update_stats /**Update stats */ + }; + + +static inline +mstro_status +mstro_pm__fill_pool_event(mstro_pool_operation op, Mstro__Pool__Event *ev) +{ + mstro_status status = MSTRO_UNIMPL; + + ev->origin_id = op->msg->token->appid; + switch (op->kind) + { + case MSTRO_OP_PM_DECLARE: + ev->kind = MSTRO__POOL__EVENT_KIND__DECLARE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DECLARE; + ev->declare = op->msg->declare; + ev->cdo_name = op->msg->declare->cdo_name; + status = MSTRO_OK; + break; + case MSTRO_OP_PM_SEAL: + ev->kind = MSTRO__POOL__EVENT_KIND__SEAL; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_SEAL; + ev->seal = op->msg->seal; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_OFFER: + ev->kind = MSTRO__POOL__EVENT_KIND__OFFER; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_OFFER; + ev->offer = op->msg->offer; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_REQUIRE: + ev->kind = MSTRO__POOL__EVENT_KIND__REQUIRE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_REQUIRE; + ev->require = op->msg->require; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_RETRACT: + ev->kind = MSTRO__POOL__EVENT_KIND__RETRACT; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_RETRACT; + ev->retract = op->msg->retract; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_DEMAND: + ev->kind = MSTRO__POOL__EVENT_KIND__DEMAND; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DEMAND; + ev->demand = op->msg->demand; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_WITHDRAW: + ev->kind = MSTRO__POOL__EVENT_KIND__WITHDRAW; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_WITHDRAW; + ev->withdraw = op->msg->withdraw; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_DISPOSE: + ev->kind = MSTRO__POOL__EVENT_KIND__DISPOSE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DISPOSE; + ev->dispose = op->msg->dispose; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_TRANSFER_COMPLETE: + ev->kind = MSTRO__POOL__EVENT_KIND__TRANSFER_COMPLETED; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_TRANSFER_COMPLETED; + ev->transfer_completed = op->msg->transfer_completed; + status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); + if(status!=MSTRO_OK) { + status = MSTRO_FAIL; + } + else { + status = MSTRO_OK; + } + break; + case MSTRO_OP_PM_LEAVE: + ev->kind = MSTRO__POOL__EVENT_KIND__APP_LEAVE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_LEAVE; + ev->leave = op->msg->leave; + status = MSTRO_OK; + break; + case MSTRO_OP_PM_JOIN: + ev->kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; + ev->join = op->msg->join; + ev->origin_id = NULL; // we will put in the correct appid in after events in mstro_pool_op__event_notify + status = MSTRO_OK; + break; + case MSTRO_OP_PM_SUBSCRIBE: + ev->kind = MSTRO__POOL__EVENT_KIND__SUBSCRIBE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_SUBSCRIBE; + ev->subscribe = op->msg->subscribe; + status = MSTRO_OK; + break; + case MSTRO_OP_PM_UNSUBSCRIBE: + ev->kind = MSTRO__POOL__EVENT_KIND__UNSUBSCRIBE; + ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_UNSUBSCRIBE; + ev->unsubscribe = op->msg->unsubscribe; + status = MSTRO_OK; + break; + /**FIXME add more operations */ + default: + ERR("Undefined operation \n"); + status = MSTRO_FAIL; + break; + } + + return status; +} + +static inline +mstro_status +mstro_pm__event_notify(mstro_pool_operation op, bool before) +{ + mstro_status status = MSTRO_UNIMPL; + /** create an event msg*/ + Mstro__Pool__Event pool_event_msg = MSTRO__POOL__EVENT__INIT; + Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; + + status = mstro_pm__fill_pool_event(op, &pool_event_msg); + if(status != MSTRO_OK) + { + ERR("Failed to fill pool event from pool operation structure\n"); + return MSTRO_FAIL; + } + + /**Add time stamp */ + Mstro__Pool__Timestamp ctime = MSTRO__POOL__TIMESTAMP__INIT; + ctime.offset = 0; + mstro_nanosec_t tick = mstro_clock(); + ctime.sec = tick/NSEC_PER_SEC; + ctime.nsec = tick-ctime.sec; + pool_event_msg.ctime = &ctime; + + if ((op->kind == MSTRO_OP_PM_JOIN) && (!before)) + { + aid.id = op->appid; + pool_event_msg.origin_id = &aid; + } + + + DEBUG("Advertising event kind %d (%s:%s)\n", + pool_event_msg.kind, + protobuf_c_enum_descriptor_get_value(&mstro__pool__event_kind__descriptor, + pool_event_msg.kind)->name, before ? "before" : "after" ); + + status = mstro_pool_event_advertise(&pool_event_msg, before, &(op->nr_outstanding_acks)); + mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_POOL_EVENTS, 1); + + if(status!=MSTRO_OK) { + ERR("Failed to advertise event: %d (%s)\n", + status, mstro_status_description(status)); + } + + return status; + +} + +/**Create mstro_pool_operations from incoming msgs + * pushes the created operations to the *queue* +*/ +mstro_status +mstro_pm_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) +{ + mstro_status status=MSTRO_OK; + mstro_pool_operation op; + + /*Create an operation*/ + status = mstro_pool_op__allocate(&op); + if (status != MSTRO_OK) + { + mstro_pm__msg_free(msg); + return status; + } + + /**handle msg */ + switch(msg->msg_case) { + /* 'good' messages: */ + case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE: + /*fill declare specific parts */ + op->kind = MSTRO_OP_PM_DECLARE; + op->handler_steps = mstro_pm_declare_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_SEAL: + /**fill seal parts*/ + op->kind = MSTRO_OP_PM_SEAL; + op->handler_steps = mstro_pm_seal_steps; + op->cdoid.qw[0] = msg->seal->cdoid->qw0; + op->cdoid.qw[1] = msg->seal->cdoid->qw1; + op->cdoid.local_id = msg->seal->cdoid->local_id; + op->send_attribute_update = false; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_OFFER: + /**fill offer parts*/ + op->kind = MSTRO_OP_PM_OFFER; + op->handler_steps = mstro_pm_offer_steps; + op->cdoid.qw[0] = msg->offer->cdoid->qw0; + op->cdoid.qw[1] = msg->offer->cdoid->qw1; + op->cdoid.local_id = msg->offer->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_REQUIRE: + /**fill require parts*/ + op->kind = MSTRO_OP_PM_REQUIRE; + op->handler_steps = mstro_pm_require_steps; + op->cdoid.qw[0] = msg->require->cdoid->qw0; + op->cdoid.qw[1] = msg->require->cdoid->qw1; + op->cdoid.local_id = msg->require->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_RETRACT: + /**fill retract parts*/ + op->kind = MSTRO_OP_PM_RETRACT; + op->handler_steps = mstro_pm_retract_steps; + op->cdoid.qw[0] = msg->retract->cdoid->qw0; + op->cdoid.qw[1] = msg->retract->cdoid->qw1; + op->cdoid.local_id = msg->retract->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_DEMAND: + /**fill demand parts*/ + op->kind = MSTRO_OP_PM_DEMAND; + op->handler_steps = mstro_pm_demand_steps; + op->cdoid.qw[0] = msg->demand->cdoid->qw0; + op->cdoid.qw[1] = msg->demand->cdoid->qw1; + op->cdoid.local_id = msg->demand->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_WITHDRAW: + /**fill withdraw parts*/ + op->kind = MSTRO_OP_PM_WITHDRAW; + op->handler_steps = mstro_pm_withdraw_steps; + op->cdoid.qw[0] = msg->withdraw->cdoid->qw0; + op->cdoid.qw[1] = msg->withdraw->cdoid->qw1; + op->cdoid.local_id = msg->withdraw->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_DISPOSE: + /*fill dispose specific parts */ + op->kind = MSTRO_OP_PM_DISPOSE; + op->handler_steps = mstro_pm_dispose_steps; // cdo pm dispose stack + op->cdoid.qw[0] = msg->dispose->cdoid->qw0; + op->cdoid.qw[1] = msg->dispose->cdoid->qw1; + op->cdoid.local_id = msg->dispose->cdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: + /*fill dispose specific parts */ + op->kind = MSTRO_OP_PM_TRANSFER_COMPLETE; + op->handler_steps = mstro_pm_transfer_completed_steps; + op->cdoid.qw[0] = msg->transfer_completed->dstcdoid->qw0; /*the operation concerns the dst cdo*/ + op->cdoid.qw[1] = msg->transfer_completed->dstcdoid->qw1; + op->cdoid.local_id = msg->transfer_completed->dstcdoid->local_id; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_LEAVE: + op->kind = MSTRO_OP_PM_LEAVE; + op->handler_steps = mstro_pm_leave_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: + op->kind = MSTRO_OP_PM_JOIN; + op->handler_steps = mstro_pm_join_steps; + op->join.ep = ep; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE: + op->kind = MSTRO_OP_PM_SUBSCRIBE; + op->handler_steps = mstro_pm_subscribe_steps; + op->subscription_handle = NULL; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_UNSUBSCRIBE: + op->kind = MSTRO_OP_PM_UNSUBSCRIBE; + op->handler_steps = mstro_pm_unsubscribe_steps; + op->subscription_handle = msg->unsubscribe->subscription_handle; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_EVENT_ACK: + op->kind = MSTRO_OP_PM_EVENT_ACK; + op->handler_steps = mstro_pm_ack_steps; + break; + case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE: + op->kind = MSTRO_OP_PM_MSG_RESOLVE; + op->handler_steps = mstro_pm_resolve_steps; + break; + default: + WARN("%s message received, dropping it, not even sending ACK\n", + msg->base.descriptor->name); + /*cleanup*/ + mstro_pool_op__free(op); + mstro_pm__msg_free(msg); + status = MSTRO_UNIMPL; + break; + } + + if (status == MSTRO_OK) + { + /* continue to fill general operation fields from msg and push to queue */ + op->step = MSTRO_OP_ST_ANNOUNCE; // first step is to announce + op->msg = msg; + if (op->kind != MSTRO_OP_PM_JOIN) + { + op->appid = msg->token->appid->id; + } + DEBUG("Filled operation structure from msg\n"); + status = erl_thread_team_enqueue(g_pool_operations_team, op); + assert(status == MSTRO_OK); + } + + return status; +} + + +mstro_status +mstro_pm__check_acks(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + + uint64_t nr_left = atomic_load_explicit(&(op->nr_outstanding_acks), memory_order_acquire); + if(nr_left==0) { + DEBUG("No outstanding acks\n"); + status = MSTRO_OK; + } + else + { + DEBUG("Remaining %" PRIu64" acks\n", nr_left); + status = MSTRO_WOULDBLOCK; + } + return status; +} + + +mstro_status +mstro_pm__event_notify_before(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + status = mstro_pm__event_notify(op, true); + return status; +} + +mstro_status +mstro_pm__event_notify_after(mstro_pool_operation op) +{ + mstro_status status = MSTRO_UNIMPL; + status = mstro_pm__event_notify(op, false); + return status; +} + -/** Event domain for handler continuations */ -mstro_event_domain g_mstro_pm_continuations=NULL; /** Group registry */ @@ -199,48 +770,48 @@ mstro_pm__send_ack(mstro_app_id app_id, } mstro_status -mstro_pm_update_stats(mstro_pool_operation op) +mstro_pm__update_stats(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; switch (op->kind) { - case MSTRO_OP_DISPOSE: + case MSTRO_OP_PM_DISPOSE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_DISPOSE, 1); break; - case MSTRO_OP_DECLARE: + case MSTRO_OP_PM_DECLARE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_DECLARE, 1); break; - case MSTRO_OP_SEAL: + case MSTRO_OP_PM_SEAL: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_SEAL, 1); break; - case MSTRO_OP_OFFER: + case MSTRO_OP_PM_OFFER: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_OFFER, 1); break; - case MSTRO_OP_REQUIRE: + case MSTRO_OP_PM_REQUIRE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_REQUIRE, 1); break; - case MSTRO_OP_RETRACT: + case MSTRO_OP_PM_RETRACT: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_RETRACT, 1); break; - case MSTRO_OP_DEMAND: + case MSTRO_OP_PM_DEMAND: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_DEMAND, 1); break; - case MSTRO_OP_WITHDRAW: + case MSTRO_OP_PM_WITHDRAW: status = mstro_stats_add_counter(MSTRO_STATS_CAT_POOL, MSTRO_STATS_L_PM_NUM_IMM_WITHDRAWS, 1); break; - case MSTRO_OP_TRANSFER_COMPLETE: + case MSTRO_OP_PM_TRANSFER_COMPLETE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_TRANSFER_COMPLETIONS, 1); break; - case MSTRO_OP_LEAVE: + case MSTRO_OP_PM_LEAVE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_LEAVE, 1); break; - case MSTRO_OP_JOIN: + case MSTRO_OP_PM_JOIN: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_JOIN, 1); break; - case MSTRO_OP_SUBSCRIBE: + case MSTRO_OP_PM_SUBSCRIBE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_SUBSCRIBE, 1); break; - case MSTRO_OP_MSG_RESOLVE: + case MSTRO_OP_PM_MSG_RESOLVE: status = mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_RESOLVE, 1); break; default: @@ -253,12 +824,12 @@ mstro_pm_update_stats(mstro_pool_operation op) mstro_status -mstro_pm_send_ack_op(mstro_pool_operation op) +mstro_pm__send_ack_op(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; switch (op->kind) { - case MSTRO_OP_SEAL: + case MSTRO_OP_PM_SEAL: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__SEAL, op->msg->seal->cdoid, NULL, op->send_attribute_update?op->msg->seal->attributes:NULL, op->status); @@ -268,27 +839,27 @@ mstro_pm_send_ack_op(mstro_pool_operation op) * NULL it and things will work. */ op->msg->seal->attributes = NULL; break; - case MSTRO_OP_OFFER: + case MSTRO_OP_PM_OFFER: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__OFFER, op->msg->offer->cdoid, NULL, NULL, op->status); break; - case MSTRO_OP_REQUIRE: + case MSTRO_OP_PM_REQUIRE: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__REQUIRE, op->msg->require->cdoid, NULL, NULL, op->status); break; - case MSTRO_OP_RETRACT: + case MSTRO_OP_PM_RETRACT: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__RETRACT, op->msg->retract->cdoid, NULL, NULL, op->status); break; - case MSTRO_OP_WITHDRAW: + case MSTRO_OP_PM_WITHDRAW: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__WITHDRAW, op->msg->withdraw->cdoid, NULL, NULL, op->status); break; - case MSTRO_OP_UNSUBSCRIBE: + case MSTRO_OP_PM_UNSUBSCRIBE: status = mstro_pm__send_ack(op->appid, MSTRO__POOL__POOL_OP_ACK__POOL_OP__UNSUBSCRIBE, NULL, op->subscription_handle, NULL, op->status); @@ -306,7 +877,7 @@ mstro_pm_send_ack_op(mstro_pool_operation op) } mstro_status -mstro_pm_send_subscribe_ack(mstro_pool_operation op) +mstro_pm__send_subscribe_ack(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -678,7 +1249,7 @@ BAILOUT: /*** SEAL ***/ /**Handled by operations*/ mstro_status -mstro_pm_cdo_seal(mstro_pool_operation op) +mstro_pm__cdo_seal(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -770,7 +1341,7 @@ mstro_pm_cdo_seal(mstro_pool_operation op) /*** DEMAND ***/ mstro_status -mstro_pm_handle_demand(mstro_pool_operation op) +mstro_pm__cdo_demand(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; status = mstro_pm_cdo_registry_update_state_op(op); @@ -801,7 +1372,7 @@ mstro_pm_handle_demand(mstro_pool_operation op) /*** LEAVE ***/ mstro_status -mstro_pm_send_bye(mstro_pool_operation op) +mstro_pm__send_bye(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; INFO("Granting LEAVE request from app %" PRIappid "\n", op->appid); @@ -827,7 +1398,7 @@ mstro_pm_send_bye(mstro_pool_operation op) /*** JOIN ***/ mstro_status -mstro_pm_send_welcome(mstro_pool_operation op) +mstro_pm__send_welcome(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -899,7 +1470,7 @@ mstro_pm__transport_method_name(Mstro__Pool__TransportKind tk) } mstro_status -mstro_pm_handle_join(mstro_pool_operation op) +mstro_pm__app_join(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -956,7 +1527,7 @@ mstro_pm_handle_join(mstro_pool_operation op) /*** SUBSCRIBE ***/ mstro_status -mstro_pm_handle_subscribe(mstro_pool_operation op) +mstro_pm__subscribe(mstro_pool_operation op) { DEBUG("SUBSCRIBE from app %" PRIappid "\n", op->appid); Mstro__Pool__Subscribe *subscribe = op->msg->subscribe; @@ -977,7 +1548,7 @@ mstro_pm_handle_subscribe(mstro_pool_operation op) /*** UNSUBSCRIBE ***/ mstro_status -mstro_pm_handle_unsubscribe(mstro_pool_operation op) +mstro_pm__unsubscribe(mstro_pool_operation op) { op->status = mstro_subscription_message_unregister(op->msg->unsubscribe,op->appid); if (op->status !=MSTRO_OK) @@ -990,7 +1561,7 @@ mstro_pm_handle_unsubscribe(mstro_pool_operation op) /*** EVENT_ACK ***/ mstro_status -mstro_pm_handle_event_ack(mstro_pool_operation op) +mstro_pm__event_ack(mstro_pool_operation op) { /* pass in to subscription engine. It will do the accounting, and in * the end trigger continuations for those handlers that sent the @@ -1005,7 +1576,7 @@ mstro_pm_handle_event_ack(mstro_pool_operation op) /* FIXME: missing event handling here */ mstro_status -mstro_pm_handle_msg_resolve(mstro_pool_operation op) +mstro_pm__msg_resolve(mstro_pool_operation op) { Mstro__Pool__Resolve *resolve = op->msg->resolve; mstro_app_id app_id = op->appid;; @@ -1192,12 +1763,12 @@ mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope, //DEBUG("PM MSG-EVENT-ACK result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE: //DEBUG("PM MSG-RESOLVE result: %d\n", status); - status = mstro_pool_op_maker(msg, NULL); + status = mstro_pm_op_maker(msg, NULL); break; case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: //DEBUG("PM MSG-JOIN result: %d\n", status); - status = mstro_pool_op_maker(msg, ep); + status = mstro_pm_op_maker(msg, ep); break; /* currently unimplemented: */ diff --git a/maestro/pool_manager_ofi.c b/maestro/pool_manager_ofi.c index 858a565d166ef8f95a44ad5e440e0348dbdcbda4..bbe2cfaa3a322036c0694348862b18fbeea9564c 100644 --- a/maestro/pool_manager_ofi.c +++ b/maestro/pool_manager_ofi.c @@ -155,13 +155,6 @@ mstro_pm_start(void) goto BAILOUT; } - stat = mstro_event_domain_create("Pool Manager message handler continuations", - &g_mstro_pm_continuations); - if(stat!=MSTRO_OK) { - ERR("Failed to create message handler continuation domain\n"); - goto BAILOUT; - } - /* organize quorum for pool_manager_size */ /* create connection */ @@ -241,8 +234,6 @@ mstro_pm_terminate(void) status = MSTRO_FAIL; } - status |= mstro_event_domain_destroy(g_mstro_pm_continuations); - status |= mstro_pm_reg_finalize(); return status; diff --git a/maestro/pool_manager_protocol.c b/maestro/pool_manager_protocol.c index bf42d9c32ff0a6040516eb511c77ee4f97d18b93..c587f0247139328638d23d6f6078d99df12daf5c 100644 --- a/maestro/pool_manager_protocol.c +++ b/maestro/pool_manager_protocol.c @@ -223,7 +223,7 @@ BAILOUT: /* same as mstro_pmp_send_nowait, but with immediate EP argument, and less case checking regarding target */ mstro_status -mstro_pmp_send_nowait_ep(struct mstro_endpoint *ep, fi_addr_t addr, const Mstro__Pool__MstroMsg *msg) +mstro_pmp_send_nowait_ep(const struct mstro_endpoint *ep, fi_addr_t addr, const Mstro__Pool__MstroMsg *msg) { size_t msgsize = mstro__pool__mstro_msg__get_packed_size(msg); diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index 2ba7ea13ccf27f1452b4a8f709dc499849c6ea44..1b85223b120fc3ed38f0cdfb567fc6a799ecf028 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -161,7 +161,7 @@ mstro_pm_app_reg__entry_dispose(struct mstro_pm_app_registry_entry *e) } mstro_status -mstro_pm_app_register(struct mstro_endpoint *ep, +mstro_pm_app_register(const struct mstro_endpoint *ep, fi_addr_t addr, char* serialized_desc, const Mstro__Pool__TransportMethods *transport_methods, @@ -1780,23 +1780,23 @@ mstro_pm_cdo_registry_update_state_op(mstro_pool_operation op) { switch (op->kind) { - case MSTRO_OP_OFFER: + case MSTRO_OP_PM_OFFER: op->status = mstro_pm_cdo_registry_update_state(&(op->cdoid), op->appid, MSTRO_CDO_STATE_OFFERED); // FIXME: WARN("If this is a group CDO, members will not be OFFERED at this time\n"); break; - case MSTRO_OP_REQUIRE: + case MSTRO_OP_PM_REQUIRE: op->status = mstro_pm_cdo_registry_update_state(&(op->cdoid), op->appid, MSTRO_CDO_STATE_REQUIRED); WARN("Not scheduling any action at REQUIRE time\n"); // FIXME: WARN("If this is a group CDO, members will not be REQUIRED at this time\n"); break; - case MSTRO_OP_RETRACT: + case MSTRO_OP_PM_RETRACT: op->status = mstro_pm_cdo_registry_update_state(&(op->cdoid), op->appid, MSTRO_CDO_STATE_RETRACTED); /* for groups, do recursion ? */ WARN("If this is a group CDO, members will not be RETRACTED at this time\n"); break; - case MSTRO_OP_DEMAND: + case MSTRO_OP_PM_DEMAND: op->status = mstro_pm_cdo_registry_update_state(&(op->cdoid), op->appid, MSTRO_CDO_STATE_DEMANDED|MSTRO_CDO_STATE_IN_TRANSPORT); break; default: diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index e823757e6120c8b199785cff0096428730e28245..75150096c65c897583fd8723c11b036c27510f0e 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -36,15 +36,11 @@ #include "maestro/i_pool_operations.h" -#include "i_thread_team.h" #include "i_fifo.h" #include "maestro/i_globals.h" #include "maestro/logging.h" -#include "i_subscription_registry.h" #include "i_maestro_numa.h" -#include "maestro/i_pool_manager.h" #include <assert.h> -#include "maestro/i_statistics.h" @@ -56,171 +52,6 @@ #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_PM,__VA_ARGS__) -/**function stack to handle pm dispose cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_dispose_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_dispose, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - NULL, - mstro_pm_update_stats}; - -/**function stack to handle pm declare cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_declare_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_declare_op, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_send_declare_ack, - mstro_pm_update_stats}; - - -/**function stack to handle pm seal cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_seal_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_seal, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - mstro_pm_update_stats}; - -/**function stack to handle pm offer cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_offer_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_update_state_op, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - mstro_pm_update_stats}; - -/**function stack to handle pm require cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_require_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_update_state_op, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - mstro_pm_update_stats}; - -/**function stack to handle pm retract cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_retract_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_update_state_op, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - mstro_pm_update_stats}; - - /**function stack to handle pm demand cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_demand_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_handle_demand, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - NULL, - mstro_pm_update_stats}; - -/**function stack to handle pm withdraw cdo msg*/ -const mstro_pool_op_st_handler mstro_pool_op_withdraw_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_withdraw, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - mstro_pm_update_stats}; - -/**function stack to handle pm transfer completed msg*/ -const mstro_pool_op_st_handler mstro_pool_op_transfer_completed_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_cdo_registry_transfer_completed, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - NULL, - mstro_pm_update_stats}; - -/**function stack to handle app leave msg*/ -const mstro_pool_op_st_handler mstro_pool_op_leave_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_app_deregister, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_bye, - mstro_pm_update_stats}; - -/**function stack to handle app leave msg*/ -const mstro_pool_op_st_handler mstro_pool_op_join_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_handle_join, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_welcome, - mstro_pm_update_stats}; - -/**function stack to handle app subscribe msg*/ -const mstro_pool_op_st_handler mstro_pool_op_subscribe_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_handle_subscribe, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_subscribe_ack, - mstro_pm_update_stats}; - -/**function stack to handle unsubscribe msg*/ -const mstro_pool_op_st_handler mstro_pool_op_unsubscribe_steps[] = { - NULL, - mstro_pool_op__event_notify_before, - mstro_pool_op__check_acks, - mstro_pm_handle_unsubscribe, - mstro_pool_op__event_notify_after, - mstro_pool_op__check_acks, - mstro_pm_send_ack_op, - NULL}; - -/**function stack to handle event ack msg*/ -const mstro_pool_op_st_handler mstro_pool_op_ack_steps[] = { - NULL, - NULL, - NULL, - mstro_pm_handle_event_ack, - NULL, - NULL, - NULL, - NULL}; - -/**function stack to handle msg resolve msg*/ -const mstro_pool_op_st_handler mstro_pool_op_resolve_steps[] = { - NULL, - NULL, - NULL, - mstro_pm_handle_msg_resolve, - NULL, - NULL, - NULL, - mstro_pm_update_stats}; /** allocate new pool operation */ mstro_status @@ -265,149 +96,6 @@ mstro_pool_op__free(mstro_pool_operation op) return status; } -/**Create mstro_pool_operations from incoming msgs - * pushes the created operations to the *queue* -*/ -mstro_status -mstro_pool_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) -{ - mstro_status status=MSTRO_OK; - mstro_pool_operation op; - - /*Create an operation*/ - status = mstro_pool_op__allocate(&op); - if (status != MSTRO_OK) - { - mstro_pm__msg_free(msg); - return status; - } - - /**handle msg */ - switch(msg->msg_case) { - /* 'good' messages: */ - case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE: - /*fill declare specific parts */ - op->kind = MSTRO_OP_DECLARE; - op->handler_steps = mstro_pool_op_declare_steps; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_SEAL: - /**fill seal parts*/ - op->kind = MSTRO_OP_SEAL; - op->handler_steps = mstro_pool_op_seal_steps; - op->cdoid.qw[0] = msg->seal->cdoid->qw0; - op->cdoid.qw[1] = msg->seal->cdoid->qw1; - op->cdoid.local_id = msg->seal->cdoid->local_id; - op->send_attribute_update = false; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_OFFER: - /**fill offer parts*/ - op->kind = MSTRO_OP_OFFER; - op->handler_steps = mstro_pool_op_offer_steps; - op->cdoid.qw[0] = msg->offer->cdoid->qw0; - op->cdoid.qw[1] = msg->offer->cdoid->qw1; - op->cdoid.local_id = msg->offer->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_REQUIRE: - /**fill require parts*/ - op->kind = MSTRO_OP_REQUIRE; - op->handler_steps = mstro_pool_op_require_steps; - op->cdoid.qw[0] = msg->require->cdoid->qw0; - op->cdoid.qw[1] = msg->require->cdoid->qw1; - op->cdoid.local_id = msg->require->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_RETRACT: - /**fill retract parts*/ - op->kind = MSTRO_OP_RETRACT; - op->handler_steps = mstro_pool_op_retract_steps; - op->cdoid.qw[0] = msg->retract->cdoid->qw0; - op->cdoid.qw[1] = msg->retract->cdoid->qw1; - op->cdoid.local_id = msg->retract->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_DEMAND: - /**fill demand parts*/ - op->kind = MSTRO_OP_DEMAND; - op->handler_steps = mstro_pool_op_demand_steps; - op->cdoid.qw[0] = msg->demand->cdoid->qw0; - op->cdoid.qw[1] = msg->demand->cdoid->qw1; - op->cdoid.local_id = msg->demand->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_WITHDRAW: - /**fill withdraw parts*/ - op->kind = MSTRO_OP_WITHDRAW; - op->handler_steps = mstro_pool_op_withdraw_steps; - op->cdoid.qw[0] = msg->withdraw->cdoid->qw0; - op->cdoid.qw[1] = msg->withdraw->cdoid->qw1; - op->cdoid.local_id = msg->withdraw->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_DISPOSE: - /*fill dispose specific parts */ - op->kind = MSTRO_OP_DISPOSE; - op->handler_steps = mstro_pool_op_dispose_steps; // cdo pm dispose stack - op->cdoid.qw[0] = msg->dispose->cdoid->qw0; - op->cdoid.qw[1] = msg->dispose->cdoid->qw1; - op->cdoid.local_id = msg->dispose->cdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: - /*fill dispose specific parts */ - op->kind = MSTRO_OP_TRANSFER_COMPLETE; - op->handler_steps = mstro_pool_op_transfer_completed_steps; - op->cdoid.qw[0] = msg->transfer_completed->dstcdoid->qw0; /*the operation concerns the dst cdo*/ - op->cdoid.qw[1] = msg->transfer_completed->dstcdoid->qw1; - op->cdoid.local_id = msg->transfer_completed->dstcdoid->local_id; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_LEAVE: - op->kind = MSTRO_OP_LEAVE; - op->handler_steps = mstro_pool_op_leave_steps; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: - op->kind = MSTRO_OP_JOIN; - op->handler_steps = mstro_pool_op_join_steps; - op->join.ep = ep; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE: - op->kind = MSTRO_OP_SUBSCRIBE; - op->handler_steps = mstro_pool_op_subscribe_steps; - op->subscription_handle = NULL; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_UNSUBSCRIBE: - op->kind = MSTRO_OP_UNSUBSCRIBE; - op->handler_steps = mstro_pool_op_unsubscribe_steps; - op->subscription_handle = msg->unsubscribe->subscription_handle; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_EVENT_ACK: - op->kind = MSTRO_OP_EVENT_ACK; - op->handler_steps = mstro_pool_op_ack_steps; - break; - case MSTRO__POOL__MSTRO_MSG__MSG_RESOLVE: - op->kind = MSTRO_OP_MSG_RESOLVE; - op->handler_steps = mstro_pool_op_resolve_steps; - break; - default: - WARN("%s message received, dropping it, not even sending ACK\n", - msg->base.descriptor->name); - /*cleanup*/ - mstro_pool_op__free(op); - mstro_pm__msg_free(msg); - status = MSTRO_UNIMPL; - break; - } - - if (status == MSTRO_OK) - { - /* continue to fill general operation fields from msg and push to queue */ - op->step = MSTRO_OP_ST_ANNOUNCE; // first step is to announce - op->msg = msg; - if (op->kind != MSTRO_OP_JOIN) - { - op->appid = msg->token->appid->id; - } - DEBUG("Filled operation structure from msg\n"); - status = erl_thread_team_enqueue(g_pool_operations_team, op); - assert(status == MSTRO_OK); - } - - return status; -} /** Convenience function to print operation type for debugging */ char * @@ -416,49 +104,49 @@ mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type) char *type = NULL; switch (operation_type) { - case MSTRO_OP_DECLARE: + case MSTRO_OP_PM_DECLARE: type = "DECLARE"; break; - case MSTRO_OP_SEAL: + case MSTRO_OP_PM_SEAL: type = "SEAL"; break; - case MSTRO_OP_OFFER: + case MSTRO_OP_PM_OFFER: type = "OFFER"; break; - case MSTRO_OP_REQUIRE: + case MSTRO_OP_PM_REQUIRE: type = "REQUIRE"; break; - case MSTRO_OP_RETRACT: + case MSTRO_OP_PM_RETRACT: type = "RETRACT"; break; - case MSTRO_OP_DEMAND: + case MSTRO_OP_PM_DEMAND: type = "DEMAND"; break; - case MSTRO_OP_WITHDRAW: + case MSTRO_OP_PM_WITHDRAW: type = "WITHDRAW"; break; - case MSTRO_OP_DISPOSE: + case MSTRO_OP_PM_DISPOSE: type = "DISPOSE"; break; - case MSTRO_OP_LEAVE: + case MSTRO_OP_PM_LEAVE: type = "LEAVE"; break; - case MSTRO_OP_JOIN: + case MSTRO_OP_PM_JOIN: type = "JOIN"; break; - case MSTRO_OP_TRANSFER_COMPLETE: + case MSTRO_OP_PM_TRANSFER_COMPLETE: type = "TRANSFER_COMPLETED"; break; - case MSTRO_OP_SUBSCRIBE: + case MSTRO_OP_PM_SUBSCRIBE: type = "SUBSCRIBE"; break; - case MSTRO_OP_UNSUBSCRIBE: + case MSTRO_OP_PM_UNSUBSCRIBE: type = "UNSUBSCRIBE"; break; - case MSTRO_OP_EVENT_ACK: + case MSTRO_OP_PM_EVENT_ACK: type = "EVENT_ACK"; break; - case MSTRO_OP_MSG_RESOLVE: + case MSTRO_OP_PM_MSG_RESOLVE: type = "MSG_RESOLVE"; break; default: @@ -482,16 +170,16 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st switch (op->kind) { /**CDO operations */ - case MSTRO_OP_DECLARE: + case MSTRO_OP_PM_DECLARE: cdo_name = op->msg->declare->cdo_name; - case MSTRO_OP_SEAL: - case MSTRO_OP_OFFER: - case MSTRO_OP_REQUIRE: - case MSTRO_OP_RETRACT: - case MSTRO_OP_DEMAND: - case MSTRO_OP_WITHDRAW: - case MSTRO_OP_DISPOSE: - case MSTRO_OP_TRANSFER_COMPLETE: + case MSTRO_OP_PM_SEAL: + case MSTRO_OP_PM_OFFER: + case MSTRO_OP_PM_REQUIRE: + case MSTRO_OP_PM_RETRACT: + case MSTRO_OP_PM_DEMAND: + case MSTRO_OP_PM_WITHDRAW: + case MSTRO_OP_PM_DISPOSE: + case MSTRO_OP_PM_TRANSFER_COMPLETE: mstro_cdo_id__str(&(op->cdoid), &cdo_id_str); if (state == MSTRO_OP_STATE_FAILED) { @@ -512,12 +200,12 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st } break; /** normal msg processing */ - case MSTRO_OP_LEAVE: - case MSTRO_OP_JOIN: - case MSTRO_OP_SUBSCRIBE: - case MSTRO_OP_UNSUBSCRIBE: - case MSTRO_OP_EVENT_ACK: - case MSTRO_OP_MSG_RESOLVE: + case MSTRO_OP_PM_LEAVE: + case MSTRO_OP_PM_JOIN: + case MSTRO_OP_PM_SUBSCRIBE: + case MSTRO_OP_PM_UNSUBSCRIBE: + case MSTRO_OP_PM_EVENT_ACK: + case MSTRO_OP_PM_MSG_RESOLVE: if (state == MSTRO_OP_STATE_FAILED) { ERR("Failed to execute step %d of operation %s (from app %"PRIappid")\n", @@ -602,237 +290,3 @@ mstro_pool_op_engine(void *operation) } return status; } - - -static inline -mstro_status -mstro_pool_op__fill_pool_event(mstro_pool_operation op, Mstro__Pool__Event *ev) -{ - mstro_status status = MSTRO_UNIMPL; - - ev->origin_id = op->msg->token->appid; - switch (op->kind) - { - case MSTRO_OP_DECLARE: - ev->kind = MSTRO__POOL__EVENT_KIND__DECLARE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DECLARE; - ev->declare = op->msg->declare; - ev->cdo_name = op->msg->declare->cdo_name; - status = MSTRO_OK; - break; - case MSTRO_OP_SEAL: - ev->kind = MSTRO__POOL__EVENT_KIND__SEAL; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_SEAL; - ev->seal = op->msg->seal; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_OFFER: - ev->kind = MSTRO__POOL__EVENT_KIND__OFFER; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_OFFER; - ev->offer = op->msg->offer; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_REQUIRE: - ev->kind = MSTRO__POOL__EVENT_KIND__REQUIRE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_REQUIRE; - ev->require = op->msg->require; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_RETRACT: - ev->kind = MSTRO__POOL__EVENT_KIND__RETRACT; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_RETRACT; - ev->retract = op->msg->retract; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_DEMAND: - ev->kind = MSTRO__POOL__EVENT_KIND__DEMAND; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DEMAND; - ev->demand = op->msg->demand; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_WITHDRAW: - ev->kind = MSTRO__POOL__EVENT_KIND__WITHDRAW; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_WITHDRAW; - ev->withdraw = op->msg->withdraw; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_DISPOSE: - ev->kind = MSTRO__POOL__EVENT_KIND__DISPOSE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_DISPOSE; - ev->dispose = op->msg->dispose; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_TRANSFER_COMPLETE: - ev->kind = MSTRO__POOL__EVENT_KIND__TRANSFER_COMPLETED; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_TRANSFER_COMPLETED; - ev->transfer_completed = op->msg->transfer_completed; - status = mstro_pm__possibly_fill_event_cdoname(&(op->cdoid), op->appid, ev); - if(status!=MSTRO_OK) { - status = MSTRO_FAIL; - } - else { - status = MSTRO_OK; - } - break; - case MSTRO_OP_LEAVE: - ev->kind = MSTRO__POOL__EVENT_KIND__APP_LEAVE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_LEAVE; - ev->leave = op->msg->leave; - status = MSTRO_OK; - break; - case MSTRO_OP_JOIN: - ev->kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; - ev->join = op->msg->join; - ev->origin_id = NULL; // we will put in the correct appid in after events in mstro_pool_op__event_notify - status = MSTRO_OK; - break; - case MSTRO_OP_SUBSCRIBE: - ev->kind = MSTRO__POOL__EVENT_KIND__SUBSCRIBE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_SUBSCRIBE; - ev->subscribe = op->msg->subscribe; - status = MSTRO_OK; - break; - case MSTRO_OP_UNSUBSCRIBE: - ev->kind = MSTRO__POOL__EVENT_KIND__UNSUBSCRIBE; - ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_UNSUBSCRIBE; - ev->unsubscribe = op->msg->unsubscribe; - status = MSTRO_OK; - break; - /**FIXME add more operations */ - default: - ERR("Undefined operation \n"); - status = MSTRO_FAIL; - break; - } - - return status; -} - -static inline -mstro_status -mstro_pool_op__event_notify(mstro_pool_operation op, bool before) -{ - mstro_status status = MSTRO_UNIMPL; - /** create an event msg*/ - Mstro__Pool__Event pool_event_msg = MSTRO__POOL__EVENT__INIT; - Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; - - status = mstro_pool_op__fill_pool_event(op, &pool_event_msg); - if(status != MSTRO_OK) - { - ERR("Failed to fill pool event from pool operation structure\n"); - return MSTRO_FAIL; - } - - /**Add time stamp */ - Mstro__Pool__Timestamp ctime = MSTRO__POOL__TIMESTAMP__INIT; - ctime.offset = 0; - mstro_nanosec_t tick = mstro_clock(); - ctime.sec = tick/NSEC_PER_SEC; - ctime.nsec = tick-ctime.sec; - pool_event_msg.ctime = &ctime; - - if ((op->kind == MSTRO_OP_JOIN) && (!before)) - { - aid.id = op->appid; - pool_event_msg.origin_id = &aid; - } - - - DEBUG("Advertising event kind %d (%s:%s)\n", - pool_event_msg.kind, - protobuf_c_enum_descriptor_get_value(&mstro__pool__event_kind__descriptor, - pool_event_msg.kind)->name, before ? "before" : "after" ); - - status = mstro_pool_event_advertise(&pool_event_msg, before, &(op->nr_outstanding_acks)); - mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_POOL_EVENTS, 1); - - if(status!=MSTRO_OK) { - ERR("Failed to advertise event: %d (%s)\n", - status, mstro_status_description(status)); - } - - return status; - -} - - - -mstro_status -mstro_pool_op__check_acks(mstro_pool_operation op) -{ - mstro_status status = MSTRO_UNIMPL; - - uint64_t nr_left = atomic_load_explicit(&(op->nr_outstanding_acks), memory_order_acquire); - if(nr_left==0) { - DEBUG("No outstanding acks\n"); - status = MSTRO_OK; - } - else - { - DEBUG("Remaining %" PRIu64" acks\n", nr_left); - status = MSTRO_WOULDBLOCK; - } - return status; -} - - -mstro_status -mstro_pool_op__event_notify_before(mstro_pool_operation op) -{ - mstro_status status = MSTRO_UNIMPL; - status = mstro_pool_op__event_notify(op, true); - return status; -} - - -mstro_status -mstro_pool_op__event_notify_after(mstro_pool_operation op) -{ - mstro_status status = MSTRO_UNIMPL; - status = mstro_pool_op__event_notify(op, false); - return status; -}