From 84c7c2b3443c5f74d2fe794a1401527af4beb33a Mon Sep 17 00:00:00 2001
From: Utz-Uwe Haus <uhaus@cray.com>
Date: Thu, 22 Jul 2021 18:14:42 +0200
Subject: [PATCH] Implement Heartbeat statistics output

Default is disabled.

To enable, set environment variable MSTRO_TELEMTRY_HEARTBEAT to
a (decimal, possibly fractional) value of seconds between reports.

Currently will log a CSV line of statistics under INFO priority and 'HB'
thread identifier at every wakeup; once MIO Telemetry is integrated,
the configured telemetry output channels and methods will be used
instead of the logging infrastructure.
---
 include/maestro/env.h |  15 +++++
 maestro/Makefile.am   |   1 +
 maestro/core.c        |  12 ++++
 maestro/heartbeat.c   | 152 ++++++++++++++++++++++++++++++++++++++++++
 maestro/i_heartbeat.h |  53 +++++++++++++++
 5 files changed, 233 insertions(+)
 create mode 100644 maestro/heartbeat.c
 create mode 100644 maestro/i_heartbeat.h

diff --git a/include/maestro/env.h b/include/maestro/env.h
index 8129cba1..4a21202d 100644
--- a/include/maestro/env.h
+++ b/include/maestro/env.h
@@ -199,6 +199,21 @@
  **/
 #define MSTRO_ENV_START_POOL_MANAGER "MSTRO_START_POOL_MANAGER"
 
+/**
+ ** @brief Enable hearbeat timer
+ **
+ ** Enable a heartbeat timer that generates telemetry events at
+ ** regular intervals.
+ **
+ ** The time specified is in seconds, but (decimal) fractional values
+ ** are supported, subject to the timer resolution of the underlying
+ ** OS (and the posix timers API).
+ **
+ ** A (mostly idle) thread is used for this purpose, so jitter may be
+ ** introduced by this, which is why it is disabled by default.
+ **/
+#define MSTRO_ENV_TELEMETRY_HEARTBEAT "MSTRO_TELEMETRY_HEARTBEAT"
+
 /**
  ** @brief Which transport method to choose by default
  **/
diff --git a/maestro/Makefile.am b/maestro/Makefile.am
index fb27d7f2..7add4337 100644
--- a/maestro/Makefile.am
+++ b/maestro/Makefile.am
@@ -71,6 +71,7 @@ libmaestro_core_la_SOURCES = \
 	i_maestro_numa.h \
 	memlock.c \
 	memory.c \
+	i_heartbeat.h heartbeat.c \
         cdo-attributes-default.txt 
 
 #	mempool.c 
diff --git a/maestro/core.c b/maestro/core.c
index 60308871..55dfc227 100644
--- a/maestro/core.c
+++ b/maestro/core.c
@@ -42,6 +42,7 @@
 #include "mamba.h"
 
 #include "i_subscription_registry.h"
+#include "i_heartbeat.h"
 
 #include <pthread.h>
 #include <stdio.h>
@@ -428,6 +429,11 @@ mstro_core_init(const char *workflow_name,
     }
   }
 
+  mstro_status stat = mstro_core_heartbeat_init();
+  if(stat!=MSTRO_OK) {
+    return MSTRO_FAIL;
+  }
+
   /* sanity check if someone changes the NAME_MAX or adds new component sizes */
   assert(sizeof(g_component_descriptor)==2U<<14);
 
@@ -623,6 +629,12 @@ mstro_core_finalize(void)
     goto BAILOUT;
   }
 
+  status = mstro_core_heartbeat_finalize();
+  if(status!=MSTRO_OK) {
+    ERR("Heartbeat finalization failed: %d\n", status);
+    goto BAILOUT;
+  }
+
 
   pthread_mutex_lock(&g_initdata_mtx);
   assert(g_initdata!=NULL);
