Skip to content
Snippets Groups Projects
Commit a2fc51ff authored by Utz-Uwe Haus's avatar Utz-Uwe Haus
Browse files

implement some telemetry destinations in telemetry listener

parent 4ea54bc9
Branches
Tags
2 merge requests!3Jsc ci update,!2update JSC-CI branch to devel
...@@ -97,6 +97,7 @@ struct mstro_log_module_descriptor { ...@@ -97,6 +97,7 @@ struct mstro_log_module_descriptor {
#define MSTRO_LOG_MODULE_TRANSP 1<<9 #define MSTRO_LOG_MODULE_TRANSP 1<<9
#define MSTRO_LOG_MODULE_STATS 1<<10 #define MSTRO_LOG_MODULE_STATS 1<<10
#define MSTRO_LOG_MODULE_USER 1<<11 #define MSTRO_LOG_MODULE_USER 1<<11
#define MSTRO_LOG_MODULE_TELEMETRY 1<<12
#define MSTRO_LOG_MODULE_ALL ((uint64_t)0xffffffffffffffff) #define MSTRO_LOG_MODULE_ALL ((uint64_t)0xffffffffffffffff)
...@@ -116,6 +117,7 @@ struct mstro_log_module_descriptor { ...@@ -116,6 +117,7 @@ struct mstro_log_module_descriptor {
{MSTRO_LOG_MODULE_TRANSP, "transp", "CDO Transport"}, \ {MSTRO_LOG_MODULE_TRANSP, "transp", "CDO Transport"}, \
{MSTRO_LOG_MODULE_STATS, "stats", "Statistics and telemetry"}, \ {MSTRO_LOG_MODULE_STATS, "stats", "Statistics and telemetry"}, \
{MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \ {MSTRO_LOG_MODULE_USER, "user", "User code using maestro logging"}, \
{MSTRO_LOG_MODULE_USER, "telemetry", "Telemetry data"}, \
} }
...@@ -136,6 +138,12 @@ mstro_location_aware_log(int level, ...@@ -136,6 +138,12 @@ mstro_location_aware_log(int level,
} while(0) } while(0)
/** @brief core logging worker function - varargs version */
void
mstro_vlocation_aware_log(int level,
uint64_t module,
const char *func, const char* file, int line,
const char *fmtstring, va_list ap);
/** @brief mamba-compatible logging interface /** @brief mamba-compatible logging interface
* *
......
...@@ -43,6 +43,9 @@ export MSTRO_WORKFLOW_NAME ...@@ -43,6 +43,9 @@ export MSTRO_WORKFLOW_NAME
PM_CMD="@top_builddir@/tests/simple_pool_manager" PM_CMD="@top_builddir@/tests/simple_pool_manager"
CLIENT_CMD="@top_builddir@/tests/simple_client" CLIENT_CMD="@top_builddir@/tests/simple_client"
# telemetry component
TELEMETRY_CMD="@top_builddir@/tests/simple_telemetry_listener"
# start pool manager, connect its output to fd 3: # start pool manager, connect its output to fd 3:
# (we need to run in a subshell to start a new process group) # (we need to run in a subshell to start a new process group)
exec 3< <(env MSTRO_LOG_LEVEL=${MSTRO_LOG_LEVEL:-2} MSTRO_MIO_CONFIG=mio-config-PM1.yaml ${PM_CMD}) exec 3< <(env MSTRO_LOG_LEVEL=${MSTRO_LOG_LEVEL:-2} MSTRO_MIO_CONFIG=mio-config-PM1.yaml ${PM_CMD})
...@@ -82,6 +85,14 @@ fi ...@@ -82,6 +85,14 @@ fi
MSTRO_POOL_MANAGER_INFO="$pm_info" MSTRO_POOL_MANAGER_INFO="$pm_info"
export MSTRO_POOL_MANAGER_INFO export MSTRO_POOL_MANAGER_INFO
# start listener
(env MSTRO_LOG_LEVEL=0 ${TELEMETRY_CMD} \
--workflow ${MSTRO_WORKFLOW_NAME} \
--component Telemetry-Vampire \
--max-idle 4 --destination stderr --verbose ) || exit 99 &
sleep 1
# start client 1 # start client 1
(env MSTRO_COMPONENT_NAME="C1" MSTRO_MIO_CONFIG=mio-config-C1.yaml ${CLIENT_CMD}) || exit 99 & (env MSTRO_COMPONENT_NAME="C1" MSTRO_MIO_CONFIG=mio-config-C1.yaml ${CLIENT_CMD}) || exit 99 &
......
...@@ -42,20 +42,34 @@ ...@@ -42,20 +42,34 @@
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <syslog.h>
#include <stdarg.h>
#define NSEC_PER_SEC ((mstro_nanosec_t)1000*1000*1000) #define NSEC_PER_SEC ((mstro_nanosec_t)1000*1000*1000)
const mstro_nanosec_t MAX_WAIT = ((mstro_nanosec_t)15)*NSEC_PER_SEC; /* 15s */ const mstro_nanosec_t DEFAULT_MAX_WAIT = ((mstro_nanosec_t)15)*NSEC_PER_SEC; /* 15s */
/** Configurable settings */ /** Configurable settings */
/** workflow name to attach to (default: from env)*/
char *g_conf_workflow_name = NULL; char *g_conf_workflow_name = NULL;
/** component name to use (default: argv[0])*/
char *g_conf_component_name = NULL; char *g_conf_component_name = NULL;
/** component name after whose BYE event we should terminate (default: none) */
char *g_conf_terminate_after = NULL; char *g_conf_terminate_after = NULL;
mstro_nanosec_t g_conf_max_wait = MAX_WAIT; /** the appid for this component, as observed during its WELCOME */
mstro_app_id g_conf_terminate_after_appid = 0;
/** timeout after which to terminate, if no events are observed anymore (default: DEFAULT_MAX_WAIT) */
mstro_nanosec_t g_conf_max_wait = DEFAULT_MAX_WAIT;
/** pool manager info (default: from env) */
char *g_conf_pminfo = NULL; char *g_conf_pminfo = NULL;
#define LOGDST_STDOUT 1 #define LOGDST_STDOUT 1
#define LOGDST_STDERR 2 #define LOGDST_STDERR 2
#define LOGDST_MAESTRO 3 #define LOGDST_MAESTRO 3
#define LOGDST_SYSLOG 4 #define LOGDST_SYSLOG 4
/** logging destination */
int g_conf_logdst = LOGDST_STDOUT; int g_conf_logdst = LOGDST_STDOUT;
/** verbose operation */
bool g_verbose = true; bool g_verbose = true;
...@@ -77,8 +91,9 @@ print_usage(const char *argv0) ...@@ -77,8 +91,9 @@ print_usage(const char *argv0)
" --help, -h This help\n" " --help, -h This help\n"
" --verbose, -v Verbose output\n" " --verbose, -v Verbose output\n"
" --version, -V Version information\n" " --version, -V Version information\n"
,((double)MAX_WAIT)/NSEC_PER_SEC); ,((double)DEFAULT_MAX_WAIT)/NSEC_PER_SEC);
} }
/** argument parsing and setting configuration */ /** argument parsing and setting configuration */
mstro_status mstro_status
parse_arguments(int argc, char **argv) parse_arguments(int argc, char **argv)
...@@ -138,13 +153,13 @@ parse_arguments(int argc, char **argv) ...@@ -138,13 +153,13 @@ parse_arguments(int argc, char **argv)
break; break;
case 'd': { case 'd': {
const char *dst = opt.arg; const char *dst = opt.arg;
if(strcasecmp(dst,"sy")==0) { if(strncasecmp(dst,"sy",2)==0) {
g_conf_logdst=LOGDST_SYSLOG; g_conf_logdst=LOGDST_SYSLOG;
} else if(strcasecmp(dst,"stdo")==0) { } else if(strncasecmp(dst,"stdo",4)==0) {
g_conf_logdst=LOGDST_STDOUT; g_conf_logdst=LOGDST_STDOUT;
} else if(strcasecmp(dst,"stde")==0) { } else if(strncasecmp(dst,"stde",4)==0) {
g_conf_logdst=LOGDST_STDERR; g_conf_logdst=LOGDST_STDERR;
} else if(strcasecmp(dst,"m")==0) { } else if(strncasecmp(dst,"m",1)==0) {
g_conf_logdst=LOGDST_MAESTRO; g_conf_logdst=LOGDST_MAESTRO;
} else { } else {
fprintf(stderr, "Illegal output destination (or none) specified: %s\n", dst); fprintf(stderr, "Illegal output destination (or none) specified: %s\n", dst);
...@@ -204,6 +219,114 @@ parse_arguments(int argc, char **argv) ...@@ -204,6 +219,114 @@ parse_arguments(int argc, char **argv)
return MSTRO_OK; return MSTRO_OK;
} }
static void
store_telemetry_file(FILE* fp, mstro_pool_event e, const char *message, va_list args)
{
e=e; /* avoid unused arg */
vfprintf(fp, message, args);
}
static void
store_telemetry_syslog(mstro_pool_event e, const char *message, va_list args)
{
e=e; /* avoid unused arg */
vsyslog(LOG_ERR, message, args);
}
/** maestro/logging.h redefines LOG_ERROR from syslog.h, so we cannot include it at the top */
#undef LOG_ERR
#undef LOG_WARNING
#undef LOG_INFO
#undef LOG_DEBUG
#include "maestro/logging.h"
static void
store_telemetry_maestro(mstro_pool_event e, const char *message, va_list args)
{
e=e; /* avoid unused arg */
mstro_vlocation_aware_log(MSTRO_LOG_INFO, MSTRO_LOG_MODULE_TELEMETRY,
__FUNCTION__, __FILE__, __LINE__, message, args);
}
static void
store_telemetry_dispatch(mstro_pool_event e, const char *message, ...)
{
va_list ap;
va_start(ap, message);
switch(g_conf_logdst) {
case LOGDST_STDOUT:
store_telemetry_file(stdout, e, message, ap);
break;
case LOGDST_STDERR:
store_telemetry_file(stderr, e, message, ap);
break;
case LOGDST_MAESTRO:
store_telemetry_maestro(e, message, ap);
break;
case LOGDST_SYSLOG:
store_telemetry_syslog(e, message, ap);
break;
default:
fprintf(stderr, "Illegal telemetry log destination %d\n", g_conf_logdst);
}
va_end(ap);
}
static void
store_telemetry(mstro_pool_event e)
{
/* basic event type */
const char *kind = mstro_pool_event_description(e->kind);
/* Until that description is not detailed enough, generate extra info here for some events */
/* format: nested CSV "timestamp, event name, serial, { per-kind-info }" */
switch(e->kind) {
case MSTRO_POOL_EVENT_APP_JOIN:
/* { assigned appid, "component name" } */
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIappid ",\"%s\" }\n",
mstro_clock(),
kind, e->serial,
e->join.appid, e->join.component_name);
break;
case MSTRO_POOL_EVENT_APP_WELCOME:
/* { assigned appid, "component name" } */
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n",
mstro_clock(),
kind, e->serial, e->welcome.appid, e->welcome.component_name);
break;
case MSTRO_POOL_EVENT_APP_LEAVE:
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n",
mstro_clock(),
kind, e->serial, e->leave.appid);
break;
case MSTRO_POOL_EVENT_APP_BYE:
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 " }\n",
mstro_clock(),
kind, e->serial, e->bye.appid);
break;
case MSTRO_POOL_EVENT_DECLARE:
/* { appid, "cdo name" } */
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n",
mstro_clock(),
kind, e->serial, e->declare.appid, e->declare.cdo_name);
break;
case MSTRO_POOL_EVENT_SEAL:
/* { appid, "cdo name" } */
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { %" PRIu64 ",\"%s\"}\n",
mstro_clock(),
kind, e->serial, e->seal.appid, e->seal.cdo_name);
break;
default:
store_telemetry_dispatch(e, "%" PRIu64 ",%s,%" PRIu64 ", { }\n",
mstro_clock(),
kind, e->serial);
}
}
/** main loop: handle events and log them */ /** main loop: handle events and log them */
mstro_status mstro_status
event_loop(void) event_loop(void)
...@@ -229,19 +352,17 @@ event_loop(void) ...@@ -229,19 +352,17 @@ event_loop(void)
s=mstro_cdo_selector_dispose(selector); s=mstro_cdo_selector_dispose(selector);
/* Subscribe to JOIN and LEAVE. /* Subscribe to JOIN and LEAVE ('after' events, after handling on the PM).
* *
* That allows us to log component names and implement a termination * That allows us to log component names and implement a termination
* criterion like "if component X has left, terminate telemetry * criterion like "if component X has left, terminate telemetry
* listener" */ * listener" */
mstro_subscription join_leave_subscription=NULL; mstro_subscription join_leave_subscription=NULL;
s=mstro_subscribe( s=mstro_subscribe(
NULL, NULL, MSTRO_POOL_EVENT_APP_JOIN|MSTRO_POOL_EVENT_APP_LEAVE,
( MSTRO_POOL_EVENT_APP_JOIN | MSTRO_POOL_EVENT_APP_LEAVE),
MSTRO_SUBSCRIPTION_OPTS_DEFAULT, MSTRO_SUBSCRIPTION_OPTS_DEFAULT,
&join_leave_subscription); &join_leave_subscription);
bool done = false; bool done = false;
mstro_nanosec_t starttime = mstro_clock(); mstro_nanosec_t starttime = mstro_clock();
...@@ -254,49 +375,89 @@ event_loop(void) ...@@ -254,49 +375,89 @@ event_loop(void)
s=mstro_subscription_poll(join_leave_subscription, &e); s=mstro_subscription_poll(join_leave_subscription, &e);
if(e!=NULL) { if(e!=NULL) {
/* reset start time, since we saw some event */
starttime = mstro_clock();
if(g_verbose)
fprintf(stdout, "JOIN/LEAVE event(s)\n"); fprintf(stdout, "JOIN/LEAVE event(s)\n");
mstro_pool_event tmp=e; mstro_pool_event tmp=e;
/* handle all */ /* handle all */
while(tmp) { while(tmp) {
store_telemetry(tmp);
switch(tmp->kind) { switch(tmp->kind) {
case MSTRO_POOL_EVENT_APP_JOIN: case MSTRO_POOL_EVENT_APP_JOIN:
if(g_verbose)
fprintf(stdout, "Noticed JOIN event of app %s\n", fprintf(stdout, "Noticed JOIN event of app %s\n",
tmp->join.component_name); tmp->join.component_name);
if(g_conf_terminate_after) {
if(strcmp(g_conf_terminate_after, tmp->join.component_name)==0) {
if(g_verbose) {
fprintf(stdout, "App recognized as the one that will trigger termination of this listener\n");
}
g_conf_terminate_after_appid = tmp->join.appid;
}
}
break; break;
case MSTRO_POOL_EVENT_APP_LEAVE: case MSTRO_POOL_EVENT_APP_LEAVE:
if(g_verbose) {
fprintf(stdout, "Noticed LEAVE event of app %" PRIappid "\n", fprintf(stdout, "Noticed LEAVE event of app %" PRIappid "\n",
tmp->leave.appid); tmp->leave.appid);
}
if(g_conf_terminate_after) {
if(g_conf_terminate_after_appid==tmp->leave.appid) {
if(g_verbose) {
fprintf(stdout, "LEAVE of %" PRIappid " triggers termination of telemetry listener\n",
tmp->leave.appid);
}
done=true;
}
}
break; break;
default: default:
fprintf(stderr, "Unexpected event %d\n", tmp->kind); fprintf(stderr, "Unexpected event %d\n", tmp->kind);
} }
if(g_verbose)
fflush(stdout); fflush(stdout);
tmp=tmp->next; tmp=tmp->next;
} }
/* acknowledge all */ /* acknowledge all */
s=mstro_subscription_ack(join_leave_subscription, e); /* no need to ack these events s=mstro_subscription_ack(join_leave_subscription, e); */
/* dispose all */ /* dispose all */
s=mstro_pool_event_dispose(e); s=mstro_pool_event_dispose(e);
} else { } else {
s=mstro_subscription_poll(cdo_subscription, &e); s=mstro_subscription_poll(cdo_subscription, &e);
if(e!=NULL) { if(e!=NULL) {
/* reset start time, since we saw some event */
starttime = mstro_clock();
mstro_pool_event tmp=e; mstro_pool_event tmp=e;
/* handle all */ /* handle all */
while(tmp) { while(tmp) {
store_telemetry(tmp);
switch(tmp->kind) { switch(tmp->kind) {
case MSTRO_POOL_EVENT_DECLARE: case MSTRO_POOL_EVENT_DECLARE:
case MSTRO_POOL_EVENT_SEAL: case MSTRO_POOL_EVENT_SEAL:
if(g_verbose) {
fprintf(stdout, "CDO event %s\n", fprintf(stdout, "CDO event %s\n",
mstro_pool_event_description(tmp->kind)); mstro_pool_event_description(tmp->kind));
}
break; break;
default: default:
fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind); fprintf(stderr, "Unexpected CDO event %d\n", tmp->kind);
} }
tmp=tmp->next; tmp=tmp->next;
} }
/* acknowledge all */ /* acknowledge all */
s=mstro_subscription_ack(cdo_subscription, e); /* no need to ack these events s=mstro_subscription_ack(cdo_subscription, e); */
/* dispose all */ /* dispose all */
s=mstro_pool_event_dispose(e); s=mstro_pool_event_dispose(e);
} else { } else {
...@@ -305,9 +466,9 @@ event_loop(void) ...@@ -305,9 +466,9 @@ event_loop(void)
} }
} }
if(mstro_clock()>starttime+MAX_WAIT) { if(mstro_clock()>starttime+DEFAULT_MAX_WAIT) {
fprintf(stderr, "Waited %" PRIu64 "s and still not done\n", fprintf(stderr, "Waited %" PRIu64 "s and still not done\n",
MAX_WAIT/(1000*1000*1000)); DEFAULT_MAX_WAIT/(1000*1000*1000));
done=true; done=true;
} }
if(done) if(done)
...@@ -333,6 +494,9 @@ main(int argc, char ** argv) ...@@ -333,6 +494,9 @@ main(int argc, char ** argv)
fprintf(stderr, "Failed to parse arguments\n"); fprintf(stderr, "Failed to parse arguments\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if(g_conf_logdst==LOGDST_SYSLOG) {
openlog(g_conf_component_name, LOG_CONS|LOG_PID, LOG_USER);
}
/** ensure pool info is set (by user or args) */ /** ensure pool info is set (by user or args) */
if(getenv(MSTRO_ENV_POOL_INFO)==NULL ) { if(getenv(MSTRO_ENV_POOL_INFO)==NULL ) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment