Select Git revision
sna_model.py
omp_consumer.c 6.10 KiB
/* Connect to pool and subscribe to many events, archiving all CDOs */
/*
* Copyright (C) 2021 Hewlett Packard Enterprise
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "omp_consumer.h"
#include "omp_injector.h"
int convert_consumer_mode(const char * consumer_mode_env)
{
int result = -1;
if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_SINK_ALL") == 0)
{
result = 0;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2ONE") == 0)
{
result = 1;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2TEN") == 0)
{
result = 2;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ALL2ALL") == 0)
{
result = 3;
}
return result;
}
int get_consumer_mode()
{
char * consumer_mode_env;
int consumer_mode = 0;
//get current consumer mode
if (getenv("MSTRO_CONSUMER_MODE") != NULL)
{
consumer_mode_env = getenv("MSTRO_CONSUMER_MODE");
consumer_mode = convert_consumer_mode(consumer_mode_env);
}
return consumer_mode;
}
int get_num_producers(int size, int nConsumers, int consumer_mode)
{
int num_producers = 0;
//calculate number of producers
switch(consumer_mode)
{
case MSTRO_CONSUMER_SINK_ALL:
case MSTRO_CONSUMER_ALL2ALL:
//sink all data from all producer ranks, i.e. all ranks - PM and nconsumers
num_producers = size - 1 - nConsumers;
break;
case MSTRO_CONSUMER_ONE2ONE:
//sink all data from only one producer
num_producers = 1;
break;
case MSTRO_CONSUMER_ONE2TEN:
//sink all data from 10 producer ranks
num_producers = 10;
break;
default:
ERR("Incorrect MSTRO_CONSUMER_MODE \n");
break;
}
return num_producers;
}
// producer_ids and num_producers are outputs
void get_producers(int rank, int size, int nConsumers, int *producers_ids, int num_producers, int consumer_mode)
{
//calculate number of producers
switch(consumer_mode)
{
case MSTRO_CONSUMER_SINK_ALL:
case MSTRO_CONSUMER_ALL2ALL:
//sink all data from all producer ranks, i.e. all ranks - PM and nconsumers
//calculate producers
for(int i = 0; i< num_producers; i++)
{
producers_ids[i] = nConsumers + 1 + i;
}
break;
case MSTRO_CONSUMER_ONE2ONE:
//sink all data from only one producer
//asumming nProducers == nConsumers, my assignment is shifted by my rank
producers_ids[0] = nConsumers + rank;
break;
case MSTRO_CONSUMER_ONE2TEN:
//sink all data from 10 producer ranks
//calculate producers
for(int i = 0; i< num_producers; i++)
{
producers_ids[i] = ((rank-1) * num_producers) + 1 + i;
}
break;
default:
ERR("Incorrect MSTRO_CONSUMER_MODE \n");
break;
}
}
mstro_status require_CDOs(mstro_cdo *cdos, int num_CDOs, int *injector_ids, int num_injectors, size_t num_attributes, size_t size_attributes)
{
mstro_status s = MSTRO_OK; // global status
int CDOs_per_inj = num_CDOs / num_injectors ;
size_t cdoidx, cdo_gid;
// declare CDOs loop
#pragma omp parallel for private(cdoidx, cdo_gid) reduction(| :s)
for(size_t i=0; i < num_injectors; i++)
{
DEBUG("consuming injector %d \n", injector_ids[i]);
for(size_t j=0; j < CDOs_per_inj; j++)
{
mstro_status s1,s2, s3;
cdoidx = injector_ids[i] *CDOs_per_inj + j; // CDO id = injector_id * thread_id * num_CDOs
cdo_gid = i*CDOs_per_inj + j;
char name[CDO_NAME_MAX];
create_name(name, cdoidx);
s3 = MSTRO_OK; // initialize its value
s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[cdo_gid]));
//FIXME We do not want extra attributes now --need to merge with different branch
// create attributes ...numbers and sizes
for(size_t j=0; j < num_attributes ; j++)
{
char attrib_name[ATTRIB_NAME_MAX];
void * data = malloc(size_attributes*sizeof(char));
snprintf(attrib_name, ATTRIB_NAME_MAX, ".maestro.benchmark.attrib_%lu", j+1);
s3 |= mstro_cdo_attribute_set(cdos[i], attrib_name, data);
DEBUG("%d: %s \n", s3, mstro_status_description(s3));
}
s3 |= mstro_cdo_declaration_seal(cdos[cdo_gid]);
s2= mstro_cdo_require(cdos[cdo_gid]);
DEBUG("[consumer] requiring %s \n", mstro_cdo_name(cdos[cdo_gid]));
s = s | s1 | s2 | s3;
}
}
return s;
}
mstro_status demand_CDOs(mstro_cdo *cdos, int num_CDOs)
{
mstro_status s = MSTRO_OK; // global status
#pragma omp parallel for reduction(| :s)
for(size_t i=0; i < num_CDOs; i++)
{
mstro_status s3,s4;
s3= mstro_cdo_demand(cdos[i]);
DEBUG("Hey, I recieved %s \n", mstro_cdo_name(cdos[i]));
s4= mstro_cdo_dispose(cdos[i]);
s = s | s3 | s4 ;
}
return s;
}