diff --git a/include/maestro/env.h b/include/maestro/env.h index 0d95b73a21a19b4fbae924559b4183bcd904d61d..f14e36c450c50656b813d70ed3c5ccca5fb55953 100644 --- a/include/maestro/env.h +++ b/include/maestro/env.h @@ -191,6 +191,17 @@ #define MSTRO_ENV_TRANSPORT_DEFAULT "MSTRO_TRANSPORT_DEFAULT" +/** + ** @brief Directory for GFS transport + ** + ** Must be visible on all workflow components that want to use GFS + ** transport. Should be a high-performance global file system location. + ** + ** Default is "./", the directory where the application is started + ** (which will often work, but not always). + **/ +#define MSTRO_ENV_TRANSPORT_GFS_DIR "MSTRO_TRANSPORT_GFS_DIR" + /** ** @brief Path to the MIO transport config file ** diff --git a/include/maestro/i_globals.h b/include/maestro/i_globals.h index 60c432eb04abf1e235611fad00ba8ad2776796d0..7ad8b8cfcb7f615cf21d7b0f61f5bea2ff7e1689 100644 --- a/include/maestro/i_globals.h +++ b/include/maestro/i_globals.h @@ -159,4 +159,12 @@ extern union mstro_component_descriptor g_component_descriptor; | (MSTRO_POOL_PROTOCOL_VERSION_MINOR << 8) \ | (MSTRO_POOL_PROTOCOL_VERSION_PATCH << 0))) + + +/** the precomputed default path for GFS transport (incl. trailing '/') */ +extern char *g_mstro_transport_gfs_dir; +/** string length of @ref g_mstro_transport_gfs_dir */ +extern size_t g_mstro_transport_gfs_dir_len; + + #endif /* MAESTRO_I_GLOBALS_H_ */ diff --git a/maestro/core.c b/maestro/core.c index 83c1fdfb718fb12a33c1711e5a80cf50812c52fc..bb9c04a7e4825c767ecdb838c304b30bfb27f34f 100644 --- a/maestro/core.c +++ b/maestro/core.c @@ -196,6 +196,53 @@ mstro_core_init(const char *workflow_name, strncpy(g_component_descriptor.component_name, data->component_name, MSTRO_WORKFLOW_NAME_MAX-1); g_component_descriptor.component_name[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; strcpy(g_component_descriptor.version, mstro_version()); + + + /* check GFS path setting */ + { + char *gfs_dir = getenv(MSTRO_ENV_TRANSPORT_GFS_DIR); + if(gfs_dir!=NULL) { + bool need_sep = false; + size_t l1 = strlen(gfs_dir); + assert(l1>0); + if(gfs_dir[l1-1]!='/') { + need_sep=true; + } + size_t l2 = strlen(data->workflow_name); + size_t l3 = strlen(data->component_name); + + g_mstro_transport_gfs_dir = malloc(l1 + (need_sep? 1 : 0) + + l2 + 1 + + l3 + 1 +1); + if(g_mstro_transport_gfs_dir==NULL) { + ERR("Failed to allocate GFS transport dir pathname\n"); + return MSTRO_NOMEM; + } + size_t pos = 0; + strcpy(g_mstro_transport_gfs_dir, gfs_dir); + pos += l1; + if(need_sep) { + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + } + strcpy(g_mstro_transport_gfs_dir+pos, data->workflow_name); + pos += l2; + + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + + strcpy(g_mstro_transport_gfs_dir+pos, data->component_name); + pos += l3; + g_mstro_transport_gfs_dir[pos] = '/'; + pos++; + g_mstro_transport_gfs_dir[pos] = '\0'; + g_mstro_transport_gfs_dir_len = pos; + assert(pos==strlen(g_mstro_transport_gfs_dir)); + + DEBUG("Set GFS transport directory to %s\n", + g_mstro_transport_gfs_dir); + } + } status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE, MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN, diff --git a/maestro/globals.c b/maestro/globals.c index 6bb1045630c41ba5f26cc102a81dcc186e80fa33..7aafb62eef4ddc33d01d211f4f8a4c6cdd923ff0 100644 --- a/maestro/globals.c +++ b/maestro/globals.c @@ -85,3 +85,12 @@ mstro_schema g_mstro_core_schema_instance = NULL; bool g_have_pool_manager=false; union mstro_component_descriptor g_component_descriptor; + + +/** the precomputed default path for GFS transport (incl. trailing '/') */ +char *g_mstro_transport_gfs_dir = "./"; /* possibly overwritten at init + * time from env*/ + +/** string length of @ref g_mstro_transport_gfs_dir */ +size_t g_mstro_transport_gfs_dir_len = 2; /* possibly overwritten at + * init time */ diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 774d091190795c977ad40f313e881e400fa4d5de..4a7538bed1852aa6722863339fb8d2564f0f4789 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -269,6 +269,31 @@ mstro_pc__select_transfer_method(mstro_cdo cdo, return MSTRO_OK; } +static inline +mstro_status +mstro_pc__construct_gfs_path_for_cdo(const mstro_cdo src_cdo, + char **path) +{ + mstro_status s=MSTRO_UNIMPL; + assert(path!=NULL); + assert(src_cdo!=NULL); + + size_t l = g_mstro_transport_gfs_dir_len + strlen(src_cdo->name) + 1; + *path = malloc(l); + if(*path==NULL) { + return MSTRO_NOMEM; + } + strcpy(*path, g_mstro_transport_gfs_dir); + + /* FIXME: possibly add a (configurable?) number of subdirectories + * based on first segment of CDO name */ + + strcpy((*path)+g_mstro_transport_gfs_dir_len, src_cdo->name); + DEBUG("Constructed GFS transport path %s\n", *path); + return MSTRO_OK; +} + + static inline mstro_status mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) @@ -398,7 +423,12 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) switch(ticket.ticket_case) { case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: INFO("TICKET CASE GFS\n"); - gfs.path = src_cdo->name; + status= mstro_pc__construct_gfs_path_for_cdo(src_cdo, &gfs.path); + if(status!=MSTRO_OK) { + ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n", + status, mstro_status_description(status)); + return status; + } gfs.keep_file = 0; // Arbitrarily rm the transport file on dst break; case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: diff --git a/transport/transport.c b/transport/transport.c index 230260565a21e2c389e83f271cb4e4a8e69cd571..4c3f6d73c038e50792a953809daa591e908582ca 100644 --- a/transport/transport.c +++ b/transport/transport.c @@ -43,6 +43,8 @@ #include "maestro/i_cdo.h" #include <inttypes.h> +#include <sys/stat.h> + #include "transport/transport_gfs.h" #ifdef HAVE_MIO @@ -170,11 +172,63 @@ mstro_transport__src_datalen_get(mstro_cdo src, return MSTRO_OK; } +/** find next separator in PATH. Return the start of that segment */ +static inline +const char * next_sep (const char *path) +{ + while (*path) + if (*path == '/' || *path == '\\') + return path; + else + path++; + return NULL; +} + +/** ensure all directories in DIRNAME exist. If not, try creating them */ +static inline +mstro_status +mkdirhier(char *dirname) +{ + char buf[PATH_MAX]; + const char *prev = dirname; + const char *next; + struct stat sb; + + while(NULL!=(next = next_sep(prev))) { + strncpy(buf, dirname, next - dirname) ; + buf[next - dirname] = '\0'; + if(stat(buf,&sb)) { + mkdir(buf, 0777); /* umask taken into account by system */ + /* we ignore errors, as we'll see an error on the last part */ + } + prev=next+1; + } + /* handle last part */ + if(stat(dirname, &sb)) { + int s = mkdir(dirname, 0777); + if(s!=0) { + ERR("Failed to create GFS transport directory %s: %d (%s)\n", + dirname, errno, strerror(errno)); + return MSTRO_FAIL; + } + } + return MSTRO_OK; +} + mstro_status mstro_transport_init() { srand(time(NULL)); /* We'll need that to generate random IDs */ + + /* ensure GFS dir is set up */ + mstro_status s = mkdirhier(g_mstro_transport_gfs_dir); + if(s!=MSTRO_OK) { + ERR("Failed to initialize GFS transport. Check your setting of %s\n", + MSTRO_ENV_TRANSPORT_GFS_DIR); + return s; + } + #ifdef HAVE_MIO /* MIO */ if(g_mio_available) {