diff --git a/tests/simple_telemetry_listener.c b/tests/simple_telemetry_listener.c index b4ca4ca7ec112b9019c7c89abbe908913d7b0c8c..d078cc449f51895787537356d1cb40458cf2ba29 100644 --- a/tests/simple_telemetry_listener.c +++ b/tests/simple_telemetry_listener.c @@ -60,6 +60,9 @@ char *g_conf_terminate_after = NULL; mstro_app_id g_conf_terminate_after_appid = 0; /** timeout after which to terminate, if no events are observed anymore (default: DEFAULT_MAX_WAIT) */ mstro_nanosec_t g_conf_max_wait = DEFAULT_MAX_WAIT; + +int g_conf_number_of_joins = -1; + /** pool manager info (default: from env) */ char *g_conf_pminfo = NULL; #define LOGDST_STDOUT 1 @@ -71,6 +74,7 @@ int g_conf_logdst = LOGDST_STDOUT; /** verbose operation */ bool g_verbose = false; +bool g_conf_read_all_sent = false; /** usage */ @@ -85,6 +89,7 @@ print_usage(const char *argv0) " --component NAME, -c NAME Maestro component name for this listener {MSTRO_COMPONENT_NAME}\n" " --terminate-after NAME, -q NAME Terminate after observing LEAVE of component NAME\n" " --max-idle SECONDS, -i SECONDS Terminate after no events for SECONDS {%g}\n" + " --number-of-joins, -n JOIN Wait for JOIN join events before sending a all-ready message\n" " --destination DEST, -d DEST Logging destination: 'syslog', 'stdout', 'stderr', or 'mstro' {stdout}\n" " --pm-info PMINFO, -p PMINFO Pool Manager OOB info {MSTRO_POOL_MANAGER_INFO}\n" " --help, -h This help\n" @@ -108,6 +113,7 @@ parse_arguments(int argc, char **argv) { "destination", ko_required_argument, 'd' }, { "pm-info", ko_required_argument, 'p' }, { "pm_info", ko_required_argument, 'p' }, + { "number-of-joins", ko_required_argument, 'n' }, { "help", ko_no_argument, 'h' }, { "verbose", ko_no_argument, 'v' }, { "version", ko_no_argument, 'V' }, @@ -115,7 +121,7 @@ parse_arguments(int argc, char **argv) }; ketopt_t opt = KETOPT_INIT; int c; - + while ((c = ketopt(&opt, argc, argv, 1, "w:c:q:i:d:hv", longopts)) >= 0) { switch(c) { case 'w': @@ -167,6 +173,14 @@ parse_arguments(int argc, char **argv) } break; } + case 'n': + if(!opt.arg) { + fprintf(stderr, "--number-of-joins option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + g_conf_number_of_joins = atoi(opt.arg); + break; case 'p': { if(!opt.arg) { fprintf(stderr, "--pm-info option is missing its argument\n"); @@ -236,7 +250,7 @@ store_telemetry_maestro(mstro_pool_event e, const char *message, va_list args) e=e; /* avoid unused arg */ mstro_vlocation_aware_log(MSTRO_LOG_INFO, MSTRO_LOG_MODULE_TELEMETRY, - __func__, __FILE__, __LINE__, message, args); + __func__, __FILE__, __LINE__, message, args); } static void @@ -406,7 +420,17 @@ event_loop(void) s=mstro_pool_event_dispose(e); } + mstro_cdo ready_all_cdo = NULL; + if (g_conf_number_of_joins > 0 && joinCount == g_conf_number_of_joins && !g_conf_read_all_sent) { + mstro_cdo_declare("allClientsReady", MSTRO_ATTR_DEFAULT, &ready_all_cdo); + mstro_cdo_seal(ready_all_cdo); + mstro_cdo_offer(ready_all_cdo); + g_conf_read_all_sent = true; + } + if((0 < leaveCount) && (joinCount == leaveCount)) { + mstro_cdo_withdraw(ready_all_cdo); + mstro_cdo_dispose(ready_all_cdo); done=true; }