diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6e95f226bb2fcdf08afbd5c52c96f13c3d917e06..9c10568dcd88111fb51b72b64e3065d72c200e43 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -76,3 +76,17 @@ deploy-test: - while [ "`openstack server show $INSTANCE_ID -c addresses -f value`" = "{}" ]; do sleep 5; done # wait until an address is available to attach the floating ip - openstack server add floating ip $INSTANCE_ID $FLOATING_IP - echo "Done" + +publishgit-do: + stage: publish + only: + - tags + tags: [stable] + script: + - apt-get update + - apt-get install -y git + - (git remote rm gith) || echo "Not found" + - (git remote -v | grep gith) || git remote add gith "https://${GITHUB_USER}:${GITHUB_TOKEN}@github.com/eflows4hpc/data-logistics-service.git" + - git remote -v + - git fetch --unshallow origin + - git push gith $CI_COMMIT_REF_NAME diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..7623188996d969af74a826e53105b4c0a070d25f --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +BSD 2-Clause License + +Copyright (c) 2021, Forschungszentrum Juelich GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/dags/conn_deco.py b/dags/conn_deco.py new file mode 100644 index 0000000000000000000000000000000000000000..b773671c6e2ad75437e8b9257378d099832e8b9e --- /dev/null +++ b/dags/conn_deco.py @@ -0,0 +1,78 @@ +from datetime import timedelta + +from airflow import settings +from airflow.decorators import dag, task +from airflow.providers.ssh.hooks.ssh import SSHHook +from airflow.models.connection import Connection +from airflow.utils.dates import days_ago + +def_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + +} + +@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def conn_decorator(): + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + + @task() + def doing_nothing(conn_id, **kwargs): + print(f"Just doing nothing with {conn_id}") + params = kwargs['params'] + print(f"This task recieved following kwargs: {params}") + + ssh_hook = SSHHook(ssh_conn_id=conn_id) + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + print("Connected") + + return conn_id + + @task() + def remove(conn_id): + print(f"Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() + + conn_id = setup() + conn_id = doing_nothing(conn_id=conn_id) + remove(conn_id) + + +dag = conn_decorator() diff --git a/dags/taskflow.py b/dags/taskflow.py index c86066f98a352dd536dc1e92fbb1be129f1b29d6..a410957376484a5efbdb8b73b6c47e51ce14a6d7 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -1,4 +1,5 @@ +from airflow import settings from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook @@ -16,8 +17,38 @@ default_args = { @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def taskflow_example(): + + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + @task(multiple_outputs=True) - def extract(**kwargs): + def extract(conn_id, **kwargs): connection = Connection.get_connection_from_secrets('default_b2share') server = connection.get_uri() print(f"Rereiving data from {server}") @@ -50,11 +81,12 @@ def taskflow_example(): return name_mappings @task() - def load(files: dict, **kwargs): + def load(connection_id, files: dict, **kwargs): print(f"Total files downloaded: {len(files)}") params = kwargs['params'] target = params.get('target', '/tmp/') - connection_id = params.get('connection', 'default_ssh') + + print(f"Using {connection_id} connection") ssh_hook = SSHHook(ssh_conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: @@ -65,9 +97,22 @@ def taskflow_example(): # or separate cleanup task? os.unlink(local) - data = extract() - files = transform(data) - load(files) + return connection_id + + @task() + def remove(conn_id): + print(f"Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() + conn_id = setup() + data = extract(conn_id) + files = transform(data) + ucid = load(connection_id = conn_id, files=files) + remove(conn_id=ucid) dag = taskflow_example() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index 5f99de95e233c6ec0a486037ca44625fb6f70863..6df478109069c94f1958eddae7aee89821742bcb 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -2,15 +2,16 @@ import os import tempfile +from airflow import settings from airflow.decorators import dag, task from airflow.models.connection import Connection +from airflow.providers.http.hooks.http import HttpHook from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago from b2shareoperator import (add_file, create_draft_record, get_record_template, submit_draft) - default_args = { 'owner': 'airflow', } @@ -18,12 +19,40 @@ default_args = { @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def upload_example(): + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + @task() - def load(**kwargs): + def load(connection_id, **kwargs): params = kwargs['params'] target = params.get('target', '/tmp/') source = params.get('source', '/tmp/') - connection_id = params.get('connection', 'default_ssh') ssh_hook = SSHHook(ssh_conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: @@ -51,9 +80,29 @@ def upload_example(): # hate such hacks: server = "https://" + connection.host token = connection.extra_dejson['access_token'] - print(f"Server: {server} + {token}") + - template = get_record_template() + params = kwargs['params'] + mid = params['mid'] + + hook = HttpHook(http_conn_id='datacat', method='GET') + hrespo = hook.run(endpoint=f"storage_target/{mid}").json()['metadata'] + print(hrespo) + + template = { + "titles" : [{"title":hrespo['title']}], + "creators" : [{"creator_name": hrespo['creator_name']}], + "descriptions" :[ + { + "description": hrespo['description'], + "description_type": "Abstract" + } + ], + "community": "a9217684-945b-4436-8632-cac271f894ed", + 'community_specific': + {'91ae5d2a-3848-4693-9f7d-cbd141172ef0': {'helmholtz centre': ['Forschungszentrum Jülich']}}, + "open_access": hrespo['open_access']=="True" + } r = create_draft_record(server=server, token=token, record=template) print(r) print(f"Draft record created {r['id']} --> {r['links']['self']}") @@ -65,9 +114,22 @@ def upload_example(): print("Submitting record for pubication") submitted = submit_draft(record=r, token=token) print(f"Record created {submitted['id']}") + return submitted['id'] + + @task() + def remove(conn_id, uid): + print(f"Upload {uid} completed. Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() - files = load() - upload(files) + conn_id = setup() + files = load(connection_id=conn_id) + uid = upload(files) + remove(conn_id=conn_id, uid=uid) dag = upload_example() diff --git a/templates/NOTICE b/templates/NOTICE new file mode 100644 index 0000000000000000000000000000000000000000..fd9d4f0e10f48f142389028975d2c2764e0de55b --- /dev/null +++ b/templates/NOTICE @@ -0,0 +1,38 @@ +This product is licensed to you under the Apache License, Version 2.0 +(the "License"). You may not use this product except in compliance with +the License. + +This product may include a number of subcomponents with separate +copyright notices and license terms. Your use of the source code for +these subcomponents is subject to the terms and conditions of the +subcomponent's license, as noted in the license.txt file. + +The files in this folder have been modified by Forschungszentrum Juelich GmbH. + +======================================================================= + +BSD 2-Clause License + +Copyright (c) 2021, Forschungszentrum Juelich GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/templates/main.html b/templates/main.html index 153b1bff2c19eac774b7ba700bc2d202c44d08d9..66297ddaef85744f91a03518498b5a0775ba1319 100644 --- a/templates/main.html +++ b/templates/main.html @@ -86,12 +86,12 @@ {% block footer %} {% if not current_user.is_anonymous %} {% set version_label = 'Version' %} - <!-- Use a wrapper div to detach the footer from the content. --> - <div style="padding-bottom: 3rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important; + <!-- Use a wrapper div to detach the footer from the content -> just nice to have --> + <div style="padding-bottom: 1rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important; flex-direction: column !important;"> . </div> - <footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: 2rem;"> + <footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: auto !important;"> <div class="container p-0" style="padding: 0px !important"> <div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat !important; background-size: cover !important; padding: 0px; !important"></div> <div class="row mt-2 px-3" style="margin-top: 0.5rem; padding-right: 1rem;"> @@ -143,6 +143,24 @@ var csrfToken = '{{ csrf_token() }}'; $('time[title]').tooltip(); </script> + <!-- Calculate the size of the dynamic footer to make sure that it doesn't cover the content of the page. Helps also on mobile devices. --> + <script> + function footerAlign() { + $('footer').css('display', 'block'); + $('footer').css('height', 'auto'); + var footerHeight = $('footer').outerHeight(); + $('body').css('padding-bottom', footerHeight); + $('footer').css('height', footerHeight); + } + + $(document).ready(function(){ + footerAlign(); + }); + + $( window ).resize(function() { + footerAlign(); + }); + </script> <!--[if IE ]> <script src="{{ url_for_asset('ie.js') }}"></script> <![endif]--> diff --git a/tests/test_dag.py b/tests/test_dag.py index 2689dfb9ceedd1fac32ce5ea222580658f6470b3..3a91270f2cc59f8e66c12577c60b767b90ed3c7b 100644 --- a/tests/test_dag.py +++ b/tests/test_dag.py @@ -18,4 +18,4 @@ class TestADag(unittest.TestCase): dag = self.dagbag.get_dag(dag_id='taskflow_example') assert self.dagbag.import_errors == {} assert dag is not None - self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}") + self.assertEqual(len(dag.tasks), 5, f"Actually: {len(dag.tasks)}")