Skip to content
Snippets Groups Projects
Select Git revision
  • 7deb615613643b7f19cd974d358bc1fc99384fc9
  • master default
  • bing_issues#190_tf2
  • bing_tf2_convert
  • bing_issue#189_train_modular
  • simon_#172_integrate_weatherbench
  • develop
  • bing_issue#188_restructure_ambs
  • yan_issue#100_extract_prcp_data
  • bing_issue#170_data_preprocess_training_tf1
  • Gong2022_temperature_forecasts
  • bing_issue#186_clean_GMD1_tag
  • yan_issue#179_integrate_GZAWS_data_onfly
  • bing_issue#178_runscript_bug_postprocess
  • michael_issue#187_bugfix_setup_runscript_template
  • bing_issue#180_bugs_postprpocess_meta_postprocess
  • yan_issue#177_repo_for_CLGAN_gmd
  • bing_issue#176_integrate_weather_bench
  • michael_issue#181_eval_era5_forecasts
  • michael_issue#182_eval_subdomain
  • michael_issue#119_warmup_Horovod
  • bing_issue#160_test_zam347
  • ambs_v1
  • ambs_gmd_nowcasting_v1.0
  • GMD1
  • modular_booster_20210203
  • new_structure_20201004_v1.0
  • old_structure_20200930
28 results

sna_model.py

Blame
  • omp_consumer.c 6.10 KiB
    /* Connect to pool and subscribe to many events, archiving all CDOs */
    /*
     * Copyright (C) 2021 Hewlett Packard Enterprise
     *
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions are
     * met:
     *
     * 1. Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     * 2. Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the distribution.
     *
     * 3. Neither the name of the copyright holder nor the names of its
     *    contributors may be used to endorse or promote products derived from
     *    this software without specific prior written permission.
     *
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
     * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
     * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    
    #include "omp_consumer.h"
    #include "omp_injector.h"
    
    int convert_consumer_mode(const char * consumer_mode_env)
    {
      int result = -1;
    
      if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_SINK_ALL") == 0)
      {
        result = 0;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2ONE") == 0)
      {
        result = 1;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2TEN") == 0)
      {
        result = 2;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ALL2ALL") == 0)
      {
        result = 3;
      }
      return result;
    }
    
    int get_consumer_mode()
    {
      char * consumer_mode_env;
      int consumer_mode = 0;
      //get current consumer mode
      if (getenv("MSTRO_CONSUMER_MODE") != NULL)
      {
        consumer_mode_env = getenv("MSTRO_CONSUMER_MODE");
        consumer_mode = convert_consumer_mode(consumer_mode_env);
      }
    
      return consumer_mode;
    }
    
    int get_num_producers(int size, int nConsumers, int consumer_mode)
    {
      int num_producers = 0;
    
      //calculate number of producers
      switch(consumer_mode)
      {
        case MSTRO_CONSUMER_SINK_ALL:
        case MSTRO_CONSUMER_ALL2ALL:
          //sink all data from all producer ranks, i.e. all ranks - PM and nconsumers
          num_producers = size - 1 - nConsumers;
          break;
        case MSTRO_CONSUMER_ONE2ONE:
          //sink all data from only one producer
          num_producers = 1;
          break;
    
        case MSTRO_CONSUMER_ONE2TEN:
          //sink all data from 10 producer ranks
          num_producers = 10;
          break;
    
        default:
          ERR("Incorrect MSTRO_CONSUMER_MODE \n");
          break;
      }
    
      return num_producers;
    
    }
    
    
    // producer_ids and num_producers are outputs
    void get_producers(int rank, int size, int nConsumers, int *producers_ids, int num_producers, int consumer_mode)
    {
    
    
      //calculate number of producers
      switch(consumer_mode)
      {
        case MSTRO_CONSUMER_SINK_ALL:
        case MSTRO_CONSUMER_ALL2ALL:
          //sink all data from all producer ranks, i.e. all ranks - PM and nconsumers
          //calculate producers
          for(int i = 0; i< num_producers; i++)
          {
            producers_ids[i] = nConsumers + 1 + i;
          }
          break;
        case MSTRO_CONSUMER_ONE2ONE:
          //sink all data from only one producer
          //asumming nProducers == nConsumers, my assignment is shifted by my rank
          producers_ids[0] = nConsumers + rank;
          break;
    
        case MSTRO_CONSUMER_ONE2TEN:
          //sink all data from 10 producer ranks
          //calculate producers
          for(int i = 0; i< num_producers; i++)
          {
            producers_ids[i] = ((rank-1) * num_producers) + 1 + i;
          }
          break;
    
        default:
          ERR("Incorrect MSTRO_CONSUMER_MODE \n");
          break;
      }
    
    }
    
    
    mstro_status require_CDOs(mstro_cdo *cdos, int num_CDOs, int *injector_ids, int num_injectors, size_t num_attributes, size_t size_attributes)
    {
    
      mstro_status s = MSTRO_OK; // global status
      int CDOs_per_inj = num_CDOs / num_injectors ;
      size_t cdoidx, cdo_gid;
    
    
      // declare CDOs loop
      #pragma omp parallel for private(cdoidx, cdo_gid) reduction(| :s)
      for(size_t i=0; i < num_injectors; i++)
      {
    
        DEBUG("consuming injector %d \n", injector_ids[i]);
    
    
        for(size_t j=0; j < CDOs_per_inj; j++)
        {
          mstro_status s1,s2, s3;
          cdoidx = injector_ids[i] *CDOs_per_inj + j; // CDO id = injector_id * thread_id * num_CDOs
          cdo_gid = i*CDOs_per_inj + j;
          char name[CDO_NAME_MAX];
          create_name(name, cdoidx);
    
          s3 = MSTRO_OK; // initialize its value
          s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[cdo_gid]));
    
    
          //FIXME We do not want extra attributes now --need to merge with different branch
          // create attributes ...numbers and sizes
          for(size_t j=0; j < num_attributes ; j++)
          {
            char attrib_name[ATTRIB_NAME_MAX];
            void * data = malloc(size_attributes*sizeof(char));
            snprintf(attrib_name, ATTRIB_NAME_MAX, ".maestro.benchmark.attrib_%lu", j+1);
            s3 |= mstro_cdo_attribute_set(cdos[i], attrib_name, data);
            DEBUG("%d: %s \n", s3, mstro_status_description(s3));
          }
    
    
          s3 |= mstro_cdo_declaration_seal(cdos[cdo_gid]);
    
          s2= mstro_cdo_require(cdos[cdo_gid]);
          DEBUG("[consumer] requiring %s \n", mstro_cdo_name(cdos[cdo_gid]));
    
          s = s | s1 | s2 | s3;
        }
    
      }
    
      return s;
    }
    
    
    mstro_status demand_CDOs(mstro_cdo *cdos, int num_CDOs)
    {
    
      mstro_status s = MSTRO_OK; // global status
    
      #pragma omp parallel for reduction(| :s)
      for(size_t i=0; i < num_CDOs; i++)
      {
        mstro_status s3,s4;
    
        s3= mstro_cdo_demand(cdos[i]);
        DEBUG("Hey, I recieved %s \n", mstro_cdo_name(cdos[i]));
        s4= mstro_cdo_dispose(cdos[i]);
        s = s | s3 | s4 ;
      }
      return s;
    
    }