From 644a0b1c1db14b564c771fffe0ed41c6444fec48 Mon Sep 17 00:00:00 2001 From: Utz-Uwe Haus <uhaus@cray.com> Date: Wed, 17 Feb 2021 16:37:00 +0100 Subject: [PATCH] implement explicit GFS transport dir support --- include/maestro/env.h | 11 ++++++++ include/maestro/i_globals.h | 8 ++++++ maestro/core.c | 47 ++++++++++++++++++++++++++++++++ maestro/globals.c | 9 +++++++ maestro/pool_client.c | 32 +++++++++++++++++++++- transport/transport.c | 54 +++++++++++++++++++++++++++++++++++++ 6 files changed, 160 insertions(+), 1 deletion(-) diff --git a/include/maestro/env.h b/include/maestro/env.h index 0d95b73a..f14e36c4 100644 --- a/include/maestro/env.h +++ b/include/maestro/env.h @@ -191,6 +191,17 @@ #define MSTRO_ENV_TRANSPORT_DEFAULT "MSTRO_TRANSPORT_DEFAULT" +/** + ** @brief Directory for GFS transport + ** + ** Must be visible on all workflow components that want to use GFS + ** transport. Should be a high-performance global file system location. + ** + ** Default is "./", the directory where the application is started + ** (which will often work, but not always). + **/ +#define MSTRO_ENV_TRANSPORT_GFS_DIR "MSTRO_TRANSPORT_GFS_DIR" + /** ** @brief Path to the MIO transport config file ** diff --git a/include/maestro/i_globals.h b/include/maestro/i_globals.h index 60c432eb..7ad8b8cf 100644 --- a/include/maestro/i_globals.h +++ b/include/maestro/i_globals.h @@ -159,4 +159,12 @@ extern union mstro_component_descriptor g_component_descriptor; | (MSTRO_POOL_PROTOCOL_VERSION_MINOR << 8) \ | (MSTRO_POOL_PROTOCOL_VERSION_PATCH << 0))) + + +/** the precomputed default path for GFS transport (incl. trailing '/') */ +extern char *g_mstro_transport_gfs_dir; +/** string length of @ref g_mstro_transport_gfs_dir */ +extern size_t g_mstro_transport_gfs_dir_len; + + #endif /* MAESTRO_I_GLOBALS_H_ */ diff --git a/maestro/core.c b/maestro/core.c index 83c1fdfb..bb9c04a7 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -196,6 +196,53 @@ mstro_core_init(const char *workflow_name, strncpy(g_component_descriptor.component_name, data->component_name, MSTRO_WORKFLOW_NAME_MAX-1); g_component_descriptor.component_name[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; strcpy(g_component_descriptor.version, mstro_version()); + + + /* check GFS path setting */ + { + char *gfs_dir = getenv(MSTRO_ENV_TRANSPORT_GFS_DIR); + if(gfs_dir!=NULL) { + bool need_sep = false; + size_t l1 = strlen(gfs_dir); + assert(l1>0); + if(gfs_dir[l1-1]!='/') { + need_sep=true; + } + size_t l2 = strlen(data->workflow_name); + size_t l3 = strlen(data->component_name); + + g_mstro_transport_gfs_dir = malloc(l1 + (need_sep? 1 : 0) + + l2 + 1 + + l3 + 1 +1); + if(g_mstro_transport_gfs_dir==NULL) { + ERR("Failed to allocate GFS transport dir pathname\n"); + return MSTRO_NOMEM; + } + size_t pos = 0; + strcpy(g_mstro_transport_gfs_dir, gfs_dir); + pos += l1; + if(need_sep) { + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + } + strcpy(g_mstro_transport_gfs_dir+pos, data->workflow_name); + pos += l2; + + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + + strcpy(g_mstro_transport_gfs_dir+pos, data->component_name); + pos += l3; + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + g_mstro_transport_gfs_dir[pos] = '\0'; + g_mstro_transport_gfs_dir_len = pos; + assert(pos==strlen(g_mstro_transport_gfs_dir)); + + DEBUG("Set GFS transport directory to %s\n", + g_mstro_transport_gfs_dir); + } + } status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE, MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN, diff --git a/maestro/globals.c b/maestro/globals.c index 6bb10456..7aafb62e 100644 --- a/maestro/globals.c +++ b/maestro/globals.c @@ -85,3 +85,12 @@ mstro_schema g_mstro_core_schema_instance = NULL; bool g_have_pool_manager=false; union mstro_component_descriptor g_component_descriptor; + + +/** the precomputed default path for GFS transport (incl. trailing '/') */ +char *g_mstro_transport_gfs_dir = "./"; /* possibly overwritten at init + * time from env*/ + +/** string length of @ref g_mstro_transport_gfs_dir */ +size_t g_mstro_transport_gfs_dir_len = 2; /* possibly overwritten at + * init time */ diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 774d0911..4a7538be 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -269,6 +269,31 @@ mstro_pc__select_transfer_method(mstro_cdo cdo, return MSTRO_OK; } +static inline +mstro_status +mstro_pc__construct_gfs_path_for_cdo(const mstro_cdo src_cdo, + char **path) +{ + mstro_status s=MSTRO_UNIMPL; + assert(path!=NULL); + assert(src_cdo!=NULL); + + size_t l = g_mstro_transport_gfs_dir_len + strlen(src_cdo->name) + 1; + *path = malloc(l); + if(*path==NULL) { + return MSTRO_NOMEM; + } + strcpy(*path, g_mstro_transport_gfs_dir); + + /* FIXME: possibly add a (configurable?) number of subdirectories + * based on first segment of CDO name */ + + strcpy((*path)+g_mstro_transport_gfs_dir_len, src_cdo->name); + DEBUG("Constructed GFS transport path %s\n", *path); + return MSTRO_OK; +} + + static inline mstro_status mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) @@ -398,7 +423,12 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) switch(ticket.ticket_case) { case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: INFO("TICKET CASE GFS\n"); - gfs.path = src_cdo->name; + status= mstro_pc__construct_gfs_path_for_cdo(src_cdo, &gfs.path); + if(status!=MSTRO_OK) { + ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n", + status, mstro_status_description(status)); + return status; + } gfs.keep_file = 0; // Arbitrarily rm the transport file on dst break; case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: diff --git a/transport/transport.c b/transport/transport.c index 23026056..4c3f6d73 100644 --- a/transport/transport.c +++ b/transport/transport.c @@ -43,6 +43,8 @@ #include "maestro/i_cdo.h" #include <inttypes.h> +#include <sys/stat.h> + #include "transport/transport_gfs.h" #ifdef HAVE_MIO @@ -170,11 +172,63 @@ mstro_transport__src_datalen_get(mstro_cdo src, return MSTRO_OK; } +/** find next separator in PATH. Return the start of that segment */ +static inline +const char * next_sep (const char *path) +{ + while (*path) + if (*path == '/' || *path == '\\') + return path; + else + path++; + return NULL; +} + +/** ensure all directories in DIRNAME exist. If not, try creating them */ +static inline +mstro_status +mkdirhier(char *dirname) +{ + char buf[PATH_MAX]; + const char *prev = dirname; + const char *next; + struct stat sb; + + while(NULL!=(next = next_sep(prev))) { + strncpy(buf, dirname, next - dirname) ; + buf[next - dirname] = '\0'; + if(stat(buf,&sb)) { + mkdir(buf, 0777); /* umask taken into account by system */ + /* we ignore errors, as we'll see an error on the last part */ + } + prev=next+1; + } + /* handle last part */ + if(stat(dirname, &sb)) { + int s = mkdir(dirname, 0777); + if(s!=0) { + ERR("Failed to create GFS transport directory %s: %d (%s)\n", + dirname, errno, strerror(errno)); + return MSTRO_FAIL; + } + } + return MSTRO_OK; +} + mstro_status mstro_transport_init() { srand(time(NULL)); /* We'll need that to generate random IDs */ + + /* ensure GFS dir is set up */ + mstro_status s = mkdirhier(g_mstro_transport_gfs_dir); + if(s!=MSTRO_OK) { + ERR("Failed to initialize GFS transport. Check your setting of %s\n", + MSTRO_ENV_TRANSPORT_GFS_DIR); + return s; + } + #ifdef HAVE_MIO /* MIO */ if(g_mio_available) { -- GitLab