Skip to content
Snippets Groups Projects
Select Git revision
  • 4ad5612edf367df15f925a48bc5002dc5fc10f0f
  • devel default
  • 107-compilation-error-when-building-maestro-core-on-m1-apple-processors
  • 108-implement-cpu-id-query-for-apple-m1-hardware
  • 58-scripting-interface-to-maestro-core
  • 101-need-ci-test-using-installed-maestro
  • 57-sphinx-documentation
  • 105-memory-leak-in-pm-message-envelope-handling
  • 104-permit-disabling-memory-pool
  • 103-liberl-installation-issue-on-devel
  • 94-maestro-rdma-transport-ignores-max_msg_size-2
  • main protected
  • 102-possible-race-in-check_pm_redundant_interlock-test
  • 97-check-if-shm-provider-can-be-enabled-after-libfabric-1-14-is-in-our-tree-2
  • 100-include-maestro-attributes-h-cannot-include-mamba-header-from-deps-path
  • 97-check-if-shm-provider-can-be-enabled-after-libfabric-1-14-is-in-our-tree
  • 17-job-failed-282354-needs-update-of-mio-interface-and-build-rules
  • 96-test-libfabric-update-to-1-13-or-1-14
  • feature/stop-telemetry-after-all-left
  • 94-maestro-rdma-transport-ignores-max_msg_size
  • 93-improve-performance-of-mstro_attribute_val_cmp_str
  • v0.3_rc1
  • maestro_d65
  • d65_experiments_20211113
  • v0.2
  • v0.2_rc1
  • d3.3
  • d3.3-review
  • d5.5
  • d5.5-review
  • v0.1
  • d3.2
  • d3.2-draft
  • v0.0
34 results

omp_consumer.c

