From ac21d5d10024df90fbea0e1f39240fe53612557d Mon Sep 17 00:00:00 2001 From: Utz-Uwe Haus <uhaus@cray.com> Date: Mon, 31 May 2021 13:39:29 +0200 Subject: [PATCH] insert int-param subscription to simple_archiver This is to reproduce Issue #16 (ECMWF) --- tests/simple_archiver.c | 181 +++++++++++++++++++++++++++------------- 1 file changed, 122 insertions(+), 59 deletions(-) diff --git a/tests/simple_archiver.c b/tests/simple_archiver.c index 6b67739e..d436a8e7 100644 --- a/tests/simple_archiver.c +++ b/tests/simple_archiver.c @@ -61,7 +61,9 @@ CHEAT_TEST(simple_archiver, cheat_yield(); mstro_cdo_selector selector=NULL; + mstro_cdo_selector ecmwf_selector=NULL; mstro_subscription cdo_subscription=NULL; + mstro_subscription ecmwf_subscription=NULL; mstro_subscription join_leave_subscription=NULL; cheat_assert(MSTRO_OK @@ -70,6 +72,12 @@ CHEAT_TEST(simple_archiver, "(has .maestro.core.cdo.name)", &selector)); cheat_yield(); + cheat_assert(MSTRO_OK + == mstro_cdo_selector_create( + NULL, NULL, + "(.maestro.ecmwf.param = 2)", + &ecmwf_selector)); + cheat_yield(); cheat_assert(MSTRO_OK == mstro_subscribe(selector, @@ -85,9 +93,19 @@ CHEAT_TEST(simple_archiver, ,true, &cdo_subscription)); cheat_yield(); + cheat_assert(MSTRO_OK + == mstro_subscribe(ecmwf_selector, + // just a few here + MSTRO_POOL_EVENT_DECLARE + |MSTRO_POOL_EVENT_OFFER + ,true, + &ecmwf_subscription)); + cheat_yield(); cheat_assert(MSTRO_OK == mstro_cdo_selector_dispose(selector)); + cheat_assert(MSTRO_OK + == mstro_cdo_selector_dispose(ecmwf_selector)); cheat_yield(); cheat_assert(MSTRO_OK @@ -104,7 +122,8 @@ CHEAT_TEST(simple_archiver, while(!done) { mstro_pool_event e=NULL; - /* poll join/leave */ + +POLL_JOIN_LEAVE: cheat_assert(MSTRO_OK==mstro_subscription_poll( join_leave_subscription, &e)); DEBUG("join/leave poll: %p\n", e); @@ -136,68 +155,110 @@ CHEAT_TEST(simple_archiver, join_leave_subscription, e)); /* dispose all */ cheat_assert(MSTRO_OK==mstro_pool_event_dispose(e)); - } else { - cheat_assert(MSTRO_OK==mstro_subscription_poll( - cdo_subscription, &e)); - DEBUG("CDO op poll: %p\n", e); - if(e) { - mstro_pool_event tmp=e; - /* handle all */ - while(tmp) { - const char *event_name=NULL; - const char *cdo_name=NULL; - event_name = mstro_pool_event_description(tmp->kind); - - switch(tmp->kind) { - case MSTRO_POOL_EVENT_OFFER: - /* FIXME: Immediately post a REQUIRE for it */ - cdo_name = tmp->offer.cdo_name; - break; - - case MSTRO_POOL_EVENT_DECLARE: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_DISPOSE: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_SEAL: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_DEMAND: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_REQUIRE: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_RETRACT: - cdo_name = tmp->offer.cdo_name; - break; - case MSTRO_POOL_EVENT_WITHDRAW: - cdo_name = tmp->offer.cdo_name; - break; - default: - fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); - } - fprintf(stdout, "CDO event %s for CDO |%s|\n", - event_name, cdo_name ? cdo_name : "??"); - - tmp=tmp->next; + } + goto TAIL; +POLL_CDOS: + // try cdo subscription + cheat_assert(MSTRO_OK==mstro_subscription_poll( + cdo_subscription, &e)); + DEBUG("CDO op poll: %p\n", e); + if(e) { + mstro_pool_event tmp=e; + /* handle all */ + while(tmp) { + const char *event_name=NULL; + const char *cdo_name=NULL; + event_name = mstro_pool_event_description(tmp->kind); + + switch(tmp->kind) { + case MSTRO_POOL_EVENT_OFFER: + /* FIXME: Immediately post a REQUIRE for it */ + cdo_name = tmp->offer.cdo_name; + break; + + case MSTRO_POOL_EVENT_DECLARE: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_DISPOSE: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_SEAL: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_DEMAND: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_REQUIRE: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_RETRACT: + cdo_name = tmp->offer.cdo_name; + break; + case MSTRO_POOL_EVENT_WITHDRAW: + cdo_name = tmp->offer.cdo_name; + break; + default: + fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); } - /* acknowledge all */ - cheat_assert(MSTRO_OK==mstro_subscription_ack( - cdo_subscription, e)); - /* dispose all */ - cheat_assert(MSTRO_OK==mstro_pool_event_dispose(e)); - } else { - //fprintf(stdout, "No event available, looping\n"); - /* NULL e should give an error, even if subscription - * itself would like ack */ - cheat_assert(MSTRO_OK!=mstro_subscription_ack( - join_leave_subscription, e)); - /* sleep(1); */ + fprintf(stdout, "CDO event %s for CDO |%s|\n", + event_name, cdo_name ? cdo_name : "??"); + + tmp=tmp->next; } + /* acknowledge all */ + cheat_assert(MSTRO_OK==mstro_subscription_ack( + cdo_subscription, e)); + /* dispose all */ + cheat_assert(MSTRO_OK==mstro_pool_event_dispose(e)); } + goto TAIL; + +POLL_ECMWF_PARAM: + cheat_assert(MSTRO_OK==mstro_subscription_poll( + ecmwf_subscription, &e)); + DEBUG("ECMWF int-param op poll: %p\n", e); + if(e) { + mstro_pool_event tmp=e; + /* handle all */ + while(tmp) { + const char *event_name=NULL; + const char *cdo_name=NULL; + event_name = mstro_pool_event_description(tmp->kind); + + switch(tmp->kind) { + case MSTRO_POOL_EVENT_OFFER: + cdo_name = tmp->offer.cdo_name; + break; + + case MSTRO_POOL_EVENT_DECLARE: + cdo_name = tmp->offer.cdo_name; + break; + default: + fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); + } + fprintf(stdout, "CDO event %s for CDO |%s|, based on .mstro.ecmwf.param==2 subscription\n", + event_name, cdo_name ? cdo_name : "??"); + + tmp=tmp->next; + } + /* acknowledge all */ + cheat_assert(MSTRO_OK==mstro_subscription_ack( + ecmwf_subscription, e)); + /* dispose all */ + cheat_assert(MSTRO_OK==mstro_pool_event_dispose(e)); + } + goto TAIL; +NOTHING_AVAILABLE: + //fprintf(stdout, "No event available, looping\n"); + + /* NULL e should give an error, even if subscription + * itself would like ack */ +TRY_ILLEGAL_ACK: + cheat_assert(MSTRO_OK!=mstro_subscription_ack( + join_leave_subscription, e)); + /* sleep(1); */ +TAIL: if(mstro_clock()>starttime+MAX_WAIT) { fprintf(stderr, "Waited %" PRIu64 "s and still not done\n", MAX_WAIT/(1000*1000*1000)); @@ -213,6 +274,8 @@ CHEAT_TEST(simple_archiver, cheat_assert(MSTRO_OK == mstro_subscription_dispose(cdo_subscription)); + cheat_assert(MSTRO_OK + == mstro_subscription_dispose(ecmwf_subscription)); cheat_assert(MSTRO_OK == mstro_subscription_dispose(join_leave_subscription)); cheat_assert(MSTRO_OK == mstro_finalize()); -- GitLab