diff --git a/maestro/subscription_registry.c b/maestro/subscription_registry.c index 88851bf4051227ed2cb891d45e6dec954006fb56..11a8c679f67fdd23261723cc3a8a112c6358e3a1 100644 --- a/maestro/subscription_registry.c +++ b/maestro/subscription_registry.c @@ -12,13 +12,13 @@ #include "attributes/maestro-schema.h" - + /* simplify logging */ #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_EVENT,__VA_ARGS__) #define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_EVENT,__VA_ARGS__) #define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_EVENT,__VA_ARGS__) #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_EVENT,__VA_ARGS__) - + /** A table storing a set of subscriptions */ @@ -30,7 +30,7 @@ struct mstro_subscription_table_entry { /** the subscription */ struct mstro_subscription_ *subscription; /** (linked list:) next entry */ - struct mstro_subscription_table_entry *next; + struct mstro_subscription_table_entry *next; }; struct mstro_subscription_table { @@ -403,7 +403,7 @@ mstro_subscription_register__local(mstro_subscription subscription, mstro_app_id origin) { mstro_status status = MSTRO_UNIMPL; - + struct mstro_subscription_table_entry *e = malloc(sizeof(struct mstro_subscription_table_entry)); if(!e) { @@ -411,7 +411,7 @@ mstro_subscription_register__local(mstro_subscription subscription, goto BAILOUT; } e->subscriber = origin; - + e->subscription = subscription; subscription->refcount++; e->next = NULL; @@ -423,7 +423,7 @@ mstro_subscription_register__local(mstro_subscription subscription, goto BAILOUT; } assert(g_subscription_table.edom!=NULL); - + if(subscription->handle.id==MSTRO_SUBSCRIPTION_HANDLE_INVALID) { /* local-only subscriptions: add local handler */ status = mstro_event_create(g_subscription_table.edom, @@ -442,7 +442,7 @@ mstro_subscription_register__local(mstro_subscription subscription, goto BAILOUT_UNLOCK; } subscription->handle.id = id; - + DEBUG("Assigned local unique id %" PRIu64 " (event %p) to subscription %p\n", subscription->handle.id, e->subscription_event_handle, subscription); } else { @@ -451,7 +451,7 @@ mstro_subscription_register__local(mstro_subscription subscription, DEBUG("Accepted pre-assigned id %" PRIu64 " for subscription %p\n", subscription->handle.id, subscription); } - + LL_PREPEND(g_subscription_table.entries, e); if(subscription->handle.id == UINT64_MAX) { ERR("FIXME: overflow of subscription table\n"); @@ -460,7 +460,7 @@ mstro_subscription_register__local(mstro_subscription subscription, BAILOUT_UNLOCK: - + err = pthread_mutex_unlock(&g_subscription_table_mtx); if(err!=0) { ERR("Failed to unlock subscription table: %d (%s)\n", err, strerror(err)); @@ -468,7 +468,7 @@ BAILOUT_UNLOCK: goto BAILOUT; } status=MSTRO_OK; - + BAILOUT: return status; } @@ -535,7 +535,7 @@ mstro_pool_event__translate_to_eventmsg(enum mstro_pool_event_kind e, case MSTRO_POOL_EVENT_TRANSPORT_COMPLETED: *ek = MSTRO__POOL__EVENT_KIND__TRANSFER_COMPLETED; break; - + case MSTRO_POOL_EVENT_APP_JOIN: *ek = MSTRO__POOL__EVENT_KIND__APP_JOIN; break; @@ -570,10 +570,10 @@ mstro_status mstro_subscription_register__with_pm(mstro_subscription subscription) { mstro_status status = MSTRO_UNIMPL; - + if(subscription==NULL) return MSTRO_INVARG; - + unsigned int mask = subscription->event_mask & MSTRO_POOL_EVENT_VALID_MASK; size_t numevents = popcount(mask); if(numevents==0) { @@ -593,7 +593,7 @@ mstro_subscription_register__with_pm(mstro_subscription subscription) int bit = ffs(mask); assert(bit!=0); status = mstro_pool_event__translate_to_eventmsg(1<<(bit-1), events+i); - if(status!=MSTRO_OK) + if(status!=MSTRO_OK) goto BAILOUT; mask ^= 1<<(bit-1); /* clear it */ } @@ -640,7 +640,7 @@ mstro_subscription_register__with_pm(mstro_subscription subscription) } smsg.local_id = sub->request_id; - + Mstro__Pool__MstroMsg msg=MSTRO__POOL__MSTRO_MSG__INIT; status = mstro_pmp_package(&msg, (ProtobufCMessage*)&smsg); if(status !=MSTRO_OK) { @@ -650,7 +650,7 @@ mstro_subscription_register__with_pm(mstro_subscription subscription) } DEBUG("Packaged subscribe message\n"); - + status = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); if(status!=MSTRO_OK) { ERR("Cannot send subscription request to pool manager: %d (%s)\n", @@ -675,7 +675,7 @@ mstro_subscription_register__with_pm(mstro_subscription subscription) DEBUG("Subscription %p has been assigned handle %" PRIu64 "\n", subscription, subscription->handle.id); - + /* ok, valid handle found */ /* the outstanding subscription allocation is freed by the incoming * message handler */ @@ -688,7 +688,7 @@ mstro_subscription_register__with_pm(mstro_subscription subscription) } status = MSTRO_OK; goto BAILOUT; - + BAILOUT_FREE: WITH_LOCKED_OUTSTANDING_SUBSCRIPTIONS({ if(sub) { @@ -697,7 +697,7 @@ BAILOUT_FREE: }}); BAILOUT: - return status; + return status; } static inline @@ -738,7 +738,7 @@ mstro_subscription__destroy(mstro_subscription subscription) } if(subscription->cdo_selector) s|= mstro_cdo_selector_dispose(subscription->cdo_selector); - + free(subscription); } return s; @@ -803,7 +803,7 @@ mstro_subscription_unregister__local(mstro_subscription subscription) status |= MSTRO_FAIL; break; } - + status |= mstro_subscription__destroy(subscription); if(status!=MSTRO_OK) { ERR("Failed to destroy subscription\n"); @@ -842,7 +842,7 @@ mstro_subscription_unregister__with_pm(mstro_subscription subscription) /* Send to PM; it will return a PoolOpAck for unsubscribe, which * will trigger our mstro_subscription_unregister_bh, which signals * an event for the subscription. */ - + Mstro__Pool__MstroMsg msg=MSTRO__POOL__MSTRO_MSG__INIT; status = mstro_pmp_package(&msg, (ProtobufCMessage*)&smsg); if(status !=MSTRO_OK) { @@ -852,16 +852,16 @@ mstro_subscription_unregister__with_pm(mstro_subscription subscription) } DEBUG("Packaged unsubscribe message\n"); - + status = mstro_pmp_send_nowait(MSTRO_APP_ID_MANAGER, &msg); if(status!=MSTRO_OK) { ERR("Cannot send subscription request to pool manager: %d (%s)\n", status, mstro_status_description(status)); goto BAILOUT_UNLOCK; } - + DEBUG("Waiting for pool manager to confirm un-subscription\n"); - + /* we still hold the lock on the subscription, so let's go to sleep * and wait what happens */ do { @@ -889,8 +889,8 @@ BAILOUT_UNLOCK: status = mstro_subscription_unregister__local(subscription); BAILOUT: - return status; -} + return status; +} mstro_status mstro_subscription_unregister(mstro_subscription subscription) @@ -970,16 +970,16 @@ mstro_subscription_register_bh(const Mstro__Pool__SubscribeAck *ack) err, strerror(err)); goto BAILOUT_FREE; } - + sub->subscription->handle.id = ack->handle->id; - + err = pthread_mutex_unlock(&sub->subscription->event_mtx); if(err!=0) { ERR("Failed to unlock subscription mutex: %d (%s)\n", err, strerror(err)); goto BAILOUT_FREE; } - + status = mstro_subscription_register__local(sub->subscription, g_pool_app_id); if(status!=MSTRO_OK) { ERR("Failed to insert subscription into local registry\n"); @@ -997,7 +997,7 @@ mstro_subscription_register_bh(const Mstro__Pool__SubscribeAck *ack) err, strerror(err)); goto BAILOUT_FREE; } - + err = pthread_mutex_unlock(&sub->subscription->event_mtx); if(err!=0) { ERR("Failed to unlock subscription mutex: %d (%s)\n", @@ -1005,7 +1005,7 @@ mstro_subscription_register_bh(const Mstro__Pool__SubscribeAck *ack) goto BAILOUT_FREE; } } - + BAILOUT_FREE: free(sub); } @@ -1029,7 +1029,7 @@ mstro_pool_event__eventmsgs_translate(size_t nevents, mstro_pool_event_kinds res = MSTRO_POOL_EVENT_NONE; mstro_status s=MSTRO_UNIMPL; - + for(size_t i=0; i<nevents; i++) { switch(events[i]) { case MSTRO__POOL__EVENT_KIND__APP_JOIN: @@ -1077,7 +1077,7 @@ mstro_pool_event__eventmsgs_translate(size_t nevents, events[i]); } } - + if(res & (~MSTRO_POOL_EVENT_VALID_MASK)) { ERR("Invalid event mask constructed, check protocol\n"); s=MSTRO_INVARG; @@ -1085,7 +1085,7 @@ mstro_pool_event__eventmsgs_translate(size_t nevents, s=MSTRO_OK; *event_mask = res; } - + BAILOUT: return s; } @@ -1114,7 +1114,7 @@ mstro_subscription_message_register(const Mstro__Pool__Subscribe *smsg, free(s); status=MSTRO_FAIL; goto BAILOUT; - } + } err = pthread_cond_init(&s->event_cond, NULL); if(err!=0) { ERR("Failed to initialize cond var: %d (%s)\n", err, strerror(err)); @@ -1122,8 +1122,8 @@ mstro_subscription_message_register(const Mstro__Pool__Subscribe *smsg, free(s); status=MSTRO_FAIL; goto BAILOUT; - } - + } + s->refcount=1; mstro__pool__subscription_handle__init(&s->handle); @@ -1138,7 +1138,7 @@ mstro_subscription_message_register(const Mstro__Pool__Subscribe *smsg, status=MSTRO_UNIMPL; goto BAILOUT; } - + status = mstro_cdo_selector_create(NULL, NULL, smsg->selector[0]->query, &s->cdo_selector); @@ -1158,7 +1158,7 @@ mstro_subscription_message_register(const Mstro__Pool__Subscribe *smsg, ERR("Failed to register subscription\n"); goto BAILOUT; } - + *new_handle = &s->handle; BAILOUT: return status; @@ -1322,7 +1322,7 @@ event_ack_cb(mstro_event event, void *ctx) struct event_ack_ctx *ectx = (struct event_ack_ctx*) ctx; uint64_t nr_left = atomic_fetch_sub_explicit(&(ectx->nr_outstanding_acks), - 1, + 1, memory_order_acquire); DEBUG("event ack on event %p, ctx %p, (pre-) nr_left=%" PRIu64 "\n", event, ectx, nr_left); @@ -1363,7 +1363,7 @@ mstro__pool_event_cdoid_get(const Mstro__Pool__Event *ev, assert(ev!=NULL); assert(id!=NULL); Mstro__Pool__CDOID *cdoid=NULL; - + switch(ev->payload_case) { case MSTRO__POOL__EVENT__PAYLOAD_DECLARE: break; @@ -1391,7 +1391,7 @@ mstro__pool_event_cdoid_get(const Mstro__Pool__Event *ev, cdoid = ev->transport_ticket->dstcdoid; break; case MSTRO__POOL__EVENT__PAYLOAD_TRANSFER_COMPLETED: cdoid = ev->transfer_completed->dstcdoid; break; - + case MSTRO__POOL__EVENT__PAYLOAD_JOIN: /* fallthrough */ case MSTRO__POOL__EVENT__PAYLOAD_WELCOME: /* fallthrough */ case MSTRO__POOL__EVENT__PAYLOAD_LEAVE: /* fallthrough */ @@ -1418,7 +1418,7 @@ mstro__pool_event_cdoid_get(const Mstro__Pool__Event *ev, *id = MSTRO_CDO_ID_NULL; } } - + return s; } @@ -1437,7 +1437,7 @@ mstro_subscription__pool_attr_find(const Mstro__Pool__Attributes__Map *kv_map, } } return NULL; - } + } } static inline @@ -1450,7 +1450,7 @@ mstro_subscription__csq_eval(const struct mstro_csq_val *csq, ERR("Cannot handle non-KV-MAP attributes\n"); return MSTRO_FAIL; } - + if(csq==NULL) { return MSTRO_OK; } else { @@ -1469,7 +1469,7 @@ mstro_subscription__csq_eval(const struct mstro_csq_val *csq, } return s; } - + case MSTRO_CSQ_AND: { const struct mstro_csq_val *tmp = csq->and.terms; mstro_status s=MSTRO_OK; /* empty AND is true */ @@ -1507,7 +1507,7 @@ mstro_subscription__csq_eval(const struct mstro_csq_val *csq, return MSTRO_OK; } } - } + } /* binary ops */ case MSTRO_CSQ_OP_LT: /* fallthrough */ case MSTRO_CSQ_OP_LE: /* fallthrough */ @@ -1591,7 +1591,7 @@ mstro_subscription__csq_eval(const struct mstro_csq_val *csq, * * Return MSTRO_OK for 'matched', MSTRO_NOMATCH if not, or an error * code. - * + * */ mstro_status mstro_subscription_selector_eval(const struct mstro_cdo_id *cdoid, @@ -1600,7 +1600,7 @@ mstro_subscription_selector_eval(const struct mstro_cdo_id *cdoid, { if(cdoid==NULL || attributes==NULL) return MSTRO_INVARG; - + WITH_CDO_ID_STR(idstr, cdoid, { DEBUG("Checking selector %p for CDO ID %s\n", sel, idstr);}); @@ -1618,7 +1618,7 @@ mstro_subscription_selector_eval(const struct mstro_cdo_id *cdoid, } return mstro_subscription__csq_eval(csq, attributes); } - + /** check whether EV needs to be reported for the event selector in S. * * (Does not check the event kind matches). @@ -1635,7 +1635,7 @@ mstro_subscription_selector_check(struct mstro_subscription_ *s, assert(ev->origin_id!=NULL); mstro_status status = MSTRO_NOMATCH; - + if(ev->payload_case==MSTRO__POOL__EVENT__PAYLOAD_DECLARE) { /* declare is special: no CDOID known yet, no attribuites, only * string name. Efficient subscriptions will subscribe to @@ -1650,13 +1650,13 @@ mstro_subscription_selector_check(struct mstro_subscription_ *s, ERR("Failed to retrieve CDO id from event\n"); return status; } - + /* fetch CDO attributes reference, local or PM */ if(g_pool_app_id==MSTRO_APP_ID_MANAGER) { mstro_app_id origin = ev->origin_id->id; status = mstro_pm_cdo_app_match(origin, &id, s->cdo_selector); } else { - + Mstro__Pool__Attributes *attributes=NULL; status = mstro_cdo_attribute_msg_get(&id, &attributes); if(status!=MSTRO_OK) { @@ -1670,7 +1670,7 @@ mstro_subscription_selector_check(struct mstro_subscription_ *s, return status; } - + /** @brief Let all subscriptions in @arg tab notice that event @arg kind/@arg iev/@arg data occured. * * If any of the notifications requires an ACK, this function will @@ -1697,7 +1697,7 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, /* caller needs to set these */ assert(ev->kind!=MSTRO__POOL__EVENT_KIND__INVALID_EVENT); assert(ev->payload_case!=MSTRO__POOL__EVENT__PAYLOAD__NOT_SET); - + struct event_ack_ctx *ctx = malloc(sizeof(struct event_ack_ctx)); if(ctx==NULL) { ERR("Failed to allocate event ack context\n"); @@ -1722,7 +1722,7 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, ERR("Failed to translate event type: incoming %" PRIu64 "\n", ev->kind); return status; } - + WITH_LOCKED_SUBSCRIPTIONS({ struct mstro_subscription_table_entry *s=NULL; @@ -1757,11 +1757,11 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, } ev->subscription_handle = &s->subscription->handle; - + if(s->subscriber != MSTRO_APP_ID_MANAGER && s->subscriber != MSTRO_APP_ID_INVALID) { /* send via PMP */ - + if(s->subscription->needs_ack) { uint64_t num_outstanding = atomic_fetch_add_explicit(&ctx->nr_outstanding_acks, @@ -1778,12 +1778,12 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, s->subscriber); goto ABORT; } - + status = mstro_event_id_get(ack_event, &ev->serial); if(status!=MSTRO_OK) { ERR("Failed to obtain event serial for subscriber %" PRIu64 "\n"); goto ABORT; - } + } DEBUG("Subscriber %" PRIu64 " for subscription %p assigned event ack id %" PRIu64 "\n", s->subscriber, s, ev->serial); } else { @@ -1791,7 +1791,7 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, s->subscriber, s); ev->serial = 0; } - + Mstro__Pool__MstroMsg msg=MSTRO__POOL__MSTRO_MSG__INIT; status = mstro_pmp_package(&msg, (ProtobufCMessage*)ev); if(status !=MSTRO_OK) { @@ -1809,7 +1809,7 @@ mstro_pool_event_advertise(Mstro__Pool__Event *ev, /* local-only or local to PM */ if(s->subscription->needs_ack) { /* FIXME: store something on outstanding-ack table */ - + ERR("Not waiting for ACK on local subscription %p\n", s); } @@ -1864,7 +1864,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) ev->serial = eventmsg->serial; assert(eventmsg->ctime!=NULL); - + ev->ctime = (eventmsg->ctime->sec * NSEC_PER_SEC) + eventmsg->ctime->nsec; switch(ev->kind) { @@ -1880,18 +1880,18 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) ev->join.appid = eventmsg->origin_id->id; /* may be MSTRO_APP_ID_INVALID for JOIN:pre */ else ev->join.appid = MSTRO_APP_ID_INVALID; - - + + DEBUG("Event: %s JOINed (appid %" PRIappid ")\n", ev->join.component_name, ev->join.appid); break; - + case MSTRO_POOL_EVENT_APP_LEAVE: assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_LEAVE); ev->leave.appid = eventmsg->origin_id->id; DEBUG("Event: %" PRIu64 " asked to LEAVE\n", ev->leave.appid); break; - + case MSTRO_POOL_EVENT_DECLARE: assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_DECLARE); assert(eventmsg->origin_id!=NULL @@ -1939,7 +1939,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) DEBUG("Event: OFFER for |%s| from %" PRIu64 "\n", ev->offer.cdo_name, ev->offer.appid); break; - + case MSTRO_POOL_EVENT_SEAL: assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_SEAL); assert(eventmsg->origin_id!=NULL @@ -1962,7 +1962,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) DEBUG("Event: SEAL for |%s| from %" PRIu64 "\n", ev->seal.cdo_name, ev->seal.appid); break; - + case MSTRO_POOL_EVENT_REQUIRE: assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_REQUIRE); assert(eventmsg->origin_id!=NULL @@ -1981,7 +1981,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) return MSTRO_NOMEM; } } - + DEBUG("Event: REQUIRE for |%s| from %" PRIu64 "\n", ev->require.cdo_name, ev->require.appid); break; @@ -2004,10 +2004,35 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) return MSTRO_NOMEM; } } - + DEBUG("Event: DEMAND for |%s| from %" PRIu64 "\n", ev->demand.cdo_name, ev->demand.appid); break; + + case MSTRO_POOL_EVENT_DISPOSE: + assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_DISPOSE); + assert(eventmsg->origin_id!=NULL + && eventmsg->origin_id->id!=MSTRO_APP_ID_INVALID); + assert(eventmsg->dispose->cdoid!=NULL); + ev->dispose.appid = eventmsg->origin_id->id; + if(eventmsg->cdo_name==NULL) { + ERR("DISPOSE event missing a CDO name\n"); + free(ev); + return MSTRO_FAIL; + } else { + ev->dispose.cdo_name = strdup(eventmsg->cdo_name); + if(ev->dispose.cdo_name == NULL) { + ERR("Failed to allocate event data\n"); + free(ev); + return MSTRO_NOMEM; + } + } + + DEBUG("Event: DEMAND for |%s| from %" PRIu64 "\n", + ev->demand.cdo_name, ev->demand.appid); + break; + + case MSTRO_POOL_EVENT_WITHDRAW: assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_WITHDRAW); assert(eventmsg->origin_id!=NULL @@ -2027,7 +2052,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) return MSTRO_NOMEM; } } - + DEBUG("Event: WITHDRAW for |%s| from %" PRIu64 "\n", ev->withdraw.cdo_name, ev->withdraw.appid); break; @@ -2039,13 +2064,12 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) break; case MSTRO_POOL_EVENT_SEAL_GROUP: - + case MSTRO_POOL_EVENT_RETRACT: - case MSTRO_POOL_EVENT_DISPOSE: case MSTRO_POOL_EVENT_TRANSPORT_INIT: case MSTRO_POOL_EVENT_TRANSPORT_TICKET: case MSTRO_POOL_EVENT_TRANSPORT_COMPLETED: - + /* pool-related */ case MSTRO_POOL_EVENT_POOL_CHECKPOINT: case MSTRO_POOL_EVENT_SUBSCRIBE: @@ -2059,8 +2083,8 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) free(ev); return MSTRO_FAIL; } - - + + /* check that we know about the subscription */ WITH_LOCKED_SUBSCRIPTIONS({ struct mstro_subscription_table_entry *s=NULL; @@ -2112,7 +2136,7 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) status = MSTRO_FAIL; } }); - + return status; } @@ -2121,7 +2145,7 @@ mstro_status mstro_subscriptions_init(void) { mstro_status status=MSTRO_OK; - + WITH_LOCKED_SUBSCRIPTIONS( if(g_subscription_table.edom==NULL) { status = mstro_event_domain_create("Subscription Events", @@ -2142,7 +2166,7 @@ mstro_status mstro_subscriptions_finalize(void) { mstro_status status=MSTRO_OK; - + WARN("Not properly de-initializing subscription registry\n"); WITH_LOCKED_OUTSTANDING_SUBSCRIPTIONS( @@ -2226,12 +2250,12 @@ mstro_pool_resolve_reply_bh(const Mstro__Pool__ResolveReply *reply) ERR("No name in resolver reply\n"); return MSTRO_INVMSG; } - + struct mstro_cdo_id id = {.qw[0] = reply->query->cdoid->qw0, .qw[1] = reply->query->cdoid->qw1, //, .local_id = reply->query->cdoid->local_id }; - + mstro_status s= mstro__subscr_resolver_notice_cdoid(reply->name, &id); if(s!=MSTRO_OK) {