diff --git a/include/maestro/i_ofi.h b/include/maestro/i_ofi.h index fa503b5e6085447d30a54ce5eba2dc469804968f..bfcb2946c145330088b4826827f5e4a543c9c067 100644 --- a/include/maestro/i_ofi.h +++ b/include/maestro/i_ofi.h @@ -211,16 +211,14 @@ struct mstro_endpoint { /** * @brief register app when joining * - * @param join_msg join message - * @param ep endpoint + * @param op operation handle * @param entry_p registry entry handle * @return mstro_status return status */ mstro_status -mstro_pm__register_app_op(Mstro__Pool__Join *join_msg, - struct mstro_endpoint *ep, +mstro_pm__register_app_op(mstro_pool_operation op, struct mstro_pm_app_registry_entry **entry_p); - + mstro_status mstro_appinfo_deserialize(const char *serialized_eps, Mstro__AppInfo **result_p); diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index 7cc2025ccdca04cf62e5947940cfdbe41613b2d2..f5eccb31fb78b63fcbaccc016e517ac8d2fdacfa 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -53,6 +53,7 @@ #include "protocols/mstro_pool.pb-c.h" #include "maestro/i_cdo.h" #include "maestro/core.h" +#include <rdma/fabric.h> /** maestro pool operation kinds */ @@ -120,13 +121,18 @@ struct mstro_pool_operation_ { /**-- operation specific*/ union { - const struct mstro_endpoint *ep; /** for join operation*/ Mstro__Pool__SubscriptionHandle *subscription_handle; /**for subscribe operation*/ bool send_attribute_update; /**for seal operation */ + struct { + const struct mstro_endpoint *ep; /** for join operation*/ + fi_addr_t translated_addr; /*for join operation*/ + } join; + }; }; + /** allocate new pool operation*/ mstro_status mstro_pool_op__allocate(mstro_pool_operation *op); diff --git a/maestro/ofi.c b/maestro/ofi.c index 8d7063a199b61dcd273f639994bf33695e7944d4..c04e7ff62ff39b2903587039eb128b7c270a94c1 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -3125,11 +3125,12 @@ mstro_ofi__submit_message_nowait(struct mstro_endpoint *ep, fi_addr_t dst, } mstro_status -mstro_pm__register_app_op(Mstro__Pool__Join *join_msg, - struct mstro_endpoint *ep, - struct mstro_pm_app_registry_entry **entry_p) +mstro_pm__register_app_op(mstro_pool_operation op, + struct mstro_pm_app_registry_entry **entry_p) { mstro_status s=MSTRO_OK; + struct mstro_endpoint *ep = op->join.ep; + Mstro__Pool__Join *join_msg = op->msg->join; assert(entry_p!=NULL); assert(join_msg!=NULL); @@ -3158,8 +3159,8 @@ mstro_pm__register_app_op(Mstro__Pool__Join *join_msg, epd->eps->n_eps); } - fi_addr_t translated_addr=FI_ADDR_UNSPEC; - s = mstro_ofi__try_epd_addr(ep, epd->eps->eps[0], &translated_addr); + op->join.translated_addr =FI_ADDR_UNSPEC; + s = mstro_ofi__try_epd_addr(ep, epd->eps->eps[0], &(op->join.translated_addr)); if(s!=MSTRO_OK) { DEBUG("local EP did not accept remote addr, can't welcome app %s:%zu\n", join_msg->component_name, join_msg->component_index); @@ -3170,7 +3171,7 @@ mstro_pm__register_app_op(Mstro__Pool__Join *join_msg, join_msg->component_name, join_msg->transport_methods->n_supported); /* insert into registry table */ - s = mstro_pm_app_register(ep, translated_addr, + s = mstro_pm_app_register(ep, op->join.translated_addr, strdup(join_msg->serialized_endpoint), join_msg->transport_methods, NULL, join_msg->component_name, join_msg->component_index, diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 52ed0d025398c2e9443e7cc7add4b3b6dcd317bc..f046c02e159fddf19b66ca7e2e0d7ec152b0f001 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -1072,12 +1072,21 @@ mstro_pm_send_welcome(mstro_pool_operation op) welcome.base.descriptor->name); return status; } - - status = mstro_pmp_send_nowait(op->appid, &msg_welcome); - if(status!=MSTRO_OK) { + if (op->status != MSTRO_OK) + { + status = mstro_pmp_send_nowait_ep(op->join.ep, op->join.translated_addr, &msg_welcome); + if(status!=MSTRO_OK) { + ERR("Cannot send WELCOME-NACK reply: %d (%s)\n", + status, mstro_status_description(status)); + } + else { + status = mstro_pmp_send_nowait(op->appid, &msg_welcome); + if(status!=MSTRO_OK) { ERR("Cannot send WELCOME reply to %"PRIappid": %d (%s)\n", op->appid, status, mstro_status_description(status)); + } + } return status; } @@ -1131,7 +1140,7 @@ mstro_pm_handle_join_op(mstro_pool_operation op) } struct mstro_pm_app_registry_entry *regentry = NULL; - op->status = mstro_pm__register_app_op(op->msg->join, op->ep,®entry); + op->status = mstro_pm__register_app_op(op,®entry); if(op->status!=MSTRO_OK) { if(op->status == MSTRO_NOT_TWICE) { ERR("Failed to register caller: already have another entity registered with same workflow/component/component-id triple\n"); diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index 877f7cb6cd184167232294a1bdb7d75f6f6d7d29..f391eab5d204004be09c78cad191b10349a1eea4 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -362,7 +362,7 @@ mstro_pool_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: op->kind = MSTRO_OP_JOIN; op->handler_steps = mstro_pool_op_join_steps; - op->ep = ep; + op->join.ep = ep; break; case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE: op->kind = MSTRO_OP_SUBSCRIBE;