diff --git a/include/maestro/env.h b/include/maestro/env.h index 8129cba1f79033118804603a93a4708ec8065e14..4a21202d2338dffd16f79651ae171146197b1790 100644 --- a/include/maestro/env.h +++ b/include/maestro/env.h @@ -199,6 +199,21 @@ **/ #define MSTRO_ENV_START_POOL_MANAGER "MSTRO_START_POOL_MANAGER" +/** + ** @brief Enable hearbeat timer + ** + ** Enable a heartbeat timer that generates telemetry events at + ** regular intervals. + ** + ** The time specified is in seconds, but (decimal) fractional values + ** are supported, subject to the timer resolution of the underlying + ** OS (and the posix timers API). + ** + ** A (mostly idle) thread is used for this purpose, so jitter may be + ** introduced by this, which is why it is disabled by default. + **/ +#define MSTRO_ENV_TELEMETRY_HEARTBEAT "MSTRO_TELEMETRY_HEARTBEAT" + /** ** @brief Which transport method to choose by default **/ diff --git a/maestro/Makefile.am b/maestro/Makefile.am index fb27d7f259e80211b174b1f62f600ee157dc9a33..7add43375c055f38f1baa7d353cddf1763e2d90f 100644 --- a/maestro/Makefile.am +++ b/maestro/Makefile.am @@ -71,6 +71,7 @@ libmaestro_core_la_SOURCES = \ i_maestro_numa.h \ memlock.c \ memory.c \ + i_heartbeat.h heartbeat.c \ cdo-attributes-default.txt # mempool.c diff --git a/maestro/core.c b/maestro/core.c index 60308871e28ac66507f3f72dc63cb4199d3f5d72..55dfc227b9d19bf6f4178a4a92ad9e989bb42b15 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -42,6 +42,7 @@ #include "mamba.h" #include "i_subscription_registry.h" +#include "i_heartbeat.h" #include <pthread.h> #include <stdio.h> @@ -428,6 +429,11 @@ mstro_core_init(const char *workflow_name, } } + mstro_status stat = mstro_core_heartbeat_init(); + if(stat!=MSTRO_OK) { + return MSTRO_FAIL; + } + /* sanity check if someone changes the NAME_MAX or adds new component sizes */ assert(sizeof(g_component_descriptor)==2U<<14); @@ -623,6 +629,12 @@ mstro_core_finalize(void) goto BAILOUT; } + status = mstro_core_heartbeat_finalize(); + if(status!=MSTRO_OK) { + ERR("Heartbeat finalization failed: %d\n", status); + goto BAILOUT; + } + pthread_mutex_lock(&g_initdata_mtx); assert(g_initdata!=NULL); diff --git a/maestro/heartbeat.c b/maestro/heartbeat.c new file mode 100644 index 0000000000000000000000000000000000000000..08541575a75ba2d90cdc6774763532b6860259b4 --- /dev/null +++ b/maestro/heartbeat.c @@ -0,0 +1,152 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief Maestro Heartbeat infrastructure + **/ +/* + * Copyright (C) 2021 HPE Switzerland GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "maestro/core.h" +#include "maestro/env.h" +#include "maestro/logging.h" +#include "maestro/i_globals.h" +#include "maestro/i_statistics.h" + +#include "i_heartbeat.h" + +#include <pthread.h> +#include <string.h> +#include <assert.h> + + +/* simplify logging */ +#define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_CORE,__VA_ARGS__) +#define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_CORE,__VA_ARGS__) +#define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_CORE,__VA_ARGS__) +#define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_CORE,__VA_ARGS__) + + +#define NFREE(x) do { if (x!=NULL) free(x); } while(0) + +_Atomic(bool) g_heatbeat_initialized = false; + +/** handle to the heartbeat thread */ +static pthread_t g_heartbeat_thread; + +/** desired heartbeat interval */ +static struct timespec g_sleep_ts; + +#define NSEC_PER_SEC (1000000000) +static +void * +mstro_heartbeat_threadfun(void *closure) +{ + const struct timespec *ts = (const struct timespec*)closure; + struct timespec tsrem = { 0, 0 }; + char *tid = strdup("HB"); + + assert(tid!=NULL); + + int s = pthread_setspecific(g_thread_descriptor_key, tid); + if(s!=0) { + ERR("Failed to set thread identifier; likely mstro_init has not been called.\n"); + } + + DEBUG("Started Heartbeat thread with %.2fs sleep duration\n", + ts->tv_sec+(double)ts->tv_nsec/(double)NSEC_PER_SEC); + + struct timespec reqts = *ts; + while(1) { + s = nanosleep(&reqts, &tsrem); + if(s!=0) { + DEBUG("nanosleep returned errno %d (%s), resuming for %.2fs\n", + errno, strerror(errno), + tsrem.tv_sec+(double)tsrem.tv_nsec/(double)NSEC_PER_SEC); + reqts = tsrem; + continue; + } + + /* successful completion: run per-heartbeat operations */ + mstro_status s = mstro_stats_report_csv(NULL); + if(s!=MSTRO_OK) { + ERR("Failed to report statistics\n"); + } + + reqts = *ts; + } + return NULL; +} + +mstro_status +mstro_core_heartbeat_init(void) +{ + char *heartbeat = getenv(MSTRO_ENV_TELEMETRY_HEARTBEAT); + if(heartbeat==NULL) { + DEBUG("Heartbeat disabled\n"); + return MSTRO_OK; + } else { + double interval = strtod(heartbeat, NULL); + /* DEBUG("Found %s=%f\n", */ + /* MSTRO_ENV_TELEMETRY_HEARTBEAT, interval); */ + if(interval<0) { + return MSTRO_INVARG; + } + + g_sleep_ts.tv_sec = (time_t) floor(interval); + g_sleep_ts.tv_nsec = (long) ((interval-floor(interval))* + NSEC_PER_SEC); + + int s = pthread_create(&g_heartbeat_thread, NULL, + mstro_heartbeat_threadfun, &g_sleep_ts); + if(s!=0) { + ERR("Failed to create heartbeat thread: %d (%s)\n", s, strerror(s)); + return MSTRO_FAIL; + } + g_heatbeat_initialized = true; + } + return MSTRO_OK; +} + +mstro_status +mstro_core_heartbeat_finalize(void) +{ + if(g_heatbeat_initialized != true) { + /* nothing to stop here */ + return MSTRO_OK; + }; + + int s = pthread_cancel(g_heartbeat_thread); + if(s!=0) { + ERR("Failed to cancel heartbeat thread: %d (%s)\n", + s, strerror(s)); + return MSTRO_FAIL; + } else { + DEBUG("Terminated heartbeat thread\n"); + return MSTRO_OK; + } +} diff --git a/maestro/i_heartbeat.h b/maestro/i_heartbeat.h new file mode 100644 index 0000000000000000000000000000000000000000..ef45ebdfd6e7a5b07e6f9ce33b46640ed05d8a88 --- /dev/null +++ b/maestro/i_heartbeat.h @@ -0,0 +1,53 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief Heartbeat infrastructure for Maestro + ** + **/ +/* + * Copyright (C) 2021 HPE Switzerland GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef MSTRO_I_HEARTBEAT_H_ +#define MSTRO_I_HEARTBEAT_H_ 1 + + +/** possibly start heartbeat subsystem (depends on @ref + * MSTRO_ENV_TELEMETRY_HEARTBEAT being set) */ +mstro_status +mstro_core_heartbeat_init(void); + +/** stop heartbeat subsystem */ +mstro_status +mstro_core_heartbeat_finalize(void); + + +#endif + +