diff --git a/include/maestro/i_pool_manager_registry.h b/include/maestro/i_pool_manager_registry.h index 714ebc8fc046682b6547258f26e43fb701386fe2..90452dea2c07f06c8ddc58c3845bd00820da22c3 100644 --- a/include/maestro/i_pool_manager_registry.h +++ b/include/maestro/i_pool_manager_registry.h @@ -88,6 +88,7 @@ struct mstro_pm_app_registry_entry { char * component_name; /**< the component name provided at JOIN time */ uint64_t component_index; /**< the component index provided at JOIN time */ bool dead; /**< If an app LEAVES the app-registry entry stays around in 'dead' state. That allows us to resurrect it on re-JOIN */ + bool pending; /**< indicates that we have already submitted an fi_read to read the entry. Avoids issuing multiple reads for the same config block */ }; /** initialize PM registry infrastructure */ @@ -141,6 +142,11 @@ mstro_pm_app_register(struct mstro_endpoint *ep, mstro_status mstro_pc_app_befriend(mstro_app_id appid, const char* serialized_ep, const Mstro__Pool__TransportMethods *methods); +/* @brief add a fake record to application registry to flag that this entry is being read elsewhere and + * will be added when fi_read completes */ +mstro_status +mstro_pc_app_register_pending(mstro_app_id id); + /** @brief Register peer application ID ** ** This is similar to mstro_pm_app_register(), except that it needs diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 02cb60f41362de98f00e65b8254b90c211b6668b..cc8b124e09a5de35cd26f6747c536a8ea07080d0 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -475,6 +475,7 @@ struct init_transfer_closure { /* handles to data filled in by async operation */ mstro_request request_handle; /**< handle of outstanding endpoint selection request */ struct mstro_endpoint *dst_ep; /**< endpoint selected */ + bool pending; /**< another fi_read is ongoing, we should wait */ }; static @@ -485,7 +486,7 @@ mstro_pc__handle_init_transfer__closure_alloc(struct init_transfer_closure**resu *result = calloc(1,sizeof(struct init_transfer_closure)); if(*result==NULL) return MSTRO_NOMEM; - else + else return MSTRO_OK; } @@ -495,9 +496,25 @@ mstro_pc__app_befriend__resume(mstro_app_id appid, struct init_transfer_closure *restart_closure) { assert(restart_closure!=NULL); assert(appid!=MSTRO_APP_ID_INVALID); - mstro_status status = MSTRO_OK; - + struct mstro_pm_app_registry_entry *e; + if(restart_closure->pending) + { + + /* try to read the entry from registry */ + status = mstro_pm_app_lookup(appid, &e); + if(e) { + if(!e->pending) { + status = MSTRO_OK; + DEBUG("Bingo, app entry appeared in registry\n"); + goto BAILOUT; + } + } + status = MSTRO_WOULDBLOCK; + DEBUG("We are waiting for the other fi_read to complete\n"); + goto BAILOUT; + } + /* called in continuation of previous attempt */ if(restart_closure->appid!=appid) { ERR("Called with nonempty mismatching closure\n"); @@ -606,30 +623,49 @@ mstro_pc__app_befriend(mstro_app_id appid, const char* serialized_ep, assert(restart_closure!=NULL); assert(methods!=NULL); assert(serialized_ep!=NULL); - struct mstro_pm_app_registry_entry *e; - mstro_status status = mstro_pm_app_lookup(appid, &e); - if(e) { - DEBUG("Found app %zu in local registry, good\n", appid); - goto BAILOUT; - } + mstro_status status = MSTRO_UNIMPL; + /* check if this is a restart */ if(*restart_closure!=NULL) { DEBUG("Resuming preempted call for appid %" PRIappid "\n", appid); status = mstro_pc__app_befriend__resume(appid, *restart_closure); } else { - DEBUG("Unknown app %zu, let's make friends\n", appid); - status = mstro_pc__handle_init_transfer__closure_alloc(restart_closure); - if(status!=MSTRO_OK) { - ERR("Cannot allocate closure\n"); - goto BAILOUT; - } - - status = mstro_pc__app_befriend__start(appid, serialized_ep, - methods, *restart_closure); - } - + /* try to read the entry from registry */ + status = mstro_pm_app_lookup(appid, &e); + if(e) { + if(e->pending) { + DEBUG("app entry is already being read ... wait until it completes\n"); + /*FIXME we do not need a restart closure in this case, but we check that it is not null everywhere*/ + status = mstro_pc__handle_init_transfer__closure_alloc(restart_closure); + if(status!=MSTRO_OK) { + ERR("Cannot allocate closure\n"); + goto BAILOUT; + } + (*restart_closure)->pending = true; /* mark that there is a pending read */ + status = MSTRO_WOULDBLOCK; /* we need to wait until the read is complete */ + goto BAILOUT; + + } else + { + DEBUG("Found app %zu in local registry, good\n", appid); + goto BAILOUT; + } + } else { /* we could not find the app in registry */ + DEBUG("Unknown app %zu, let's make friends\n", appid); + status = mstro_pc__handle_init_transfer__closure_alloc(restart_closure); + if(status!=MSTRO_OK) { + ERR("Cannot allocate closure\n"); + goto BAILOUT; + } + (*restart_closure)->pending = false; /*we are going to do the fi_read */ + /* mark that this app entry is being read now for others */ + status = mstro_pc_app_register_pending(appid); + status = mstro_pc__app_befriend__start(appid, serialized_ep, + methods, *restart_closure); + } + } if(status!=MSTRO_WOULDBLOCK) { NOISE("Deallocating restart closure for lookup of peer app %" PRIappid "\n", appid); assert(*restart_closure!=NULL); diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index 6c57015ab5e613f928fd94d7e640914194d73a22..1094b71d7d0d911e641ca55072e8a8cd60431f79 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -128,6 +128,7 @@ mstro_pm_app_reg__new_entry(struct mstro_pm_app_registry_entry **entry_p, mstro_ e->component_name = NULL; e->component_index = UINT64_MAX; e->dead = false; + e->pending = false; *entry_p = e; return MSTRO_OK; @@ -190,6 +191,7 @@ mstro_pm_app_register(struct mstro_endpoint *ep, e->transport_methods = (Mstro__Pool__TransportMethods *)transport_methods; e->component_name = strdup(component_name); e->component_index = component_index; + e->pending =false; /* check for duplicates */ WITH_LOCKED_APP_REGISTRY({ @@ -253,7 +255,21 @@ BAILOUT: return status; } - +mstro_status +mstro_pc_app_register_pending(mstro_app_id id) +{ + struct mstro_pm_app_registry_entry *e=NULL; + mstro_status status = MSTRO_UNIMPL; + status = mstro_pm_app_reg__new_entry(&e, id); + if(status==MSTRO_OK) { + e->pending = true; + WITH_LOCKED_APP_REGISTRY({ + HASH_ADD(hh, g_mstro_pm_app_registry, + appid, sizeof(mstro_app_id), e); + }); + } + return status; +} mstro_status mstro_pc_app_register(struct mstro_endpoint *ep, @@ -274,6 +290,7 @@ mstro_pc_app_register(struct mstro_endpoint *ep, e->ep = ep; e->addr = addr; e->serialized_desc = serialized_desc; + e->pending = false; e->transport_methods=malloc(sizeof(Mstro__Pool__TransportMethods)); if(e==NULL) { @@ -292,6 +309,24 @@ mstro_pc_app_register(struct mstro_endpoint *ep, /* FIXME: we should check that there is no previous entry and * document whether that's an error or a legal way of overriding * things */ + /* remove pending entries */ + WITH_LOCKED_APP_REGISTRY({ + struct mstro_pm_app_registry_entry *elt=NULL; + struct mstro_pm_app_registry_entry *tmp=NULL; + HASH_ITER(hh, g_mstro_pm_app_registry, elt, tmp) { + if(elt->appid == e->appid) { + if(elt->pending) { + // remove fake pending entry + DEBUG("Remove pending appid %" PRIappid " (index %" PRIu64 ")\n", + elt->appid, elt->component_index); + HASH_DEL(g_mstro_pm_app_registry, elt); + } + } + } +unlock: + ;}); + + WITH_LOCKED_APP_REGISTRY({ HASH_ADD(hh, g_mstro_pm_app_registry, appid, sizeof(mstro_app_id), e); @@ -359,7 +394,6 @@ mstro_pm_app_lookup(mstro_app_id appid, } - /** pair of CDO name and chosen CDO ID */ struct cdo_name_id_pair {