Something went wrong on our end
Select Git revision
-
Christian Faber authoredChristian Faber authored
nam_ext_interface.c 29.14 KiB
/*
* nam_ext_interface.c
*
* Created on: Jun 2, 2016
* Author: galonska
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <limits.h>
#include <inttypes.h>
#include <dirent.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <regex.h>
#include <mpi.h>
#include "nam_memory.h"
#include "nam_util.h"
#include "nam_interface.h"
#include "nam_ext_interface.h"
#include "nam_extoll.h"
#define GET_COMM 11
#define GET_B_COMM 10
#define PUT_COMM 3
#define PUT_B_COMM 2
int init_extended = 0;
//ToDo: function to gain challenge as color for mpi_comm_split (according to rank)
//Benchmark für XOR checkpointing
MPI_Datatype nam_mpi_allocation;
typedef struct nam_mpi_allocation_info
{
int status;
int persistant;
int nam_index;
uint64_t nla;
uint64_t challenge;
size_t size;
}nam_mpi_allocation_info_t;
typedef struct nam_cp_args
{
size_t offset;
void *buf;
size_t bytes;
int root;
int data_avail;
nam_ext_allocation_t *ext_alloc;
} nam_cp_args_t;
static inline
unsigned long getusec(void)
{
struct timeval tv;
gettimeofday(&tv,NULL);
return (tv.tv_usec+tv.tv_sec*1000000);
}
int dummy_allgather (void *s_buf, int count, MPI_Datatype type, void *r_buf, MPI_Comm comm)
{
//MPI_AllGather(&my_rank_global, 1, MPI_INT, ext_alloc->global_ranks, 1, MPI_INT, comm_local);
int root = 0;
int ranks;
MPI_Comm_size(comm, &ranks);
MPI_Gather(s_buf, count, type,r_buf,count,type,root,comm);
MPI_Bcast(r_buf, ranks*count, type, root, comm);
return 0;
}
int nam_create_nam_mpi_allocation_type()
{
MPI_Datatype oldtypes[3];
int blockcounts[3];
MPI_Aint offsets[3], extent;
offsets[0] = 0;
oldtypes[0] = MPI_INT;
blockcounts[0] = 3;
MPI_Type_extent(oldtypes[0], &extent);
offsets[1] = offsets[0] + blockcounts[0] * extent;
oldtypes[1] = MPI_UINT64_T;
blockcounts[1] = 2;
MPI_Type_extent(oldtypes[1], &extent);
offsets[2] = offsets[1] + blockcounts[1] * extent;
oldtypes[2] = my_MPI_SIZE_T;
blockcounts[2] = 1;
if(MPI_Type_struct(3, blockcounts, offsets, oldtypes, &nam_mpi_allocation) == MPI_SUCCESS)
{
nam_print(10, "MPI Datatype for allocations defined!");
}
if(MPI_Type_commit(&nam_mpi_allocation)== MPI_SUCCESS)
{
nam_print(10, "MPI Datatype for allocations commited!");
}
return 0;
}
nam_ext_allocation_t *read_ext_allocation(char *path)
{
nam_ext_allocation_t *ext_alloc;
nam_allocation_t *alloc;
FILE *f = NULL;
char line[30]; /* typically less then 30 but just to be sure! */
int i;
f = fopen(path, "r");
if(!f)
{
nam_print(0, "Could not open file %s for reading", path);
goto err_open;
}
ext_alloc = (nam_ext_allocation_t *) malloc(sizeof(nam_ext_allocation_t));
ext_alloc->alloc = (nam_allocation_t *) malloc(sizeof(nam_allocation_t));
alloc = ext_alloc->alloc;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_NAM_INDEX" = %d",&alloc->nam_index);
}
else
goto err_read;
// if(fgets(line, sizeof(line), f) != NULL)
// {
// sscanf(line,STR_NODEID" = %"SCNu16"",&alloc->nam.rma2_nodeid);
//
// }
// else
// goto err_read;
//
// if(fgets(line, sizeof(line), f) != NULL)
// {
// sscanf(line,STR_VPID" = %"SCNu16"",&alloc->nam.rma2_vpid);
//
// }
// else
// goto err_read;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_CHALLENGE" = %"SCNu64"",&alloc->challenge);
}
else
goto err_read;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_SIZE" = %"SCNu64"",&alloc->size);
}
else
goto err_read;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_NLA" = %"SCNu64"",&alloc->nla);
}
else
goto err_read;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_MAX_BYTES" = %"SCNu64"",&ext_alloc->max_bytes);
}
else
goto err_read;
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,STR_N_RANKS" = %d",&ext_alloc->local_size);
}
else
goto err_read;
if(ext_alloc->local_size)
{
ext_alloc->global_ranks = malloc(ext_alloc->local_size * sizeof(int));
for (i = 0; i < ext_alloc->local_size; ++i)
{
if(fgets(line, sizeof(line), f) != NULL)
{
sscanf(line,"%d",&ext_alloc->global_ranks[i]);
}
else
goto err_read;
}
}
alloc->persistant = 1;
fclose(f);
return ext_alloc;
err_open:
return NULL;
err_read:
nam_print(0, "Could read from file %s", path);
return NULL;
}
int write_ext_allocation(char *path, nam_ext_allocation_t *ext_alloc)
{
char file_path[256];
FILE *f = NULL;
int i;
nam_allocation_t *alloc = ext_alloc->alloc;
mode_t mode = S_IRUSR | S_IWUSR;
//create path $HOME/.libNAM/<challenge>.alloc_ext
sprintf(file_path, "%s/%"PRIu64".alloc_ext",nam_home, alloc->challenge);
f = fopen(file_path, "w");
if(!f)
{
nam_print(0, "Could not open file %s for writing", file_path);
goto err_open;
}
if(fprintf(f, STR_NAM_INDEX" = %d\n", alloc->nam_index) < 0)
{
goto err_write;
}
// if(fprintf(f, STR_NODEID" = %"PRIu16"\n", alloc->nam.rma2_nodeid) < 0)
// {
// goto err_write;
// }
// if(fprintf(f, STR_VPID" = %"PRIu16"\n", alloc->nam.rma2_vpid) < 0)
// {
// goto err_write;
// }
if(fprintf(f, STR_CHALLENGE" = %"PRIu64"\n", alloc->challenge) < 0)
{
goto err_write;
}
if(fprintf(f, STR_SIZE" = %"PRIu64"\n", alloc->size) < 0)
{
goto err_write;
}
if(fprintf(f, STR_NLA" = %"PRIu64"\n", alloc->nla) < 0)
{
goto err_write;
}
if(fprintf(f, STR_MAX_BYTES" = %"PRIu64"\n", ext_alloc->max_bytes) < 0)
{
goto err_write;
}
// if there is rank information, save it, too!
if(fprintf(f, STR_N_RANKS" = %d\n", ext_alloc->local_size) < 0)
{
goto err_write;
}
for (i = 0; i < ext_alloc->local_size; ++i)
{
if(fprintf(f,"%d\n", ext_alloc->global_ranks[i]) < 0)
{
goto err_write;
}
}
fclose(f);
chmod(file_path, mode);
return 0;
err_open:
return 1;
err_write:
nam_print(0, "Could write to file %s", file_path);
return 1;
}
int nam_load_ext_allocations()
{
int rc = 0;
int reti = 0;
regex_t regex;
DIR *dp;
struct dirent *ep;
char f_str[256];
nam_ext_allocation_t *current;
//regular expression for allocation files
char *regexp= "([0-9]+).alloc_ext";
reti = regcomp(®ex, regexp, REG_EXTENDED);
if(reti)
{
nam_print(0, "Could not compile regular expression %s", regexp);
}
dp = opendir (nam_home);
if (dp != NULL)
{
while ((ep = readdir (dp)))
{
reti = regexec(®ex, ep->d_name, 0, NULL, 0);
if(!reti)
{
sprintf(f_str,"%s/%s", nam_home, ep->d_name);
nam_print( 1, "%s matches expression! Loading...", f_str);
current = read_ext_allocation(f_str);
LIST_INSERT_HEAD(&pers_ext_alloc_head, current, ext_allocations);
}
}
(void) closedir (dp);
}
else
nam_print(0, "Could not open directory %s", nam_home)
return rc;
}
int nam_delete_all_ext_allocations()
{
int rc = 0;
int reti = 0;
regex_t regex;
DIR *dp;
struct dirent *ep;
char f_str[256];
//regular expression for allocation files
char *regexp= "([0-9]+).alloc_ext";
reti = regcomp(®ex, regexp, REG_EXTENDED);
if(reti)
{
nam_print(0, "Could not compile regular expression %s", regexp);
}
dp = opendir (nam_home);
if (dp != NULL)
{
while ((ep = readdir (dp)))
{
reti = regexec(®ex, ep->d_name, 0, NULL, 0);
if(!reti)
{
sprintf(f_str,"%s/%s", nam_home, ep->d_name);
nam_print( 1, "%s matches expression! deleting...", f_str);
unlink(f_str);
}
}
(void) closedir (dp);
}
else
nam_print(0, "Could not open directory %s", nam_home)
return rc;
}
ssize_t nam_get_max_bytes(nam_ext_allocation_t *ext_alloc)
{
ssize_t rc;
rc = ext_alloc->max_bytes;
return rc;
}
int nam_init_extended()
{
if(init_extended)
return 0;
nam_init();
LIST_INIT(&pers_ext_alloc_head);
nam_load_ext_allocations();
nam_create_nam_mpi_allocation_type();
init_extended = 1;
return 0;
}
int nam_gather(size_t offset, void *sendbuf, size_t send_bytes, void *recvbuf, size_t recv_bytes, int root, nam_ext_allocation_t *ext_alloc)
{
int rank;
int size;
nam_allocation_t *alloc = ext_alloc->alloc;
nam_init_extended();
MPI_Comm_size(ext_alloc->comm_global, &size);
MPI_Comm_rank(ext_alloc->comm_global, &rank);
// gather all bytes of sendbuf at root in recvbuf
//put the data on the nam if not root
if(rank != root)
{
if(nam_put_sync(sendbuf, offset + rank*send_bytes,send_bytes,alloc)!=send_bytes)
{
nam_print(0,"Error writing data to NAM");
}
MPI_Barrier(ext_alloc->comm_global);
}
else // fetch the bytes from NAM
{
MPI_Barrier(ext_alloc->comm_global);
if(nam_get_sync(recvbuf,offset,recv_bytes,alloc)!=recv_bytes)
{
nam_print(0,"Error reading data from NAM");
}
// copy own bytes to own offset
memcpy ((char*)recvbuf + rank*recv_bytes,sendbuf,send_bytes);
}
return 0;
}
int nam_scatter(size_t offset, void *sendbuf, size_t send_bytes, void *recvbuf, size_t recv_bytes, int root, nam_ext_allocation_t *ext_alloc)
{
int rank;
int size;
nam_allocation_t *alloc = ext_alloc->alloc;
nam_init_extended();
MPI_Comm_size(ext_alloc->comm_global, &size);
MPI_Comm_rank(ext_alloc->comm_global, &rank);
// gather all bytes of sendbuf at root in recvbuf
//put the data on the nam if root
if(rank == root)
{
if(nam_put_sync(sendbuf, offset,send_bytes,alloc)!=send_bytes)
{
nam_print(0, "Error writing data to NAM");
}
// copy own bytes to own offset
memcpy ((char*)recvbuf,sendbuf + rank*recv_bytes,recv_bytes);
MPI_Barrier(ext_alloc->comm_global);
}
else // all others fetch the bytes from NAM
{
MPI_Barrier(ext_alloc->comm_global);
if(nam_get_sync(recvbuf, offset + rank*recv_bytes,recv_bytes,alloc)!=recv_bytes)
{
nam_print(0, "Error reading data from NAM");
}
}
return 0;
}
int nam_bcast(size_t offset, void *buf, size_t bytes, int root, nam_ext_allocation_t *ext_alloc)
{
int rank;
int size;
nam_allocation_t *alloc = ext_alloc->alloc;
nam_init_extended();
MPI_Comm_size(ext_alloc->comm_global, &size);
MPI_Comm_rank(ext_alloc->comm_global, &rank);
//put the data on the nam if root
if(rank == root)
{
if(nam_put_sync(buf, offset,bytes,alloc)!=bytes)
{
nam_print(0, "Error writing data to NAM");
}
MPI_Barrier(ext_alloc->comm_global);
}
else // all others fetch the bytes from NAM
{
MPI_Barrier(ext_alloc->comm_global);
if(nam_get_sync(buf, offset ,bytes,alloc)!=bytes)
{
nam_print(0,"Error reading data from NAM");
}
}
return 0;
}
int nam_restart(nam_cp_args_t *args)
{
int my_data_avail;
int *all_data_avail;
int rank_to_rebuild = -1;
int n_avail = 0;
int size;
int my_rank;
int master = 0;
int i;
int rc = 0;
mem_info_t *current;
RMA2_VPID *all_vpids;
RMA2_Nodeid *all_nodeids;
uint64_t *all_bytes;
RMA2_NLA *all_nlas;
RMA2_NLA my_nla;
extoll_con_info_t *config_con;
void *buf = args->buf;
uint64_t my_bytes = (uint64_t) args->bytes;
int root = args->root;
nam_allocation_t *alloc = args->ext_alloc->alloc;
MPI_Comm comm = args->ext_alloc->comm_local;
my_data_avail = args->data_avail;
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &my_rank);
//gather data from all processes
all_vpids = malloc(sizeof(RMA2_VPID)*size);
all_nodeids = malloc(sizeof(RMA2_Nodeid)*size);
config_con = alloc->config_con;
//register the buffer with rma
mem_info_t *mem = extoll_register(buf, my_bytes);
my_nla = mem->nla;
//start rebuild process
if(my_rank == master)
{
all_nlas = malloc(sizeof(RMA2_NLA)*size);
all_bytes = malloc(sizeof(uint64_t)*size);
all_data_avail = malloc(sizeof(int)*size);
}
//distribute all the data
MPI_Gather(&my_vpid,1,MPI_UINT16_T,all_vpids,1,MPI_UINT16_T,root,comm);
MPI_Gather(&my_nodeid,1,MPI_UINT16_T,all_nodeids,1,MPI_UINT16_T,root,comm);
MPI_Gather(&my_nla,1,MPI_UINT64_T,all_nlas,1,MPI_UINT64_T,root,comm);
MPI_Gather(&my_bytes,1,MPI_UINT64_T,all_bytes,1,MPI_UINT64_T,root,comm);
MPI_Gather(&my_data_avail,1,MPI_INT,all_data_avail,1,MPI_INT,root,comm);
if(my_rank == master)
{
for (i = 0; i < size; ++i)
{
if(!all_data_avail[i])
{
rank_to_rebuild = i;
}
else
{
n_avail++;
}
}
if(rank_to_rebuild < 0)
{
nam_print(0, "Error: no rank found which has no data. aborting restart process!");
rc = 0;
goto error;
}
if(n_avail < (size-1))
{
nam_print(0, "Error: #Ranks for CP and RS must be the same or more than one rank missing its data for each communicator!");
nam_print(0, "available: %d, #ranks: %d", n_avail, size);
rc = 1;
goto error;
}
//1 . clear all:
nam_reset = 0; //must be set in order to restart
extoll_reset_checkpointing(config_con);
//1. Write the number of Ranks to CR-C0
if(extoll_configure_num_ranks(size, config_con))
{
nam_print(0, "Error configuring #Ranks!");
rc = 1;
goto error;
}
//configure all other processes
for (i = 0; i < size; ++i)
{
if(i != rank_to_rebuild)
{
//2.configure start address at CR-C1
if(extoll_configure_start_address(all_nlas[i], config_con))
{
rc = 1;
goto error;
}
//3. Write the NodeID + VPID + Bytecount to CR-C2
if(extoll_configure_process(all_bytes[i], all_vpids[i], all_nodeids[i], 0, 0, config_con))
{
rc = 1;
goto error;
}
}
}
//configure failed process
//4.configure start address at CR-C1
if(extoll_configure_start_address(all_nlas[rank_to_rebuild], config_con))
{
rc = 1;
goto error;
}
//5. Write the NodeID + VPID + Bytecount to CR-C2
if(extoll_configure_process(all_bytes[rank_to_rebuild], all_vpids[rank_to_rebuild], all_nodeids[rank_to_rebuild], 0, 1, config_con))
{
rc = 1;
goto error;
}
}
error:
MPI_Bcast(&rc, 1, MPI_INT, root, comm);
if(!rc)
{
extoll_check_notifications_block(config_con);
//when the new rank gets its noti, reconstruction is finished!
if(!my_data_avail)
{
nam_get_sync(buf, 0, my_bytes, alloc);
}
}
MPI_Barrier(comm);
//set all memory to unactive
extoll_reset_memory();
nam_reset = 0;
//checkpoint all stuff again
nam_checkpoint(args);
return rc;
}
int nam_checkpoint(nam_cp_args_t *args)
{
int size = 0;
int my_rank;
int my_rank_global;
int i;
int rc = 0;
int cprs_active = 0;
size_t max_bytes = 0;
int max_ranks = max_ranks_cprs;
unsigned long max_time_conf;
unsigned long max_time_cp;
unsigned long max_time_overall;
unsigned long max_time_sync;
unsigned long t_overall = 0, t_cp = 0, t_conf = 0, t_sync = 0;
nam_init_extended();
extoll_con_info_t *config_con;
RMA2_VPID *all_vpids;
RMA2_Nodeid *all_nodeids;
uint64_t *all_bytes;
RMA2_NLA *all_nlas;
RMA2_NLA my_nla;
void *buf = args->buf;
uint64_t my_bytes = (uint64_t) args->bytes;
int root = args->root;
nam_ext_allocation_t *ext_alloc = args->ext_alloc;
nam_allocation_t *alloc = ext_alloc->alloc;
MPI_Comm comm = args->ext_alloc->comm_local;
config_con = alloc->config_con;
assert(config_con);
t_overall = getusec();
//register the buffer with rma
mem_info_t *mem = extoll_register(buf, my_bytes);
my_nla = mem->nla;
MPI_Comm_rank(ext_alloc->comm_global, &my_rank_global);
nam_print(1,"Rank %d checkpointing %"PRIu64" bytes on NAM %d", my_rank_global, my_bytes, alloc->nam_index);
//check if its already configured.
MPI_Allreduce(&mem->cprs_active, &cprs_active, 1, MPI_INT, MPI_LAND, comm);
if(!cprs_active)
{
t_conf = getusec();
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &my_rank);
if(size > max_ranks)
{
nam_print(0, "Error, allowed ranks per NAM > %d!", max_ranks);
rc = 1;
goto skip;
}
//gather data from all processes
if(my_rank == root)
{
all_nlas = malloc(sizeof(RMA2_NLA)*size);
all_bytes = malloc(sizeof(uint64_t)*size);
all_vpids = malloc(sizeof(RMA2_VPID)*size);
all_nodeids = malloc(sizeof(RMA2_Nodeid)*size);
}
MPI_Gather(&my_vpid,1,MPI_UINT16_T,all_vpids,1,MPI_UINT16_T,root,comm);
MPI_Gather(&my_nodeid,1,MPI_UINT16_T,all_nodeids,1,MPI_UINT16_T,root,comm);
MPI_Gather(&my_nla,1,MPI_UINT64_T,all_nlas,1,MPI_UINT64_T,root,comm);
MPI_Gather(&my_bytes,1,MPI_UINT64_T,all_bytes,1,MPI_UINT64_T,root,comm);
if(my_rank == root)
{
//0. Reset everything
if(extoll_reset_checkpointing(config_con))
{
nam_print(0, "Error resetting checkpointing!");
rc = 1;
goto skip;
}
//1. Write the number of Ranks to CR-C0
if(extoll_configure_num_ranks(size, config_con))
{
nam_print(0, "Error configuring #Ranks!");
rc = 1;
goto skip;
}
//configure for all processes
for (i = 0; i < size; ++i)
{
// printf("Nodeid: %"PRIu16" VPID: %"PRIu16"\n", all_nodeids[i], all_vpids[i] );
//2.configure start address at CR-C1
if(extoll_configure_start_address(all_nlas[i], config_con))
{
nam_print(0, "Error configuring start address! Rank %d", i);
rc = 1;
goto skip;
}
//3. Write the NodeID + VPID + Bytecount to CR-C2
if(extoll_configure_process(all_bytes[i], all_vpids[i], all_nodeids[i], 0, 0, config_con))
{
nam_print(0, "Error configuring node! Rank %d", i);
rc = 1;
goto skip;
}
}
// check status
if(extoll_check_status(config_con))
{
nam_print(0, "CP Status not valid!");
rc = 1;
goto skip;
}
//check progress
if(extoll_check_progress(config_con))
{
nam_print(0, "Error checking progress!");
rc = 1;
goto skip;
}
}
t_conf = getusec() - t_conf;
skip:
MPI_Bcast(&rc, 1, MPI_INT, root, comm);
if(rc)
{
nam_print(0, "Configuration not successful!");
goto error;
}
mem->cprs_active = 1;
MPI_Allreduce(&my_bytes, &max_bytes,1, my_MPI_SIZE_T ,MPI_MAX, comm);
ext_alloc->max_bytes = max_bytes;
if(my_rank == root)
write_ext_allocation(nam_home, ext_alloc);
}
MPI_Barrier(comm);
t_sync = getusec() - t_overall;
t_cp = getusec();
//Start the checkpointing
if(extoll_start_checkpointing(my_bytes, my_vpid, my_nodeid, config_con))
{
nam_print(0, "ERROR in starting the Checkpoint Operation!");
goto error;
}
extoll_check_notifications_block(config_con);
t_cp = getusec() - t_cp;
t_overall = getusec() - t_overall;
if(nam_debug > 0)
{
MPI_Reduce(&t_conf, &max_time_conf, 1, MPI_UNSIGNED_LONG, MPI_MAX, root, comm);
MPI_Reduce(&t_cp, &max_time_cp, 1, MPI_UNSIGNED_LONG, MPI_MAX, root, comm);
MPI_Reduce(&t_overall, &max_time_overall, 1, MPI_UNSIGNED_LONG, MPI_MAX, root, comm);
MPI_Reduce(&t_sync, &max_time_sync, 1, MPI_UNSIGNED_LONG, MPI_MAX, root, comm);
if(my_rank == root)
{
nam_print(3, "Bandwidth: %f MB/s Overall time: %f sec\t Config time: %f usec\t CP Time: %f sec \t Sync time: %f usec, Ratio Config/Overall: %f %%",
(double) my_bytes * size / max_time_overall,
(double) max_time_overall/10e6,
(double) max_time_conf,
(double) max_time_cp / 10e6,
(double) max_time_sync,
(double) max_time_conf / max_time_overall * 100);
}
}
return 0;
error:
return 1;
}
int nam_checkpoint_async(void *buf, size_t bytes, int root, MPI_Comm comm, nam_ext_allocation_t *ext_alloc, nam_async_request_t *req)
{
nam_cp_args_t *args = malloc(sizeof(nam_cp_args_t));
nam_init_extended();
args->buf = buf;
args->bytes = bytes;
args->root = 0;
args->ext_alloc = ext_alloc;
pthread_t *worker_thread;
worker_thread = malloc(sizeof(pthread_t));
req->thread = worker_thread;
nam_print(3, "Creating checkpointing thread\n");
if (pthread_create(worker_thread, NULL, (void*)nam_checkpoint, (void*) args) < 0)
{
nam_print(0,"could not create thread");
return 1;
}
return 0;
}
int nam_restart_async(void *buf, size_t bytes, int root, int data_avail, nam_ext_allocation_t *ext_alloc, nam_async_request_t *req)
{
nam_cp_args_t *args = malloc(sizeof(nam_cp_args_t));
nam_init_extended();
args->buf = buf;
args->bytes = bytes;
args->root = root;
args->ext_alloc = ext_alloc;
args->data_avail = data_avail;
pthread_t *worker_thread;
worker_thread = malloc(sizeof(pthread_t));
req->thread = worker_thread;
nam_print(3, "Creating restart thread\n");
if (pthread_create(worker_thread, NULL, (void*)nam_restart, (void*) args) < 0)
{
nam_print(0,"could not create thread");
return 1;
}
return 0;
}
int nam_restart_sync(void *buf, size_t bytes, int data_avail, nam_ext_allocation_t *ext_alloc)
{
nam_cp_args_t *args = malloc(sizeof(nam_cp_args_t));
nam_init_extended();
args->buf = buf;
args->bytes = bytes;
args->root = 0;
args->ext_alloc = ext_alloc;
args->data_avail = data_avail;
return nam_restart(args);
}
int nam_checkpoint_sync(void *buf, size_t bytes, nam_ext_allocation_t *ext_alloc)
{
nam_cp_args_t *args = malloc(sizeof(nam_cp_args_t));
nam_init_extended();
args->buf = buf;
args->bytes = bytes;
args->ext_alloc = ext_alloc;
args->root = 0;
return nam_checkpoint(args);
}
nam_allocation_t *nam_sync_allocation(nam_allocation_t *alloc, int root, MPI_Comm comm)
{
nam_mpi_allocation_info_t alloc_info;
int my_rank;
int ranks;
int created = 1;
int connected = 1;
int i;
int *all_connected;
nam_init_extended();
MPI_Comm_rank(comm, &my_rank);
MPI_Comm_size(comm, &ranks);
all_connected = malloc(sizeof(int)*ranks);
if(my_rank == root)
{
if(!alloc)
{
created = 0;
connected = 0;
MPI_Bcast(&created, 1, MPI_INT, root, comm);
nam_print(0, "Error in creating the allocation on %d", my_rank);
goto err;
}
// nam_print_allocation(alloc);
MPI_Bcast(&created, 1, MPI_INT, root, comm);
alloc_info.status = alloc->status;
alloc_info.persistant = alloc->persistant;
alloc_info.nam_index = alloc->nam_index;
alloc_info.nla = alloc->nla;
alloc_info.size = alloc->size;
alloc_info.challenge = alloc->challenge;
MPI_Bcast(&alloc_info, 1, nam_mpi_allocation, root, comm);
//workaround
// MPI_Bcast(&alloc_info.status, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.persistant, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.nam_index, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.nla, 1, my_MPI_SIZE_T, root, comm);
// MPI_Bcast(&alloc_info.size, 1, my_MPI_SIZE_T, root, comm);
// MPI_Bcast(&alloc_info.challenge, 1, MPI_UINT64_T, root, comm);
#ifndef ATEST
MPI_Allgather(&connected, 1, MPI_INT, all_connected, 1, MPI_INT, comm);
#else
dummy_allgather(&connected, 1, MPI_INT, all_connected, comm);
#endif
}
else
{
MPI_Bcast(&created, 1, MPI_INT, root, comm);
if(!created)
{
goto err;
}
alloc = malloc(sizeof(nam_allocation_t));
MPI_Bcast(&alloc_info, 1, nam_mpi_allocation, root, comm);
//workaround
// MPI_Bcast(&alloc_info.status, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.persistant, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.nam_index, 1, MPI_INT, root, comm);
// MPI_Bcast(&alloc_info.nla, 1, my_MPI_SIZE_T, root, comm);
// MPI_Bcast(&alloc_info.size, 1, my_MPI_SIZE_T, root, comm);
// MPI_Bcast(&alloc_info.challenge, 1, MPI_UINT64_T, root, comm);
alloc->status = alloc_info.status;
alloc->persistant = alloc_info.persistant;
alloc->nam_index = alloc_info.nam_index;
alloc->nla = alloc_info.nla;
alloc->size = alloc_info.size;
alloc->challenge = alloc_info.challenge;
// alloc->nla = alloc->start;
// nam_print_allocation(alloc);
if(nam_connect_allocation(alloc))
{
connected = 0;
}
#ifndef ATEST
MPI_Allgather(&connected, 1, MPI_INT, all_connected, 1, MPI_INT, comm);
#else
dummy_allgather(&connected, 1, MPI_INT, all_connected, comm);
#endif
}
for (i = 0; i < ranks; ++i)
{
if(!all_connected[i])
goto err;
}
return alloc;
err:
return NULL;
}
nam_ext_allocation_t *nam_alloc_cprs(int root, MPI_Comm comm)
{
nam_allocation_t *alloc = NULL;
nam_ext_allocation_t *ext_alloc;
nam_init_extended();
int my_rank_global, my_rank_local;
int local_root = 0;
int i_nam;
MPI_Comm comm_local;
MPI_Comm_rank(comm, &my_rank_global);
//here all the magic occurs! ToDo: Reflect network!
//split comm in local groups. one for each nam
i_nam = my_rank_global % n_nams;
nam_print(1, "Splitting in %d communicators", n_nams);
nam_print(1, "My Color: %d", i_nam);
MPI_Comm_split(comm, i_nam, my_rank_global, &comm_local);
//local root requests allocation
MPI_Comm_rank(comm_local, &my_rank_local);
nam_print(1, "My rank global: %d, my rank local: %d, my nam: %d", my_rank_global, my_rank_local, i_nam);
if(my_rank_local == local_root)
{
alloc = nam_alloc(0, NM_CPRS, i_nam, 1);
}
// sync allocation over all processes of local comm
alloc = nam_sync_allocation(alloc, local_root, comm_local);
if(alloc)
{
nam_print(1, "My rank global: %d, my rank local: %d, got allocation", my_rank_global, my_rank_local);
ext_alloc = malloc(sizeof(nam_ext_allocation_t));
ext_alloc->alloc = alloc;
ext_alloc->comm_global = comm;
ext_alloc->comm_local = comm_local;
ext_alloc->max_bytes = 0;
//gather the global rank ids of the local communicator
MPI_Comm_size(comm_local, &ext_alloc->local_size);
ext_alloc->global_ranks = malloc(ext_alloc->local_size* sizeof(int));
#ifndef ATEST
MPI_Allgather(&my_rank_global, 1, MPI_INT, ext_alloc->global_ranks, 1, MPI_INT, comm_local);
#else
dummy_allgather(&my_rank_global, 1, MPI_INT, ext_alloc->global_ranks, comm_local);
#endif
LIST_INSERT_HEAD(&pers_ext_alloc_head,ext_alloc,ext_allocations);
if(my_rank_local == local_root)
{
write_ext_allocation(nam_home, ext_alloc);
}
}
return ext_alloc;
}
nam_ext_allocation_t *nam_reuse_cprs(int root, MPI_Comm comm)
{
nam_allocation_t alloc; //dummy allocation
nam_ext_allocation_t *current = NULL;
nam_ext_allocation_t *found = NULL;
nm_request_t req;
int i_found = 0;
int *all_found;
int my_rank;
int global_size;
int i;
nam_init_extended();
MPI_Comm_rank(comm, &my_rank);
MPI_Comm_size(comm, &global_size);
//search for allocation used by my rank
LIST_FOREACH(current, &pers_ext_alloc_head, ext_allocations)
{
for (i = 0; i < current->local_size; ++i)
{
//found my rank within allocation
if(current->global_ranks[i] == my_rank)
{
found = current;
i_found = 1;
break;
}
}
if(i_found)
break;
}
all_found = malloc(global_size * sizeof(int));
//check if all found their allocation
#ifndef ATEST
MPI_Allgather(&i_found, 1, MPI_INT, all_found, 1, MPI_INT, comm);
#else
dummy_allgather(&i_found, 1, MPI_INT, all_found, comm);
#endif
for (i = 0; i < global_size; ++i) {
if(!all_found[i])
{
nam_print(0, "Error, not all processes found their allocation to reuse! (rank %d)", i);
goto err_alloc;
}
}
req.challenge = found->alloc->challenge;
req.bytes = found->alloc->size;
req.flag = NM_CPRS;
req.persistant = 1;
if(tcp_writeall(nm_connection, &req,sizeof(nm_request_t))!= sizeof(nm_request_t))
{
nam_print(0, "Error sending allocation request to NM");
goto err_alloc;
}
if(tcp_readall(nm_connection,&alloc,sizeof(nam_allocation_t))!= sizeof(nam_allocation_t))
{
nam_print(0, "Error reading allocation information from NM");
goto err_alloc;
}
if(alloc.status == ALLOC_FAILED)
{
nam_print(0, "Error challenging NM with existing allocation!");
goto err_alloc;
}
//split the communicator with nam index
found->comm_global = comm;
MPI_Comm_split(found->comm_global, found->alloc->nam_index, my_rank, &found->comm_local);
//connect the allocation
nam_connect_allocation(found->alloc);
if(all_found)
free(all_found);
nam_print(0, "Re-Use of allocation with %"PRIu64" SUCCESSFUL!", req.challenge);
return found;
err_alloc:
if(all_found)
free(all_found);
return NULL;
}
nam_ext_allocation_t *nam_malloc_all(size_t size, int root, MPI_Comm comm)
{
nam_allocation_t *alloc = NULL;
nam_ext_allocation_t *ext_alloc= NULL;
nam_init_extended();
int my_rank;
MPI_Comm_rank(comm, &my_rank);
if(my_rank == root)
{
alloc = nam_malloc(size);
}
// sync allocation over all processes
alloc = nam_sync_allocation(alloc, root, comm);
if(alloc)
{
ext_alloc = malloc(sizeof(nam_ext_allocation_t));
ext_alloc->alloc = alloc;
ext_alloc->comm_global = comm;
ext_alloc->comm_local = comm;
}
return ext_alloc;
}
nam_ext_allocation_t *nam_malloc_all_persistant(size_t size, int root, MPI_Comm comm)
{
nam_allocation_t *alloc = NULL;
nam_ext_allocation_t *ext_alloc= NULL;
nam_init_extended();
int my_rank;
MPI_Comm_rank(comm, &my_rank);
if(my_rank == root)
{
alloc = nam_malloc_persistant(size);
}
// sync allocation over all processes
alloc = nam_sync_allocation(alloc, root, comm);
if(alloc)
{
ext_alloc = malloc(sizeof(nam_ext_allocation_t));
ext_alloc->alloc = alloc;
ext_alloc->comm_global = comm;
ext_alloc->comm_local = comm;
}
return ext_alloc;
}
nam_ext_allocation_t *nam_calloc_all(size_t nmemb, size_t size, int root, MPI_Comm comm)
{
nam_allocation_t *alloc = NULL;
nam_ext_allocation_t *ext_alloc= NULL;
nam_init_extended();
int my_rank;
MPI_Comm_rank(comm, &my_rank);
if(my_rank == root)
{
alloc = nam_calloc(nmemb, size);
}
// sync allocation over all processes
alloc = nam_sync_allocation(alloc, root, comm);
if(alloc)
{
ext_alloc = malloc(sizeof(nam_ext_allocation_t));
ext_alloc->alloc = alloc;
ext_alloc->comm_global = comm;
ext_alloc->comm_local = comm;
}
return ext_alloc;
}
int nam_free_all(nam_ext_allocation_t *ext_alloc)
{
nam_init_extended();
char f_str[300];
int my_rank;
int root = 0;
MPI_Comm_rank(ext_alloc->comm_local, &my_rank);
if(my_rank == root)
{
nam_free(ext_alloc->alloc);
sprintf(f_str, "%s/%"PRIu64".alloc_ext",nam_home, ext_alloc->alloc->challenge);
nam_print(1,"Deleting persistant allocation in free_all %s", f_str);
unlink(f_str);
}
LIST_REMOVE(ext_alloc, ext_allocations);
nam_free_ext_allocation(ext_alloc);
return 0;
}