diff --git a/include/maestro/i_ofi.h b/include/maestro/i_ofi.h index bfcb2946c145330088b4826827f5e4a543c9c067..e526da687396aaa0f53f7d7af9ef492e9ee9ac8b 100644 --- a/include/maestro/i_ofi.h +++ b/include/maestro/i_ofi.h @@ -216,7 +216,7 @@ struct mstro_endpoint { * @return mstro_status return status */ mstro_status -mstro_pm__register_app_op(mstro_pool_operation op, +mstro_pm__register_app(mstro_pool_operation op, struct mstro_pm_app_registry_entry **entry_p); mstro_status diff --git a/include/maestro/i_pool_manager.h b/include/maestro/i_pool_manager.h index 08723b20161ae2b2393742fdc08e6e7b8fc58197..d11e21d19f0e78b6a3af7ea6ab9c0bcb73a5f90e 100644 --- a/include/maestro/i_pool_manager.h +++ b/include/maestro/i_pool_manager.h @@ -52,49 +52,6 @@ /** endpoint abstraction */ struct mstro_endpoint; -/** forward declaration of continuation function context */ -struct mstro_pm__continuation_ctx; - -/** function that can be called as continuation of a message handler */ -typedef mstro_status(*mstro_pm__continuation)( - mstro_event event, struct mstro_pm__continuation_ctx *cont); - -/** structure keeping context for bottom-half handlers of message - * handling functions that are preempted (waiting) for subscription - * acknowledgement. - * - * The continuation handler is expected to either schedule another - * continuation, re-using the structure passed to it (like a private - * stack frame), or free it using mstro_pm__continuation_free() before - * returning. - */ -struct mstro_pm__continuation_ctx { - /** the continuation function to call. It will be passed the event - * that triggered it and the cont structure passed in the - * mstro_pm__schedule_continuation() call. */ - mstro_pm__continuation bh_handler; - /* The data that is needed to express the 'current continuation' - * context, mainly message-dependent info that the original message - * handler had in arguments or derived from it, but that have gone - * out of scope with it, and would have been recycled by the main - * message dispatcher by now. - */ - /** The original message that was handled when things were - * interrupted */ - Mstro__Pool__MstroMsg *msg; - - /* extra information needed for only some specific handlers */ - union { - struct { - const struct mstro_endpoint *ep; - } ctx_join; - struct { - struct mstro_pm_app_registry_entry *regentry; - } ctx_welcome; - }; -}; - - /* Pool manager "Handle incoming Message" dispatcher function */ mstro_status @@ -109,19 +66,10 @@ mstro_pm__possibly_fill_event_cdoname(const struct mstro_cdo_id *cdoid, mstro_app_id recipient, Mstro__Pool__Event *ev); -mstro_status -mstro_pm__event_notify_and_continue(Mstro__Pool__Event *pool_event_msg, - bool beforep, - struct mstro_pm__continuation_ctx *ctx, - mstro_pm__continuation cont_fun, - Mstro__Pool__MstroMsg *msg, - const struct mstro_endpoint *ep, - struct mstro_pm_app_registry_entry *regentry - ); /**Handle pm cdo seal operation and send ack*/ mstro_status -mstro_pm_cdo_seal_op(mstro_pool_operation op); +mstro_pm_cdo_seal(mstro_pool_operation op); /** * @brief wrapper for mstro_pm__send_ack to use within operations @@ -139,7 +87,7 @@ mstro_pm_send_ack_op(mstro_pool_operation op); * @return mstro_status */ mstro_status -mstro_pm_handle_demand_op(mstro_pool_operation op); +mstro_pm_handle_demand(mstro_pool_operation op); /** * @brief Send bye reply for application @@ -220,7 +168,7 @@ mstro_pm_handle_msg_resolve(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pm_handle_join_op(mstro_pool_operation op); +mstro_pm_handle_join(mstro_pool_operation op); diff --git a/include/maestro/i_pool_manager_registry.h b/include/maestro/i_pool_manager_registry.h index 5811d1d9b817735ce4daf983b0d1f06117a709c7..3c620249249d8eecf6a1321ea1f43c1af4e42a47 100644 --- a/include/maestro/i_pool_manager_registry.h +++ b/include/maestro/i_pool_manager_registry.h @@ -252,13 +252,6 @@ mstro_pm_cdo_registry_withdraw(mstro_pool_operation op); mstro_status mstro_pm_cdo_registry_send_declare_ack(mstro_pool_operation op); -/** @brief Update CDO state on CDOID for APP_ID. - ** - **/ -mstro_status mstro_pm_cdo_registry_update_state( - const struct mstro_cdo_id *cdoid, - const mstro_app_id app_id, - mstro_cdo_state new_state); /** @brief Handle some or all outstanding demand queue entries. ** diff --git a/include/maestro/i_pool_operations.h b/include/maestro/i_pool_operations.h index a94ffe9f3f36449c4586ad6038f877807da84e12..c09d990474d7f8184363545a7be7fd74a8551fb2 100644 --- a/include/maestro/i_pool_operations.h +++ b/include/maestro/i_pool_operations.h @@ -150,10 +150,6 @@ mstro_pool_op__event_notify_after(mstro_pool_operation op); mstro_status mstro_pool_op__check_acks(mstro_pool_operation op); -mstro_status -mstro_pool_op_event_advertise(Mstro__Pool__Event *ev, - bool beforep, - _Atomic(uint64_t) *nr_outstanding_acks); /** * @brief convenience function to print the operation kind @@ -163,7 +159,7 @@ mstro_pool_op_event_advertise(Mstro__Pool__Event *ev, * @return char* string represents the operation type */ char * -mstro_pool_op__kind_to_string(enum mstro_pool_operation_kind operation_type); +mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type); /*Function to execute an operation*/ mstro_status diff --git a/maestro/i_subscription_registry.h b/maestro/i_subscription_registry.h index 42386b5e276a155709efd4a5abafd886903e844b..6278aeae56d38026431f67a014387b18b8537b7e 100644 --- a/maestro/i_subscription_registry.h +++ b/maestro/i_subscription_registry.h @@ -157,7 +157,7 @@ mstro_subscription_message_event_ack(const Mstro__Pool__EventAck *msg); mstro_status mstro_pool_event_advertise(Mstro__Pool__Event *e, bool beforep, - mstro_event continuation); + _Atomic(uint64_t) *nr_outstanding_acks); /** @brief Handle an incoming event notification */ diff --git a/maestro/ofi.c b/maestro/ofi.c index 8a8b4b113e66254d2bc6c36f4b84f6942f7d96fd..242ea91aacc7998f52abd0c538faa90686494c92 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -3124,8 +3124,16 @@ mstro_ofi__submit_message_nowait(struct mstro_endpoint *ep, fi_addr_t dst, return stat; } +/* FIXME: this needs to move to pool manager protocol code regions */ +/* register a new app using its JOIN message. Allocates a new entry in + * @ref g_mstro_pm_app_table and returns the key to identify it in + * *entry_p + * + * The transport_methods slot will be consumed by the registration and + * NULLed. + */ mstro_status -mstro_pm__register_app_op(mstro_pool_operation op, +mstro_pm__register_app(mstro_pool_operation op, struct mstro_pm_app_registry_entry **entry_p) { mstro_status s=MSTRO_OK; @@ -3189,101 +3197,6 @@ mstro_pm__register_app_op(mstro_pool_operation op, } -/* FIXME: this needs to move to pool manager protocol code regions */ -/* register a new app using its JOIN message. Allocates a new entry in - * @ref g_mstro_pm_app_table and returns the key to identify it in - * *entry_p - * - * The transport_methods slot will be consumed by the registration and - * NULLed. - */ -mstro_status -mstro_pm__register_app(Mstro__Pool__Join *join_msg, - struct mstro_endpoint *ep, - struct mstro_pm_app_registry_entry **entry_p) -{ - mstro_status s=MSTRO_OK; - - assert(entry_p!=NULL); - assert(join_msg!=NULL); - - /* unpack serialized address */ - Mstro__AppInfo *epd=NULL; - s = mstro_appinfo_deserialize(join_msg->serialized_endpoint, &epd); - if(s!=MSTRO_OK) { - ERR("Failed to deserialize endpoint in JOIN message\n"); - goto BAILOUT_FREE; - } - DEBUG("incoming serialized EPD: %s\n", join_msg->serialized_endpoint); - if(epd->eps==NULL) { - ERR("Empty EPD\n"); - s=MSTRO_FAIL; - goto BAILOUT_FREE; - } - - - WITH_MSTRO_EPL_DESCRIPTION(str,epd->eps,{ - DEBUG("EPD parsed as %s\n", str); - };); - - assert(epd->eps->n_eps>0); - if(epd->eps->n_eps>1) { - WARN("App provided %zu contact endpoints, only trying #0 -- FIXME\n", - epd->eps->n_eps); - } - - fi_addr_t translated_addr=FI_ADDR_UNSPEC; - s = mstro_ofi__try_epd_addr(ep, epd->eps->eps[0], &translated_addr); - if(s!=MSTRO_OK) { - DEBUG("local EP did not accept remote addr, can't welcome app %s:%zu\n", - join_msg->component_name, join_msg->component_index); - goto BAILOUT_FREE; - } - - DEBUG("App %s advertises %zu transport methods\n", - join_msg->component_name, join_msg->transport_methods->n_supported); - - /* insert into registry table */ - s = mstro_pm_app_register(ep, translated_addr, - strdup(join_msg->serialized_endpoint), - join_msg->transport_methods, - NULL, join_msg->component_name, join_msg->component_index, - entry_p); - if(s!=MSTRO_OK) { - ERR("Failed to register application: %d (%s)\n", s, mstro_status_description(s)); - DEBUG("Sending WELCOME-NACK\n"); - Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; - welcome.token = NULL; - welcome.nack = true; - - Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; - mstro_status status = mstro_pmp_package(&msg_welcome, - (ProtobufCMessage*)&welcome); - if(status!=MSTRO_OK) { - ERR("Failed to package %s into a pool manager message\n", - welcome.base.descriptor->name); - goto BAILOUT_FREE; - } - status = mstro_pmp_send_nowait_ep(ep, translated_addr, &msg_welcome); - if(status!=MSTRO_OK) { - ERR("Cannot send WELCOME-NACK reply: %d (%s)\n", - status, mstro_status_description(status)); - goto BAILOUT_FREE; - } - goto BAILOUT_FREE; - } else { - join_msg->transport_methods = NULL; /* we kept a reference to it, caller will free the message */ - //DEBUG("Assigned app ID %"PRIu64"\n", (*entry_p)->appid); - } - -BAILOUT_FREE: - ; -BAILOUT: - mstro__app_info__free_unpacked(epd, NULL); - return s; -} - - /** allocate a fresh 'short' message envelope, context, and post a receive for it on EP. Returns the message envelope in *envelope_p and the context in *ctxt_p. */ diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index b4911a3daa0f6e97fb2d61f968babb76cbcc8dfd..c8f1818f74cae77cc3ffb8bb9d017fa025570dd5 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -300,7 +300,7 @@ mstro_pm_send_ack_op(mstro_pool_operation op) if(status!=MSTRO_OK) { ERR("Failed to send %s-ACK: %d (%s)\n", - mstro_pool_op__kind_to_string(op->kind), status, mstro_status_description(status)); + mstro_pool_op_kind_to_string(op->kind), status, mstro_status_description(status)); } return status; } @@ -333,225 +333,6 @@ mstro_pm_send_subscribe_ack(mstro_pool_operation op) return status; } -/** Deallocate a continuation structure - * - * This automatically deallocated the mstro_pm__continuation.msg, but - * all other slots will not be touched (caller has to clear them - * before if desired). - */ - -static inline -void -mstro_pm__continuation_free(struct mstro_pm__continuation_ctx *c) -{ - assert(c!=NULL); - assert(c->msg!=NULL); - assert(c->msg->token!=&g_pool_apptoken); - mstro__pool__mstro_msg__free_unpacked(c->msg, NULL); - free(c); -} - -/** the event handler dispatch function - * - * All subscriptions that require an ACK are dispatched through the i_event.h - * infrastructure, with this function as the event handler - */ -static -void -mstro_pm__event_bh(mstro_event event, - void *closure) -{ - if(event==NULL) { - ERR("NULL event\n"); abort(); - } - if(closure==NULL) { - ERR("NULL closure\n"); abort(); - } - struct mstro_pm__continuation_ctx *cont - = (struct mstro_pm__continuation_ctx*)closure; - - DEBUG("continuation %p, handler %p\n", cont, cont->bh_handler); - mstro_status s = cont->bh_handler(event, cont); - if(s!=MSTRO_OK) { - ERR("Executing continuation handler failed: %d (%s)\n", - s, mstro_status_description(s)); - } -} - - -/** Create a new event and schedule a continuation handler for it. - * - * Returns the event that can trigger the event. - */ -static inline -mstro_status -mstro_pm__schedule_continuation( - struct mstro_pm__continuation_ctx *cont, - mstro_event *result) -{ - assert(g_mstro_pm_continuations!=NULL); - mstro_event event; - mstro_status s = mstro_event_create(g_mstro_pm_continuations, - mstro_pm__event_bh, - cont, NULL, true, &event); - if(s!=MSTRO_OK) { - ERR("Failed to schedule continuation event: %d (%s)\n", - s, mstro_status_description(s)); - goto BAILOUT; - } - - mstro_event_id id; - s = mstro_event_id_get(event, &id); - if(s!=MSTRO_OK) { - ERR("Failed to obtain event id\n"); - goto BAILOUT; - } - - DEBUG("Scheduled continuation, event %p, id %" PRIu64 "\n", - event, id); - *result = event; - -BAILOUT: - return s; -} - -/** create a new continuation object - * - * content beyond handler and msg must be filled in by caller as - * needed. - */ -static inline -struct mstro_pm__continuation_ctx * -mstro_pm__continuation_create( - mstro_pm__continuation bh_handler, - Mstro__Pool__MstroMsg *msg) -{ - assert(bh_handler!=NULL); - assert(msg!=NULL); - struct mstro_pm__continuation_ctx *res - = malloc(sizeof(struct mstro_pm__continuation_ctx)); - if(res) { - res->bh_handler=bh_handler; - res->msg = msg; - } - - return res; -} - -static inline -void -mstro_pm__continuation_destroy( - struct mstro_pm__continuation_ctx *ctx) -{ - assert(ctx!=NULL); - free(ctx); -} - -/* send event notification to subscribers. - * - * If the subscription requires an acknowledgement, schedule the - * continuation, otherwise execute it immediately. - * - * If @arg ctx is non-null it will be reused as context, otherwise a - * new one will be allocated. If reuse occurs, the other arguments - * will still be set in the context (caller does not need to do it). - * - * The @arg pool_even_msg will not be referenced after return from - * this function, so it's safe to allocate it on the stack of the - * caller and have components in it that may be short-lived. - */ -mstro_status -mstro_pm__event_notify_and_continue(Mstro__Pool__Event *pool_event_msg, - bool beforep, - struct mstro_pm__continuation_ctx *ctx, - mstro_pm__continuation cont_fun, - Mstro__Pool__MstroMsg *msg, - const struct mstro_endpoint *ep, - struct mstro_pm_app_registry_entry *regentry - ) -{ - if(pool_event_msg==NULL) { - ERR("NULL pool event data\n"); - return MSTRO_INVARG; - } - if(cont_fun==NULL) { - ERR("No continuation function given\n"); - return MSTRO_INVARG; - } else { - DEBUG("Cont-fun is %p\n", cont_fun); - } - - if(msg==NULL) { - ERR("No message available\n"); - return MSTRO_INVARG; - } - - /* ep==NULL is permitted; only JOIN sets it */ - - if(ctx==NULL) { - ctx = mstro_pm__continuation_create(cont_fun, msg); - if(ctx==NULL) { - ERR("Failed to allocate continuation context\n"); - return MSTRO_NOMEM; - } - } else { - DEBUG("Reusing caller-provided context %p\n", ctx); - ctx->bh_handler = cont_fun; - ctx->msg = msg; - } - /* special cases */ - if(ep) { - assert(regentry==NULL); - ctx->ctx_join.ep = ep; - } - if(regentry) { - assert(ep==NULL); - ctx->ctx_welcome.regentry = regentry; - } - - mstro_event cont_event=NULL; - mstro_status s = mstro_pm__schedule_continuation(ctx, &cont_event); - if(s!=MSTRO_OK) { - ERR("Failed to schedule event handler: %d (%s)\n", - s, mstro_status_description(s)); - goto BAILOUT_FREE; - } - - Mstro__Pool__Timestamp ctime = MSTRO__POOL__TIMESTAMP__INIT; - ctime.offset = 0; - mstro_nanosec_t tick = mstro_clock(); - ctime.sec = tick/NSEC_PER_SEC; - ctime.nsec = tick-ctime.sec; - pool_event_msg->ctime = &ctime; - - - /* Send subscription info. We pass our continuation event so that - * the subscription engine can trigger our continuation when all - * the event acks have arrived. If none are needed the event will - * be triggered immediately. - */ - DEBUG("Advertising event kind %d (%s:%s)\n", - pool_event_msg->kind, - protobuf_c_enum_descriptor_get_value(&mstro__pool__event_kind__descriptor, - pool_event_msg->kind)->name, - beforep ? "before" : "after"); - s = mstro_pool_event_advertise(pool_event_msg, beforep, cont_event); - mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_POOL_EVENTS, 1); - - if(s!=MSTRO_OK) { - ERR("Failed to advertise event: %d (%s)\n", - s, mstro_status_description(s)); - goto BAILOUT_FREE; - } - return MSTRO_OK; - -BAILOUT_FREE: - mstro_pm__continuation_destroy(ctx); - return s; -} - - - void mstro_pm__msg_free(Mstro__Pool__MstroMsg *msg) { @@ -897,7 +678,7 @@ BAILOUT: /*** SEAL ***/ /**Handled by operations*/ mstro_status -mstro_pm_cdo_seal_op(mstro_pool_operation op) +mstro_pm_cdo_seal(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -989,7 +770,7 @@ mstro_pm_cdo_seal_op(mstro_pool_operation op) /*** DEMAND ***/ mstro_status -mstro_pm_handle_demand_op(mstro_pool_operation op) +mstro_pm_handle_demand(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; status = mstro_pm_cdo_registry_update_state_op(op); @@ -1043,6 +824,8 @@ mstro_pm_send_bye(mstro_pool_operation op) return status; } + +/*** JOIN ***/ mstro_status mstro_pm_send_welcome(mstro_pool_operation op) { @@ -1115,242 +898,8 @@ mstro_pm__transport_method_name(Mstro__Pool__TransportKind tk) ->name; } -/* FIXME: this function needs to be refactored and moved to pool - * manager proto; it currently is in ofi.c */ -mstro_status -mstro_pm__register_app(Mstro__Pool__Join *join_msg, - const struct mstro_endpoint *ep, - struct mstro_pm_app_registry_entry **entry_p); - - - - - - - -/*** JOIN ***/ -// static -// mstro_status -// mstro_pm__handle_join_phase5(mstro_event event, -// struct mstro_pm__continuation_ctx *cont) -// { -// DEBUG("JOIN phase 5/5, event %p\n", event); -// //mstro_pm__msg_free(cont->msg); -// //mstro_pm__continuation_destroy(cont); -// mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_JOIN, 1); -// return MSTRO_OK; -// } - -// static -// mstro_status -// mstro_pm__handle_join_phase4(mstro_event event, -// struct mstro_pm__continuation_ctx *cont) -// { -// DEBUG("JOIN phase 4/5, event %p\n", event); - - - -// /* broadcast WELCOME:after event */ -// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; -// /* serial and handle set later */ -// ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; -// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; -// ev.welcome = &welcome; -// ev.origin_id = cont->msg->token->appid; - -// status = mstro_pm__event_notify_and_continue( -// &ev, -// false, -// cont, -// mstro_pm__handle_join_phase5, -// cont->msg, NULL, NULL); -// if(status!=MSTRO_OK) { -// ERR("Failed notifying subscribers of WELCOME:after event\n"); -// goto BAILOUT_FREE; -// } -// goto DONE; - -// BAILOUT_FREE: -// mstro_pm__msg_free(cont->msg); -// mstro_pm__continuation_destroy(cont); -// DONE: -// return status; -// } - -// static -// mstro_status -// mstro_pm__handle_join_phase3(mstro_event event, -// struct mstro_pm__continuation_ctx *cont) -// { -// DEBUG("JOIN phase 3/5, event %p\n", event); - -// /* broadcast WELCOME:before event */ -// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; -// /* serial and handle set later */ -// ev.kind = MSTRO__POOL__EVENT_KIND__APP_WELCOME; -// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_WELCOME; -// /* FIXME: instead of a full welcome message just the app id would be enough */ -// Mstro__Pool__Appid id = MSTRO__POOL__APPID__INIT; -// id.id = cont->ctx_welcome.regentry->appid; -// Mstro__Pool__Apptoken token = MSTRO__POOL__APPTOKEN__INIT; -// token.appid = &id; -// Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; -// welcome.token = &token; -// ev.welcome = &welcome; - -// mstro_status status = mstro_pm__event_notify_and_continue( -// &ev, -// true, -// cont, -// mstro_pm__handle_join_phase4, -// cont->msg, NULL, cont->ctx_welcome.regentry); -// if(status!=MSTRO_OK) { -// ERR("Failed notifying subscribers of WELCOME:before event\n"); -// goto BAILOUT_FREE; -// } -// goto DONE; -// BAILOUT_FREE: -// mstro_pm__msg_free(cont->msg); -// mstro_pm__continuation_destroy(cont); -// DONE: -// return status; -// } - -// static -// mstro_status -// mstro_pm__handle_join_phase2(mstro_event event, -// struct mstro_pm__continuation_ctx *cont) -// { -// DEBUG("JOIN phase 2/5, event %p\n", event); - - - -// /* produce join:after event */ - -// Mstro__Pool__Event ev = MSTRO__POOL__EVENT__INIT; -// /* serial and handle set later */ -// ev.kind = MSTRO__POOL__EVENT_KIND__APP_JOIN; -// ev.payload_case = MSTRO__POOL__EVENT__PAYLOAD_JOIN; -// ev.join = cont->msg->join; -// /* it's safe to refer to the Appid object since the Event object is -// * on stack and will only be alive until notify-and-continue is -// * done */ -// Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; -// aid.id = regentry->appid; -// ev.origin_id = &aid; - -// INFO("JOIN message received. Caller %s:%d is now known as app #%" PRIu64 "\n", -// ev.join->component_name, ev.join->component_index, regentry->appid); - -// status = mstro_pm__event_notify_and_continue( -// &ev, -// false, -// NULL, -// mstro_pm__handle_join_phase3, -// cont->msg, NULL, regentry); -// if(status!=MSTRO_OK) { -// ERR("Failed notifying subscribers of JOIN:after event: %d (%s)\n", -// status, mstro_status_description(status)); -// goto BAILOUT_FREE; -// } -// goto DONE; -// BAILOUT_FREE: -// mstro_pm__msg_free(cont->msg); -// /* FIXME: free regentry*/ - -// DONE: -// mstro_pm__continuation_destroy(cont); -// return status; -// } - -/** handle JOIN message that came in on EP */ -static inline -mstro_status -mstro_pm__handle_join(Mstro__Pool__MstroMsg *msg, - const struct mstro_endpoint *ep) -{ - Mstro__Pool__Join *join = msg->join; - mstro_status status = MSTRO_OK; - - DEBUG("New component %s:%d (PM proto version %06x) advertises endpoint %s\n", - join->component_name, join->component_index, - join->protocol_version, join->serialized_endpoint); - - if(join->transport_methods==NULL - || (join->transport_methods->n_supported==0)) { - ERR("No transport methods advertised\n"); - status=MSTRO_INVMSG; - goto BAILOUT_FREE; - } - - char transports[200]; - Mstro__Pool__TransportMethods *tm = msg->join->transport_methods; - - transports[0] = '\0'; //initialize - for (size_t i = 0; i < tm->n_supported; i++) - { - strcat(transports, " "); - strcat(transports, mstro_pm__transport_method_name(tm->supported[i])); - } - - - struct mstro_pm_app_registry_entry *regentry = NULL; - status = mstro_pm__register_app(msg->join, - ep, - ®entry); - if(status!=MSTRO_OK) { - if(status==MSTRO_NOT_TWICE) { - ERR("Failed to register caller: already have another entity registered with same workflow/component/component-id triple\n"); - status=MSTRO_OK; - goto BAILOUT_FREE; - } else { - ERR("Failed to register caller as new app\n"); - // nack has been sent by register_app above; no events for this - goto BAILOUT_FREE; - } - } - - DEBUG("Registered app at id %zu, Transport: %s \n", - regentry->appid, - transports); - - Mstro__Pool__Appid id = MSTRO__POOL__APPID__INIT; - id.id = regentry->appid; - - Mstro__Pool__Apptoken token = MSTRO__POOL__APPTOKEN__INIT; - token.appid = &id; - - Mstro__Pool__Welcome welcome = MSTRO__POOL__WELCOME__INIT; - welcome.token = &token; - - Mstro__Pool__MstroMsg msg_welcome = MSTRO__POOL__MSTRO_MSG__INIT; - status = mstro_pmp_package(&msg_welcome, - (ProtobufCMessage*)&welcome); - if(status!=MSTRO_OK) { - ERR("Failed to package %s into a pool manager message\n", - welcome.base.descriptor->name); - goto BAILOUT_FREE; - } - - status = mstro_pmp_send_nowait(regentry->appid, - &msg_welcome); - if(status!=MSTRO_OK) { - ERR("Cannot send WELCOME reply to %zu: %d (%s)\n", - regentry->appid, - status, mstro_status_description(status)); - goto BAILOUT_FREE; - } -goto DONE; - -BAILOUT_FREE: - mstro_pm__msg_free(msg); - -DONE: - return status; -} - mstro_status -mstro_pm_handle_join_op(mstro_pool_operation op) +mstro_pm_handle_join(mstro_pool_operation op) { mstro_status status = MSTRO_UNIMPL; @@ -1377,8 +926,7 @@ mstro_pm_handle_join_op(mstro_pool_operation op) } struct mstro_pm_app_registry_entry *regentry = NULL; - op->status = mstro_pm__register_app_op(op,®entry); - //op->status = mstro_pm__register_app(op->msg->join,op->join.ep,®entry); + op->status = mstro_pm__register_app(op,®entry); if(op->status!=MSTRO_OK) { if(op->status == MSTRO_NOT_TWICE) { ERR("Failed to register caller: already have another entity registered with same workflow/component/component-id triple\n"); @@ -1648,9 +1196,8 @@ mstro_pm_handle_msg(const struct mstro_msg_envelope *envelope, break; case MSTRO__POOL__MSTRO_MSG__MSG_JOIN: - status = mstro_pool_op_maker(msg, ep); - //status = mstro_pm__handle_join(msg, ep); //DEBUG("PM MSG-JOIN result: %d\n", status); + status = mstro_pool_op_maker(msg, ep); break; /* currently unimplemented: */ diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index a992d79cdf1c2c24633ca6fcd08d9189a4a1ff4d..2ba7ea13ccf27f1452b4a8f709dc499849c6ea44 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -1369,7 +1369,7 @@ mstro_pm_cdo_registry_send_declare_ack(mstro_pool_operation op) } -/**Wrapper for mstro_pm_cdo_registry_declare + send ack */ +/**Wrapper for mstro_pm_cdo_registry_declare */ mstro_status mstro_pm_cdo_registry_declare_op(mstro_pool_operation op) { @@ -1733,6 +1733,47 @@ mstro_pm_cdo_registry__set_state(const struct mstro_cdo_id *cdoid, return MSTRO_OK; } +static inline +mstro_status +mstro_pm_cdo_registry_update_state(const struct mstro_cdo_id *cdoid, + const mstro_app_id app_id, + mstro_cdo_state new_state) +{ + struct mstro_pm_cdo_registry_entry *regentry=NULL; + bool found=false; + + struct mstro_cdo_id head = *cdoid; + head.local_id = MSTRO_CDO_LOCAL_ID_NONE; + + WITH_LOCKED_CDO_REGISTRY( + HASH_FIND(hh, g_mstro_pm_cdo_registry, + &head, sizeof(struct mstro_cdo_id), regentry); + if(regentry) { + struct per_app_cdo_entries *per_app_entries; + HASH_FIND(hh, regentry->app_to_attributes, + &app_id, sizeof(app_id), per_app_entries); + if(per_app_entries) { + mstro_status s = + mstro_pm_cdo_registry__set_state(cdoid, app_id, + new_state, per_app_entries); + if(s==MSTRO_OK) { + found=true; + } + } + }); + + if(found) { + mstro_status status = mstro_pm__notify_cdo_registry_change(); + if(status!=MSTRO_OK) { + ERR("Failed to notify waiters of registry change\n"); + return status; + } + } + + return found? MSTRO_OK : MSTRO_FAIL; +} + + /*wrapper for mstro_pm_cdo_registry_update_state to work with operations */ mstro_status mstro_pm_cdo_registry_update_state_op(mstro_pool_operation op) @@ -1766,7 +1807,7 @@ mstro_pm_cdo_registry_update_state_op(mstro_pool_operation op) { WITH_CDO_ID_STR(idstr, &(op->cdoid), INFO("app %" PRIappid " %sED CDO (global id: `%s')\n", - op->appid, mstro_pool_op__kind_to_string(op->kind),idstr); + op->appid, mstro_pool_op_kind_to_string(op->kind),idstr); ); } else /**op->status == MSTRO_FAIL */ @@ -1781,44 +1822,6 @@ mstro_pm_cdo_registry_update_state_op(mstro_pool_operation op) return MSTRO_OK; /**operation status will be probagated to operation issuer*/ } -mstro_status -mstro_pm_cdo_registry_update_state(const struct mstro_cdo_id *cdoid, - const mstro_app_id app_id, - mstro_cdo_state new_state) -{ - struct mstro_pm_cdo_registry_entry *regentry=NULL; - bool found=false; - - struct mstro_cdo_id head = *cdoid; - head.local_id = MSTRO_CDO_LOCAL_ID_NONE; - - WITH_LOCKED_CDO_REGISTRY( - HASH_FIND(hh, g_mstro_pm_cdo_registry, - &head, sizeof(struct mstro_cdo_id), regentry); - if(regentry) { - struct per_app_cdo_entries *per_app_entries; - HASH_FIND(hh, regentry->app_to_attributes, - &app_id, sizeof(app_id), per_app_entries); - if(per_app_entries) { - mstro_status s = - mstro_pm_cdo_registry__set_state(cdoid, app_id, - new_state, per_app_entries); - if(s==MSTRO_OK) { - found=true; - } - } - }); - - if(found) { - mstro_status status = mstro_pm__notify_cdo_registry_change(); - if(status!=MSTRO_OK) { - ERR("Failed to notify waiters of registry change\n"); - return status; - } - } - - return found? MSTRO_OK : MSTRO_FAIL; -} static inline mstro_status diff --git a/maestro/pool_operations.c b/maestro/pool_operations.c index d62dc442ab9f79063cf616eaddbe68a82718d6c1..e823757e6120c8b199785cff0096428730e28245 100644 --- a/maestro/pool_operations.c +++ b/maestro/pool_operations.c @@ -84,7 +84,7 @@ const mstro_pool_op_st_handler mstro_pool_op_seal_steps[] = { NULL, mstro_pool_op__event_notify_before, mstro_pool_op__check_acks, - mstro_pm_cdo_seal_op, + mstro_pm_cdo_seal, mstro_pool_op__event_notify_after, mstro_pool_op__check_acks, mstro_pm_send_ack_op, @@ -128,7 +128,7 @@ const mstro_pool_op_st_handler mstro_pool_op_demand_steps[] = { NULL, mstro_pool_op__event_notify_before, mstro_pool_op__check_acks, - mstro_pm_handle_demand_op, + mstro_pm_handle_demand, mstro_pool_op__event_notify_after, mstro_pool_op__check_acks, NULL, @@ -172,7 +172,7 @@ const mstro_pool_op_st_handler mstro_pool_op_join_steps[] = { NULL, mstro_pool_op__event_notify_before, mstro_pool_op__check_acks, - mstro_pm_handle_join_op, + mstro_pm_handle_join, mstro_pool_op__event_notify_after, mstro_pool_op__check_acks, mstro_pm_send_welcome, @@ -411,7 +411,7 @@ mstro_pool_op_maker(Mstro__Pool__MstroMsg *msg, const struct mstro_endpoint *ep) /** Convenience function to print operation type for debugging */ char * -mstro_pool_op__kind_to_string(enum mstro_pool_operation_kind operation_type) +mstro_pool_op_kind_to_string(enum mstro_pool_operation_kind operation_type) { char *type = NULL; switch (operation_type) @@ -497,7 +497,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st { ERR("Failed to execute step %d of operation %s on CDO %s (from app %"PRIappid")\n", op->step, - mstro_pool_op__kind_to_string(op->kind), + mstro_pool_op_kind_to_string(op->kind), (cdo_name != NULL)?cdo_name:cdo_id_str, op->appid); } @@ -506,7 +506,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st DEBUG("%s%s operation %s on CDO %s (from app %"PRIappid")\n", (state == MSTRO_OP_STATE_BLOCKED)?"Blocked":"Completed", (state == MSTRO_OP_STATE_COMPLETED)?"":op_step_str, - mstro_pool_op__kind_to_string(op->kind), + mstro_pool_op_kind_to_string(op->kind), (cdo_name != NULL)?cdo_name:cdo_id_str, op->appid); } @@ -522,7 +522,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st { ERR("Failed to execute step %d of operation %s (from app %"PRIappid")\n", op->step, - mstro_pool_op__kind_to_string(op->kind), + mstro_pool_op_kind_to_string(op->kind), op->appid); } else @@ -530,7 +530,7 @@ mstro_pool_op__print_state(mstro_pool_operation op, enum mstro_pool_operation_st DEBUG("%s%s operation %s (from app %"PRIappid")\n", (state == MSTRO_OP_STATE_BLOCKED)?"Blocked":"Completed", (state == MSTRO_OP_STATE_COMPLETED)?"":op_step_str, - mstro_pool_op__kind_to_string(op->kind), + mstro_pool_op_kind_to_string(op->kind), op->appid); } break; @@ -787,7 +787,7 @@ mstro_pool_op__event_notify(mstro_pool_operation op, bool before) protobuf_c_enum_descriptor_get_value(&mstro__pool__event_kind__descriptor, pool_event_msg.kind)->name, before ? "before" : "after" ); - status = mstro_pool_op_event_advertise(&pool_event_msg, before, &(op->nr_outstanding_acks)); + status = mstro_pool_event_advertise(&pool_event_msg, before, &(op->nr_outstanding_acks)); mstro_stats_add_counter(MSTRO_STATS_CAT_PROTOCOL, MSTRO_STATS_L_PM_NUM_POOL_EVENTS, 1); if(status!=MSTRO_OK) { diff --git a/maestro/subscription_registry.c b/maestro/subscription_registry.c index b9c2891680f111dcf9ab10bdb707a0cc7f5dc96b..8a11dec29c237b0b294daab090ae17462d0be2ce 100644 --- a/maestro/subscription_registry.c +++ b/maestro/subscription_registry.c @@ -1316,18 +1316,9 @@ mstro_subscription_message_event_ack(const Mstro__Pool__EventAck *msg) msg->serial, &result); } -/* Acknowledgement management */ -/** A structure holding the context info for handling incoming event ack messages */ -struct event_ack_ctx { - /** The count-down of outstanding acks */ - _Atomic(uint64_t) nr_outstanding_acks; - /** the continuation event to trigger when all acks have occured */ - mstro_event continuation; -}; -/**FIXME to replace event_ack_cb */ static void -mstro_pool_op_event_ack_cb(mstro_event event, void *ctx) +event_ack_cb(mstro_event event, void *ctx) { assert(event!=NULL); assert(ctx!=NULL); _Atomic(uint64_t) *nr_outstanding_acks = (_Atomic(uint64_t) *) ctx; @@ -1340,27 +1331,6 @@ mstro_pool_op_event_ack_cb(mstro_event event, void *ctx) event, ctx, nr_left); } -static void -event_ack_cb(mstro_event event, void *ctx) -{ - assert(event!=NULL); assert(ctx!=NULL); - struct event_ack_ctx *ectx = (struct event_ack_ctx*) ctx; - - uint64_t nr_left = atomic_fetch_sub_explicit(&(ectx->nr_outstanding_acks), - 1, - memory_order_acquire); - DEBUG("event ack on event %p, ctx %p, (pre-) nr_left=%" PRIu64 "\n", - event, ectx, nr_left); - if(nr_left==1) { - /* we were the last ones */ - mstro_event event = ectx->continuation; - void *result; - free(ectx); - DEBUG("Last outstanding ack, triggering continuation event %p\n", event); - mstro_status s = mstro_event_trigger(event, &result); - assert(s==MSTRO_OK); - } -} /** mask of the events that support a CDO selector */ #define MSTRO_POOL_EVENT_CDO_MASK ( \ @@ -1707,165 +1677,8 @@ mstro_subscription_selector_check(struct mstro_subscription_ *s, * Local subscriptions will be handled directly, remote subscriptions * will receive event messages. */ - mstro_status mstro_pool_event_advertise(Mstro__Pool__Event *ev, - bool beforep, - mstro_event continuation) -{ - mstro_status status = MSTRO_OK; - - if(ev==NULL) { - ERR("NULL pool event data\n"); - return MSTRO_INVARG; - } - /* caller needs to set these */ - assert(ev->kind!=MSTRO__POOL__EVENT_KIND__INVALID_EVENT); - assert(ev->payload_case!=MSTRO__POOL__EVENT__PAYLOAD__NOT_SET); - - struct event_ack_ctx *ctx = malloc(sizeof(struct event_ack_ctx)); - if(ctx==NULL) { - ERR("Failed to allocate event ack context\n"); - return MSTRO_NOMEM; - } - ctx->continuation = continuation; - atomic_store_explicit(&ctx->nr_outstanding_acks, - 1, memory_order_release); /* one for ourselves */ - - mstro_event ads_done; - assert(g_subscription_table.edom!=NULL); - status = mstro_event_create(g_subscription_table.edom, - event_ack_cb, ctx, NULL, true, &ads_done); - if(status!=MSTRO_OK) { - ERR("Failed to create ads-done event\n"); - return MSTRO_FAIL; - } - - enum mstro_pool_event_kind kind; - status=mstro_pool_event__eventmsgs_translate(1, &ev->kind, &kind); - if(status!=MSTRO_OK) { - ERR("Failed to translate event type: incoming %" PRIu64 "\n", ev->kind); - return status; - } - - WITH_LOCKED_SUBSCRIPTIONS({ - struct mstro_subscription_table_entry *s=NULL; - - LL_FOREACH(g_subscription_table.entries, s) { - if( (s->subscription->event_mask & kind) - && (s->subscription->signal_before == beforep)) { - assert(s->subscription); - - DEBUG("Subscription %p from %" PRIu64 " matches event %d (beforep is %d)\n", - s, s->subscriber, kind, beforep); - if(s->subscription->event_mask & MSTRO_POOL_EVENT_CDO_MASK) { - DEBUG("This is a CDO event, checking CDO selector\n"); - /* FIXME: Due to ev being a protobuf event structure this - * is less efficient than it could be. We should instead - * convert the event content to cdo, attributes, in - * schema/dict format, and check against those for every - * candidate subscription, rather than the other way - * around */ - status = mstro_subscription_selector_check(s->subscription, ev); - switch(status) { - case MSTRO_NOMATCH: - DEBUG("subscription selector rejected this event\n"); - continue; - case MSTRO_OK: - DEBUG("Subscription predicate matches, will notify\n"); - break; - default: - ERR("Failed to evaluate subscription predicate: %d (%s)\n", - status, mstro_status_description(status)); - continue; - } - } - - ev->subscription_handle = &s->subscription->handle; - - if(s->subscriber != MSTRO_APP_ID_MANAGER - && s->subscriber != MSTRO_APP_ID_INVALID) { - /* send via PMP */ - - if(s->subscription->needs_ack) { - uint64_t num_outstanding - = atomic_fetch_add_explicit(&ctx->nr_outstanding_acks, - 1, memory_order_release); - DEBUG("Now %" PRIu64 " outstanding acks for this event\n", - num_outstanding); - - mstro_event ack_event; - status = mstro_event_create(g_subscription_table.edom, - event_ack_cb, ctx, NULL, true, - &ack_event); - if(status!=MSTRO_OK) { - ERR("Failed to create event for subscription ack, skipping subscriber %" PRIu64 "\n", - s->subscriber); - goto ABORT; - } - - status = mstro_event_id_get(ack_event, &ev->serial); - if(status!=MSTRO_OK) { - ERR("Failed to obtain event serial for subscriber %" PRIu64 "\n"); - goto ABORT; - } - DEBUG("Subscriber %" PRIu64 " for subscription %p assigned event ack id %" PRIu64 "\n", - s->subscriber, s, ev->serial); - } else { - DEBUG("Subscriber %" PRIu64 " for subscription %p needs no ack id\n", - s->subscriber, s); - ev->serial = 0; - } - - Mstro__Pool__MstroMsg msg=MSTRO__POOL__MSTRO_MSG__INIT; - status = mstro_pmp_package(&msg, (ProtobufCMessage*)ev); - if(status !=MSTRO_OK) { - ERR("Failed to package %s into a pool manager message\n", - ev->base.descriptor->name); - goto ABORT; - } - status = mstro_pmp_send_nowait(s->subscriber, &msg); - if(status!=MSTRO_OK) { - ERR("Cannot send event notification to app %" PRIu64 ": %d (%s)\n", - s->subscriber, status, mstro_status_description(status)); - goto ABORT; - } - } else { - /* local-only or local to PM */ - if(s->subscription->needs_ack) { - /* FIXME: store something on outstanding-ack table */ - - ERR("Not waiting for ACK on local subscription %p\n", - s); - } - status = mstro_pool_event_consume(ev); - if(status!=MSTRO_OK) { - ERR("Cannot perform local event notification: %d (%s)\n", - status, mstro_status_description(status)); - continue; - } - } - } - /* continue FOREACH */ - continue; - /* abort target for cases where an ack setup failed */ - ABORT: - atomic_fetch_sub_explicit(&ctx->nr_outstanding_acks, 1, memory_order_release); - } - }); - - DEBUG("Done informing subscribers\n"); - - void *result; - status = mstro_event_trigger(ads_done, &result); - assert(result==NULL); -BAILOUT: - return status; -} - -/**Operations version of mstro_pool_event_advertise*/ -mstro_status -mstro_pool_op_event_advertise(Mstro__Pool__Event *ev, bool beforep, _Atomic(uint64_t) *nr_outstanding_acks) { @@ -1937,7 +1750,7 @@ mstro_pool_op_event_advertise(Mstro__Pool__Event *ev, mstro_event ack_event; status = mstro_event_create(g_subscription_table.edom, - mstro_pool_op_event_ack_cb, nr_outstanding_acks, NULL, true, + event_ack_cb, nr_outstanding_acks, NULL, true, &ack_event); if(status!=MSTRO_OK) { ERR("Failed to create event for subscription ack, skipping subscriber %" PRIu64 "\n",