diff --git a/INSTALL.md b/INSTALL.md index 579de21e9a30d2aad1a365f219743c42f8812433..b0bee5d6746b3ea9f848c93742ba33540d75f2e9 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,60 +1,124 @@ -Building: ---------- +# Prerequisites +You will need a `C compiler` capable of understanding `C11`, as well as `posix threads`. -Prerequisites: You will need a C compiler capable of understanding C11, as well -as posix threads. +Also, you will need `autoreconf` and `libtool` for configuring Maestro build system. -We are including libyaml, libcyaml and libfabric as subtrees in our tree -and build these versions automatically with our make rules. -By default, the OFI conductor will be built. To enable the experimental other variants -`--enable-mpi-conductor` or `--enable-smp-conductor` at configure time (INCOMPLETE). +deps/c-timestamp +deps/libcyaml +deps/libfabric +deps/libyaml +deps/mamba +deps/mio +deps/protobuf +deps/protobuf-c + + + +#### Subtree projects and updates + +We are including a number of external dependency libraries as subtrees in our tree under `./deps/` +and build these versions automatically with our make rules. +Maestro uses the libraries it built from the subtrees and not the versions of the libraries on your system. + + +| Subtree | Remote branch | +| :------------|:----------------------------------------| +| c-timestamp | git@github.com:chansen/c-timestamp.git | +| libcyaml | git@github.com:tlsa/libcyaml.git master | +| libfabric | git@github.com:ofiwg/libfabric.git v1.11.x | +| libyaml | git@github.com:yaml/libyaml.git master | +| mamba | git@gitlab.com:cerl/mamba.git master | +| mio | git@gitlab.version.fz-juelich.de:10022/maestro/mio.git master| +| protobuf | git@github.com:protocolbuffers/protobuf.git 3.10.x | +| protobuf-c | git@github.com:protobuf-c/protobuf-c.git next | + + + +To update a dependency project (e.g. mamba) subtree please do + +``` +git subtree pull --prefix=deps/mamba --squash git@gitlab.com:cerl/mamba.git master +``` + + +# Building + +## For CRAY XC systems +Please load the following modules before configuring/building, to ensure `libfabric` will be built properly. -For CRAY XC systems you should do module load rdma-credentials module load gni-headers -before configuring/building, to ensure libfabric will be built properly. -We are also including Version 0.1.0 of the Mamba library in deps/mamba, and a snapshot of MIO in deps/mio. +#### jupiter system +Do not use one of the `craype-network-ofi\*` modules at this time. +`libfabric` builds nicely with CCE 9.0. -To use MIO you need to add the `--with-mio` flag at configure time, and have MERO installed on the system. +### sage prototype +Please load the following modules before configuring/building. + +``` +module use $CLIENT_MOD_PATH +module swap gnu GCC/9.3.0 +module load Autotools git binutils pkg-config libreadline +``` + +## Calling autoreconf -Then ``` autoreconf -ivf -./configure # CPPFLAGS="-I/path/to/ofi/include" LDFLAGS="-L/path/to/ofi/lib" -make ``` -Depending on which branch you build, the default configure argumenst may include options to enable the -gcc/llvm address sanitizer. To disable, please configure with +## Configuring + ``` ---disable-asan +./configure ``` -If you have the address sanitizer enabled, that may (depending on the compiler) also turn on the leak checker, and since -we are not fully memory leak-free at this point may trigger exit-time failures. To disable it, set the environment variable + +or specify the install directory, e.g. + ``` -LSAN_OPTIONS=detect_leaks=0 +./configure --prefix=$HOME/maestro ``` +#### For CRAY systems +Be sure to use `CC=cc` on the configure line. + +### Configuration options -Cray-specific (jupiter) ------------------------ -Do not use one of the craype-network-ofi\* modules at this time. -Be sure to use CC=cc on the configure line. -libfabric builds nicely with CCE 9.0. +By default, the OFI conductor will be built. To enable the experimental other variants +`--enable-mpi-pool-manager` or `--enable-smp-pool-manager` at configure time (INCOMPLETE). + +We are also including Version 0.1.0 of the Mamba library in deps/mamba, and a snapshot of MIO in deps/mio. + +To use MIO you need to add the `--with-mio` flag at configure time, and have MERO installed on the system. -Subtree project updates ------------------------ -To update a dependency project (like mamba) subtree do +Depending on which branch you build, the default configure arguments may include options to enable the +gcc/llvm address sanitizer. To disable, please configure with `--disable-asan` + +## Build + +Please use + +``` +make ``` -git subtree pull --prefix=deps/mamba --squash git@gitlab.com:cerl/mamba.git master + +If you have the address sanitizer enabled, that may (depending on the compiler) also turn on the leak checker. Since +we are not fully memory leak-free at this point, it may trigger exit-time failures. To disable it, set the environment variable +`LSAN_OPTIONS=detect_leaks=0` + +## Install + +Please use + +``` +make install ``` -CI --- +# Continuous integration We are running gitlab-CI on the master, devel, and mvp branches. If you notice issues you can run the gitlab CI locally using docker and the gitlab-runner tool. diff --git a/Makefile.am b/Makefile.am index 14faf1158dcd005478ecb50df51853ce54e6854d..5f3bba5ee1f2c1dea634bf460ca1a597f5172617 100644 --- a/Makefile.am +++ b/Makefile.am @@ -33,13 +33,13 @@ ACLOCAL_AMFLAGS=-I m4 # automatic version number generation from git version # If you want to force updating the version string you need to run -# autoconf -f +# autoconf -f # in the toplevel dir. We don't do this all the time automatically to # save development time. In a fresh git checkout the initial autoreconf does # the right thing. Of course, building a release should thus be done in a clean # checkout. BUILT_SOURCES = $(top_srcdir)/.version -EXTRA_DIST = $(top_srcdir)/.version +EXTRA_DIST = $(top_srcdir)/.version examples $(top_srcdir)/.version: echo $(VERSION) > $@-t && mv $@-t $@ dist-hook: @@ -54,7 +54,7 @@ maestro: deps tests: maestro lib_LTLIBRARIES = libmaestro.la -libmaestro_la_SOURCES = +libmaestro_la_SOURCES = libmaestro_la_LIBADD = \ maestro/libmaestro_core.la \ protocols/libmaestro_proto.la \ @@ -65,7 +65,7 @@ libmaestro_la_LIBADD = \ # README dist_doc_DATA = README.md -# CMake package +# CMake package cmakepackagedir = $(libdir)/cmake/maestro/ cmakepackage_DATA = MaestroConfig.cmake diff --git a/README.md b/README.md index 78063c06c2f1e2ae97a7b82b04cd32f88ed179b4..9802421e1b0f041c8e0c7785108487831afcc086 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,61 @@ -Maestro Core ------------- +# Description -This repository contains the Maestro Core Library, as developed for D3.2. -It features the Maestro Core API, used by example code and a MVP demonstrator. +Maestro is a data- and memory-aware middleware framework that addresses the ubiquitous problems of data movement in complex memory hierarchies that exist at multiple levels of the HPC software stack. -Installation ------------- +<img alt="Maestro architecture overview image" src="docs/maestro-arch-overview.png"> -Please refer to INSTALL.md -Examples --------- +This repository contains the Maestro Core Library, as developed for `D3.2`. +It features the Maestro Core API, used by example codes and a MVP demonstrator. + +# Installation + +Please refer to [INSTALL.md](INSTALL.md) + +# Usage +Maestro can be executed on various sizes and types of machines from a simple laptop to large HPC clusters. +On Cray systems, please build all binaries on the service nodes (login nodes) and execute on compute nodes. + +## Access an installed Maestro version + +Please include the main `Maestro header` file in your code +``` +#include "maestro.h" +``` + +Please add the `include path` and `library path` of Maestro to the compilation/linking command +``` +-I$(MAESTRO_PATH)/include/maestro -L$(MAESTRO_PATH)/lib -lmaestro +``` +Please `export` the path to Maestro library before running -Please use +``` +export LD_LIBRARY_PATH=$(MAESTRO_PATH)/lib:$LD_LIBRARY_PATH +``` + +where `$(MAESTRO_PATH)` is Maestro install path specified during configuration with `./configure --prefix=$(MAESTRO_PATH)` + +## Unit tests + +Build the unit tests only +``` +make check TESTS= +``` + +Run the unit tests only +``` +make check TESTS +``` + +Build and run the unit tests. This may take some time ``` make check ``` -to build and run the test examples. -This may take some time. -Limits ------- -Maestro-core needs quite a few file descriptors and also wants to lock pages +## Limits + +Maestro requires quite a few file descriptors and also locks pages into memory for RDMA purposes. We try to give a diagnostic message if errors are triggered that may be due to resource constraints. Still, we recommend @@ -33,72 +66,63 @@ ulimit -l 256 to set at least 1024 file descriptors and 256k of RDMA space. +When using a system that uses `PBSPro` or `ALPS` workload manager, please export -Local multithreaded demo (MVP1) -------------------------------- - -MVP1 consists in a local multithreaded demo application. More reading (d3.2) here : -`https://bscw.zam.kfa-juelich.de/bscw/bscw.cgi/2995531` - -Reference version is tagged d3.2-draft, on master branch. - -`make check` also builds the demo executable `demo_mvp_d3_2` in addition to examples, and runs it. -`./run_demo.sh` permits to run the demo alone. - - -Adaptive Transport demo ------------------------ +``` +export APRUN_XFER_LIMITS=1 +``` +before submiting your job to ensure enough limits for Maestro on the compute nodes. -Pool manager interlock demo uses a three application setup, comprising one pool -manager process, and showing GFS and MIO transport. More reading (d5.5 to appear on BSCW) and -information on how to setup a VM to run Mero here: -`https://gitlab.version.fz-juelich.de/maestro/maestro-mero-vm` +## Fabric provider choice/ High-Performance interconnect usage -Reference version is tagged d5.5-review, on master branch. +Maestro isolates the user from the multitude of network provider choices by using +`libfabric`, and transparently choosing 'the best' connectivity between components. Unfortunately this +functionality is not fully working, due to issues in the upstream `libfabric` code, and in incomplete testing +of our usage of it. -The pool manager interlock demonstration `./tests/check_pm_interlock.sh` is -automatically launched with make check. +Please use +``` +export FI_PROVIDER=provider +``` +to force a specific fabric provider. -fabric provider choice/ High-Performance Interconnect usage ------------------------------------------------------------ +### List of supported providers -Maestro-core is trying hard to isolate the user from the multitude of network provider choices by using -libfabric, and transparently choosing 'the best' connectivity between components. Unfortunately this -functionality is not fully working, due to issues in the upstream libfabric code, and in incomplete testing -of our usage of it. -The safest (and lowest performance) connectivity is provided by the `sockets` provider. You can force usage of -that by setting +| Provider | Support | +| :------------ |:----------------------------------| +| sockets | TCP/IP networking | +| verbs | Infiniband networks and slingshot | +| gni | Cray Areis networks | +| psm2 | Intel Omni-Path networks | + + +You can execute ``` -FI_PROVIDER=sockets +./Maestro-source-dir/deps/libfabric/util/fi_info ``` +to see all the discovered providers by libfabrics, where `Maestro-source-dir` is the directory containing Maestro source. + +The safest (and lowest performance) connectivity is provided by the `sockets` provider. It should work on almost any network that can support TCP/IP networking, including ethernet, IB, and GNI (Aries). -in your environment. It should work on most any network that can support TCP/IP -networking, including ethernet, IB, and GNI (Aries). Usage of the `tcp` and `tcp;ofi_rxm` provider is currently broken, an upstream issue is open. On Cray XC systems the GNI (Aries) provider is supported. If you compile with -the `rdma-credentials` and `gni-headers` modules loaded the GNI provider should +the `rdma-credentials` and `gni-headers` modules loaded, the GNI provider should be autoselected if a GNI NIC is found at runtime. NOTE that GNI NICs on login nodes typically do not work, due to a limitation of -the libfabric/gni driver, so you will have to run your application exclusively +the `libfabric/gni` driver, so you will have to run your application exclusively on compute nodes, or manually switch the components running on login nodes to the sockets provider. -The GNI driver can be forced by setting -``` -FI_PROVIDER=gni -``` - - -If you are using GNI you will implicitly be using Cray libdrc, a mechanism to obtain network -authentication tokens. Maestro-core is requesting workflow-level tokens that even +If you are using GNI you will implicitly be using Cray `libdrc`, a mechanism to obtain network +authentication tokens. Maestro core is requesting workflow-level tokens that even support running multiple components of a workflow from different user IDs. In some cases the system may run out of tokens, and there is no user-level token inquiry tool available. If you see failure of GNI startup, try running your application with @@ -115,20 +139,86 @@ LIBDRC:CORE:DEBUG rdmacred.c:658 - finished acquire request, rc=-28 If you see this, contact your system admin to clear cached DRC credentials. +## Examples + +Examples can be found in `/examples` directory. It includes currently one simple example, `single_node_pool_op.c`. It is a multi-threaded application (pthread) consists of a producer thread, and two consumer threads. `single_node_pool_op.c` is based on d3.2 of Maestro project, more reading (d3.2) [here](https://bscw.zam.kfa-juelich.de/bscw/bscw.cgi/2995531). + +To build the example, please use + +``` +make MAESTRO=$(MAESTRO_PATH) +``` + +Example paramters, such as `num_producers`, `num_consumers`, `num_archivers`, `cdo_size`, and `cdo_count`, can be configured using `single_node_pool_op_config.yaml` file. + +Before executing the binary, please `export LD_LIBRARY_PATH=$(MAESTRO_PATH)/lib:$LD_LIBRARY_PATH` and then + +``` +./single_node_pool_op.o +``` +## Demos + +### Local multithreaded demo (MVP1) + +MVP1 consists in a local multithreaded demo application. More reading (d3.2) [here](https://bscw.zam.kfa-juelich.de/bscw/bscw.cgi/2995531) + +Reference version is tagged `d3.2-draft`, on master branch. + +`make check` also builds the demo executable `demo_mvp_d3_2` in addition to examples, and runs it. +`./run_demo.sh` permits to run the demo alone. + + +### Adaptive Transport demo + + +Pool manager interlock demo uses a three application setup, comprising one pool +manager process, and showing GFS and MIO transport. More reading (`d5.5` to appear on BSCW) and +information on how to setup a VM to run Mero [here](https://gitlab.version.fz-juelich.de/maestro/maestro-mero-vm) + +Reference version is tagged `d5.5-review`, on master branch. + +The pool manager interlock demonstration [check_pm_interlock.sh](./tests/check_pm_interlock.sh) is +automatically launched with make check. -Documentation -------------- -Doxygen documentation is available and compiled in `./docs` folder. +# Documentation -Common issues/FAQs ------------------- +Doxygen documentation is available and compiled in [docs](./docs) folder. + +# Common issues/FAQs * If you have many network interfaces/many addresses assigned to an interface - (may happen with IPv6 rather suddenly) the libfabric setup of the pool - manager may hit 'too many open files/errno=-24' issues. Check `ulimit -n`, + (may happen with `IPv6` rather suddenly) the `libfabric` setup of the pool + manager may hit `too many open files/errno=-24` issues. Check `ulimit -n`, and increase the limit. -* If you see clients stuck at JOIN time while everything else looks good, there +* If you see clients stuck at `JOIN` time while everything else looks good, there is a chance that your firewalling intercepts the packages. + +* ##### Is it safe to invalidate the pointer to data wrapped by an offered cdo, given that ownership has passed to maestro core? (We keep hold onto the cdo handle itself of course.) + No, the allocation that was captured in the cdo handle must not be touched until after DISPOSE. Of course you can forget the pointer you have, but you must not re-use the allocation or free it. + +* ##### Does a producer have to check whether an offered cdo has been consumed (DEMANDed), and wait until it is, before calling (withdraw followed by) dispose? + A producer cannot directly figure that out (unless you do complicated event ops). The idea is: The consumer must submit the REQUIRE before the WITHDRAW occurs. This can be accomplished by + 1. pre-posting the REQUIRE, or + 2. by posting it after observing an OFFER:before or OFFER:after event (for safety with a 'require-ack' flag or any earlier event, like DECLARE or SEAL), + 3. by posting it in a WITHDRAW:before with require-ack set. + In all these cases Maestro will ensure that the REQUIRE can be satisfied, by taking a copy (more or less eagerly, this is to be tuned], or by blocking WITHDRAW. + +* ##### Should withdraw be needed at all if an offered cdo is consumed by another application? + Every OFFER must be followed by WITHDRAW (and DISPOSE); every REQUIRE must be followed by RETRACT or DEMAND (and DISPOSE). Remember that one OFFER can satisfy many REQUIRES for the same CDO; WITHDRAW indicates that you're no longer ready to do so (and maestro needs to ensure outstanding REQUIRES can still be satisfied if their DEMAND comes in) + +* ##### Should dispose block until an offered cdo is consumed by another? Or will it only block if there has already been a require posted? + WITHDRAW may block if maestro decides that it cannot or does not want to take a copy and there is at least one outstanding REQUIRE for the CDO, or a DEMAND is still in progress. DISPOSE should never block (but may take some time -- but not related to the pool protocol). + + +# Authors and acknowledgment + +[Data Orchestration in High Performance Computing](https://www.maestro-data.eu/) project has received funding from the European Union’s Horizon 2020 research and innovation program through grant agreement 801101. + +# License + +[BSD 3-clause License](https://gitlab.com/cerl/maestro/maestro-core/-/blob/devel/LICENSE) + + diff --git a/docs/maestro-arch-overview.png b/docs/maestro-arch-overview.png new file mode 100644 index 0000000000000000000000000000000000000000..99e98dbd94ae2f85692a33ed2a198ad8c085cbc3 Binary files /dev/null and b/docs/maestro-arch-overview.png differ diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..5094eed71ecd2d4c4e4c740137de410bf0abb504 --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,30 @@ +MAESTRO ?=/usr/local/ + +CC ?= gcc +MPICC ?= mpicc + +INC=-I$(MAESTRO)/include/maestro +LDFLAGS=-L$(MAESTRO)/lib + +LIBS=-lmaestro +LIBS_OMP=-fopenmp -lmaestro + +CFLAGS=-O3 + +OBJs = core_bench local_pool_op + + +all: local_pool_op core_bench + +%: %.c + $(CC) $(CFLAGS) $(INC) $< $(LDFLAGS) $(LIBS) -o $@ + +%.o: %.c + $(MPICC) -c $(CFLAGS) $(INC) $< -o $@ + +core_bench: omp_injector.o omp_consumer.o core_benchmark.o + $(MPICC) $(LDFLAGS) $^ $(LIBS_OMP) -o $@ + + +clean: + rm -f $(OBJs) *.o *CDO* diff --git a/examples/bechmark_attributes.yaml b/examples/bechmark_attributes.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c2be382888ed3106e575fea7286edd4bf63cabfe --- /dev/null +++ b/examples/bechmark_attributes.yaml @@ -0,0 +1,220 @@ +# A user-defined schema. The minimum is name and version +schema-name: Benchmark Attributes +schema-version: 0 + +schema-namespace: ".maestro.benchmark." + +# attributes section is optional; if given it needs to have a sequence value +maestro-attributes: + +# Top-level attributes + + - key: "attrib_1" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_2" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_3" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_4" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_5" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_6" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_7" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_8" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_9" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_10" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_11" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_12" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_13" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_14" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_15" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_16" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_17" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_18" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_19" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_20" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_21" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_22" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_23" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_24" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_25" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_26" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_27" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_28" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_29" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_30" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_31" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_32" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_33" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_34" + type: str() + required: False + default: "" + documentation: Value length is the attribute size + + - key: "attrib_35" + type: str() + required: False + default: "" + documentation: Value length is the attribute size diff --git a/examples/core_benchmark.c b/examples/core_benchmark.c new file mode 100644 index 0000000000000000000000000000000000000000..dafbf70135772023c464be41cc30c427917bf8c3 --- /dev/null +++ b/examples/core_benchmark.c @@ -0,0 +1,356 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief benchmark core CDO operations, i.e., declare, offer, require, demand, withdraw, and dispose. Using multiple producers and consumers. This benchmark demonstrates also how to write an MPI+OpenMP application with Maestro. + **/ + +/* + * Copyright (C) 2021 Hewlett Packard Enterprise + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "maestro.h" +#include <assert.h> +#include "maestro/logging.h" +#include <string.h> +#include "omp.h" +#include "omp_injector.h" +#include "omp_consumer.h" +#include "mpi.h" + + +#define BENCHMARK_ATTRIBUTES_YAML "./bechmark_attributes.yaml" +#define MAX_NUM_ATTRIBUTES 35 + + + +/* simplify logging */ +#define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + +int main(int argc, char *argv[]) +{ + + int injector_id = 1; // could come from MPI_COMM_rank + size_t num_attributes = 0; + size_t size_attributes = 0; + size_t CDOs_per_thread = 10000; // 10k CDOs/thread by default + size_t nConsumers = 0; // number of consumers + size_t size_CDO = 0; // size of CDOs + mstro_status status = MSTRO_OK; // global status + + + int rank, size; + + // read arguments + if (argc > 1) + { + num_attributes =atoi(argv[1]); + if (num_attributes > MAX_NUM_ATTRIBUTES) + { + printf("Maximum number of supported attributes in this benchmark is %d \n", MAX_NUM_ATTRIBUTES); + num_attributes = MAX_NUM_ATTRIBUTES; + } + } + if (argc > 2) + { + size_attributes =atoi(argv[2]); + } + if (argc > 3) + { + CDOs_per_thread =atoi(argv[3]); + } + if (argc > 4) + { + nConsumers =atoi(argv[4]); + } + if (argc > 5) + { + size_CDO =atoi(argv[5]); + } + + /* save print time: */ + setenv("MSTRO_LOG_LEVEL","0",1); + + + // export the path to the custom scehma + setenv("MSTRO_SCHEMA_LIST",BENCHMARK_ATTRIBUTES_YAML,1); + + // read num of threads + size_t num_threads = 1; + // Read environment variables + if (getenv("OMP_NUM_THREADS") != NULL) + { + num_threads = atoi(getenv("OMP_NUM_THREADS")); + } + + /*start MPI ... if number of ranks is 1, then start only injector component + that will work with local pool manager*/ + MPI_Init(NULL,NULL); + MPI_Comm_rank (MPI_COMM_WORLD, &rank); + MPI_Comm_size (MPI_COMM_WORLD, &size); + + if (size == 1) + { + // start producer + status = start_injector(rank, num_attributes, size_attributes, CDOs_per_thread); + } + else + { + /*If number of ranks is greater than 1, + 1. start a pool manager component + 2. broadcast PM info + 3.look for the num of producers and consumers and start them + */ + if (rank == 0) + { + // start pool manager --- workflow, component, id + status = mstro_init("Tests", "Pool_Manager", 0); + if(status!=MSTRO_OK) { + ERR("Maestro Pool Manager: Failed to initialize Maestro: %d (%s)\n", + status, mstro_status_description(status)); + // panic and fail + MPI_Abort(MPI_COMM_WORLD, -1); + } + status = mstro_pm_start(); + if(status!=MSTRO_OK) { + ERR("Simple Maestro Pool Manager: Failed to start pool: %d (%s)\n", + status, mstro_status_description(status)); + + // panic and fail + MPI_Abort(MPI_COMM_WORLD, -1); + } + + char *info = NULL; + status = mstro_pm_getinfo(&info); + + if(status!=MSTRO_OK) { + ERR("Simple Maestro Pool Manager: Failed to obtain pool contact info: %d (%s)\n", + status, mstro_status_description(status)); + + // panic and fail + MPI_Abort(MPI_COMM_WORLD, -1); + } + + assert(status == MSTRO_OK); + + // get size of PM info and broadcast it + int info_size = strlen(info); + MPI_Bcast(&info_size,1,MPI_INT, 0, MPI_COMM_WORLD); + //broadcast the PM info + MPI_Bcast(info,info_size,MPI_CHAR, 0, MPI_COMM_WORLD); + + //sync producers and conumers --otherwise producers may widthraw CDOs before consumed + MPI_Barrier(MPI_COMM_WORLD); + + } + else if ((rank > 0) && (rank <= nConsumers)) + { + double require_time, demand_time, before, after; + // get PM info + int info_size; + MPI_Bcast(&info_size,1,MPI_INT, 0, MPI_COMM_WORLD); + //get the PM info + char *info = (char *) malloc(info_size*sizeof(char)); + MPI_Bcast(info,info_size,MPI_CHAR, 0, MPI_COMM_WORLD); + + // put it in the environment + setenv("MSTRO_POOL_MANAGER_INFO",info,1); + + + //get consumer mode + int consumer_mode = get_consumer_mode(); + printf("[Consumer %d] consumer mode = %d \n",rank, consumer_mode); + // get the number of producers assigned to me + int num_producers = get_num_producers(size, nConsumers, consumer_mode); + printf("[Consumer %d] num_producers = %d \n",rank, num_producers ); + // allocate producers list + int *producers_ids = (int *) malloc(sizeof(int)*num_producers); + // calculate my assigned producers + get_producers(rank, size, nConsumers, producers_ids, num_producers, consumer_mode); + + + for (size_t i = 0; i < num_producers; i++) { + printf("producer: %d \n", producers_ids[i]); + } + + // create CDOs array + size_t num_CDOs = CDOs_per_thread * num_threads*num_producers; // CDOs per producer * (num_thread) * num_producers + printf("[Consumer %d] num_CDOs = %zu \n",rank, num_CDOs ); + + mstro_cdo cdos[num_CDOs]; + + + + // start consumers + status = mstro_init("Tests","Consumer",rank); + assert(MSTRO_OK == status); + + + // Require CDOs + before = omp_get_wtime(); + // declare CDOs loop + + status = require_CDOs(cdos, num_CDOs, producers_ids, num_producers, num_attributes, size_attributes); + assert(MSTRO_OK == status); + after = omp_get_wtime(); + require_time = (after - before) * 1000.0*1000.0; //time in us seconds + + fprintf(stdout, "Throughput (declare/require): %.5f us\n", require_time/(double) num_CDOs); + + + //sync producers and consumers -- avoid CDOs been withdrawn before require + MPI_Barrier(MPI_COMM_WORLD); + + + // Demand CDOs + before = omp_get_wtime(); // restart timer here + status = demand_CDOs(cdos, num_CDOs); + assert(MSTRO_OK == status); + after = omp_get_wtime(); + demand_time = (after - before) * 1000.0*1000.0; //time in us seconds + fprintf(stdout, "Throughput (demand/dispose): %.5f us\n", demand_time/(double) num_CDOs); + + + // finalize Maestro + status = mstro_finalize(); + assert(status == MSTRO_OK); + + //clean up + if (producers_ids != NULL) + {free(producers_ids);} + + + + } + else + { + double declare_time,withdraw_time, before, after; + + // get PM info + int info_size; + MPI_Bcast(&info_size,1,MPI_INT, 0, MPI_COMM_WORLD); + //get the PM info + char *info = (char *) malloc(info_size*sizeof(char)); + MPI_Bcast(info,info_size,MPI_CHAR, 0, MPI_COMM_WORLD); + + // put it in the environment + setenv("MSTRO_POOL_MANAGER_INFO",info,1); + + size_t num_CDOs = CDOs_per_thread * num_threads; // 1K per producer * (num_thread) + + status = mstro_init("Tests","Injector",rank); + + assert(MSTRO_OK == status); + + // create CDOs array + mstro_cdo cdos[num_CDOs]; + + // create CDOs DATA + char *CDO_data[num_CDOs]; + if (size_CDO != 0) + { + for (size_t i = 0; i < num_CDOs; i++) { + //allocate data + posix_memalign((void**) &CDO_data[i], (size_t) sysconf(_SC_PAGESIZE), sizeof(char)*size_CDO); + //fill data + fill_char_array(CDO_data[i], size_CDO); + } + + } + + before = omp_get_wtime(); + // declare CDOs loop + status = inject_CDOs(rank, cdos, num_CDOs, num_attributes, size_attributes, size_CDO, CDO_data); + + assert(MSTRO_OK == status); + + after = omp_get_wtime(); + + declare_time = (after - before) * 1000.0*1000.0; //time in us seconds + + fprintf(stdout, "#CDOs: %zu, #Threads: %zu, #Attributes: %zu, Size of attributes: %zu \n", num_CDOs, num_threads, num_attributes, size_attributes); + fprintf(stdout, "Throughput (declare/offer): %.5f us\n", declare_time/(double) num_CDOs); + + //sync producers and consumers -- avoid CDOs been withdrawn before require + MPI_Barrier(MPI_COMM_WORLD); + + before = omp_get_wtime(); // restart timer here + // withdraw CDOs loop + status = withdraw_CDOs(cdos, num_CDOs); + + assert(MSTRO_OK == status); + + after = omp_get_wtime(); + + withdraw_time = (after - before) * 1000.0*1000.0; //time in us seconds + fprintf(stdout, "Throughput (withdraw/dispose): %.5f us\n", withdraw_time/(double) num_CDOs); + + status = mstro_finalize(); + + assert(status == MSTRO_OK); + + } + + + + } + + + // wait for termination otherwise PM will exist before producers and consumers + MPI_Barrier(MPI_COMM_WORLD); + + // terminate pool manager + if ((rank == 0) && (size > 1)) + { + status = mstro_pm_terminate(); + if(status!=MSTRO_OK) { + ERR("Simple Maestro Pool Manager: Failed to shut down pool: %d (%s)\n", + status, mstro_status_description(status)); + + // panic and fail + MPI_Abort(MPI_COMM_WORLD, -1); + } + + status = mstro_finalize(); + if(status!=MSTRO_OK) { + ERR("Simple Maestro Pool Maestro: failed to terminate: %d (%s)\n", + status, mstro_status_description(status)); + + // panic and fail + MPI_Abort(MPI_COMM_WORLD, -1); + } + } + + MPI_Finalize(); + + return 0; +} diff --git a/examples/local_pool_op.c b/examples/local_pool_op.c new file mode 100644 index 0000000000000000000000000000000000000000..bb3410620fcb81c51d7fb0e90ceb38826d1ceaaa --- /dev/null +++ b/examples/local_pool_op.c @@ -0,0 +1,1060 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief Single node pool operations example + **/ + +/* + Driver + + - A multi-threaded application (pthread) consisting of + + - A proxy for the workflow component (WP4) (work description) + + - A producer application + + - two consumer applications C1 and C2 + + - The producer creates CDOs (type -,1,2) + + - The consumer C1 (consumer) retrieves the CDOs and performs computations on it, then drops them + + - The consumer C2 (archiver) writes out the CDOs to permanent storage + + + Maestro Pool MVP + + - Single node pool operation + + - CDO type 0, 1 and 2 declarations + + - Basic Offer-Demand functionality + + - Task coupling / CDO resolution through the pool + + - DEMAND operations with user-allocated buffers + + - Basic interface to Mamba for type 1,2 CDOs + */ + +/** Implementation: + * main thread: + * - constructs work description + * - posts (announces) a work description CDO for all to see. + * - starts threads for consumer, archiver tasks + * - posts CONSUMER_READY REQUIRE + * -- if multiple consumers, multiple such CDOs + * - posts ARCHIVER_READY REQUIRE + * -- if multiple archivers, multiple such CDOs + * - starts producer + * - waits for threads to finish + * - withdraws work description CDO + * - quits + * + * archiver: + * - waits for work description + * - posts CDO REQUIRES for all CDOs in the list + * - demands them one by one + * - writes them to disk (file name = CDO name) using Mamba tiling interface + * - disposes them + * -- when multiple archive threads run, archiver i + * handles CDOs with index%num_archivers==i + * + * consumer: + * - waits for work description + * - posts CDO REQUIRES for all CDOS in the list + * - demands them one by one + * - computes a checksum on the data, prints it + * - disposes them + * -- when multiple consumer threads run, consumer i + * handles CDOs with index%num_consumers==i + * + * producer: + * - waits for work description + * - posts CDO OFFER for all CDOs + * - posts CDO WITHDRAW for all CDOs + * - disposes them + * -- when multiple producer threads run, producer i + * handles CDOs with index%num_consumers==i + * + */ +/* + * Copyright (C) 2019 Cray Computer GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "maestro.h" +#include "maestro/logging.h" + +#include "mamba.h" +#include "mmb_tile_iterator.h" + +#include <cyaml.h> + +#include <stdlib.h> +#include <pthread.h> +#include <assert.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + + +/* simplify logging */ +#define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + + + +/** name template of data CDOs sent around */ +#define CDO_NAME_TEMPLATE "CDO %zu" +/** UB on length of CDO_NAME_TEMPLATE */ +#define CDO_NAME_MAX (strlen(CDO_NAME_TEMPLATE)+10) + +/** maximum number of CDOs in one run */ +#define CDO_COUNT_MAX 100000 + +/** A structure holding the workflow configuration */ +struct cdo_announcement { + size_t num_producers; /** number of producer threads */ + size_t num_consumers; /** number of consumer threads */ + size_t num_archivers; /** number of archiver threads */ + int64_t cdo_size; /** size of each CDO */ + size_t num_entries; /** number of CDOs */ + char *cdo_names[CDO_COUNT_MAX]; /** string names of the CDOs */ +}; + +/** A suitable scope/layout/size string for the announcement */ +char *announcement_scope_string; + +/** A CDO name for the work announcement */ +#define ANNOUNCEMENT_TOKEN_NAME "CDO Announcement" + +/** A CDO name template for 'Consumer ready' */ +#define CONSUMER_READY_NAME_TEMPLATE "Consumer Ready %zu" + +/** A CDO name template for 'Archiver ready' */ +#define ARCHIVER_READY_NAME_TEMPLATE "Archiver Ready %zu" + +#define READY_NAME_MAX 32 + +/** A rigid version of the workflow configuration for cyaml to fill */ +struct mvp_config_ +{ + int num_producers; + int num_consumers; + int num_archivers; + int cdo_size; + int cdo_count; +}; + +typedef struct mvp_config_ * mvp_config; + +/** cyaml schema */ +static const cyaml_schema_field_t top_mapping_schema[] = { + CYAML_FIELD_INT("num_producers", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL, + struct mvp_config_, num_producers), + CYAML_FIELD_INT("num_consumers", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL, + struct mvp_config_, num_consumers), + CYAML_FIELD_INT("num_archivers", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL, + struct mvp_config_, num_archivers), + CYAML_FIELD_INT("cdo_size", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL, + struct mvp_config_, cdo_size), + CYAML_FIELD_INT("cdo_count", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL, + struct mvp_config_, cdo_count), + CYAML_FIELD_END +}; + +/* cyaml value schema for the top level mapping. */ +static const cyaml_schema_value_t top_schema = { + CYAML_VALUE_MAPPING(CYAML_FLAG_POINTER, + struct mvp_config_, top_mapping_schema), +}; + +/** Basic cyaml config */ +static const cyaml_config_t config = { + .log_level = CYAML_LOG_WARNING, /* Logging errors and warnings only. */ + .log_fn = cyaml_log, /* Use the default logging function. */ + .mem_fn = cyaml_mem, /* Use the default memory allocator. */ +}; + +/** Call cyaml to parse config file */ +int +mvp_config_parse(mvp_config* handle, const char* filename) +{ + if (handle == NULL) return MSTRO_INVARG; + + cyaml_err_t err; + err = cyaml_load_file(filename, &config, + &top_schema, (cyaml_data_t **)handle, NULL); + if (err != CYAML_OK) { + fprintf(stderr, "ERROR: %s (file %s)\n", cyaml_strerror(err), filename); + return -1; + } + + return 0; +} + +/** announcement message receive: */ +static void +wait_for_announcement(struct cdo_announcement *announcement) +{ + mstro_status s; + mstro_cdo announcement_cdo; + /* start by waiting for the announcement */ + s = mstro_cdo_declare(ANNOUNCEMENT_TOKEN_NAME, + MSTRO_ATTR_DEFAULT, + &announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to declare announcement CDO\n"); + abort(); + } + s = mstro_cdo_attribute_set(announcement_cdo, + MSTRO_ATTR_CORE_CDO_RAW_PTR, + announcement); + if(s!=MSTRO_OK) { + ERR("Failed to set raw-ptr attribute on announcement CDO\n"); + abort(); + } + + s = mstro_cdo_attribute_set_yaml(announcement_cdo, + announcement_scope_string); + if(s!=MSTRO_OK) { + ERR("Failed to set raw-ptr size attribute on announcement CDO\n"); + abort(); + } + + s = mstro_cdo_require(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to require announcement CDO\n"); + abort(); + } + + s = mstro_cdo_demand(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to withdraw announcement CDO\n"); + abort(); + } + + s = mstro_cdo_dispose(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to dispose announcement CDO\n"); + abort(); + } + + return; +} + +static void +do_announce(struct cdo_announcement *announcement, mstro_cdo *result) +{ + mstro_status s; + mstro_cdo announcement_cdo; + /* start by waiting for the announcement */ + s = mstro_cdo_declare(ANNOUNCEMENT_TOKEN_NAME, + MSTRO_ATTR_DEFAULT, + &announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to declare announcement CDO\n"); + abort(); + } + s = mstro_cdo_attribute_set(announcement_cdo, + MSTRO_ATTR_CORE_CDO_RAW_PTR, + announcement); + + if(s!=MSTRO_OK) { + ERR("Failed to set raw-ptr attribute on announcement CDO\n"); + abort(); + } + s = mstro_cdo_attribute_set_yaml(announcement_cdo, + announcement_scope_string); + if(s!=MSTRO_OK) { + ERR("Failed to set raw-ptr size attribute on announcement CDO\n"); + abort(); + } + + s = mstro_cdo_offer(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to offer announcement CDO\n"); + abort(); + } + + *result = announcement_cdo; +} + +static void +withdraw_announcement(mstro_cdo announcement_cdo) +{ + mstro_status s; + + s = mstro_cdo_withdraw(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to withdraw announcement CDO\n"); + abort(); + } + + s = mstro_cdo_dispose(announcement_cdo); + if(s!=MSTRO_OK) { + ERR("Failed to dispose announcement CDO\n"); + abort(); + } + + return; +} + +/* do the equivalent of a sem_post using the given CDO name */ +static void +cdo_sem_post(const char *name, mstro_cdo *sem) +{ + mstro_cdo cdo; + mstro_status s; + /* use type 0 CDO */ + s = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, + &cdo); + s |= mstro_cdo_offer(cdo); + if(s!=MSTRO_OK) { + ERR("CDO sem post failed\n"); + abort(); + } + *sem = cdo; +} + +static void +cdo_sem_post_cleanup(mstro_cdo sem) +{ + mstro_status s = mstro_cdo_withdraw(sem); + s |= mstro_cdo_dispose(sem); + + if(s!=MSTRO_OK) { + ERR("CDO sem post cleanup failed\n"); + abort(); + } +} + +/* do the equivalent of a sem_wait using the given CDO name */ +static void +cdo_sem_wait(const char *name) +{ + mstro_cdo cdo; + mstro_status s; + /* use type 0 CDO */ + s = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, + &cdo); + s |= mstro_cdo_require(cdo); + s |= mstro_cdo_demand(cdo); + s |= mstro_cdo_dispose(cdo); + + if(s!=MSTRO_OK) { + ERR("CDO sem wait failed\n"); + abort(); + } + +} +/* Quickly flash the "Hello" token */ +void +declare_archiver_ready(size_t index, mstro_cdo *cdo) +{ + char name[READY_NAME_MAX]; + size_t s = snprintf(name, READY_NAME_MAX, + ARCHIVER_READY_NAME_TEMPLATE, index); + if(s>=READY_NAME_MAX) { + ERR("Failed to construct name"); + abort(); + } + cdo_sem_post(name, cdo); +} + +/* Block on a "Hello" token */ +void +wait_archiver_ready(size_t index) +{ + char name[READY_NAME_MAX]; + size_t s = snprintf(name, READY_NAME_MAX, + ARCHIVER_READY_NAME_TEMPLATE, index); + if(s>=READY_NAME_MAX) { + ERR("Failed to construct name"); + abort(); + } + cdo_sem_wait(name); +} + +void +declare_consumer_ready(size_t index, mstro_cdo *cdo) +{ + char name[READY_NAME_MAX]; + size_t s = snprintf(name, READY_NAME_MAX, + CONSUMER_READY_NAME_TEMPLATE, index); + if(s>=READY_NAME_MAX) { + ERR("Failed to construct name"); + abort(); + } + cdo_sem_post(name,cdo); +} + +void +wait_consumer_ready(size_t index) +{ + char name[READY_NAME_MAX]; + size_t s = snprintf(name, READY_NAME_MAX, + CONSUMER_READY_NAME_TEMPLATE, index); + if(s>=READY_NAME_MAX) { + ERR("Failed to construct name"); + abort(); + } + cdo_sem_wait(name); +} + + +static void +archiver_flush_to_disk(const char *name, mmbArray *a) +{ + size_t chunkdims = 4096; + mmbDimensions chunks = {1, &chunkdims}; + mmbError stat = mmb_array_tile(a, &chunks); + if(stat != MMB_OK) { + ERR("Failed to tile mamba array (chunked tiles)\n"); + abort(); + } + + // Loop over tiles + mmbTileIterator* it; + stat = mmb_tile_iterator_create(a, &it); + if(stat != MMB_OK) { + ERR("Failed to get tile iterator\n"); + abort(); + } + + size_t nt; + stat = mmb_tile_iterator_count(it, &nt); + if(stat != MMB_OK) { + ERR("Failed to get tile iterator count\n"); + abort(); + } + stat = mmb_tile_iterator_first(it); + if(stat != MMB_OK) { + ERR("Failed to move tile iterator to first\n"); + abort(); + } + FILE *dst = fopen((const char*)name, "w"); + if(dst==NULL) { + ERR("Failed to open %s for writing\n", name); + abort(); + } + for(size_t i = 0; i < nt; i++){ + mmbArrayTile* tile = it->tile; + size_t count = fwrite(&MMB_IDX_1D(tile, tile->lower[0], char), + 1, + tile->upper[0]-tile->lower[0], + dst); + if(count!= tile->upper[0]-tile->lower[0]) { + ERR("Incomplete write on tile %d of %s\n", i, name); + abort(); + } + stat = mmb_tile_iterator_next(it); + if(stat != MMB_OK) { + ERR("Failed to increment tile iterator\n"); + abort(); + } + } + if(0!=fclose(dst)) { + ERR("Failed to close %s after writing\n", name); + abort(); + } + stat = mmb_tile_iterator_destroy(it); + if(stat != MMB_OK) { + ERR("Failed to free tile iterator\n"); + abort(); + } + + // Remove tiling + stat = mmb_array_untile(a); + if(stat != MMB_OK) { + ERR("Failed to untile mamba array\n"); + abort(); + } + +} + +/* + * archiver: + * - waits for work description + * - posts CDO REQUIRES for all CDOs in the list + * - demands them one by one + * - writes them to disk (file name = CDO name) using Mamba tiling interface + * - disposes them + * -- when multiple archive threads run, archiver i + * handles CDOs with index%num_archivers==i + */ +void* +archiver_thread_fun(void *closure) +{ + mstro_status s; + size_t my_idx = * (size_t*)closure; + INFO("Archiver %zu starting\n", my_idx); + + struct cdo_announcement *announcement + = malloc(sizeof(struct cdo_announcement)); + if(announcement==NULL) { + ERR("Failed to allocate for incoming announcement\n"); + abort(); + } + + wait_for_announcement(announcement); + + mstro_cdo incoming[announcement->num_entries]; + void *incoming_buffers[announcement->num_entries]; + + /* declare and require all that we are responsible for */ + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_archivers==my_idx) { + /* declare it */ + s = mstro_cdo_declare(announcement->cdo_names[i], + MSTRO_ATTR_DEFAULT, + &(incoming[i])); + if(s!=MSTRO_OK) { + ERR("Failed to declare CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + /* add allocation */ + incoming_buffers[i] = malloc(announcement->cdo_size); + if(incoming_buffers[i]==NULL) { + ERR("Cannot allocate CDO buffer for archiving\n"); + abort(); + } + s = mstro_cdo_attribute_set(incoming[i], + MSTRO_ATTR_CORE_CDO_RAW_PTR, + incoming_buffers[i]); + s |= mstro_cdo_attribute_set(incoming[i], + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &announcement->cdo_size); + // INFO("archiver cdo %d incoming buffer %p\n", i, incoming_buffers[i]); + + if(s!=MSTRO_OK) { + ERR("Failed to add buffer to CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + /* require it */ + s = mstro_cdo_require(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to require CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + } + } + /* send ACK that we're ready */ + INFO("Declaring Archiver %zu ready\n", my_idx); + mstro_cdo ready_cdo; + declare_archiver_ready(my_idx,&ready_cdo); + + /* process all in some order */ + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_archivers==my_idx) { + s = mstro_cdo_demand(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to demand CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + + /* extract mamba pointer */ + mmbArray *mamba_array; + s = mstro_cdo_access_mamba_array(incoming[i], &mamba_array); + if(s!=MSTRO_OK) { + ERR("Failed to extract mamba array from CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + + /* do write-out */ + archiver_flush_to_disk(announcement->cdo_names[i], mamba_array); + + s = mstro_cdo_dispose(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to dispose CDO %s after archiving\n", + announcement->cdo_names[i]); + abort(); + } + free(incoming_buffers[i]); + } + } + + cdo_sem_post_cleanup(ready_cdo); + free(announcement); + return NULL; +} + +void buf_fill_rand(const char* name, char* b, int size) +{ + size_t i; + for (i=0; i<size/sizeof(char); i++) + b[i] = (unsigned char)rand()%256; +} + +/* + * producer: + * - waits for work description + * - posts CDO OFFER for all CDOs + * - posts CDO WITHDRAW for all CDOs + * - disposes them + * -- when multiple producer threads run, producer i + * handles CDOs with index%num_consumers==i + */ +void* +producer_thread_fun(void *closure) +{ + mstro_status s; + size_t my_idx = * (size_t*)closure; + INFO("Producer %zu starting\n", my_idx); + + struct cdo_announcement *announcement + = malloc(sizeof(struct cdo_announcement)); + if(announcement==NULL) { + ERR("Failed to allocate for incoming announcement\n"); + abort(); + } + + wait_for_announcement(announcement); + + /* produce */ + mstro_cdo outgoing[announcement->num_entries]; + void *outgoing_buffers[announcement->num_entries]; + + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_producers==my_idx) { + /* declare it */ + s = mstro_cdo_declare(announcement->cdo_names[i], + MSTRO_ATTR_DEFAULT, + &(outgoing[i])); + if(s!=MSTRO_OK) { + ERR("Failed to declare outgoing CDO %s\n", + announcement->cdo_names[i]); + abort(); + } + /* add allocation */ + outgoing_buffers[i] = malloc(announcement->cdo_size); + if(outgoing_buffers[i]==NULL) { + ERR("Cannot allocate outgoing CDO buffer\n"); + abort(); + } + /* add some data */ + buf_fill_rand(announcement->cdo_names[i], + outgoing_buffers[i], announcement->cdo_size); + + s = mstro_cdo_attribute_set(outgoing[i], + MSTRO_ATTR_CORE_CDO_RAW_PTR, + outgoing_buffers[i]); + s |= mstro_cdo_attribute_set(outgoing[i], + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &announcement->cdo_size); + + if(s!=MSTRO_OK) { + ERR("Failed to add outgoing buffer to CDO %s\n", + announcement->cdo_names[i]); + abort(); + } + /* OFFER it */ + s = mstro_cdo_offer(outgoing[i]); + if(s!=MSTRO_OK) { + ERR("Failed to offer CDO %s\n", + announcement->cdo_names[i]); + abort(); + } + } + } + /* now claim them back */ + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_producers==my_idx) { + s = mstro_cdo_withdraw(outgoing[i]); + if(s!=MSTRO_OK) { + ERR("Failed to demand outgoing CDO %s\n", + announcement->cdo_names[i]); + abort(); + } + + s = mstro_cdo_dispose(outgoing[i]); + if(s!=MSTRO_OK) { + ERR("Failed to dipose outgoing CDO %s\n", + announcement->cdo_names[i]); + abort(); + } + free(outgoing_buffers[i]); + } + } + + free(announcement); + return NULL; +} + + +void +mvp_checksum(const char* name, void* rawptr, uint64_t size) +{ + unsigned char x; + uint64_t i; + + for (i=0,x=0;i<size; i++) + x ^= ((unsigned char*)rawptr)[i]; + + INFO("Checksum for cdo \"%s\": %d\n", name, x); +} + +void +consumer_flush_to_disk(const char *name, void *a, uint64_t size) +{ + char file_name [256]; + sprintf(file_name, "consumer_%s", name); + + FILE *dst = fopen((const char*)file_name, "w"); + if(dst==NULL) { + ERR("Failed to open %s for writing\n", name); + abort(); + } + + size_t count = fwrite(a, + sizeof(char), + size/sizeof(char), + dst); + if(count != size) { + ERR("Incomplete write of %s (%zu of %"PRIu64")\n", file_name, count, size); + abort(); + } + if(0!=fclose(dst)) { + ERR("Failed to close %s after writing\n", file_name); + abort(); + } +} + +/* + * consumer: + * - waits for work description + * - posts CDO REQUIRES for all CDOS in the list + * - demands them one by one + * - computes a checksum on the data, prints it + * - disposes them + * -- when multiple consumer threads run, consumer i + * handles CDOs with index%num_consumers==i + */ +void * +consumer_thread_fun(void *closure) +{ + mstro_status s; + size_t my_idx = * (size_t*)closure; + INFO("Consumer %zu starting\n", my_idx); + + struct cdo_announcement *announcement + = malloc(sizeof(struct cdo_announcement)); + if(announcement==NULL) { + ERR("Failed to allocate for incoming announcement\n"); + abort(); + } + + wait_for_announcement(announcement); + + /* post requests */ + mstro_cdo incoming[announcement->num_entries]; + void *incoming_buffers[announcement->num_entries]; + + /* declare and require all that we are responsible for */ + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_consumers==my_idx) { + /* declare it */ + s = mstro_cdo_declare(announcement->cdo_names[i], + MSTRO_ATTR_DEFAULT, + &incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to declare CDO %s for consumption\n", + announcement->cdo_names[i]); + abort(); + } + /* add allocation */ + incoming_buffers[i] = malloc(announcement->cdo_size); + if(incoming_buffers[i]==NULL) { + ERR("Cannot allocate CDO buffer for comsumption\n"); + abort(); + } + s = mstro_cdo_attribute_set(incoming[i], + MSTRO_ATTR_CORE_CDO_RAW_PTR, + incoming_buffers[i]); + s |= mstro_cdo_attribute_set(incoming[i], + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &announcement->cdo_size); + // INFO("consumer cdo %d incoming buffer %p\n", i, incoming_buffers[i]); + + if(s!=MSTRO_OK) { + ERR("Failed to add buffer to CDO %s for consumption\n", + announcement->cdo_names[i]); + abort(); + } + /* require it */ + s = mstro_cdo_require(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to require CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + } + } + + /* send ACK that we're ready */ + mstro_cdo ready_cdo; + + INFO("Declaring Consumer %zu ready\n", my_idx); + declare_consumer_ready(my_idx,&ready_cdo); + + /* process all in some order */ + for(size_t i=0; i<announcement->num_entries; i++) { + if(i%announcement->num_consumers==my_idx) { + s = mstro_cdo_demand(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to demand CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + + /* extract raw ptr */ + void* rawptr; + enum mstro_cdo_attr_value_type type; + const int64_t* size; + s = mstro_cdo_access_ptr(incoming[i], &rawptr, NULL); + if(s!=MSTRO_OK) { + ERR("Failed to extract raw pointer from CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + /* query the size */ + s = mstro_cdo_attribute_get(incoming[i], + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &type, (const void**)&size); + if(s!=MSTRO_OK) { + ERR("Failed to extract local size from CDO %s for archiving\n", + announcement->cdo_names[i]); + abort(); + } + + /* do some work */ + mvp_checksum(announcement->cdo_names[i], rawptr, *size); + + /* write-out */ + consumer_flush_to_disk(announcement->cdo_names[i], + rawptr, *size); + + s = mstro_cdo_dispose(incoming[i]); + if(s!=MSTRO_OK) { + ERR("Failed to dispose CDO %s after archiving\n", + announcement->cdo_names[i]); + abort(); + } + free(incoming_buffers[i]); + } + } + + /* demand data and process */ + + + cdo_sem_post_cleanup(ready_cdo); + free(announcement); + return NULL; +} + + + +#define DEFAULT_NUM_PRODUCERS 1 +#define DEFAULT_NUM_CONSUMERS 1 +#define DEFAULT_NUM_ARCHIVERS 1 +#define DEFAULT_CDO_SIZE 1024 +#define DEFAULT_CDO_COUNT 42 + +#define CONFIG_FILE_PATH "./local_pool_op_config.yaml" + +/* + * main thread: + * - constructs work description + * - posts (announces) a work description CDO for all to see. + * - starts threads for consumer, archiver tasks + * - posts CONSUMER_READY REQUIRE + * -- if multiple consumers, multiple such CDOs + * - posts ARCHIVER_READY REQUIRE + * -- if multiple archivers, multiple such CDOs + * - starts producer + * - waits for threads to finish + * - withdraws work description CDO + * - quits +*/ +int main(int argc, char **argv) +{ + /* verbosity */ + putenv("MSTRO_LOG_LEVEL=1"); + putenv("MSTRO_LOG_COLOR_ERRORS=1"); + putenv("MMB_LOG_LEVEL=0"); + + mstro_status s = mstro_init("Single Node Pool Operations", + "Main", 0); + if(s!=MSTRO_OK) { + ERR("Failed to initialize Maestro\n"); + return s; + } + + struct cdo_announcement announcement; + mvp_config config_handle=NULL; + int err; + + /* constructs work description */ + switch(argc) { + default: + WARN("Command line arguments ignored\n"); + /* fallthrough: */ + case 1: + + err = mvp_config_parse(&config_handle, CONFIG_FILE_PATH); + if (err != 0) { + WARN("Failed to parse a config file. Using default config.\n"); + announcement.num_producers = DEFAULT_NUM_PRODUCERS; + announcement.num_consumers = DEFAULT_NUM_CONSUMERS; + announcement.num_archivers = DEFAULT_NUM_ARCHIVERS; + announcement.cdo_size = DEFAULT_CDO_SIZE; + announcement.num_entries = DEFAULT_CDO_COUNT; + } + else { + announcement.num_producers = config_handle->num_producers; + announcement.num_consumers = config_handle->num_consumers; + announcement.num_archivers = config_handle->num_archivers; + announcement.cdo_size = config_handle->cdo_size; + announcement.num_entries = config_handle->cdo_count; + } + + break; + } + assert(announcement.num_entries<=CDO_COUNT_MAX); + + pthread_t producers[announcement.num_producers]; + pthread_t consumers[announcement.num_consumers]; + pthread_t archivers[announcement.num_archivers]; + size_t producer_idxs[announcement.num_producers]; + size_t consumer_idxs[announcement.num_consumers]; + size_t archiver_idxs[announcement.num_archivers]; + + for(size_t i=0; i<announcement.num_entries; i++) { + announcement.cdo_names[i] = malloc(CDO_NAME_MAX); + if(announcement.cdo_names[i]==NULL) { + ERR("Failed to allocate CDO name\n"); + goto BAILOUT; + } + size_t l = + snprintf(announcement.cdo_names[i], CDO_NAME_MAX, + CDO_NAME_TEMPLATE, i); + if(l>=CDO_NAME_MAX) { + ERR("CDO name does not fit:" CDO_NAME_TEMPLATE, i); + goto BAILOUT; + } + } + + announcement_scope_string = malloc(128); + if(announcement_scope_string==NULL) + abort(); + snprintf(announcement_scope_string, 128, + "scope:\n" + " local-size: %zu", sizeof(struct cdo_announcement)); + + /* posts (announces) a work description CDO for all to see */ + mstro_cdo announcement_cdo; + do_announce(&announcement, &announcement_cdo); + + + /* start consumers */ + INFO("Starting %zu consumers\n", announcement.num_consumers); + for(size_t i=0; i<announcement.num_consumers; i++) { + consumer_idxs[i] = i; + pthread_create(&consumers[i], NULL, + consumer_thread_fun, + &(consumer_idxs[i])); + } + /* start archivers */ + INFO("Starting %zu archivers\n", announcement.num_archivers); + for(size_t i=0; i<announcement.num_archivers; i++) { + archiver_idxs[i] = i; + pthread_create(&archivers[i], NULL, + archiver_thread_fun, + &(archiver_idxs[i])); + } + + for(size_t i=0; i<announcement.num_consumers; i++) { + INFO("Waiting for Consumer %zu to be ready\n"); + wait_consumer_ready(i); + } + for(size_t i=0; i<announcement.num_archivers; i++) { + INFO("Waiting for Archiver %zu to be ready\n"); + wait_archiver_ready(i); + } + + /* start producers */ + INFO("Starting %zu producers\n", announcement.num_producers); + for(size_t i=0; i<announcement.num_producers; i++) { + producer_idxs[i]=i; + pthread_create(&producers[i], NULL, + producer_thread_fun, + &(producer_idxs[i])); + } + + + /* wait for threads to complete */ + for(size_t i=0; i<announcement.num_producers; i++) { + pthread_join(producers[i], NULL); + } + for(size_t i=0; i<announcement.num_consumers; i++) { + pthread_join(consumers[i], NULL); + } + for(size_t i=0; i<announcement.num_archivers; i++) { + pthread_join(archivers[i], NULL); + } + + /* withdraw announcement */ + withdraw_announcement(announcement_cdo); + + + for(size_t i=0; i<announcement.num_entries; i++) { + free(announcement.cdo_names[i]); + } + + fprintf(stdout, "Sent %zu CDOs of %" PRIi64 " bytes each from %zu produce to %zu compute and %zu archiving threads\n", + announcement.num_entries, + announcement.cdo_size, + announcement.num_producers, + announcement.num_consumers, + announcement.num_archivers); + + + +BAILOUT: + if(config_handle) free(config_handle); + + return s; +} diff --git a/examples/local_pool_op_config.yaml b/examples/local_pool_op_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..5b801519b7a027f98541937641468975c077bb06 --- /dev/null +++ b/examples/local_pool_op_config.yaml @@ -0,0 +1,5 @@ +num_producers: 1 +num_consumers: 1 +num_archivers: 1 +cdo_size: 1024 +cdo_count: 42 diff --git a/examples/omp_consumer.c b/examples/omp_consumer.c new file mode 100644 index 0000000000000000000000000000000000000000..8150eaa4f047fb9cd01dd0317642985f0377a5f0 --- /dev/null +++ b/examples/omp_consumer.c @@ -0,0 +1,217 @@ +/* Connect to pool and subscribe to many events, archiving all CDOs */ +/* + * Copyright (C) 2021 Hewlett Packard Enterprise + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "omp_consumer.h" +#include "omp_injector.h" + +int convert_consumer_mode(const char * consumer_mode_env) +{ + int result = -1; + + if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_SINK_ALL") == 0) + { + result = 0; + } + else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2ONE") == 0) + { + result = 1; + } + else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ONE2TEN") == 0) + { + result = 2; + } + else if (strcmp(consumer_mode_env, "MSTRO_CONSUMER_ALL2ALL") == 0) + { + result = 3; + } + return result; +} + +int get_consumer_mode() +{ + char * consumer_mode_env; + int consumer_mode = 0; + //get current consumer mode + if (getenv("MSTRO_CONSUMER_MODE") != NULL) + { + consumer_mode_env = getenv("MSTRO_CONSUMER_MODE"); + consumer_mode = convert_consumer_mode(consumer_mode_env); + } + + return consumer_mode; +} + +int get_num_producers(int size, int nConsumers, int consumer_mode) +{ + int num_producers = 0; + + //calculate number of producers + switch(consumer_mode) + { + case MSTRO_CONSUMER_SINK_ALL: + case MSTRO_CONSUMER_ALL2ALL: + //sink all data from all producer ranks, i.e. all ranks - PM and nconsumers + num_producers = size - 1 - nConsumers; + break; + case MSTRO_CONSUMER_ONE2ONE: + //sink all data from only one producer + num_producers = 1; + break; + + case MSTRO_CONSUMER_ONE2TEN: + //sink all data from 10 producer ranks + num_producers = 10; + break; + + default: + ERR("Incorrect MSTRO_CONSUMER_MODE \n"); + break; + } + + return num_producers; + +} + + +// producer_ids and num_producers are outputs +void get_producers(int rank, int size, int nConsumers, int *producers_ids, int num_producers, int consumer_mode) +{ + + + //calculate number of producers + switch(consumer_mode) + { + case MSTRO_CONSUMER_SINK_ALL: + case MSTRO_CONSUMER_ALL2ALL: + //sink all data from all producer ranks, i.e. all ranks - PM and nconsumers + //calculate producers + for(int i = 0; i< num_producers; i++) + { + producers_ids[i] = nConsumers + 1 + i; + } + break; + case MSTRO_CONSUMER_ONE2ONE: + //sink all data from only one producer + //asumming nProducers == nConsumers, my assignment is shifted by my rank + producers_ids[0] = nConsumers + rank; + break; + + case MSTRO_CONSUMER_ONE2TEN: + //sink all data from 10 producer ranks + //calculate producers + for(int i = 0; i< num_producers; i++) + { + producers_ids[i] = ((rank-1) * num_producers) + 1 + i; + } + break; + + default: + ERR("Incorrect MSTRO_CONSUMER_MODE \n"); + break; + } + +} + + +mstro_status require_CDOs(mstro_cdo *cdos, int num_CDOs, int *injector_ids, int num_injectors, size_t num_attributes, size_t size_attributes) +{ + + mstro_status s = MSTRO_OK; // global status + int CDOs_per_inj = num_CDOs / num_injectors ; + size_t cdoidx, cdo_gid; + + + // declare CDOs loop + #pragma omp parallel for private(cdoidx, cdo_gid) reduction(| :s) + for(size_t i=0; i < num_injectors; i++) + { + + DEBUG("consuming injector %d \n", injector_ids[i]); + + + for(size_t j=0; j < CDOs_per_inj; j++) + { + mstro_status s1,s2, s3; + cdoidx = injector_ids[i] *CDOs_per_inj + j; // CDO id = injector_id * thread_id * num_CDOs + cdo_gid = i*CDOs_per_inj + j; + char name[CDO_NAME_MAX]; + create_name(name, cdoidx); + + s3 = MSTRO_OK; // initialize its value + s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[cdo_gid])); + + + //FIXME We do not want extra attributes now --need to merge with different branch + // create attributes ...numbers and sizes + for(size_t j=0; j < num_attributes ; j++) + { + char attrib_name[ATTRIB_NAME_MAX]; + void * data = malloc(size_attributes*sizeof(char)); + snprintf(attrib_name, ATTRIB_NAME_MAX, ".maestro.benchmark.attrib_%lu", j+1); + s3 |= mstro_cdo_attribute_set(cdos[i], attrib_name, data); + DEBUG("%d: %s \n", s3, mstro_status_description(s3)); + } + + + s3 |= mstro_cdo_declaration_seal(cdos[cdo_gid]); + + s2= mstro_cdo_require(cdos[cdo_gid]); + DEBUG("[consumer] requiring %s \n", mstro_cdo_name(cdos[cdo_gid])); + + s = s | s1 | s2 | s3; + } + + } + + return s; +} + + +mstro_status demand_CDOs(mstro_cdo *cdos, int num_CDOs) +{ + + mstro_status s = MSTRO_OK; // global status + + #pragma omp parallel for reduction(| :s) + for(size_t i=0; i < num_CDOs; i++) + { + mstro_status s3,s4; + + s3= mstro_cdo_demand(cdos[i]); + DEBUG("Hey, I recieved %s \n", mstro_cdo_name(cdos[i])); + s4= mstro_cdo_dispose(cdos[i]); + s = s | s3 | s4 ; + } + return s; + +} diff --git a/examples/omp_consumer.h b/examples/omp_consumer.h new file mode 100644 index 0000000000000000000000000000000000000000..ffe6a4348bf2397622dcb819f53039c3c4a756fc --- /dev/null +++ b/examples/omp_consumer.h @@ -0,0 +1,68 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief openMP injector component + **/ + +/* + * Copyright (C) 2021 Hewlett Packard Enterprise + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + #ifndef mstro_consumer + #define mstro_consumer 1 + + + #define MSTRO_CONSUMER_SINK_ALL 0 + #define MSTRO_CONSUMER_ONE2ONE 1 + #define MSTRO_CONSUMER_ONE2TEN 2 + #define MSTRO_CONSUMER_ALL2ALL 3 + + #include "maestro.h" + #include "maestro/logging.h" + #include <string.h> + #include <unistd.h> + #include <inttypes.h> + + /* simplify logging */ + #define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + #define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + #define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + #define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + + + +mstro_status require_CDOs(mstro_cdo *cdos, int num_CDOs, int *injector_ids, int num_injectors, size_t num_attributes, size_t size_attributes); +mstro_status demand_CDOs(mstro_cdo *cdos, int num_CDOs); +void get_producers(int rank, int size, int nConsumers, int *producers_ids, int num_producers, int consumer_mode); +int convert_consumer_mode(const char * consumer_mode_env); +int get_num_producers(int size, int nConsumers, int consumer_mode); +int get_consumer_mode(); + +#endif diff --git a/examples/omp_injector.c b/examples/omp_injector.c new file mode 100644 index 0000000000000000000000000000000000000000..c35d2323b8596ba398f4e255412cd5152889142a --- /dev/null +++ b/examples/omp_injector.c @@ -0,0 +1,179 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief openMP injector component + **/ + +/* + * Copyright (C) 2021 Hewlett Packard Enterprise + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#include "omp_injector.h" + + + +void create_name(char *dst, size_t idx) +{ + snprintf(dst, CDO_NAME_MAX, "Test CDO %zu\n", idx); +} + + +mstro_status inject_CDOs(int injector_id, mstro_cdo *cdos, int num_CDOs, size_t num_attributes, size_t size_attributes, size_t size_CDO, char **CDO_data) +{ + + mstro_status s = MSTRO_OK; // global status + + size_t cdoidx; + + // declare CDOs loop + #pragma omp parallel for private(cdoidx) reduction(| :s) + for(size_t i=0; i < num_CDOs; i++) + { + mstro_status s1,s2, s3; + cdoidx = injector_id *num_CDOs + i; // CDO id = injector_id * thread_id * num_CDOs + char name[CDO_NAME_MAX]; + create_name(name, cdoidx); + + s3 = MSTRO_OK; // initialize its value + s1 = mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, & (cdos[i])); + + + // create attributes ...numbers and sizes + for(size_t j=0; j < num_attributes ; j++) + { + char attrib_name[ATTRIB_NAME_MAX]; + void * data = malloc(size_attributes*sizeof(char)); + snprintf(attrib_name, ATTRIB_NAME_MAX, ".maestro.benchmark.attrib_%lu", j+1); + s3 |= mstro_cdo_attribute_set(cdos[i], attrib_name, data); + DEBUG("%d: %s \n", s3, mstro_status_description(s3)); + } + + + // add data + if (size_CDO != 0) + { + s3 |= mstro_cdo_attribute_set(cdos[i], + MSTRO_ATTR_CORE_CDO_RAW_PTR, + CDO_data[i]); + s3 |= mstro_cdo_attribute_set(cdos[i], + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &size_CDO); + } + + + s3 |= mstro_cdo_declaration_seal(cdos[i]); + + s2= mstro_cdo_offer(cdos[i]); + DEBUG("[injector] offering %s \n", mstro_cdo_name(cdos[i])); + + s = s | s1 | s2 | s3; + } + + return s; + +} + +void fill_char_array(char *array, size_t size) +{ + for (size_t i = 0; i < size; i++) { + array[i] = 'x'; + } +} + +mstro_status withdraw_CDOs(mstro_cdo *cdos, int num_CDOs) +{ + + mstro_status s = MSTRO_OK; // global status + + #pragma omp parallel for reduction(| :s) + for(size_t i=0; i < num_CDOs; i++) { + mstro_status s3,s4; + + s3= mstro_cdo_withdraw(cdos[i]); + s4= mstro_cdo_dispose(cdos[i]); + s = s | s3 | s4 ; + } + return s; + +} + + +mstro_status start_injector(int injector_id, size_t num_attributes, size_t size_attributes, size_t CDOs_per_thread) +{ + + double declare_time,withdraw_time, before, after; + size_t num_threads = 1; + + // Read environment variables + if (getenv("OMP_NUM_THREADS") != NULL) + { + num_threads = atoi(getenv("OMP_NUM_THREADS")); + } + + mstro_status s = MSTRO_OK; // global status + + + size_t num_CDOs = CDOs_per_thread * num_threads; // 1K per producer * (num_thread) + + s = mstro_init("Tests","Injector",injector_id); + + assert(MSTRO_OK == s); + + + + mstro_cdo cdos[num_CDOs]; + + before = omp_get_wtime(); + // declare CDOs loop + s = inject_CDOs(injector_id, cdos, num_CDOs, num_attributes, size_attributes, 0, NULL); + + assert(MSTRO_OK == s); + + after = omp_get_wtime(); + declare_time = (after - before) * 1000.0*1000.0; //time in us seconds + + before = after; // restart timer here + // withdraw CDOs loop + s = withdraw_CDOs(cdos, num_CDOs); + + assert(MSTRO_OK == s); + + after = omp_get_wtime(); + + withdraw_time = (after - before) * 1000.0*1000.0; //time in us seconds + fprintf(stdout, "#CDOs: %zu, #Threads: %zu, #Attributes: %zu, Size of attributes: %zu \n", num_CDOs, num_threads, num_attributes, size_attributes); + fprintf(stdout, "Throughput (declare/offer): %.5f us\n", declare_time/(double) num_CDOs); + fprintf(stdout, "Throughput (withdraw/dispose): %.5f us\n", withdraw_time/(double) num_CDOs); + + s |= mstro_finalize(); + + return s; +} diff --git a/examples/omp_injector.h b/examples/omp_injector.h new file mode 100644 index 0000000000000000000000000000000000000000..23104f276dd88381bd00f7ab16061d7be8010d8f --- /dev/null +++ b/examples/omp_injector.h @@ -0,0 +1,66 @@ +/* -*- mode:c -*- */ +/** @file + ** @brief openMP injector component + **/ + +/* + * Copyright (C) 2021 Hewlett Packard Enterprise + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + #ifndef omp_injector + #define omp_injector 1 + +#include "maestro.h" +#include <assert.h> +#include "maestro/logging.h" +#include <string.h> +#include "omp.h" + + +#define CDO_NAME_MAX 32 +#define ATTRIB_NAME_MAX 128 + +/* simplify logging */ +#define DEBUG(...) LOG_DEBUG(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define INFO(...) LOG_INFO(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_USER,__VA_ARGS__) +#define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_USER,__VA_ARGS__) + + + +void create_name(char *dst, size_t idx); + +mstro_status start_injector(int injector_id, size_t num_attributes, size_t size_attributes, size_t CDOs_per_thread); +mstro_status inject_CDOs(int injector_id, mstro_cdo *cdos, int num_CDOs, size_t num_attributes, size_t size_attributes, size_t size_CDO, char **CDO_data); +mstro_status withdraw_CDOs(mstro_cdo *cdos, int num_CDOs); +void fill_char_array(char *array, size_t size); + +#endif diff --git a/tests/check_declaration_seal.c b/tests/check_declaration_seal.c index 38cab7997f9ecc5437df53657df368ba99751da2..a7266c6c0da0f5a1aa04251d248fb7e5c0b75212 100644 --- a/tests/check_declaration_seal.c +++ b/tests/check_declaration_seal.c @@ -61,7 +61,7 @@ CHEAT_TEST(cdo_declaration_seal_works, mstro_cdo cdo=NULL; enum mstro_cdo_attr_value_type type; const void* val; - uint64_t size; + int64_t size; size = 16000; cheat_assert(MSTRO_OK == mstro_cdo_declare( @@ -102,7 +102,7 @@ CHEAT_TEST(cdo_declaration_seal_works, cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( cdo, ".maestro.core.cdo.scope.local-size", - &size, true)); + &size, true)); cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo)); diff --git a/tests/check_pool_mamba.c b/tests/check_pool_mamba.c index 76e959ecdfbc74d303c35739f44049e9d247ba1d..b7f9ec4c0e1ea630cd83fdc8cbe685fa334be572 100644 --- a/tests/check_pool_mamba.c +++ b/tests/check_pool_mamba.c @@ -1,6 +1,6 @@ /* -*- mode:c -*- */ /** @file - ** @brief simple mamba use through maestro check + ** @brief simple mamba use through maestro check **/ /* @@ -50,16 +50,16 @@ CHEAT_TEST(cdo_local_pool_mamba_works, char name[] = "Test CDO name üÜöÖäÄ"; mstro_cdo cdo=NULL; cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo)); - uint64_t size = 4096; + int64_t size = 4096; enum mstro_cdo_attr_value_type type; float* buf; void *rec; buf = malloc(sizeof(float)*size); fprintf(stderr, "buf: %p\n", buf); rec = NULL; - + // TODO declare/set a Mamba array - mmbArray* ma_ptr = NULL; + mmbArray* ma_ptr = NULL; // TODO associate (cdo, mmb_array_ptr) cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( @@ -67,14 +67,14 @@ CHEAT_TEST(cdo_local_pool_mamba_works, cheat_assert(MSTRO_OK == mstro_cdo_attribute_set( cdo, ".maestro.core.cdo.scope.local-size", &size, true)); - + mmbArray *mamba_ptr=NULL; /* mamba array not automatically available */ mstro_status s1 = mstro_cdo_attribute_get( cdo, ".maestro.core.cdo.mamba-array", &type, (const void**)&mamba_ptr); - + fprintf(stderr,"Mamba array fetch stat 1: %d, val %p\n", s1, mamba_ptr); - + cheat_assert(MSTRO_OK==s1); cheat_assert(mamba_ptr==NULL); @@ -82,10 +82,10 @@ CHEAT_TEST(cdo_local_pool_mamba_works, mmbArray *tmp=NULL; cheat_assert(MSTRO_OK==mstro_cdo_access_mamba_array(cdo,&tmp)); cheat_assert(tmp==NULL); - + /* now seal */ cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo)); - + cheat_assert(MSTRO_OK == mstro_cdo_access_ptr( @@ -93,7 +93,7 @@ CHEAT_TEST(cdo_local_pool_mamba_works, fprintf(stderr, "rec: %p\n", (float*)rec); cheat_assert(rec==buf); - /* afterwards we can also get mamba array as an attribute */ + /* afterwards we can also get mamba array as an attribute */ cheat_assert(MSTRO_OK == mstro_cdo_attribute_get( cdo, ".maestro.core.cdo.mamba-array", &type, (const void**)&mamba_ptr)); @@ -114,6 +114,3 @@ CHEAT_TEST(cdo_local_pool_mamba_works, cheat_assert(MSTRO_OK == mstro_finalize()); free(buf); ) - - - diff --git a/tests/check_transport_mio.c b/tests/check_transport_mio.c index f2eb2e065639efb06aea6d09bab708bfe5d6d08f..fd7116c8034e742b92be20b87a7f03cf0919879d 100644 --- a/tests/check_transport_mio.c +++ b/tests/check_transport_mio.c @@ -58,26 +58,26 @@ CHEAT_DECLARE ( { unsigned char x; int i; - - for (i=0,x=0;i<size; i++) - x ^= ((unsigned char*)rawptr)[i]; - + + for (i=0,x=0;i<size; i++) + x ^= ((unsigned char*)rawptr)[i]; + fprintf(stderr, "Checksum for cdo \"%s\":\t%d\n", name, x); return x; } ) - + /* this tests MIO transport by using this single thread as both Producer and Consumer */ CHEAT_TEST(transport_mio_works, size_t data_count = 1031; - size_t bytes = data_count*sizeof(double); + int64_t bytes = data_count*sizeof(double); size_t pad = 0; double src_data[data_count]; for(size_t i=0; i<data_count; i++) { src_data[i]=random(); } unsigned char sender_checksum = transport_checksum("pioneer departure", src_data, data_count); - + cheat_assert(MSTRO_OK == mstro_init("Tests","TRANSPORT",0)); char name[] = "transport_pioneer"; char filename_dst[] = "./CDOs/__CDO_mio_transport_pioneer"; @@ -85,7 +85,7 @@ CHEAT_DECLARE ( mstro_cdo cdo_dst=NULL; int s = mkdir("./CDOs", S_IRWXU); cheat_assert(s==0 || errno==EEXIST); - + cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_RAW_PTR, @@ -110,13 +110,13 @@ CHEAT_DECLARE ( NULL, (const void**)&tval)); cheat_assert(*tval==bytes); - + cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_dst)); - + /* Particular case where the offerer is also the src, so let us execute * the src transport before offering, otherwise the raw-ptr won't be * accessible anymore */ - + /* Have a ticket issued for src */ Mstro__Pool__UUID id = MSTRO__POOL__UUID__INIT; size_t i; @@ -147,10 +147,10 @@ CHEAT_DECLARE ( ticket.ticket_case = MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO; Mstro__Pool__TransferTicketMIO mio = MSTRO__POOL__TRANSFER_TICKET_MIO__INIT; ticket.mio = &mio; - struct mstro_cdo_id* semid; + struct mstro_cdo_id* semid; semid = malloc(sizeof(struct mio_obj_id)); cheat_assert(semid != NULL); - struct mstro_cdo_id* objid; + struct mstro_cdo_id* objid; objid = malloc(sizeof(struct mio_obj_id)); cheat_assert (objid != NULL); char* semname = NULL; @@ -170,53 +170,50 @@ CHEAT_DECLARE ( */ cheat_assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); cheat_assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); - + ticket.mio->semid.len = sizeof(struct mstro_cdo_id); ticket.mio->semid.data = (uint8_t*)semid; ticket.mio->objid.len = sizeof(struct mstro_cdo_id); objid->qw[0] = ticket.cdoid->qw0; objid->qw[1] = ticket.cdoid->qw1; ticket.mio->objid.data = (uint8_t*)objid; - + ticket.mio->keep_obj = 0; /* No clean issuing, have to duplicate code above from maestro/ofi.c*/ // cheat_assert(MSTRO_OK == mstro_transport_ticket_issue(cdo_src, &ticket)); - - cheat_assert(MSTRO_OK == mstro_transport_mio_src_execute(cdo_src, &ticket)); - + + cheat_assert(MSTRO_OK == mstro_transport_mio_src_execute(cdo_src, &ticket)); + cheat_assert(MSTRO_OK == mstro_cdo_offer(cdo_src)); /* ticket is supposed to be sent to dst (4-step protocol), so here let * us just pretend it was, and use ticket directly for dst transport * execute, while it is being implemented */ - + cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_dst)); - cheat_assert(MSTRO_OK == mstro_transport_mio_dst_execute(cdo_dst, &ticket)); + cheat_assert(MSTRO_OK == mstro_transport_mio_dst_execute(cdo_dst, &ticket)); cheat_assert(MSTRO_OK == mstro_cdo_require(cdo_dst)); cheat_assert(MSTRO_OK == mstro_cdo_demand(cdo_dst)); - - + + double* data; size_t len; enum mstro_cdo_attr_value_type type; const void* val; - + cheat_assert(MSTRO_OK == mstro_cdo_access_ptr(cdo_dst, (void**)&data, NULL)); - cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_dst, "scope.local-size", &type, &val)); + cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_dst, "scope.local-size", &type, &val)); len = *(size_t*)val; len /= sizeof(double); cheat_assert(len > 0 && data != NULL); unsigned char receiver_checksum = transport_checksum("pioneer arrival", data, len); cheat_assert(sender_checksum == receiver_checksum); - + cheat_assert(MSTRO_OK == mstro_cdo_dispose(cdo_dst)); cheat_assert(MSTRO_OK == mstro_cdo_withdraw(cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_dispose(cdo_src)); cheat_assert(MSTRO_OK == mstro_finalize()); - - ) - - + )