diff --git a/include/maestro/logging.h b/include/maestro/logging.h index 9a1698e6107ac2769cad1c1e27cdd248a0799860..4c48a6550acc5b45f060e2b2034b542221bb9f00 100644 --- a/include/maestro/logging.h +++ b/include/maestro/logging.h @@ -97,6 +97,7 @@ struct mstro_log_module_descriptor { #define MSTRO_LOG_MODULE_TRANSP 1<<9 #define MSTRO_LOG_MODULE_STATS 1<<10 #define MSTRO_LOG_MODULE_USER 1<<11 +#define MSTRO_LOG_MODULE_TELEMETRY 1<<12 #define MSTRO_LOG_MODULE_ALL ((uint64_t)0xffffffffffffffff) @@ -115,7 +116,8 @@ struct mstro_log_module_descriptor { {MSTRO_LOG_MODULE_TRANSF, "transf", "CDO Transformations"}, \ {MSTRO_LOG_MODULE_TRANSP, "transp", "CDO Transport"}, \ {MSTRO_LOG_MODULE_STATS, "stats", "Statistics and telemetry"}, \ - {MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \ + {MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \ + {MSTRO_LOG_MODULE_USER, "telemetry", "Telemetry data"}, \ } @@ -136,6 +138,12 @@ mstro_location_aware_log(int level, } while(0) +/** @brief core logging worker function - varargs version */ + void +mstro_vlocation_aware_log(int level, + uint64_t module, + const char *func, const char* file, int line, + const char *fmtstring, va_list ap); /** @brief mamba-compatible logging interface * diff --git a/tests/check_pm_declare.sh.in b/tests/check_pm_declare.sh.in index 8f60d4e4acff394f272011ba447b754f3302d8c5..4488871cc3ce339094b5bac920686c2d4b992a66 100644 --- a/tests/check_pm_declare.sh.in +++ b/tests/check_pm_declare.sh.in @@ -43,6 +43,9 @@ export MSTRO_WORKFLOW_NAME PM_CMD="@top_builddir@/tests/simple_pool_manager" CLIENT_CMD="@top_builddir@/tests/simple_client" +# telemetry component +TELEMETRY_CMD="@top_builddir@/tests/simple_telemetry_listener" + # start pool manager, connect its output to fd 3: # (we need to run in a subshell to start a new process group) exec 3< <(env MSTRO_LOG_LEVEL=${MSTRO_LOG_LEVEL:-2} MSTRO_MIO_CONFIG=mio-config-PM1.yaml ${PM_CMD}) @@ -82,6 +85,14 @@ fi MSTRO_POOL_MANAGER_INFO="$pm_info" export MSTRO_POOL_MANAGER_INFO +# start listener +(env MSTRO_LOG_LEVEL=0 ${TELEMETRY_CMD} \ + --workflow ${MSTRO_WORKFLOW_NAME} \ + --component Telemetry-Vampire \ + --max-idle 4 --destination stderr --verbose ) || exit 99 & + +sleep 1 + # start client 1 (env MSTRO_COMPONENT_NAME="C1" MSTRO_MIO_CONFIG=mio-config-C1.yaml ${CLIENT_CMD}) || exit 99 & diff --git a/tests/simple_telemetry_listener.c b/tests/simple_telemetry_listener.c index fc30d5656111f285ac3f952b90463be0f6e266f8..f482bd8ae0fe5c96a8d794ff81a7828e4e34874f 100644 --- a/tests/simple_telemetry_listener.c +++ b/tests/simple_telemetry_listener.c @@ -42,20 +42,34 @@ #include <errno.h> #include <stdlib.h> +#include <syslog.h> +#include <stdarg.h> + + #define NSEC_PER_SEC ((mstro_nanosec_t)1000*1000*1000) -const mstro_nanosec_t MAX_WAIT = ((mstro_nanosec_t)15)*NSEC_PER_SEC; /* 15s */ +const mstro_nanosec_t DEFAULT_MAX_WAIT = ((mstro_nanosec_t)15)*NSEC_PER_SEC; /* 15s */ /** Configurable settings */ + +/** workflow name to attach to (default: from env)*/ char *g_conf_workflow_name = NULL; +/** component name to use (default: argv[0])*/ char *g_conf_component_name = NULL; +/** component name after whose BYE event we should terminate (default: none) */ char *g_conf_terminate_after = NULL; -mstro_nanosec_t g_conf_max_wait = MAX_WAIT; +/** the appid for this component, as observed during its WELCOME */ +mstro_app_id g_conf_terminate_after_appid = 0; +/** timeout after which to terminate, if no events are observed anymore (default: DEFAULT_MAX_WAIT) */ +mstro_nanosec_t g_conf_max_wait = DEFAULT_MAX_WAIT; +/** pool manager info (default: from env) */ char *g_conf_pminfo = NULL; #define LOGDST_STDOUT 1 #define LOGDST_STDERR 2 #define LOGDST_MAESTRO 3 #define LOGDST_SYSLOG 4 +/** logging destination */ int g_conf_logdst = LOGDST_STDOUT; +/** verbose operation */ bool g_verbose = true; @@ -77,8 +91,9 @@ print_usage(const char *argv0) " --help, -h This help\n" " --verbose, -v Verbose output\n" " --version, -V Version information\n" - ,((double)MAX_WAIT)/NSEC_PER_SEC); + ,((double)DEFAULT_MAX_WAIT)/NSEC_PER_SEC); } + /** argument parsing and setting configuration */ mstro_status parse_arguments(int argc, char **argv) @@ -138,13 +153,13 @@ parse_arguments(int argc, char **argv) break; case 'd': { const char *dst = opt.arg; - if(strcasecmp(dst,"sy")==0) { + if(strncasecmp(dst,"sy",2)==0) { g_conf_logdst=LOGDST_SYSLOG; - } else if(strcasecmp(dst,"stdo")==0) { + } else if(strncasecmp(dst,"stdo",4)==0) { g_conf_logdst=LOGDST_STDOUT; - } else if(strcasecmp(dst,"stde")==0) { + } else if(strncasecmp(dst,"stde",4)==0) { g_conf_logdst=LOGDST_STDERR; - } else if(strcasecmp(dst,"m")==0) { + } else if(strncasecmp(dst,"m",1)==0) { g_conf_logdst=LOGDST_MAESTRO; } else { fprintf(stderr, "Illegal output destination (or none) specified: %s\n", dst); @@ -204,6 +219,114 @@ parse_arguments(int argc, char **argv) return MSTRO_OK; } +static void +store_telemetry_file(FILE* fp, mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + vfprintf(fp, message, args); +} + +static void +store_telemetry_syslog(mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + vsyslog(LOG_ERR, message, args); +} + +/** maestro/logging.h redefines LOG_ERROR from syslog.h, so we cannot include it at the top */ +#undef LOG_ERR +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_DEBUG +#include "maestro/logging.h" + +static void +store_telemetry_maestro(mstro_pool_event e, const char *message, va_list args) +{ + e=e; /* avoid unused arg */ + + mstro_vlocation_aware_log(MSTRO_LOG_INFO, MSTRO_LOG_MODULE_TELEMETRY, + __FUNCTION__, __FILE__, __LINE__, message, args); +} + +static void +store_telemetry_dispatch(mstro_pool_event e, const char *message, ...) +{ + va_list ap; + va_start(ap, message); + + switch(g_conf_logdst) { + case LOGDST_STDOUT: + store_telemetry_file(stdout, e, message, ap); + break; + case LOGDST_STDERR: + store_telemetry_file(stderr, e, message, ap); + break; + case LOGDST_MAESTRO: + store_telemetry_maestro(e, message, ap); + break; + case LOGDST_SYSLOG: + store_telemetry_syslog(e, message, ap); + break; + default: + fprintf(stderr, "Illegal telemetry log destination %d\n", g_conf_logdst); + } + va_end(ap); +} + +static void +store_telemetry(mstro_pool_event e) +{ + /* basic event type */ + const char *kind = mstro_pool_event_description(e->kind); + + /* Until that description is not detailed enough, generate extra info here for some events */ + + /* format: nested CSV "timestamp, event name, serial, { per-kind-info }" */ + switch(e->kind) { + case MSTRO_POOL_EVENT_APP_JOIN: + /* { assigned appid, "component name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIappid ",\"%s\" }\n", + mstro_clock(), + kind, e->serial, + e->join.appid, e->join.component_name); + break; + case MSTRO_POOL_EVENT_APP_WELCOME: + /* { assigned appid, "component name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->welcome.appid, e->welcome.component_name); + break; + + case MSTRO_POOL_EVENT_APP_LEAVE: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n", + mstro_clock(), + kind, e->serial, e->leave.appid); + break; + case MSTRO_POOL_EVENT_APP_BYE: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n", + mstro_clock(), + kind, e->serial, e->bye.appid); + break; + case MSTRO_POOL_EVENT_DECLARE: + /* { appid, "cdo name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->declare.appid, e->declare.cdo_name); + break; + case MSTRO_POOL_EVENT_SEAL: + /* { appid, "cdo name" } */ + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n", + mstro_clock(), + kind, e->serial, e->seal.appid, e->seal.cdo_name); + break; + default: + store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { }\n", + mstro_clock(), + kind, e->serial); + } +} + /** main loop: handle events and log them */ mstro_status event_loop(void) @@ -229,18 +352,16 @@ event_loop(void) s=mstro_cdo_selector_dispose(selector); - /* Subscribe to JOIN and LEAVE. + /* Subscribe to JOIN and LEAVE ('after' events, after handling on the PM). * * That allows us to log component names and implement a termination * criterion like "if component X has left, terminate telemetry * listener" */ mstro_subscription join_leave_subscription=NULL; s=mstro_subscribe( - NULL, - ( MSTRO_POOL_EVENT_APP_JOIN | MSTRO_POOL_EVENT_APP_LEAVE), + NULL, MSTRO_POOL_EVENT_APP_JOIN|MSTRO_POOL_EVENT_APP_LEAVE, MSTRO_SUBSCRIPTION_OPTS_DEFAULT, &join_leave_subscription); - bool done = false; @@ -254,49 +375,89 @@ event_loop(void) s=mstro_subscription_poll(join_leave_subscription, &e); if(e!=NULL) { - fprintf(stdout, "JOIN/LEAVE event(s)\n"); + /* reset start time, since we saw some event */ + starttime = mstro_clock(); + + if(g_verbose) + fprintf(stdout, "JOIN/LEAVE event(s)\n"); + mstro_pool_event tmp=e; /* handle all */ while(tmp) { + store_telemetry(tmp); + switch(tmp->kind) { case MSTRO_POOL_EVENT_APP_JOIN: - fprintf(stdout, "Noticed JOIN event of app %s\n", - tmp->join.component_name); + if(g_verbose) + fprintf(stdout, "Noticed JOIN event of app %s\n", + tmp->join.component_name); + if(g_conf_terminate_after) { + if(strcmp(g_conf_terminate_after, tmp->join.component_name)==0) { + if(g_verbose) { + fprintf(stdout, "App recognized as the one that will trigger termination of this listener\n"); + } + g_conf_terminate_after_appid = tmp->join.appid; + } + } break; + case MSTRO_POOL_EVENT_APP_LEAVE: - fprintf(stdout, "Noticed LEAVE event of app %" PRIappid "\n", - tmp->leave.appid); + if(g_verbose) { + fprintf(stdout, "Noticed LEAVE event of app %" PRIappid "\n", + tmp->leave.appid); + } + if(g_conf_terminate_after) { + if(g_conf_terminate_after_appid==tmp->leave.appid) { + if(g_verbose) { + fprintf(stdout, "LEAVE of %" PRIappid " triggers termination of telemetry listener\n", + tmp->leave.appid); + } + done=true; + } + } break; + default: fprintf(stderr, "Unexpected event %d\n", tmp->kind); } - fflush(stdout); + + if(g_verbose) + fflush(stdout); + tmp=tmp->next; } /* acknowledge all */ - s=mstro_subscription_ack(join_leave_subscription, e); + /* no need to ack these events s=mstro_subscription_ack(join_leave_subscription, e); */ /* dispose all */ s=mstro_pool_event_dispose(e); } else { s=mstro_subscription_poll(cdo_subscription, &e); if(e!=NULL) { + /* reset start time, since we saw some event */ + starttime = mstro_clock(); + mstro_pool_event tmp=e; /* handle all */ while(tmp) { + store_telemetry(tmp); + switch(tmp->kind) { case MSTRO_POOL_EVENT_DECLARE: case MSTRO_POOL_EVENT_SEAL: - fprintf(stdout, "CDO event %s\n", - mstro_pool_event_description(tmp->kind)); + if(g_verbose) { + fprintf(stdout, "CDO event %s\n", + mstro_pool_event_description(tmp->kind)); + } break; default: fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); } + tmp=tmp->next; } /* acknowledge all */ - s=mstro_subscription_ack(cdo_subscription, e); + /* no need to ack these events s=mstro_subscription_ack(cdo_subscription, e); */ /* dispose all */ s=mstro_pool_event_dispose(e); } else { @@ -305,9 +466,9 @@ event_loop(void) } } - if(mstro_clock()>starttime+MAX_WAIT) { + if(mstro_clock()>starttime+DEFAULT_MAX_WAIT) { fprintf(stderr, "Waited %" PRIu64 "s and still not done\n", - MAX_WAIT/(1000*1000*1000)); + DEFAULT_MAX_WAIT/(1000*1000*1000)); done=true; } if(done) @@ -333,6 +494,9 @@ main(int argc, char ** argv) fprintf(stderr, "Failed to parse arguments\n"); exit(EXIT_FAILURE); } + if(g_conf_logdst==LOGDST_SYSLOG) { + openlog(g_conf_component_name, LOG_CONS|LOG_PID, LOG_USER); + } /** ensure pool info is set (by user or args) */ if(getenv(MSTRO_ENV_POOL_INFO)==NULL ) {