Blame
  • omp_consumer.c 6.18 KiB
    /* Consume/demand CDOs declared in various scenarios 
     * many2one (sinkall) one2one ten2one all2all */
    /*
     * Copyright (C) 2018-2020 Cray Computer GmbH
     * Copyright (C) 2021 HPE Switzerland GmbH
     *
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions are
     * met:
     *
     * 1. Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     * 2. Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the distribution.
     *
     * 3. Neither the name of the copyright holder nor the names of its
     *    contributors may be used to endorse or promote products derived from
     *    this software without specific prior written permission.
     *
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
     * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
     * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    
    #include "omp_consumer.h"
    #include "omp_injector.h"
    
    int convert_consumer_mode(const char * consumer_mode_env)
    {
      int result = -1;
    
      if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_SINK_ALL") == 0) {
        result = 0;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2ONE") == 0) {
        result = 1;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2TEN") == 0) {
        result = 2;
      }
      else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ALL2ALL") == 0) {
        result = 3;
      }
      return result;
    }
    
    int get_consumer_mode() {
      char * consumer_mode_env;
      int consumer_mode = 0;
      /*get current consumer mode */
      if (getenv("MSTRO_CONSUMER_MODE") != NULL) {
        consumer_mode_env = getenv("MSTRO_CONSUMER_MODE");
        consumer_mode = convert_consumer_mode(consumer_mode_env);
      }
    
      return consumer_mode;
    }
    
    int get_num_producers(int size, int nConsumers, int consumer_mode) {
      int num_producers = 0;
    
      /*calculate number of producers */
      switch(consumer_mode) {
        case MSTRO_CONSUMER_SINK_ALL:
        case MSTRO_CONSUMER_ALL2ALL:
          /* sink all data from all producer ranks, i.e. all ranks - PM and nconsumers */
          num_producers = size - 1 - nConsumers;
          break;
        case MSTRO_CONSUMER_ONE2ONE:
          /* sink all data from only one producer */
          num_producers = 1;
          break;
    
        case MSTRO_CONSUMER_ONE2TEN:
          /* sink all data from 10 producer ranks */
          num_producers = 10;
          break;
    
        default:
          ERR("Incorrect MSTRO_CONSUMER_MODE \n");
          break;
      }
    
      return num_producers;
    
    }
    
    
    /* producer_ids and num_producers are outputs */
    void get_producers(int rank, int nConsumers, int *producers_ids, int num_producers, int consumer_mode) {
    
    
      /*calculate number of producers */
      switch(consumer_mode) {
        case MSTRO_CONSUMER_SINK_ALL:
        case MSTRO_CONSUMER_ALL2ALL:
          /*sink all data from all producer ranks, i.e. all ranks - PM and nconsumers */
          /*calculate producers */
          for(int i = 0; i< num_producers; i++){
            producers_ids[i] = nConsumers + 1 + i;
          }
          break;
        case MSTRO_CONSUMER_ONE2ONE:
          /*sink all data from only one producer */
          /*asumming nProducers == nConsumers, my assignment is shifted by my rank */
          producers_ids[0] = nConsumers + rank;
          break;
    
        case MSTRO_CONSUMER_ONE2TEN:
          /* sink all data from 10 producer ranks */
          /* calculate producers */
          for(int i = 0; i< num_producers; i++){
            producers_ids[i] = ((rank-1) * num_producers) + 1 + nConsumers + i;
          }
          break;
    
        default:
          ERR("Incorrect MSTRO_CONSUMER_MODE \n");
          break;
      }
    
    }
    
    
    mstro_status require_CDOs(mstro_cdo *cdos, size_t num_CDOs, int *injector_ids, int num_injectors, char **CDO_data, int64_t cdo_data_size) {
    
      mstro_status s = MSTRO_OK; // global status
      int CDOs_per_inj = num_CDOs / num_injectors ;
      size_t cdoidx, cdo_gid, i, j;
    
      
    
      /*declare CDOs loop */
      #pragma omp parallel for private(cdoidx, cdo_gid, i, j) reduction(| :s)
      for(cdo_gid=0; cdo_gid < num_CDOs; cdo_gid++) {
    
        i = cdo_gid / CDOs_per_inj; /* injector id */
        j = cdo_gid % CDOs_per_inj; /* cdo id within this injector */
        
        int thread_id = omp_get_thread_num();
    
        DEBUG("consuming injector %d \n", injector_ids[i]);
    
        mstro_status s1,s2, s3;
        cdoidx = injector_ids[i] *CDOs_per_inj + j; /* CDO id = injector_id * thread_id * num_CDOs */
         
        char name[CDO_NAME_MAX];
        create_name(name, cdoidx);
    
        s3 = MSTRO_OK; /* initialize its value */
        s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[cdo_gid]));
    
     
          
        //char *CDO_data;
        //posix_memalign((void**) &CDO_data, (size_t) sysconf(_SC_PAGESIZE), sizeof(char)*cdo_data_size);
        s3 |= mstro_cdo_attribute_set( cdos[cdo_gid], MSTRO_ATTR_CORE_CDO_RAW_PTR, CDO_data[thread_id], false);
    
        s3 |= mstro_cdo_attribute_set(cdos[cdo_gid],
                                       MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE,
                                       &cdo_data_size, true);
       
        s3 |= mstro_cdo_declaration_seal(cdos[cdo_gid]);
          
    
        s2= mstro_cdo_require(cdos[cdo_gid]);
        DEBUG("[consumer] requiring %s \n", mstro_cdo_name(cdos[cdo_gid]));
    
        s = s | s1 | s2 | s3;
    
      }
    
      return s;
    }
    
    
    mstro_status demand_CDOs(mstro_cdo *cdos, size_t num_CDOs){
    
      mstro_status s = MSTRO_OK; /* global status */
    
      #pragma omp parallel for reduction(| :s)
      for(size_t i=0; i < num_CDOs; i++){
        mstro_status s3,s4;
    
        s3= mstro_cdo_demand(cdos[i]);
        DEBUG("Hey, I recieved %s \n", mstro_cdo_name(cdos[i]));
        s4= mstro_cdo_dispose(cdos[i]);
        s = s | s3 | s4 ;
      }
      return s;
    
    }