diff --git a/configure.ac b/configure.ac index 0aeaff0a5494b2e520482ea93a67e58e2d2b4d97..eb8d475f9ee86c8908f0ff2dead5454677328ea1 100644 --- a/configure.ac +++ b/configure.ac @@ -672,7 +672,10 @@ m4_define([TESTSCRIPTS], tests/check_ecmwf_events.sh \ tests/check_ecmwf_handle.sh \ tests/check_ecmwf_attr.sh \ - tests/check_decode_pminfo.sh + tests/check_decode_pminfo.sh \ + tests/py_check_local_pm \ + tests/py_check_pm \ + tests/py_check_threaded_interlock ]) m4_foreach_w([TESTSCRIPT], diff --git a/scripting/maestro-py.i b/scripting/maestro-py.i index 8b042f23bac4e84d20dc844dd2b9f0d6debcf709..f470b7674818e8f6cd030771a9a1c59880079444 100644 --- a/scripting/maestro-py.i +++ b/scripting/maestro-py.i @@ -50,6 +50,20 @@ } } +/* char ** result args */ +%typemap(in, numinputs=0) char **result_p (char *tmp) { + $1 = &tmp; +} +%typemap(argout) char **result_p (PyObject* obj) { + /* wrap char** putput to python unicode object */ + obj = PyUnicode_FromString(*$1); + $result = SWIG_Python_AppendOutput($result,obj); +} +/* unicode object is a copy and owned by python, so temporary object returned by maestro can be freed */ +%typemap(freearg) char** result_p { + free(*$1); +} + %{ #define SWIG_FILE_WITH_INIT #include "maestro.h" diff --git a/tests/.gitignore b/tests/.gitignore index 7ba085d48c0e7ea63fdc8f810c67fb1f1874d511..16ce8f9a872525a71680161be000f0c59f95e4c6 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -43,3 +43,10 @@ check_pm_redundant_interlock.sh decode_pminfo check_decode_pminfo.sh check_fifo +pminfo +py_check_local_pm +py_check_pm +py_check_threaded_interlock +redundant_interlock_client_1 +redundant_interlock_client_2 +check_dispose_reuse diff --git a/tests/Makefile.am b/tests/Makefile.am index 2ec4d4bd722b913dd8b34a6ff1c3a0a4c2ef5411..08fa337f9338ab6e3e2cb61a149a553a2f674cac 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -161,6 +161,13 @@ check_PROGRAMS += check_transport_mio TESTS += check_transport_mio endif +if WITH_SWIG +# for the moment that means: with python +TESTS += py_check_pm \ + py_check_local_pm \ + py_check_threaded_interlock +endif + CLIENT1_OPS="DECLARE cdo1 1025\ SEAL cdo1 -1 \ DECLARE cdo2 20480\ diff --git a/tests/py_check_local_pm.in b/tests/py_check_local_pm.in new file mode 100644 index 0000000000000000000000000000000000000000..8be7bdf5b08243b9811db76e078d18d17c886a1a --- /dev/null +++ b/tests/py_check_local_pm.in @@ -0,0 +1,30 @@ +#!@top_srcdir@/scripting/mpython +# test with local pool manager only +import maestro_core as M + +def test_local_pm(): + M.mstro_init("test_workflow", "component1", 0) + cdo1 = M.mstro_cdo_declare("CDO1", None) + cdo2 = M.mstro_cdo_declare("CDO2", None) + + M.mstro_cdo_offer(cdo1) + M.mstro_cdo_offer(cdo2) + + cdo2_copy = M.mstro_cdo_declare("CDO2", None) + M.mstro_cdo_require(cdo2_copy) + M.mstro_cdo_demand(cdo2_copy) + + M.mstro_cdo_withdraw(cdo1) + M.mstro_cdo_withdraw(cdo2) + + M.mstro_cdo_dispose(cdo1) + M.mstro_cdo_dispose(cdo2) + M.mstro_cdo_dispose(cdo2_copy) + + M.mstro_finalize() + + +test_local_pm() +test_local_pm() + + diff --git a/tests/py_check_pm.in b/tests/py_check_pm.in new file mode 100644 index 0000000000000000000000000000000000000000..afaf8c17b9ac16ec555c23faa39df3d833ad4ff4 --- /dev/null +++ b/tests/py_check_pm.in @@ -0,0 +1,15 @@ +#!@top_srcdir@/scripting/mpython +import maestro_core as M +import time + +M.mstro_init("test_pm_wf", "pm", 0) + +M.mstro_pm_start() + +print(M.mstro_pm_getinfo()) + +time.sleep(3) + +M.mstro_pm_terminate() + +M.mstro_finalize() diff --git a/tests/py_check_threaded_interlock.in b/tests/py_check_threaded_interlock.in new file mode 100644 index 0000000000000000000000000000000000000000..2538c349bcfc81b02b88b204b1ae2569cae189a8 --- /dev/null +++ b/tests/py_check_threaded_interlock.in @@ -0,0 +1,99 @@ +#!@top_srcdir@/scripting/mpython +# simple cross-thread and cross-process maestro check +# +import maestro_core as M +from multiprocessing import Process, Pipe + +def pm(parent_conn, producer_conn, consumer_conn): + M.mstro_init("check_threaded_pm", "pm", 0) + M.mstro_pm_start() + print("PM started") + # tell the others about us: + consumer_conn.send(M.mstro_pm_getinfo()) + + producer_conn.send(M.mstro_pm_getinfo()) + + print("PM sent pminfo") + + # wait for termination from runner script: + termination_dummy = parent_conn.recv() + print("PM received shutdown") + + M.mstro_pm_terminate() + M.mstro_finalize(); + +def producer(pm_conn): + print("producer started") + M.mstro_init("check_threaded_pm", "producer", 0) + print("producer initialized") + pm_info = pm_conn.recv() + print("producer received PMINFO") + pm_conn.close() + + M.mstro_pm_attach(pm_info) + print("Attached to PM") + + sem = M.mstro_cdo_declare("consumer_ready", None) + M.mstro_cdo_require(sem) + M.mstro_cdo_demand(sem) + print("obtained consumer ready info") + M.mstro_cdo_dispose(sem) + + cdo = M.mstro_cdo_declare("CDO1",None) + M.mstro_cdo_offer(cdo) + M.mstro_cdo_withdraw(cdo) + M.mstro_cdo_dispose(cdo) + + + M.mstro_finalize(); + +def consumer(pm_conn): + print("consumer started") + M.mstro_init("check_threaded_pm", "consumer", 0) + print("consumer initialized") + pm_info = pm_conn.recv() + print("consumer received PMINFO") + pm_conn.close() + + M.mstro_pm_attach(pm_info) + print("Attached to PM") + + sem = M.mstro_cdo_declare("consumer_ready", None) + M.mstro_cdo_offer(sem) + print("signaled consumer ready") + + cdo = M.mstro_cdo_declare("CDO1",None) + M.mstro_cdo_require(cdo) + M.mstro_cdo_demand(cdo) + M.mstro_cdo_dispose(cdo) + + M.mstro_cdo_withdraw(sem) + M.mstro_cdo_dispose(sem) + + M.mstro_finalize(); + +# Connector pipes: +pm_parent_conn, pm_child_conn = Pipe() +producer_parent_conn, producer_child_conn = Pipe() +consumer_parent_conn, consumer_child_conn = Pipe() + +# start producer +producer_proc = Process(target=producer, args=(producer_child_conn,)) + +# start consumer +consumer_proc = Process(target=consumer, args=(consumer_child_conn,)) +producer_proc.start() +consumer_proc.start() + +# Start PM process +pm_proc = Process(target=pm, args=(pm_child_conn, producer_parent_conn, consumer_parent_conn,)) +pm_proc.start() + +# wait for workers to do their work and terminate +producer_proc.join() +consumer_proc.join() + +# tell PM to stop +pm_parent_conn.send("Please terminate") +pm_proc.join() +