Select Git revision
omp_consumer.c
Ali Mohammed authored
omp_consumer.c 6.18 KiB
/* Consume/demand CDOs declared in various scenarios
* many2one (sinkall) one2one ten2one all2all */
/*
* Copyright (C) 2018-2020 Cray Computer GmbH
* Copyright (C) 2021 HPE Switzerland GmbH
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "omp_consumer.h"
#include "omp_injector.h"
int convert_consumer_mode(const char * consumer_mode_env)
{
int result = -1;
if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_SINK_ALL") == 0) {
result = 0;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2ONE") == 0) {
result = 1;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2TEN") == 0) {
result = 2;
}
else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ALL2ALL") == 0) {
result = 3;
}
return result;
}
int get_consumer_mode() {
char * consumer_mode_env;
int consumer_mode = 0;
/*get current consumer mode */
if (getenv("MSTRO_CONSUMER_MODE") != NULL) {
consumer_mode_env = getenv("MSTRO_CONSUMER_MODE");
consumer_mode = convert_consumer_mode(consumer_mode_env);
}
return consumer_mode;
}
int get_num_producers(int size, int nConsumers, int consumer_mode) {
int num_producers = 0;
/*calculate number of producers */
switch(consumer_mode) {
case MSTRO_CONSUMER_SINK_ALL:
case MSTRO_CONSUMER_ALL2ALL:
/* sink all data from all producer ranks, i.e. all ranks - PM and nconsumers */
num_producers = size - 1 - nConsumers;
break;
case MSTRO_CONSUMER_ONE2ONE:
/* sink all data from only one producer */
num_producers = 1;
break;
case MSTRO_CONSUMER_ONE2TEN:
/* sink all data from 10 producer ranks */
num_producers = 10;
break;
default:
ERR("Incorrect MSTRO_CONSUMER_MODE \n");
break;
}
return num_producers;
}
/* producer_ids and num_producers are outputs */
void get_producers(int rank, int nConsumers, int *producers_ids, int num_producers, int consumer_mode) {
/*calculate number of producers */
switch(consumer_mode) {
case MSTRO_CONSUMER_SINK_ALL:
case MSTRO_CONSUMER_ALL2ALL:
/*sink all data from all producer ranks, i.e. all ranks - PM and nconsumers */
/*calculate producers */
for(int i = 0; i< num_producers; i++){
producers_ids[i] = nConsumers + 1 + i;
}
break;
case MSTRO_CONSUMER_ONE2ONE:
/*sink all data from only one producer */
/*asumming nProducers == nConsumers, my assignment is shifted by my rank */
producers_ids[0] = nConsumers + rank;
break;
case MSTRO_CONSUMER_ONE2TEN:
/* sink all data from 10 producer ranks */
/* calculate producers */
for(int i = 0; i< num_producers; i++){
producers_ids[i] = ((rank-1) * num_producers) + 1 + nConsumers + i;
}
break;
default:
ERR("Incorrect MSTRO_CONSUMER_MODE \n");
break;
}
}
mstro_status require_CDOs(mstro_cdo *cdos, size_t num_CDOs, int *injector_ids, int num_injectors, char **CDO_data, int64_t cdo_data_size) {
mstro_status s = MSTRO_OK; // global status
int CDOs_per_inj = num_CDOs / num_injectors ;
size_t cdoidx, cdo_gid, i, j;
/*declare CDOs loop */
#pragma omp parallel for private(cdoidx, cdo_gid, i, j) reduction(| :s)
for(cdo_gid=0; cdo_gid < num_CDOs; cdo_gid++) {
i = cdo_gid / CDOs_per_inj; /* injector id */
j = cdo_gid % CDOs_per_inj; /* cdo id within this injector */
int thread_id = omp_get_thread_num();
DEBUG("consuming injector %d \n", injector_ids[i]);
mstro_status s1,s2, s3;
cdoidx = injector_ids[i] *CDOs_per_inj + j; /* CDO id = injector_id * thread_id * num_CDOs */
char name[CDO_NAME_MAX];
create_name(name, cdoidx);
s3 = MSTRO_OK; /* initialize its value */
s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[cdo_gid]));
//char *CDO_data;
//posix_memalign((void**) &CDO_data, (size_t) sysconf(_SC_PAGESIZE), sizeof(char)*cdo_data_size);
s3 |= mstro_cdo_attribute_set( cdos[cdo_gid], MSTRO_ATTR_CORE_CDO_RAW_PTR, CDO_data[thread_id], false);
s3 |= mstro_cdo_attribute_set(cdos[cdo_gid],
MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE,
&cdo_data_size, true);
s3 |= mstro_cdo_declaration_seal(cdos[cdo_gid]);
s2= mstro_cdo_require(cdos[cdo_gid]);
DEBUG("[consumer] requiring %s \n", mstro_cdo_name(cdos[cdo_gid]));
s = s | s1 | s2 | s3;
}
return s;
}
mstro_status demand_CDOs(mstro_cdo *cdos, size_t num_CDOs){
mstro_status s = MSTRO_OK; /* global status */
#pragma omp parallel for reduction(| :s)
for(size_t i=0; i < num_CDOs; i++){
mstro_status s3,s4;
s3= mstro_cdo_demand(cdos[i]);
DEBUG("Hey, I recieved %s \n", mstro_cdo_name(cdos[i]));
s4= mstro_cdo_dispose(cdos[i]);
s = s | s3 | s4 ;
}
return s;
}