diff --git a/maestro/heartbeat.c b/maestro/heartbeat.c
new file mode 100644
index 00000000..08541575
--- /dev/null
+++ b/maestro/heartbeat.c
@@ -0,0 +1,152 @@
+/* -*- mode:c -*- */
+/** @file
+ ** @brief Maestro Heartbeat infrastructure
+ **/
+/*
+ * Copyright (C) 2021 HPE Switzerland GmbH
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "maestro/core.h"
+#include "maestro/env.h"
+#include "maestro/logging.h"
+#include "maestro/i_globals.h"
+#include "maestro/i_statistics.h"
+
+#include "i_heartbeat.h"
+
+#include <pthread.h>
+#include <string.h>
+#include <assert.h>
+
+
+/* simplify logging */
+#define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_CORE,__VA_ARGS__)
+#define INFO(...)  LOG_INFO(MSTRO_LOG_MODULE_CORE,__VA_ARGS__)
+#define WARN(...)  LOG_WARN(MSTRO_LOG_MODULE_CORE,__VA_ARGS__)
+#define ERR(...)   LOG_ERR(MSTRO_LOG_MODULE_CORE,__VA_ARGS__)
+
+
+#define NFREE(x) do { if (x!=NULL) free(x); } while(0)
+
+_Atomic(bool) g_heatbeat_initialized = false;
+
+/** handle to the heartbeat thread */
+static pthread_t g_heartbeat_thread;
+
+/** desired heartbeat interval */
+static struct timespec g_sleep_ts;
+
+#define NSEC_PER_SEC (1000000000)
+static
+void *
+mstro_heartbeat_threadfun(void *closure)
+{
+  const struct timespec *ts = (const struct timespec*)closure;
+  struct timespec tsrem = { 0, 0 };
+  char *tid = strdup("HB");
+
+  assert(tid!=NULL);
+
+  int s = pthread_setspecific(g_thread_descriptor_key, tid);
+  if(s!=0) {
+    ERR("Failed to set thread identifier; likely mstro_init has not been called.\n");
+  }
+
+  DEBUG("Started Heartbeat thread with %.2fs sleep duration\n",
+        ts->tv_sec+(double)ts->tv_nsec/(double)NSEC_PER_SEC);
+  
+  struct timespec reqts = *ts;
+  while(1) {
+    s = nanosleep(&reqts, &tsrem);
+    if(s!=0) {
+      DEBUG("nanosleep returned errno %d (%s), resuming for %.2fs\n",
+            errno, strerror(errno),
+            tsrem.tv_sec+(double)tsrem.tv_nsec/(double)NSEC_PER_SEC);
+      reqts = tsrem;
+      continue;
+    }
+    
+    /* successful completion: run per-heartbeat operations */
+    mstro_status s = mstro_stats_report_csv(NULL);
+    if(s!=MSTRO_OK) {
+      ERR("Failed to report statistics\n");
+    }    
+    
+    reqts = *ts;
+  }
+  return NULL;
+}
+
+mstro_status
+mstro_core_heartbeat_init(void)
+{
+  char *heartbeat = getenv(MSTRO_ENV_TELEMETRY_HEARTBEAT);
+  if(heartbeat==NULL) {
+    DEBUG("Heartbeat disabled\n");
+    return MSTRO_OK;
+  } else {
+    double interval = strtod(heartbeat, NULL);
+    /* DEBUG("Found %s=%f\n", */
+    /*       MSTRO_ENV_TELEMETRY_HEARTBEAT, interval); */
+    if(interval<0) {
+      return MSTRO_INVARG;
+    }
+    
+    g_sleep_ts.tv_sec = (time_t) floor(interval);
+    g_sleep_ts.tv_nsec = (long) ((interval-floor(interval))*
+                                 NSEC_PER_SEC);
+    
+    int s = pthread_create(&g_heartbeat_thread, NULL,
+                           mstro_heartbeat_threadfun, &g_sleep_ts);
+    if(s!=0) {
+      ERR("Failed to create heartbeat thread: %d (%s)\n", s, strerror(s));
+      return MSTRO_FAIL;
+    }
+    g_heatbeat_initialized = true;
+  }
+  return MSTRO_OK;
+}
+
+mstro_status
+mstro_core_heartbeat_finalize(void)
+{
+  if(g_heatbeat_initialized != true) {
+    /* nothing to stop here */
+    return MSTRO_OK;
+  };
+  
+  int s = pthread_cancel(g_heartbeat_thread);
+  if(s!=0) {
+    ERR("Failed to cancel heartbeat thread: %d (%s)\n",
+        s, strerror(s));
+    return MSTRO_FAIL;
+  } else {
+    DEBUG("Terminated heartbeat thread\n");
+    return MSTRO_OK;
+  }
+}
diff --git a/maestro/i_heartbeat.h b/maestro/i_heartbeat.h
new file mode 100644
index 00000000..ef45ebdf
--- /dev/null
+++ b/maestro/i_heartbeat.h
@@ -0,0 +1,53 @@
+/* -*- mode:c -*- */
+/** @file
+ ** @brief Heartbeat infrastructure for Maestro
+ **
+ **/
+/*
+ * Copyright (C) 2021 HPE Switzerland GmbH
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * 3. Neither the name of the copyright holder nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef MSTRO_I_HEARTBEAT_H_
+#define MSTRO_I_HEARTBEAT_H_ 1
+
+
+/** possibly start heartbeat subsystem (depends on @ref
+ * MSTRO_ENV_TELEMETRY_HEARTBEAT being set) */
+mstro_status
+mstro_core_heartbeat_init(void);
+
+/** stop heartbeat subsystem */
+mstro_status
+mstro_core_heartbeat_finalize(void);
+
+
+#endif
+  
+  
-- 
GitLab