diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index f5eccb31fb78b63fcbaccc016e517ac8d2fdacfa..a94ffe9f3f36449c4586ad6038f877807da84e12 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -118,16 +118,16 @@ struct mstro_pool_operation_ { Mstro__Pool__MstroMsg *msg; /**<msg*/ const mstro_pool_op_st_handler *handler_steps; /**<steps to execute in this operation*/ mstro_status status; /**<status of the core operation for the PoolOP ack*/ + /**-- operation specific*/ union { Mstro__Pool__SubscriptionHandle *subscription_handle; /**for subscribe operation*/ bool send_attribute_update; /**for seal operation */ struct { - const struct mstro_endpoint *ep; /** for join operation*/ - fi_addr_t translated_addr; /*for join operation*/ + const struct mstro_endpoint *ep; /** for join operation*/ + fi_addr_t translated_addr; } join; - }; }; diff --git a/maestro/ofi.c b/maestro/ofi.c index 71aadca8b13ae7cfb643b94861ffef73dcd1e272..8a8b4b113e66254d2bc6c36f4b84f6942f7d96fd 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -3129,7 +3129,7 @@ mstro_pm__register_app_op(mstro_pool_operation op, struct mstro_pm_app_registry_entry **entry_p) { mstro_status s=MSTRO_OK; - struct mstro_endpoint *ep = op->join.ep; + const struct mstro_endpoint *ep = op->join.ep; Mstro__Pool__Join *join_msg = op->msg->join; assert(entry_p!=NULL); @@ -3159,7 +3159,7 @@ mstro_pm__register_app_op(mstro_pool_operation op, epd->eps->n_eps); } - op->join.translated_addr =FI_ADDR_UNSPEC; + op->join.translated_addr = FI_ADDR_UNSPEC; s = mstro_ofi__try_epd_addr(ep, epd->eps->eps[0], &(op->join.translated_addr)); if(s!=MSTRO_OK) { DEBUG("local EP did not accept remote addr, can't welcome app %s:%zu\n", diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index f046c02e159fddf19b66ca7e2e0d7ec152b0f001..b4911a3daa0f6e97fb2d61f968babb76cbcc8dfd 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -1055,41 +1055,54 @@ mstro_pm_send_welcome(mstro_pool_operation op) token.appid = &id; Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; - if (op->status == MSTRO_OK) - { - welcome.token = &token; - } - else + welcome.token = &token; + + + if (op->status!= MSTRO_OK) { + /*Send NACK*/ welcome.token = NULL; welcome.nack = true; - } - - Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; - status = mstro_pmp_package(&msg_welcome,(ProtobufCMessage*)&welcome); - if(status!=MSTRO_OK) { - ERR("Failed to package %s into a pool manager message\n", + INFO("Sending negative acknolwedge for a join request \n"); + + Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; + status = mstro_pmp_package(&msg_welcome, + (ProtobufCMessage*)&welcome); + if(status!=MSTRO_OK) { + ERR("Failed to package %s into a pool manager message\n", welcome.base.descriptor->name); - return status; - } - if (op->status != MSTRO_OK) - { + return status; + } + status = mstro_pmp_send_nowait_ep(op->join.ep, op->join.translated_addr, &msg_welcome); if(status!=MSTRO_OK) { ERR("Cannot send WELCOME-NACK reply: %d (%s)\n", status, mstro_status_description(status)); + return status; + } } - else { - status = mstro_pmp_send_nowait(op->appid, &msg_welcome); + else + { + + INFO("Sending Welcome message to app %"PRIappid" \n", op->appid); + Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; + status = mstro_pmp_package(&msg_welcome, + (ProtobufCMessage*)&welcome); if(status!=MSTRO_OK) { - ERR("Cannot send WELCOME reply to %"PRIappid": %d (%s)\n", - op->appid, - status, mstro_status_description(status)); + ERR("Failed to package %s into a pool manager message\n", + welcome.base.descriptor->name); + return status; + } + + status = mstro_pmp_send_nowait(op->appid, + &msg_welcome); + if(status!=MSTRO_OK) { + ERR("Cannot send WELCOME reply to %zu: %d (%s)\n", + op->appid, + status, mstro_status_description(status)); + return status; } } - return status; - } - return status; } @@ -1112,25 +1125,166 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, + + + +/*** JOIN ***/ +// static +// mstro_status +// mstro_pm__handle_join_phase5(mstro_event event, +// struct mstro_pm__continuation_ctx *cont) +// { +// DEBUG("JOIN phase 5/5, event %p\n", event); +// //mstro_pm__msg_free(cont->msg); +// //mstro_pm__continuation_destroy(cont); +// mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_JOIN, 1); +// return MSTRO_OK; +// } + +// static +// mstro_status +// mstro_pm__handle_join_phase4(mstro_event event, +// struct mstro_pm__continuation_ctx *cont) +// { +// DEBUG("JOIN phase 4/5, event %p\n", event); + + + +// /* broadcast WELCOME:after event */ +// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; +// /* serial and handle set later */ +// ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; +// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; +// ev.welcome = &welcome; +// ev.origin_id = cont->msg->token->appid; + +// status = mstro_pm__event_notify_and_continue( +// &ev, +// false, +// cont, +// mstro_pm__handle_join_phase5, +// cont->msg, NULL, NULL); +// if(status!=MSTRO_OK) { +// ERR("Failed notifying subscribers of WELCOME:after event\n"); +// goto BAILOUT_FREE; +// } +// goto DONE; + +// BAILOUT_FREE: +// mstro_pm__msg_free(cont->msg); +// mstro_pm__continuation_destroy(cont); +// DONE: +// return status; +// } + +// static +// mstro_status +// mstro_pm__handle_join_phase3(mstro_event event, +// struct mstro_pm__continuation_ctx *cont) +// { +// DEBUG("JOIN phase 3/5, event %p\n", event); + +// /* broadcast WELCOME:before event */ +// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; +// /* serial and handle set later */ +// ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; +// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; +// /* FIXME: instead of a full welcome message just the app id would be enough */ +// Mstro__Pool__Appid id = MSTRO__POOL__APPID__INIT; +// id.id = cont->ctx_welcome.regentry->appid; +// Mstro__Pool__Apptoken token = MSTRO__POOL__APPTOKEN__INIT; +// token.appid = &id; +// Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; +// welcome.token = &token; +// ev.welcome = &welcome; + +// mstro_status status = mstro_pm__event_notify_and_continue( +// &ev, +// true, +// cont, +// mstro_pm__handle_join_phase4, +// cont->msg, NULL, cont->ctx_welcome.regentry); +// if(status!=MSTRO_OK) { +// ERR("Failed notifying subscribers of WELCOME:before event\n"); +// goto BAILOUT_FREE; +// } +// goto DONE; +// BAILOUT_FREE: +// mstro_pm__msg_free(cont->msg); +// mstro_pm__continuation_destroy(cont); +// DONE: +// return status; +// } + +// static +// mstro_status +// mstro_pm__handle_join_phase2(mstro_event event, +// struct mstro_pm__continuation_ctx *cont) +// { +// DEBUG("JOIN phase 2/5, event %p\n", event); + + + +// /* produce join:after event */ + +// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; +// /* serial and handle set later */ +// ev.kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; +// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; +// ev.join = cont->msg->join; +// /* it's safe to refer to the Appid object since the Event object is +// * on stack and will only be alive until notify-and-continue is +// * done */ +// Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; +// aid.id = regentry->appid; +// ev.origin_id = &aid; + +// INFO("JOIN message received. Caller %s:%d is now known as app #%" PRIu64 "\n", +// ev.join->component_name, ev.join->component_index, regentry->appid); + +// status = mstro_pm__event_notify_and_continue( +// &ev, +// false, +// NULL, +// mstro_pm__handle_join_phase3, +// cont->msg, NULL, regentry); +// if(status!=MSTRO_OK) { +// ERR("Failed notifying subscribers of JOIN:after event: %d (%s)\n", +// status, mstro_status_description(status)); +// goto BAILOUT_FREE; +// } +// goto DONE; +// BAILOUT_FREE: +// mstro_pm__msg_free(cont->msg); +// /* FIXME: free regentry*/ + +// DONE: +// mstro_pm__continuation_destroy(cont); +// return status; +// } + +/** handle JOIN message that came in on EP */ +static inline mstro_status -mstro_pm_handle_join_op(mstro_pool_operation op) +mstro_pm__handle_join(Mstro__Pool__MstroMsg *msg, + const struct mstro_endpoint *ep) { - mstro_status status = MSTRO_UNIMPL; - + Mstro__Pool__Join *join = msg->join; + mstro_status status = MSTRO_OK; + DEBUG("New component %s:%d (PM proto version %06x) advertises endpoint %s\n", - op->msg->join->component_name, op->msg->join->component_index, - op->msg->join->protocol_version, op->msg->join->serialized_endpoint); + join->component_name, join->component_index, + join->protocol_version, join->serialized_endpoint); - if(op->msg->join->transport_methods==NULL - || (op->msg->join->transport_methods->n_supported==0)) { + if(join->transport_methods==NULL + || (join->transport_methods->n_supported==0)) { ERR("No transport methods advertised\n"); - op->status = MSTRO_INVMSG; - op->step = MSTRO_OP_ST_SEND_POOLOP_ACK -1; /**Report error to app*/ - return MSTRO_OK; + status=MSTRO_INVMSG; + goto BAILOUT_FREE; } char transports[200]; - Mstro__Pool__TransportMethods *tm = op->msg->join->transport_methods; + Mstro__Pool__TransportMethods *tm = msg->join->transport_methods; transports[0] = '\0'; //initialize for (size_t i = 0; i < tm->n_supported; i++) @@ -1139,57 +1293,29 @@ mstro_pm_handle_join_op(mstro_pool_operation op) strcat(transports, mstro_pm__transport_method_name(tm->supported[i])); } + struct mstro_pm_app_registry_entry *regentry = NULL; - op->status = mstro_pm__register_app_op(op,®entry); - if(op->status!=MSTRO_OK) { - if(op->status == MSTRO_NOT_TWICE) { + status = mstro_pm__register_app(msg->join, + ep, + ®entry); + if(status!=MSTRO_OK) { + if(status==MSTRO_NOT_TWICE) { ERR("Failed to register caller: already have another entity registered with same workflow/component/component-id triple\n"); - op->step = MSTRO_OP_ST_SEND_POOLOP_ACK-1; status=MSTRO_OK; - return status; + goto BAILOUT_FREE; } else { ERR("Failed to register caller as new app\n"); - op->step = MSTRO_OP_ST_SEND_POOLOP_ACK-1; - status=MSTRO_OK; - return status; + // nack has been sent by register_app above; no events for this + goto BAILOUT_FREE; } } - status = MSTRO_OK; - DEBUG("Registered app at id %"PRIappid", Transport: %s \n", - regentry->appid, transports); - - op->appid = regentry->appid; - - INFO("JOIN message received. Caller %s:%d is now known as app #%" PRIu64 "\n", - op->msg->join->component_name, op->msg->join->component_index, op->appid); - - return status; -} - - - -/*** JOIN ***/ -static -mstro_status -mstro_pm__handle_join_phase5(mstro_event event, - struct mstro_pm__continuation_ctx *cont) -{ - DEBUG("JOIN phase 5/5, event %p\n", event); - mstro_pm__msg_free(cont->msg); - mstro_pm__continuation_destroy(cont); - mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_JOIN, 1); - return MSTRO_OK; -} - -static -mstro_status -mstro_pm__handle_join_phase4(mstro_event event, - struct mstro_pm__continuation_ctx *cont) -{ - DEBUG("JOIN phase 4/5, event %p\n", event); - + + DEBUG("Registered app at id %zu, Transport: %s \n", + regentry->appid, + transports); + Mstro__Pool__Appid id = MSTRO__POOL__APPID__INIT; - id.id = cont->ctx_welcome.regentry->appid; + id.id = regentry->appid; Mstro__Pool__Apptoken token = MSTRO__POOL__APPTOKEN__INIT; token.appid = &id; @@ -1198,7 +1324,7 @@ mstro_pm__handle_join_phase4(mstro_event event, welcome.token = &token; Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; - mstro_status status = mstro_pmp_package(&msg_welcome, + status = mstro_pmp_package(&msg_welcome, (ProtobufCMessage*)&welcome); if(status!=MSTRO_OK) { ERR("Failed to package %s into a pool manager message\n", @@ -1206,230 +1332,79 @@ mstro_pm__handle_join_phase4(mstro_event event, goto BAILOUT_FREE; } - status = mstro_pmp_send_nowait(cont->ctx_welcome.regentry->appid, + status = mstro_pmp_send_nowait(regentry->appid, &msg_welcome); if(status!=MSTRO_OK) { ERR("Cannot send WELCOME reply to %zu: %d (%s)\n", - cont->ctx_welcome.regentry->appid, + regentry->appid, status, mstro_status_description(status)); goto BAILOUT_FREE; } - - /* broadcast WELCOME:after event */ - Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; - /* serial and handle set later */ - ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; - ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; - ev.welcome = &welcome; - ev.origin_id = cont->msg->token->appid; - - status = mstro_pm__event_notify_and_continue( - &ev, - false, - cont, - mstro_pm__handle_join_phase5, - cont->msg, NULL, NULL); - if(status!=MSTRO_OK) { - ERR("Failed notifying subscribers of WELCOME:after event\n"); - goto BAILOUT_FREE; - } - goto DONE; - -BAILOUT_FREE: - mstro_pm__msg_free(cont->msg); - mstro_pm__continuation_destroy(cont); -DONE: - return status; -} +goto DONE; -static -mstro_status -mstro_pm__handle_join_phase3(mstro_event event, - struct mstro_pm__continuation_ctx *cont) -{ - DEBUG("JOIN phase 3/5, event %p\n", event); - - /* broadcast WELCOME:before event */ - Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; - /* serial and handle set later */ - ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; - ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; - /* FIXME: instead of a full welcome message just the app id would be enough */ - Mstro__Pool__Appid id = MSTRO__POOL__APPID__INIT; - id.id = cont->ctx_welcome.regentry->appid; - Mstro__Pool__Apptoken token = MSTRO__POOL__APPTOKEN__INIT; - token.appid = &id; - Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; - welcome.token = &token; - ev.welcome = &welcome; - - mstro_status status = mstro_pm__event_notify_and_continue( - &ev, - true, - cont, - mstro_pm__handle_join_phase4, - cont->msg, NULL, cont->ctx_welcome.regentry); - if(status!=MSTRO_OK) { - ERR("Failed notifying subscribers of WELCOME:before event\n"); - goto BAILOUT_FREE; - } - goto DONE; BAILOUT_FREE: - mstro_pm__msg_free(cont->msg); - mstro_pm__continuation_destroy(cont); + mstro_pm__msg_free(msg); + DONE: return status; } -static mstro_status -mstro_pm__handle_join_phase2(mstro_event event, - struct mstro_pm__continuation_ctx *cont) +mstro_pm_handle_join_op(mstro_pool_operation op) { - DEBUG("JOIN phase 2/5, event %p\n", event); + mstro_status status = MSTRO_UNIMPL; - /* yes, this is an exceedingly ugly way of getting a printout of - * the supported transports */ - const char* transports[5]= {"","","","",""}; - Mstro__Pool__TransportMethods *tm = cont->msg->join->transport_methods; - - size_t n = tm->n_supported; - if(n>4) { - transports[4] = "and more"; - n=4; - } - switch(n) { - case 4: - transports[3] - = mstro_pm__transport_method_name( - tm->supported[3]); - /* fall-thru */ - case 3: - transports[2] - = mstro_pm__transport_method_name( - tm->supported[2]); - /* fall-thru */ - case 2: - transports[1] - = mstro_pm__transport_method_name( - tm->supported[1]); - /* fall-thru */ - case 1: - transports[0] - = mstro_pm__transport_method_name( - tm->supported[0]); - break; - default: - ERR("Not handling all transport methods advertised (have %zu, handling 4)\n", n); - }; + DEBUG("New component %s:%d (PM proto version %06x) advertises endpoint %s\n", + op->msg->join->component_name, op->msg->join->component_index, + op->msg->join->protocol_version, op->msg->join->serialized_endpoint); + + if(op->msg->join->transport_methods==NULL + || (op->msg->join->transport_methods->n_supported==0)) { + ERR("No transport methods advertised\n"); + op->status = MSTRO_INVMSG; + op->step = MSTRO_OP_ST_SEND_POOLOP_ACK -1; /**Report error to app*/ + return MSTRO_OK; + } + + char transports[200]; + Mstro__Pool__TransportMethods *tm = op->msg->join->transport_methods; + transports[0] = '\0'; //initialize + for (size_t i = 0; i < tm->n_supported; i++) + { + strcat(transports, " "); + strcat(transports, mstro_pm__transport_method_name(tm->supported[i])); + } + struct mstro_pm_app_registry_entry *regentry = NULL; - mstro_status status = mstro_pm__register_app(cont->msg->join, - cont->ctx_join.ep, - ®entry); - if(status!=MSTRO_OK) { - if(status==MSTRO_NOT_TWICE) { + op->status = mstro_pm__register_app_op(op,®entry); + //op->status = mstro_pm__register_app(op->msg->join,op->join.ep,®entry); + if(op->status!=MSTRO_OK) { + if(op->status == MSTRO_NOT_TWICE) { ERR("Failed to register caller: already have another entity registered with same workflow/component/component-id triple\n"); + op->step = MSTRO_OP_ST_SEND_POOLOP_ACK-1; status=MSTRO_OK; - goto BAILOUT_FREE; + return status; } else { ERR("Failed to register caller as new app\n"); - // nack has been sent by register_app above; no events for this - goto BAILOUT_FREE; + op->step = MSTRO_OP_ST_SEND_POOLOP_ACK-1; + status=MSTRO_OK; + return status; } } - - DEBUG("Registered app at id %zu, Transport: %s %s %s %s %s\n", - regentry->appid, - transports[0], transports[1], transports[2], - transports[3], transports[4]); - - /* produce join:after event */ - - Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; - /* serial and handle set later */ - ev.kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; - ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; - ev.join = cont->msg->join; - /* it's safe to refer to the Appid object since the Event object is - * on stack and will only be alive until notify-and-continue is - * done */ - Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; - aid.id = regentry->appid; - ev.origin_id = &aid; + status = MSTRO_OK; + DEBUG("Registered app at id %"PRIappid", Transport: %s \n", + regentry->appid, transports); + + /**save app id to operation */ + op->appid = regentry->appid; INFO("JOIN message received. Caller %s:%d is now known as app #%" PRIu64 "\n", - ev.join->component_name, ev.join->component_index, regentry->appid); - - status = mstro_pm__event_notify_and_continue( - &ev, - false, - NULL, - mstro_pm__handle_join_phase3, - cont->msg, NULL, regentry); - if(status!=MSTRO_OK) { - ERR("Failed notifying subscribers of JOIN:after event: %d (%s)\n", - status, mstro_status_description(status)); - goto BAILOUT_FREE; - } - goto DONE; -BAILOUT_FREE: - mstro_pm__msg_free(cont->msg); - /* FIXME: free regentry*/ + op->msg->join->component_name, op->msg->join->component_index, op->appid); -DONE: - mstro_pm__continuation_destroy(cont); return status; } -/** handle JOIN message that came in on EP */ -static inline -mstro_status -mstro_pm__handle_join(Mstro__Pool__MstroMsg *msg, - const struct mstro_endpoint *ep) -{ - Mstro__Pool__Join *join = msg->join; - mstro_status status = MSTRO_OK; - - DEBUG("New component %s:%d (PM proto version %06x) advertises endpoint %s\n", - join->component_name, join->component_index, - join->protocol_version, join->serialized_endpoint); - - if(join->transport_methods==NULL - || (join->transport_methods->n_supported==0)) { - ERR("No transport methods advertised\n"); - status=MSTRO_INVMSG; - goto BAILOUT_FREE; - } - - /* broadcast JOIN:before event */ - Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; - /* serial and handle set later */ - ev.kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; - ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; - ev.join = msg->join; - /* origin-id not assigned yet. */ - ev.origin_id = NULL; - - - status = mstro_pm__event_notify_and_continue( - &ev, - true, - NULL, - mstro_pm__handle_join_phase2, - msg, ep, NULL); - if(status!=MSTRO_OK) { - ERR("Failed notifying subscribers of JOIN:before event: %d (%s)\n", - status, mstro_status_description(status)); - goto BAILOUT_FREE; - } - goto DONE; -BAILOUT_FREE: - mstro_pm__msg_free(msg); - -DONE: - return status; -} /*** SUBSCRIBE ***/ mstro_status diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index f391eab5d204004be09c78cad191b10349a1eea4..d62dc442ab9f79063cf616eaddbe68a82718d6c1 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -726,6 +726,7 @@ mstro_pool_op__fill_pool_event(mstro_pool_operation op, Mstro__Pool__Event *ev) ev->kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; ev->payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; ev->join = op->msg->join; + ev->origin_id = NULL; // we will put in the correct appid in after events in mstro_pool_op__event_notify status = MSTRO_OK; break; case MSTRO_OP_SUBSCRIBE: