Skip to content
Snippets Groups Projects
Commit 644a0b1c authored by Utz-Uwe Haus's avatar Utz-Uwe Haus
Browse files

implement explicit GFS transport dir support

parent be127393
Branches
Tags
1 merge request!6Resolve "Make GFS CDO directory configurable"
...@@ -191,6 +191,17 @@ ...@@ -191,6 +191,17 @@
#define MSTRO_ENV_TRANSPORT_DEFAULT "MSTRO_TRANSPORT_DEFAULT" #define MSTRO_ENV_TRANSPORT_DEFAULT "MSTRO_TRANSPORT_DEFAULT"
/**
** @brief Directory for GFS transport
**
** Must be visible on all workflow components that want to use GFS
** transport. Should be a high-performance global file system location.
**
** Default is "./", the directory where the application is started
** (which will often work, but not always).
**/
#define MSTRO_ENV_TRANSPORT_GFS_DIR "MSTRO_TRANSPORT_GFS_DIR"
/** /**
** @brief Path to the MIO transport config file ** @brief Path to the MIO transport config file
** **
......
...@@ -159,4 +159,12 @@ extern union mstro_component_descriptor g_component_descriptor; ...@@ -159,4 +159,12 @@ extern union mstro_component_descriptor g_component_descriptor;
| (MSTRO_POOL_PROTOCOL_VERSION_MINOR << 8) \ | (MSTRO_POOL_PROTOCOL_VERSION_MINOR << 8) \
| (MSTRO_POOL_PROTOCOL_VERSION_PATCH << 0))) | (MSTRO_POOL_PROTOCOL_VERSION_PATCH << 0)))
/** the precomputed default path for GFS transport (incl. trailing '/') */
extern char *g_mstro_transport_gfs_dir;
/** string length of @ref g_mstro_transport_gfs_dir */
extern size_t g_mstro_transport_gfs_dir_len;
#endif /* MAESTRO_I_GLOBALS_H_ */ #endif /* MAESTRO_I_GLOBALS_H_ */
...@@ -197,6 +197,53 @@ mstro_core_init(const char *workflow_name, ...@@ -197,6 +197,53 @@ mstro_core_init(const char *workflow_name,
g_component_descriptor.component_name[MSTRO_WORKFLOW_NAME_MAX-1] = '\0'; g_component_descriptor.component_name[MSTRO_WORKFLOW_NAME_MAX-1] = '\0';
strcpy(g_component_descriptor.version, mstro_version()); strcpy(g_component_descriptor.version, mstro_version());
/* check GFS path setting */
{
char *gfs_dir = getenv(MSTRO_ENV_TRANSPORT_GFS_DIR);
if(gfs_dir!=NULL) {
bool need_sep = false;
size_t l1 = strlen(gfs_dir);
assert(l1>0);
if(gfs_dir[l1-1]!='/') {
need_sep=true;
}
size_t l2 = strlen(data->workflow_name);
size_t l3 = strlen(data->component_name);
g_mstro_transport_gfs_dir = malloc(l1 + (need_sep? 1 : 0)
+ l2 + 1
+ l3 + 1 +1);
if(g_mstro_transport_gfs_dir==NULL) {
ERR("Failed to allocate GFS transport dir pathname\n");
return MSTRO_NOMEM;
}
size_t pos = 0;
strcpy(g_mstro_transport_gfs_dir, gfs_dir);
pos += l1;
if(need_sep) {
g_mstro_transport_gfs_dir[pos] = '/';
pos++;
}
strcpy(g_mstro_transport_gfs_dir+pos, data->workflow_name);
pos += l2;
g_mstro_transport_gfs_dir[pos] = '/';
pos++;
strcpy(g_mstro_transport_gfs_dir+pos, data->component_name);
pos += l3;
g_mstro_transport_gfs_dir[pos] = '/';
pos++;
g_mstro_transport_gfs_dir[pos] = '\0';
g_mstro_transport_gfs_dir_len = pos;
assert(pos==strlen(g_mstro_transport_gfs_dir));
DEBUG("Set GFS transport directory to %s\n",
g_mstro_transport_gfs_dir);
}
}
status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE, status=mstro_schema_parse(MSTRO_SCHEMA_BUILTIN_YAML_CORE,
MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN, MSTRO_SCHEMA_BUILTIN_YAML_CORE_LEN,
&g_mstro_core_schema_instance); &g_mstro_core_schema_instance);
......
...@@ -85,3 +85,12 @@ mstro_schema g_mstro_core_schema_instance = NULL; ...@@ -85,3 +85,12 @@ mstro_schema g_mstro_core_schema_instance = NULL;
bool g_have_pool_manager=false; bool g_have_pool_manager=false;
union mstro_component_descriptor g_component_descriptor; union mstro_component_descriptor g_component_descriptor;
/** the precomputed default path for GFS transport (incl. trailing '/') */
char *g_mstro_transport_gfs_dir = "./"; /* possibly overwritten at init
* time from env*/
/** string length of @ref g_mstro_transport_gfs_dir */
size_t g_mstro_transport_gfs_dir_len = 2; /* possibly overwritten at
* init time */
...@@ -269,6 +269,31 @@ mstro_pc__select_transfer_method(mstro_cdo cdo, ...@@ -269,6 +269,31 @@ mstro_pc__select_transfer_method(mstro_cdo cdo,
return MSTRO_OK; return MSTRO_OK;
} }
static inline
mstro_status
mstro_pc__construct_gfs_path_for_cdo(const mstro_cdo src_cdo,
char **path)
{
mstro_status s=MSTRO_UNIMPL;
assert(path!=NULL);
assert(src_cdo!=NULL);
size_t l = g_mstro_transport_gfs_dir_len + strlen(src_cdo->name) + 1;
*path = malloc(l);
if(*path==NULL) {
return MSTRO_NOMEM;
}
strcpy(*path, g_mstro_transport_gfs_dir);
/* FIXME: possibly add a (configurable?) number of subdirectories
* based on first segment of CDO name */
strcpy((*path)+g_mstro_transport_gfs_dir_len, src_cdo->name);
DEBUG("Constructed GFS transport path %s\n", *path);
return MSTRO_OK;
}
static inline static inline
mstro_status mstro_status
mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init)
...@@ -398,7 +423,12 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) ...@@ -398,7 +423,12 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init)
switch(ticket.ticket_case) { switch(ticket.ticket_case) {
case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS:
INFO("TICKET CASE GFS\n"); INFO("TICKET CASE GFS\n");
gfs.path = src_cdo->name; status= mstro_pc__construct_gfs_path_for_cdo(src_cdo, &gfs.path);
if(status!=MSTRO_OK) {
ERR("Failed to construct GFS path for SRC-CDO: %d (%s)\n",
status, mstro_status_description(status));
return status;
}
gfs.keep_file = 0; // Arbitrarily rm the transport file on dst gfs.keep_file = 0; // Arbitrarily rm the transport file on dst
break; break;
case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO:
......
...@@ -43,6 +43,8 @@ ...@@ -43,6 +43,8 @@
#include "maestro/i_cdo.h" #include "maestro/i_cdo.h"
#include <inttypes.h> #include <inttypes.h>
#include <sys/stat.h>
#include "transport/transport_gfs.h" #include "transport/transport_gfs.h"
#ifdef HAVE_MIO #ifdef HAVE_MIO
...@@ -170,11 +172,63 @@ mstro_transport__src_datalen_get(mstro_cdo src, ...@@ -170,11 +172,63 @@ mstro_transport__src_datalen_get(mstro_cdo src,
return MSTRO_OK; return MSTRO_OK;
} }
/** find next separator in PATH. Return the start of that segment */
static inline
const char * next_sep (const char *path)
{
while (*path)
if (*path == '/' || *path == '\\')
return path;
else
path++;
return NULL;
}
/** ensure all directories in DIRNAME exist. If not, try creating them */
static inline
mstro_status
mkdirhier(char *dirname)
{
char buf[PATH_MAX];
const char *prev = dirname;
const char *next;
struct stat sb;
while(NULL!=(next = next_sep(prev))) {
strncpy(buf, dirname, next - dirname) ;
buf[next - dirname] = '\0';
if(stat(buf,&sb)) {
mkdir(buf, 0777); /* umask taken into account by system */
/* we ignore errors, as we'll see an error on the last part */
}
prev=next+1;
}
/* handle last part */
if(stat(dirname, &sb)) {
int s = mkdir(dirname, 0777);
if(s!=0) {
ERR("Failed to create GFS transport directory %s: %d (%s)\n",
dirname, errno, strerror(errno));
return MSTRO_FAIL;
}
}
return MSTRO_OK;
}
mstro_status mstro_status
mstro_transport_init() mstro_transport_init()
{ {
srand(time(NULL)); /* We'll need that to generate random IDs */ srand(time(NULL)); /* We'll need that to generate random IDs */
/* ensure GFS dir is set up */
mstro_status s = mkdirhier(g_mstro_transport_gfs_dir);
if(s!=MSTRO_OK) {
ERR("Failed to initialize GFS transport. Check your setting of %s\n",
MSTRO_ENV_TRANSPORT_GFS_DIR);
return s;
}
#ifdef HAVE_MIO #ifdef HAVE_MIO
/* MIO */ /* MIO */
if(g_mio_available) { if(g_mio_available) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment