diff --git a/include/maestro/core.h b/include/maestro/core.h index 71e72f5e3fbbff0385939e6fe26575bad3f570d7..4954c18343e471514f1471f8cf1de7bbc5ad4453 100644 --- a/include/maestro/core.h +++ b/include/maestro/core.h @@ -43,7 +43,8 @@ extern "C" { #include "maestro/config.h" #include "maestro/status.h" #include <stdint.h> - + #include <inttypes.h> + /**@defgroup MSTRO_Core Maestro Core API **@{ ** This is the core API, as developed for D3.2 @@ -96,7 +97,12 @@ extern "C" { ** Application-in-workflow identifier type **/ typedef uint64_t mstro_app_id; - + + /** + ** A printf format specifie suitable to print an @ref mstro_app_id value + **/ + #define PRIappid PRIu64 + /**@} (end of group MSTRO_Core) */ /* include the remaining public API */ @@ -105,6 +111,7 @@ extern "C" { #include "maestro/pool_manager.h" #include "maestro/attributes.h" #include "maestro/groups.h" +#include "maestro/env.h" #ifdef __cplusplus diff --git a/include/maestro/i_ketopt.h b/include/maestro/i_ketopt.h new file mode 100644 index 0000000000000000000000000000000000000000..166938b8311778941a0424916f59eed77a721f14 --- /dev/null +++ b/include/maestro/i_ketopt.h @@ -0,0 +1,146 @@ +/* taken from https://github.com/attractivechaos/klib/blob/master/ketopt.h */ + +/* This file is originally part of klib, which is licensed under the + * X11/MIT licnense: */ + +/* The MIT License + Copyright (c) 2008, 2009, 2011 by Attractive Chaos <attractor@live.co.uk> + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +#ifndef I_KETOPT_H +#define I_KETOPT_H + +#include <string.h> /* for strchr() and strncmp() */ + +#define ko_no_argument 0 +#define ko_required_argument 1 +#define ko_optional_argument 2 + +typedef struct { + int ind; /* equivalent to optind */ + int opt; /* equivalent to optopt */ + char *arg; /* equivalent to optarg */ + int longidx; /* index of a long option; or -1 if short */ + /* private variables not intended for external uses */ + int i, pos, n_args; +} ketopt_t; + +typedef struct { + char *name; + int has_arg; + int val; +} ko_longopt_t; + +static ketopt_t KETOPT_INIT = { 1, 0, 0, -1, 1, 0, 0 }; + +static void ketopt_permute(char *argv[], int j, int n) /* move argv[j] over n elements to the left */ +{ + int k; + char *p = argv[j]; + for (k = 0; k < n; ++k) + argv[j - k] = argv[j - k - 1]; + argv[j - k] = p; +} + +/** + * Parse command-line options and arguments + * + * This fuction has a similar interface to GNU's getopt_long(). Each call + * parses one option and returns the option name. s->arg points to the option + * argument if present. The function returns -1 when all command-line arguments + * are parsed. In this case, s->ind is the index of the first non-option + * argument. + * + * @param s status; shall be initialized to KETOPT_INIT on the first call + * @param argc length of argv[] + * @param argv list of command-line arguments; argv[0] is ignored + * @param permute non-zero to move options ahead of non-option arguments + * @param ostr option string + * @param longopts long options + * + * @return ASCII for a short option; ko_longopt_t::val for a long option; -1 if + * argv[] is fully processed; '?' for an unknown option or an ambiguous + * long option; ':' if an option argument is missing + */ +static int ketopt(ketopt_t *s, int argc, char *argv[], int permute, const char *ostr, const ko_longopt_t *longopts) +{ + int opt = -1, i0, j; + if (permute) { + while (s->i < argc && (argv[s->i][0] != '-' || argv[s->i][1] == '\0')) + ++s->i, ++s->n_args; + } + s->arg = 0, s->longidx = -1, i0 = s->i; + if (s->i >= argc || argv[s->i][0] != '-' || argv[s->i][1] == '\0') { + s->ind = s->i - s->n_args; + return -1; + } + if (argv[s->i][0] == '-' && argv[s->i][1] == '-') { /* "--" or a long option */ + if (argv[s->i][2] == '\0') { /* a bare "--" */ + ketopt_permute(argv, s->i, s->n_args); + ++s->i, s->ind = s->i - s->n_args; + return -1; + } + s->opt = 0, opt = '?', s->pos = -1; + if (longopts) { /* parse long options */ + int k, n_exact = 0, n_partial = 0; + const ko_longopt_t *o = 0, *o_exact = 0, *o_partial = 0; + for (j = 2; argv[s->i][j] != '\0' && argv[s->i][j] != '='; ++j) {} /* find the end of the option name */ + for (k = 0; longopts[k].name != 0; ++k) + if (strncmp(&argv[s->i][2], longopts[k].name, j - 2) == 0) { + if (longopts[k].name[j - 2] == 0) ++n_exact, o_exact = &longopts[k]; + else ++n_partial, o_partial = &longopts[k]; + } + if (n_exact > 1 || (n_exact == 0 && n_partial > 1)) return '?'; + o = n_exact == 1? o_exact : n_partial == 1? o_partial : 0; + if (o) { + s->opt = opt = o->val, s->longidx = o - longopts; + if (argv[s->i][j] == '=') s->arg = &argv[s->i][j + 1]; + if (o->has_arg == 1 && argv[s->i][j] == '\0') { + if (s->i < argc - 1) s->arg = argv[++s->i]; + else opt = ':'; /* missing option argument */ + } + } + } + } else { /* a short option */ + const char *p; + if (s->pos == 0) s->pos = 1; + opt = s->opt = argv[s->i][s->pos++]; + p = strchr((char*)ostr, opt); + if (p == 0) { + opt = '?'; /* unknown option */ + } else if (p[1] == ':') { + if (argv[s->i][s->pos] == 0) { + if (s->i < argc - 1) s->arg = argv[++s->i]; + else opt = ':'; /* missing option argument */ + } else s->arg = &argv[s->i][s->pos]; + s->pos = -1; + } + } + if (s->pos < 0 || argv[s->i][s->pos] == 0) { + ++s->i, s->pos = 0; + if (s->n_args > 0) /* permute */ + for (j = i0; j < s->i; ++j) + ketopt_permute(argv, j, s->n_args); + } + s->ind = s->i - s->n_args; + return opt; +} + +#endif diff --git a/include/maestro/logging.h b/include/maestro/logging.h index 9a1698e6107ac2769cad1c1e27cdd248a0799860..4c48a6550acc5b45f060e2b2034b542221bb9f00 100644 --- a/include/maestro/logging.h +++ b/include/maestro/logging.h @@ -97,6 +97,7 @@ struct mstro_log_module_descriptor { #define MSTRO_LOG_MODULE_TRANSP 1<<9 #define MSTRO_LOG_MODULE_STATS 1<<10 #define MSTRO_LOG_MODULE_USER 1<<11 +#define MSTRO_LOG_MODULE_TELEMETRY 1<<12 #define MSTRO_LOG_MODULE_ALL ((uint64_t)0xffffffffffffffff) @@ -115,7 +116,8 @@ struct mstro_log_module_descriptor { {MSTRO_LOG_MODULE_TRANSF, "transf", "CDO Transformations"}, \ {MSTRO_LOG_MODULE_TRANSP, "transp", "CDO Transport"}, \ {MSTRO_LOG_MODULE_STATS, "stats", "Statistics and telemetry"}, \ - {MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \ + {MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \ + {MSTRO_LOG_MODULE_USER, "telemetry", "Telemetry data"}, \ } @@ -136,6 +138,12 @@ mstro_location_aware_log(int level, } while(0) +/** @brief core logging worker function - varargs version */ + void +mstro_vlocation_aware_log(int level, + uint64_t module, + const char *func, const char* file, int line, + const char *fmtstring, va_list ap); /** @brief mamba-compatible logging interface * diff --git a/include/maestro/pool.h b/include/maestro/pool.h index 359ce2b60a5cd12c45d12e912dd3c44026214994..8f8532603950fec2dd0cc0d946cd23794cfe3d2f 100644 --- a/include/maestro/pool.h +++ b/include/maestro/pool.h @@ -376,6 +376,7 @@ struct mstro_pool_event_ { /* JOIN */ struct { char *component_name; /**< component that is trying to join */ + mstro_app_id appid; /**< the app ID assigned (MSTRO_APP_ID_INVALID if JOIN:pre is observed) */ } join; /* WELCOME */ diff --git a/maestro/ofi.c b/maestro/ofi.c index 8d77f63c5db7c703291904e6a2cebb3dc675fc9a..be9651a411089b69293245fa7fbea4d9bee9a329 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -2390,8 +2390,8 @@ mstro_pm__register_app(Mstro__Pool__Join *join_msg, goto BAILOUT_FREE; } - DEBUG("App %d advertises %zu transport methods\n", - join_msg->transport_methods->n_supported); + DEBUG("App %s advertises %zu transport methods\n", + join_msg->component_name, join_msg->transport_methods->n_supported); /* insert into registry table */ s = mstro_pm_app_register(ep, translated_addr, @@ -2874,8 +2874,10 @@ mstro_pm_attach(const char *remote_pm_info) mstro_status s=mstro_ep_desc_deserialize(&pm_epd, remote_pm_info); if(s!=MSTRO_OK) { - ERR("Failed to parse pool manager info: %d (%s)\n", - s, mstro_status_description(s)); + ERR("Failed to parse pool manager info: %d (%s%s)\n", + s, mstro_status_description(s), + /* parser errrors also if a NULL result */ + s==MSTRO_NOMEM ? " or invalid pool-manager info data" : ""); goto BAILOUT; } diff --git a/maestro/pool_manager.c b/maestro/pool_manager.c index 8897816642b7f07ec9a43a585734a7447b0acc2c..95fff9b67c9e4e78435e71ceca4a7e3e9b3a6939 100644 --- a/maestro/pool_manager.c +++ b/maestro/pool_manager.c @@ -416,8 +416,10 @@ mstro_pm__event_notify_and_continue(Mstro__Pool__Event *pool_event_msg, * the event acks have arrived. If none are needed the event will * be triggered immediately. */ - DEBUG("Advertising event kind %d (%s)\n", + DEBUG("Advertising event kind %d (%s:%s)\n", pool_event_msg->kind, + protobuf_c_enum_descriptor_get_value(&mstro__pool__event_kind__descriptor, + pool_event_msg->kind)->name, beforep ? "before" : "after"); s = mstro_pool_event_advertise(pool_event_msg, beforep, cont_event); @@ -2020,7 +2022,9 @@ mstro_pm__handle_join_phase2(mstro_event event, /* it's safe to refer to the Appid object since the Event object is * on stack and will only be alive until notify-and-continue is * done */ - ev.origin_id = cont->msg->token->appid; + Mstro__Pool__Appid aid = MSTRO__POOL__APPID__INIT; + aid.id = regentry->appid; + ev.origin_id = &aid; status = mstro_pm__event_notify_and_continue( diff --git a/maestro/pool_manager_registry.c b/maestro/pool_manager_registry.c index 5623c26e468dd6918529e1fdae6d4ef4da9d9bb3..bda4204003eeeeedaeb3976072700327dc49d180 100644 --- a/maestro/pool_manager_registry.c +++ b/maestro/pool_manager_registry.c @@ -1128,6 +1128,10 @@ mstro_pm_cdo_app_match(mstro_app_id origin, const struct mstro_cdo_id *id, return MSTRO_OK; } + /* WITH_CDO_ID_STR(str,id, { */ + /* DEBUG("Trying to match %s origin %" PRIappid " for selector %p, query |%s|\n", */ + /* str, origin, cdo_selector, cdo_selector->query); */ + /* }); */ mstro_status status = MSTRO_FAIL; /* now fetcch attributes from per-app-CDO entry, call subscription-module checker and return result */ WITH_LOCKED_CDO_REGISTRY({ diff --git a/maestro/subscription_registry.c b/maestro/subscription_registry.c index 4fb434a6a90bc99a26147ede3285a27a3c5148a0..7e4f520eb52305ce8455cbf7cbc384c57d173aac 100644 --- a/maestro/subscription_registry.c +++ b/maestro/subscription_registry.c @@ -1403,6 +1403,17 @@ mstro__pool_event_cdoid_get(const Mstro__Pool__Event *ev, ERR("Unexpected event type: %d\n", ev->payload_case); s=MSTRO_INVARG; } + + if(s==MSTRO_OK) { + /* DECLARE may leave uuid NULL */ + if(uuid) { + id->qw[0] = uuid->qw0; + id->qw[1] = uuid->qw1; + } else { + *id = MSTRO_CDO_ID_NULL; + } + } + return s; } @@ -1624,7 +1635,7 @@ mstro_subscription_selector_check(struct mstro_subscription_ *s, return MSTRO_OK; } else { /* we always need the CDO id. Unfortunately it's slightly buried in the EV */ - struct mstro_cdo_id id; + struct mstro_cdo_id id = MSTRO_CDO_ID_INVALID; status = mstro__pool_event_cdoid_get(ev, &id); if(status!=MSTRO_OK) { ERR("Failed to retrieve CDO id from event\n"); @@ -1848,7 +1859,14 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) free(ev); return MSTRO_NOMEM; } - DEBUG("Event: %s JOINed\n", ev->join.component_name); + if(eventmsg->origin_id) + ev->join.appid = eventmsg->origin_id->id; /* may be MSTRO_APP_ID_INVALID for JOIN:pre */ + else + ev->join.appid = MSTRO_APP_ID_INVALID; + + + DEBUG("Event: %s JOINed (appid %" PRIappid ")\n", + ev->join.component_name, ev->join.appid); break; case MSTRO_POOL_EVENT_APP_LEAVE: @@ -1972,6 +1990,12 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) } break; + case MSTRO_POOL_EVENT_APP_BYE: + assert(eventmsg->payload_case==MSTRO__POOL__EVENT__PAYLOAD_BYE); + ev->bye.appid = eventmsg->origin_id->id; + DEBUG("Event: %" PRIu64 " granted BYE\n", ev->bye.appid); + break; + case MSTRO_POOL_EVENT_SEAL_GROUP: case MSTRO_POOL_EVENT_DEMAND: case MSTRO_POOL_EVENT_RETRACT: @@ -1981,8 +2005,6 @@ mstro_pool_event_consume(const Mstro__Pool__Event *eventmsg) case MSTRO_POOL_EVENT_TRANSPORT_COMPLETED: /* pool-related */ - case MSTRO_POOL_EVENT_APP_WELCOME: - case MSTRO_POOL_EVENT_APP_BYE: case MSTRO_POOL_EVENT_POOL_CHECKPOINT: case MSTRO_POOL_EVENT_SUBSCRIBE: case MSTRO_POOL_EVENT_UNSUBSCRIBE: diff --git a/tests/.gitignore b/tests/.gitignore index ba74bfbb913d915e7e0a59efc8e1344f81638065..817c704ffab2d8c4b0d9b2a90f928b1828d6331d 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -20,4 +20,5 @@ foo simple_archiver simple_injector simple_group_injector +simple_telemetry_listener coverage \ No newline at end of file diff --git a/tests/Makefile.am b/tests/Makefile.am index b40b04224f18d190c7aaa772a1f8e3b5867e39f8..3a9fb1590f5055c0a86389c6ad289581ec961247 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -115,6 +115,7 @@ check_PROGRAMS = check_version check_init check_uuid \ simple_interlock_client_2 \ simple_injector \ simple_archiver \ + simple_telemetry_listener \ check_events \ coverage diff --git a/tests/check_pm_declare.sh.in b/tests/check_pm_declare.sh.in index 8f60d4e4acff394f272011ba447b754f3302d8c5..4488871cc3ce339094b5bac920686c2d4b992a66 100644 --- a/tests/check_pm_declare.sh.in +++ b/tests/check_pm_declare.sh.in @@ -43,6 +43,9 @@ export MSTRO_WORKFLOW_NAME PM_CMD="@top_builddir@/tests/simple_pool_manager" CLIENT_CMD="@top_builddir@/tests/simple_client" +# telemetry component +TELEMETRY_CMD="@top_builddir@/tests/simple_telemetry_listener" + # start pool manager, connect its output to fd 3: # (we need to run in a subshell to start a new process group) exec 3< <(env MSTRO_LOG_LEVEL=${MSTRO_LOG_LEVEL:-2} MSTRO_MIO_CONFIG=mio-config-PM1.yaml ${PM_CMD}) @@ -82,6 +85,14 @@ fi MSTRO_POOL_MANAGER_INFO="$pm_info" export MSTRO_POOL_MANAGER_INFO +# start listener +(env MSTRO_LOG_LEVEL=0 ${TELEMETRY_CMD} \ + --workflow ${MSTRO_WORKFLOW_NAME} \ + --component Telemetry-Vampire \ + --max-idle 4 --destination stderr --verbose ) || exit 99 & + +sleep 1 + # start client 1 (env MSTRO_COMPONENT_NAME="C1" MSTRO_MIO_CONFIG=mio-config-C1.yaml ${CLIENT_CMD}) || exit 99 & diff --git a/tests/simple_telemetry_listener.c b/tests/simple_telemetry_listener.c new file mode 100644 index 0000000000000000000000000000000000000000..f482bd8ae0fe5c96a8d794ff81a7828e4e34874f --- /dev/null +++ b/tests/simple_telemetry_listener.c @@ -0,0 +1,537 @@ +/* Connect to pool and subscribe to all JOIN and all CDO ID creating + * events. Log them. */ +/* + * Copyright (C) 2020 Cray Computer GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* FIXME: set signal handler to flush log and stop cleanly */ + + +#include "maestro.h" +#include "maestro/i_statistics.h" +#include "maestro/i_ketopt.h" + +#include <unistd.h> +#include <errno.h> +#include <stdlib.h> + +#include <syslog.h> +#include <stdarg.h> + + +#define NSEC_PER_SEC ((mstro_nanosec_t)1000*1000*1000) +const mstro_nanosec_t DEFAULT_MAX_WAIT = ((mstro_nanosec_t)15)*NSEC_PER_SEC; /* 15s */ + +/** Configurable settings */ + +/** workflow name to attach to (default: from env)*/ +char *g_conf_workflow_name = NULL; +/** component name to use (default: argv[0])*/ +char *g_conf_component_name = NULL; +/** component name after whose BYE event we should terminate (default: none) */ +char *g_conf_terminate_after = NULL; +/** the appid for this component, as observed during its WELCOME */ +mstro_app_id g_conf_terminate_after_appid = 0; +/** timeout after which to terminate, if no events are observed anymore (default: DEFAULT_MAX_WAIT) */ +mstro_nanosec_t g_conf_max_wait = DEFAULT_MAX_WAIT; +/** pool manager info (default: from env) */ +char *g_conf_pminfo = NULL; +#define LOGDST_STDOUT 1 +#define LOGDST_STDERR 2 +#define LOGDST_MAESTRO 3 +#define LOGDST_SYSLOG 4 +/** logging destination */ +int g_conf_logdst = LOGDST_STDOUT; +/** verbose operation */ +bool g_verbose = true; + + +/** usage */ + +void +print_usage(const char *argv0) +{ + fprintf(stderr, "Usage: %s [OPTIONS]\n", argv0); + fprintf(stderr, " Initialization OPTIONS:\n"); + fprintf(stderr, " (environment variables that are used as default in {curly brackets})\n"); + fprintf(stderr, + " --workflow NAME, -w NAME Maestro Workflow name to attach to {MSTRO_WORKFLOW_NAME}\n" + " --component NAME, -c NAME Maestro component name for this listener {MSTRO_COMPONENT_NAME}\n" + " --terminate-after NAME, -q NAME Terminate after observing LEAVE of component NAME\n" + " --max-idle SECONDS, -i SECONDS Terminate after no events for SECONDS {%g}\n" + " --destination DEST, -d DEST Logging destination: 'syslog', 'stdout', 'stderr', or 'mstro' {stdout}\n" + " --pm-info PMINFO, -p PMINFO Pool Manager OOB info {MSTRO_POOL_MANAGER_INFO}\n" + " --help, -h This help\n" + " --verbose, -v Verbose output\n" + " --version, -V Version information\n" + ,((double)DEFAULT_MAX_WAIT)/NSEC_PER_SEC); +} + +/** argument parsing and setting configuration */ +mstro_status +parse_arguments(int argc, char **argv) +{ + /* default */ + g_conf_component_name = argv[0]; + + static ko_longopt_t longopts[] = { + { "workflow", ko_required_argument, 'w' }, + { "component", ko_required_argument, 'c' }, + { "terminate-after", ko_required_argument, 'q' }, + { "max-idle", ko_required_argument, 'i' }, + { "destination", ko_required_argument, 'd' }, + { "pm-info", ko_required_argument, 'p' }, + { "pm_info", ko_required_argument, 'p' }, + { "help", ko_no_argument, 'h' }, + { "verbose", ko_no_argument, 'v' }, + { "version", ko_no_argument, 'V' }, + { NULL, 0, 0 } + }; + ketopt_t opt = KETOPT_INIT; + int c; + + while ((c = ketopt(&opt, argc, argv, 1, "w:c:q:i:d:hv", longopts)) >= 0) { + switch(c) { + case 'w': + if(!opt.arg) { + fprintf(stderr, "--workflow option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + g_conf_workflow_name = opt.arg; + break; + case 'c': + if(!opt.arg) { + fprintf(stderr, "--component option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + g_conf_component_name = opt.arg; + break; + case 'q': + if(!opt.arg) { + fprintf(stderr, "--terminate-after option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + g_conf_terminate_after = opt.arg; + break; + case 'i': + if(!opt.arg) { + fprintf(stderr, "--max-idle option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + g_conf_max_wait = atoi(opt.arg)* NSEC_PER_SEC; + break; + case 'd': { + const char *dst = opt.arg; + if(strncasecmp(dst,"sy",2)==0) { + g_conf_logdst=LOGDST_SYSLOG; + } else if(strncasecmp(dst,"stdo",4)==0) { + g_conf_logdst=LOGDST_STDOUT; + } else if(strncasecmp(dst,"stde",4)==0) { + g_conf_logdst=LOGDST_STDERR; + } else if(strncasecmp(dst,"m",1)==0) { + g_conf_logdst=LOGDST_MAESTRO; + } else { + fprintf(stderr, "Illegal output destination (or none) specified: %s\n", dst); + print_usage(argv[0]); + return MSTRO_FAIL; + } + break; + } + case 'p': { + if(!opt.arg) { + fprintf(stderr, "--pm-info option is missing its argument\n"); + print_usage(argv[0]); + exit(EXIT_FAILURE); + } + size_t len = strlen(MSTRO_ENV_POOL_INFO) + 1 + strlen(opt.arg) + 1; + g_conf_pminfo = malloc(len); + if(g_conf_pminfo==NULL) { + fprintf(stderr, "pm-info argument too large\n"); + exit(EXIT_FAILURE); + } + strcpy(g_conf_pminfo, MSTRO_ENV_POOL_INFO); + strcat(g_conf_pminfo, "="); + strcat(g_conf_pminfo, opt.arg); + if(putenv(g_conf_pminfo)!=0) { + fprintf(stderr, "Failed to set PM-INFO: %d (%s)\n", errno, strerror(errno)); + exit(EXIT_FAILURE); + } + break; + } + case 'v': + g_verbose = true; + break; + case 'h': + print_usage(argv[0]); + exit(EXIT_SUCCESS); + case 'V': + fprintf(stdout, "%s, part of %s\n", argv[0], mstro_version()); + exit(EXIT_SUCCESS); + break; + default: + fprintf(stderr, "Unrecognized option: %c\n", optopt? optopt : ' '); + print_usage(argv[0]); + return MSTRO_FAIL; + } + } + if(opt.ind<argc) { + fprintf(stderr, "Unexpected non-option arguments: %s (...)\n", argv[opt.ind]); + print_usage(argv[0]); + return MSTRO_FAIL; + } + + if(g_verbose) { + fprintf(stderr, "Configuration: %s/%s/%s/%llu/%d\n", + g_conf_workflow_name, g_conf_component_name, + g_conf_terminate_after, g_conf_max_wait, g_conf_logdst); + } + return MSTRO_OK; +} + +static void +store_telemetry_file(FILE* fp, mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + vfprintf(fp, message, args); +} + +static void +store_telemetry_syslog(mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + vsyslog(LOG_ERR, message, args); +} + +/** maestro/logging.h redefines LOG_ERROR from syslog.h, so we cannot include it at the top */ +#undef LOG_ERR +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_DEBUG +#include "maestro/logging.h" + +static void +store_telemetry_maestro(mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + + mstro_vlocation_aware_log(MSTRO_LOG_INFO, MSTRO_LOG_MODULE_TELEMETRY, + __FUNCTION__, __FILE__, __LINE__, message, args); +} + +static void +store_telemetry_dispatch(mstro_pool_event e, const char *message, ...) +{ + va_list ap; + va_start(ap, message); + + switch(g_conf_logdst) { + case LOGDST_STDOUT: + store_telemetry_file(stdout, e, message, ap); + break; + case LOGDST_STDERR: + store_telemetry_file(stderr, e, message, ap); + break; + case LOGDST_MAESTRO: + store_telemetry_maestro(e, message, ap); + break; + case LOGDST_SYSLOG: + store_telemetry_syslog(e, message, ap); + break; + default: + fprintf(stderr, "Illegal telemetry log destination %d\n", g_conf_logdst); + } + va_end(ap); +} + +static void +store_telemetry(mstro_pool_event e) +{ + /* basic event type */ + const char *kind = mstro_pool_event_description(e->kind); + + /* Until that description is not detailed enough, generate extra info here for some events */ + + /* format: nested CSV "timestamp, event name, serial, { per-kind-info }" */ + switch(e->kind) { + case MSTRO_POOL_EVENT_APP_JOIN: + /* { assigned appid, "component name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIappid ",\"%s\" }\n", + mstro_clock(), + kind, e->serial, + e->join.appid, e->join.component_name); + break; + case MSTRO_POOL_EVENT_APP_WELCOME: + /* { assigned appid, "component name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->welcome.appid, e->welcome.component_name); + break; + + case MSTRO_POOL_EVENT_APP_LEAVE: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n", + mstro_clock(), + kind, e->serial, e->leave.appid); + break; + case MSTRO_POOL_EVENT_APP_BYE: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n", + mstro_clock(), + kind, e->serial, e->bye.appid); + break; + case MSTRO_POOL_EVENT_DECLARE: + /* { appid, "cdo name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->declare.appid, e->declare.cdo_name); + break; + case MSTRO_POOL_EVENT_SEAL: + /* { appid, "cdo name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->seal.appid, e->seal.cdo_name); + break; + default: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { }\n", + mstro_clock(), + kind, e->serial); + } +} + +/** main loop: handle events and log them */ +mstro_status +event_loop(void) +{ + /* wildcard selector: any CDO */ + mstro_cdo_selector selector=NULL; + mstro_status s=mstro_cdo_selector_create( + NULL, NULL, + "(has .maestro.core.cdo.name)", + &selector); + + + /* Subscribe to DECLARE and SEAL. + * + * That allows us to log app-id/name/local-id and later + * app-id/local-id/cdo-id + */ + mstro_subscription cdo_subscription=NULL; + s = mstro_subscribe(selector, + MSTRO_POOL_EVENT_DECLARE |MSTRO_POOL_EVENT_SEAL, + MSTRO_SUBSCRIPTION_OPTS_DEFAULT, + &cdo_subscription); + s=mstro_cdo_selector_dispose(selector); + + + /* Subscribe to JOIN and LEAVE ('after' events, after handling on the PM). + * + * That allows us to log component names and implement a termination + * criterion like "if component X has left, terminate telemetry + * listener" */ + mstro_subscription join_leave_subscription=NULL; + s=mstro_subscribe( + NULL, MSTRO_POOL_EVENT_APP_JOIN|MSTRO_POOL_EVENT_APP_LEAVE, + MSTRO_SUBSCRIPTION_OPTS_DEFAULT, + &join_leave_subscription); + + bool done = false; + + mstro_nanosec_t starttime = mstro_clock(); + + /* instead of a busy loop we could use event wait sets -- but we + * don't have them yet */ + while(!done) { + mstro_pool_event e=NULL; + /* poll join/leave */ + s=mstro_subscription_poll(join_leave_subscription, &e); + + if(e!=NULL) { + /* reset start time, since we saw some event */ + starttime = mstro_clock(); + + if(g_verbose) + fprintf(stdout, "JOIN/LEAVE event(s)\n"); + + mstro_pool_event tmp=e; + /* handle all */ + while(tmp) { + store_telemetry(tmp); + + switch(tmp->kind) { + case MSTRO_POOL_EVENT_APP_JOIN: + if(g_verbose) + fprintf(stdout, "Noticed JOIN event of app %s\n", + tmp->join.component_name); + if(g_conf_terminate_after) { + if(strcmp(g_conf_terminate_after, tmp->join.component_name)==0) { + if(g_verbose) { + fprintf(stdout, "App recognized as the one that will trigger termination of this listener\n"); + } + g_conf_terminate_after_appid = tmp->join.appid; + } + } + break; + + case MSTRO_POOL_EVENT_APP_LEAVE: + if(g_verbose) { + fprintf(stdout, "Noticed LEAVE event of app %" PRIappid "\n", + tmp->leave.appid); + } + if(g_conf_terminate_after) { + if(g_conf_terminate_after_appid==tmp->leave.appid) { + if(g_verbose) { + fprintf(stdout, "LEAVE of %" PRIappid " triggers termination of telemetry listener\n", + tmp->leave.appid); + } + done=true; + } + } + break; + + default: + fprintf(stderr, "Unexpected event %d\n", tmp->kind); + } + + if(g_verbose) + fflush(stdout); + + tmp=tmp->next; + } + + /* acknowledge all */ + /* no need to ack these events s=mstro_subscription_ack(join_leave_subscription, e); */ + /* dispose all */ + s=mstro_pool_event_dispose(e); + } else { + s=mstro_subscription_poll(cdo_subscription, &e); + if(e!=NULL) { + /* reset start time, since we saw some event */ + starttime = mstro_clock(); + + mstro_pool_event tmp=e; + /* handle all */ + while(tmp) { + store_telemetry(tmp); + + switch(tmp->kind) { + case MSTRO_POOL_EVENT_DECLARE: + case MSTRO_POOL_EVENT_SEAL: + if(g_verbose) { + fprintf(stdout, "CDO event %s\n", + mstro_pool_event_description(tmp->kind)); + } + break; + default: + fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); + } + + tmp=tmp->next; + } + /* acknowledge all */ + /* no need to ack these events s=mstro_subscription_ack(cdo_subscription, e); */ + /* dispose all */ + s=mstro_pool_event_dispose(e); + } else { + sleep(1); /* core will queue up events for us, no need to do + * intensive busy-loop */ + } + } + + if(mstro_clock()>starttime+DEFAULT_MAX_WAIT) { + fprintf(stderr, "Waited %" PRIu64 "s and still not done\n", + DEFAULT_MAX_WAIT/(1000*1000*1000)); + done=true; + } + if(done) + break; /* from WHILE */ + + } + s= mstro_subscription_dispose(cdo_subscription); + s= mstro_subscription_dispose(join_leave_subscription); + +BAILOUT: + return s; +} + +/** main entrypoint */ +char *g_default_loglevel=NULL; + +int +main(int argc, char ** argv) +{ + + mstro_status s = parse_arguments(argc, argv); + if(s!=MSTRO_OK) { + fprintf(stderr, "Failed to parse arguments\n"); + exit(EXIT_FAILURE); + } + if(g_conf_logdst==LOGDST_SYSLOG) { + openlog(g_conf_component_name, LOG_CONS|LOG_PID, LOG_USER); + } + + /** ensure pool info is set (by user or args) */ + if(getenv(MSTRO_ENV_POOL_INFO)==NULL ) { + fprintf(stderr, "No pool manager info provided, cannot attach to anything\n"); + exit(EXIT_FAILURE); + } + + /** try to reduce logging, unless user set something */ + if(getenv(MSTRO_ENV_LOG_LEVEL)==NULL) { + int l = strlen(MSTRO_ENV_LOG_LEVEL) + 1 + strlen("err") + 1; + g_default_loglevel=malloc(l); + if(g_default_loglevel!=NULL) { + strcpy(g_default_loglevel, MSTRO_ENV_LOG_LEVEL); + strcat(g_default_loglevel, "=err"); + putenv(g_default_loglevel); + } else { + ;/* well, then try without reduced logging */ + } + } + + /** start */ + s=mstro_init(g_conf_workflow_name, g_conf_component_name, 0); + if(s!=MSTRO_OK) { + fprintf(stderr, "Failed to initialize Maestro: %d (%s)\n", + s, mstro_status_description(s)); + exit(EXIT_FAILURE); + } + + s = event_loop(); + + mstro_status s1=mstro_finalize(); + + if(s1!=MSTRO_OK || s!=MSTRO_OK) + return EXIT_FAILURE; + else + return EXIT_SUCCESS; +} +