diff --git a/dags/conn_deco.py b/dags/conn_deco.py new file mode 100644 index 0000000000000000000000000000000000000000..b773671c6e2ad75437e8b9257378d099832e8b9e --- /dev/null +++ b/dags/conn_deco.py @@ -0,0 +1,78 @@ +from datetime import timedelta + +from airflow import settings +from airflow.decorators import dag, task +from airflow.providers.ssh.hooks.ssh import SSHHook +from airflow.models.connection import Connection +from airflow.utils.dates import days_ago + +def_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + +} + +@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def conn_decorator(): + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + + @task() + def doing_nothing(conn_id, **kwargs): + print(f"Just doing nothing with {conn_id}") + params = kwargs['params'] + print(f"This task recieved following kwargs: {params}") + + ssh_hook = SSHHook(ssh_conn_id=conn_id) + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + print("Connected") + + return conn_id + + @task() + def remove(conn_id): + print(f"Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() + + conn_id = setup() + conn_id = doing_nothing(conn_id=conn_id) + remove(conn_id) + + +dag = conn_decorator() diff --git a/dags/taskflow.py b/dags/taskflow.py index c86066f98a352dd536dc1e92fbb1be129f1b29d6..a410957376484a5efbdb8b73b6c47e51ce14a6d7 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -1,4 +1,5 @@ +from airflow import settings from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook @@ -16,8 +17,38 @@ default_args = { @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def taskflow_example(): + + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + @task(multiple_outputs=True) - def extract(**kwargs): + def extract(conn_id, **kwargs): connection = Connection.get_connection_from_secrets('default_b2share') server = connection.get_uri() print(f"Rereiving data from {server}") @@ -50,11 +81,12 @@ def taskflow_example(): return name_mappings @task() - def load(files: dict, **kwargs): + def load(connection_id, files: dict, **kwargs): print(f"Total files downloaded: {len(files)}") params = kwargs['params'] target = params.get('target', '/tmp/') - connection_id = params.get('connection', 'default_ssh') + + print(f"Using {connection_id} connection") ssh_hook = SSHHook(ssh_conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: @@ -65,9 +97,22 @@ def taskflow_example(): # or separate cleanup task? os.unlink(local) - data = extract() - files = transform(data) - load(files) + return connection_id + + @task() + def remove(conn_id): + print(f"Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() + conn_id = setup() + data = extract(conn_id) + files = transform(data) + ucid = load(connection_id = conn_id, files=files) + remove(conn_id=ucid) dag = taskflow_example() diff --git a/templates/main.html b/templates/main.html index 153b1bff2c19eac774b7ba700bc2d202c44d08d9..66297ddaef85744f91a03518498b5a0775ba1319 100644 --- a/templates/main.html +++ b/templates/main.html @@ -86,12 +86,12 @@ {% block footer %} {% if not current_user.is_anonymous %} {% set version_label = 'Version' %} - <!-- Use a wrapper div to detach the footer from the content. --> - <div style="padding-bottom: 3rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important; + <!-- Use a wrapper div to detach the footer from the content -> just nice to have --> + <div style="padding-bottom: 1rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important; flex-direction: column !important;"> . </div> - <footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: 2rem;"> + <footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: auto !important;"> <div class="container p-0" style="padding: 0px !important"> <div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat !important; background-size: cover !important; padding: 0px; !important"></div> <div class="row mt-2 px-3" style="margin-top: 0.5rem; padding-right: 1rem;"> @@ -143,6 +143,24 @@ var csrfToken = '{{ csrf_token() }}'; $('time[title]').tooltip(); </script> + <!-- Calculate the size of the dynamic footer to make sure that it doesn't cover the content of the page. Helps also on mobile devices. --> + <script> + function footerAlign() { + $('footer').css('display', 'block'); + $('footer').css('height', 'auto'); + var footerHeight = $('footer').outerHeight(); + $('body').css('padding-bottom', footerHeight); + $('footer').css('height', footerHeight); + } + + $(document).ready(function(){ + footerAlign(); + }); + + $( window ).resize(function() { + footerAlign(); + }); + </script> <!--[if IE ]> <script src="{{ url_for_asset('ie.js') }}"></script> <![endif]--> diff --git a/tests/test_dag.py b/tests/test_dag.py index 2689dfb9ceedd1fac32ce5ea222580658f6470b3..3a91270f2cc59f8e66c12577c60b767b90ed3c7b 100644 --- a/tests/test_dag.py +++ b/tests/test_dag.py @@ -18,4 +18,4 @@ class TestADag(unittest.TestCase): dag = self.dagbag.get_dag(dag_id='taskflow_example') assert self.dagbag.import_errors == {} assert dag is not None - self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}") + self.assertEqual(len(dag.tasks), 5, f"Actually: {len(dag.tasks)}")