From 853880180db64cdc2f24a3fc2db84110c429fade Mon Sep 17 00:00:00 2001 From: Utz-Uwe Haus <uhaus@cray.com> Date: Wed, 9 Nov 2022 21:22:52 +0100 Subject: [PATCH] Add 3 python-based unit tests non-PM ops, PM start/stop and python.mp based PM+producer+consumer examples --- configure.ac | 5 +- scripting/maestro-py.i | 14 ++++ tests/.gitignore | 7 ++ tests/Makefile.am | 7 ++ tests/py_check_local_pm.in | 30 +++++++++ tests/py_check_pm.in | 15 +++++ tests/py_check_threaded_interlock.in | 99 ++++++++++++++++++++++++++++ 7 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 tests/py_check_local_pm.in create mode 100644 tests/py_check_pm.in create mode 100644 tests/py_check_threaded_interlock.in diff --git a/configure.ac b/configure.ac index 0aeaff0a..eb8d475f 100644 --- a/configure.ac +++ b/configure.ac @@ -672,7 +672,10 @@ m4_define([TESTSCRIPTS], tests/check_ecmwf_events.sh \ tests/check_ecmwf_handle.sh \ tests/check_ecmwf_attr.sh \ - tests/check_decode_pminfo.sh + tests/check_decode_pminfo.sh \ + tests/py_check_local_pm \ + tests/py_check_pm \ + tests/py_check_threaded_interlock ]) m4_foreach_w([TESTSCRIPT], diff --git a/scripting/maestro-py.i b/scripting/maestro-py.i index 8b042f23..f470b767 100644 --- a/scripting/maestro-py.i +++ b/scripting/maestro-py.i @@ -50,6 +50,20 @@ } } +/* char ** result args */ +%typemap(in, numinputs=0) char **result_p (char *tmp) { + $1 = &tmp; +} +%typemap(argout) char **result_p (PyObject* obj) { + /* wrap char** putput to python unicode object */ + obj = PyUnicode_FromString(*$1); + $result = SWIG_Python_AppendOutput($result,obj); +} +/* unicode object is a copy and owned by python, so temporary object returned by maestro can be freed */ +%typemap(freearg) char** result_p { + free(*$1); +} + %{ #define SWIG_FILE_WITH_INIT #include "maestro.h" diff --git a/tests/.gitignore b/tests/.gitignore index 7ba085d4..16ce8f9a 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -43,3 +43,10 @@ check_pm_redundant_interlock.sh decode_pminfo check_decode_pminfo.sh check_fifo +pminfo +py_check_local_pm +py_check_pm +py_check_threaded_interlock +redundant_interlock_client_1 +redundant_interlock_client_2 +check_dispose_reuse diff --git a/tests/Makefile.am b/tests/Makefile.am index 2ec4d4bd..08fa337f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -161,6 +161,13 @@ check_PROGRAMS += check_transport_mio TESTS += check_transport_mio endif +if WITH_SWIG +# for the moment that means: with python +TESTS += py_check_pm \ + py_check_local_pm \ + py_check_threaded_interlock +endif + CLIENT1_OPS="DECLARE cdo1 1025\ SEAL cdo1 -1 \ DECLARE cdo2 20480\ diff --git a/tests/py_check_local_pm.in b/tests/py_check_local_pm.in new file mode 100644 index 00000000..8be7bdf5 --- /dev/null +++ b/tests/py_check_local_pm.in @@ -0,0 +1,30 @@ +#!@top_srcdir@/scripting/mpython +# test with local pool manager only +import maestro_core as M + +def test_local_pm(): + M.mstro_init("test_workflow", "component1", 0) + cdo1 = M.mstro_cdo_declare("CDO1", None) + cdo2 = M.mstro_cdo_declare("CDO2", None) + + M.mstro_cdo_offer(cdo1) + M.mstro_cdo_offer(cdo2) + + cdo2_copy = M.mstro_cdo_declare("CDO2", None) + M.mstro_cdo_require(cdo2_copy) + M.mstro_cdo_demand(cdo2_copy) + + M.mstro_cdo_withdraw(cdo1) + M.mstro_cdo_withdraw(cdo2) + + M.mstro_cdo_dispose(cdo1) + M.mstro_cdo_dispose(cdo2) + M.mstro_cdo_dispose(cdo2_copy) + + M.mstro_finalize() + + +test_local_pm() +test_local_pm() + + diff --git a/tests/py_check_pm.in b/tests/py_check_pm.in new file mode 100644 index 00000000..afaf8c17 --- /dev/null +++ b/tests/py_check_pm.in @@ -0,0 +1,15 @@ +#!@top_srcdir@/scripting/mpython +import maestro_core as M +import time + +M.mstro_init("test_pm_wf", "pm", 0) + +M.mstro_pm_start() + +print(M.mstro_pm_getinfo()) + +time.sleep(3) + +M.mstro_pm_terminate() + +M.mstro_finalize() diff --git a/tests/py_check_threaded_interlock.in b/tests/py_check_threaded_interlock.in new file mode 100644 index 00000000..2538c349 --- /dev/null +++ b/tests/py_check_threaded_interlock.in @@ -0,0 +1,99 @@ +#!@top_srcdir@/scripting/mpython +# simple cross-thread and cross-process maestro check +# +import maestro_core as M +from multiprocessing import Process, Pipe + +def pm(parent_conn, producer_conn, consumer_conn): + M.mstro_init("check_threaded_pm", "pm", 0) + M.mstro_pm_start() + print("PM started") + # tell the others about us: + consumer_conn.send(M.mstro_pm_getinfo()) + + producer_conn.send(M.mstro_pm_getinfo()) + + print("PM sent pminfo") + + # wait for termination from runner script: + termination_dummy = parent_conn.recv() + print("PM received shutdown") + + M.mstro_pm_terminate() + M.mstro_finalize(); + +def producer(pm_conn): + print("producer started") + M.mstro_init("check_threaded_pm", "producer", 0) + print("producer initialized") + pm_info = pm_conn.recv() + print("producer received PMINFO") + pm_conn.close() + + M.mstro_pm_attach(pm_info) + print("Attached to PM") + + sem = M.mstro_cdo_declare("consumer_ready", None) + M.mstro_cdo_require(sem) + M.mstro_cdo_demand(sem) + print("obtained consumer ready info") + M.mstro_cdo_dispose(sem) + + cdo = M.mstro_cdo_declare("CDO1",None) + M.mstro_cdo_offer(cdo) + M.mstro_cdo_withdraw(cdo) + M.mstro_cdo_dispose(cdo) + + + M.mstro_finalize(); + +def consumer(pm_conn): + print("consumer started") + M.mstro_init("check_threaded_pm", "consumer", 0) + print("consumer initialized") + pm_info = pm_conn.recv() + print("consumer received PMINFO") + pm_conn.close() + + M.mstro_pm_attach(pm_info) + print("Attached to PM") + + sem = M.mstro_cdo_declare("consumer_ready", None) + M.mstro_cdo_offer(sem) + print("signaled consumer ready") + + cdo = M.mstro_cdo_declare("CDO1",None) + M.mstro_cdo_require(cdo) + M.mstro_cdo_demand(cdo) + M.mstro_cdo_dispose(cdo) + + M.mstro_cdo_withdraw(sem) + M.mstro_cdo_dispose(sem) + + M.mstro_finalize(); + +# Connector pipes: +pm_parent_conn, pm_child_conn = Pipe() +producer_parent_conn, producer_child_conn = Pipe() +consumer_parent_conn, consumer_child_conn = Pipe() + +# start producer +producer_proc = Process(target=producer, args=(producer_child_conn,)) + +# start consumer +consumer_proc = Process(target=consumer, args=(consumer_child_conn,)) +producer_proc.start() +consumer_proc.start() + +# Start PM process +pm_proc = Process(target=pm, args=(pm_child_conn, producer_parent_conn, consumer_parent_conn,)) +pm_proc.start() + +# wait for workers to do their work and terminate +producer_proc.join() +consumer_proc.join() + +# tell PM to stop +pm_parent_conn.send("Please terminate") +pm_proc.join() + -- GitLab