Skip to content
Snippets Groups Projects
Commit 43e7acaa authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merge branch 'main' into mptest

parents 551a3414 e29aee17
No related branches found
No related tags found
No related merge requests found
from datetime import timedelta
from airflow import settings
from airflow.decorators import dag, task
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.models.connection import Connection
from airflow.utils.dates import days_ago
def_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def conn_decorator():
@task
def setup(**kwargs):
print(f"Setting up the connection")
params = kwargs['params']
rrid = kwargs['run_id']
host = params.get('host')
port = params.get('port', 2222)
user = params.get('login', 'eflows')
key = params.get('key')
conn_id = f"tmp_connection_{rrid}"
extra = {"private_key": key}
conn = Connection(
conn_id=conn_id,
conn_type='ssh',
description='Automatically generated Connection',
host=host,
login=user,
port=port,
extra=extra
)
session = settings.Session()
session.add(conn)
session.commit()
print(f"Connection {conn_id} created")
return conn_id
@task()
def doing_nothing(conn_id, **kwargs):
print(f"Just doing nothing with {conn_id}")
params = kwargs['params']
print(f"This task recieved following kwargs: {params}")
ssh_hook = SSHHook(ssh_conn_id=conn_id)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
print("Connected")
return conn_id
@task()
def remove(conn_id):
print(f"Removing conneciton {conn_id}")
session = settings.Session()
for con in session.query(Connection).all():
print(con)
session.query(Connection).filter(Connection.conn_id == conn_id).delete()
session.commit()
conn_id = setup()
conn_id = doing_nothing(conn_id=conn_id)
remove(conn_id)
dag = conn_decorator()
from airflow import settings
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.providers.ssh.hooks.ssh import SSHHook
......@@ -16,8 +17,38 @@ default_args = {
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_example():
@task
def setup(**kwargs):
print(f"Setting up the connection")
params = kwargs['params']
rrid = kwargs['run_id']
host = params.get('host')
port = params.get('port', 2222)
user = params.get('login', 'eflows')
key = params.get('key')
conn_id = f"tmp_connection_{rrid}"
extra = {"private_key": key}
conn = Connection(
conn_id=conn_id,
conn_type='ssh',
description='Automatically generated Connection',
host=host,
login=user,
port=port,
extra=extra
)
session = settings.Session()
session.add(conn)
session.commit()
print(f"Connection {conn_id} created")
return conn_id
@task(multiple_outputs=True)
def extract(**kwargs):
def extract(conn_id, **kwargs):
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
......@@ -50,11 +81,12 @@ def taskflow_example():
return name_mappings
@task()
def load(files: dict, **kwargs):
def load(connection_id, files: dict, **kwargs):
print(f"Total files downloaded: {len(files)}")
params = kwargs['params']
target = params.get('target', '/tmp/')
connection_id = params.get('connection', 'default_ssh')
print(f"Using {connection_id} connection")
ssh_hook = SSHHook(ssh_conn_id=connection_id)
with ssh_hook.get_conn() as ssh_client:
......@@ -65,9 +97,22 @@ def taskflow_example():
# or separate cleanup task?
os.unlink(local)
data = extract()
files = transform(data)
load(files)
return connection_id
@task()
def remove(conn_id):
print(f"Removing conneciton {conn_id}")
session = settings.Session()
for con in session.query(Connection).all():
print(con)
session.query(Connection).filter(Connection.conn_id == conn_id).delete()
session.commit()
conn_id = setup()
data = extract(conn_id)
files = transform(data)
ucid = load(connection_id = conn_id, files=files)
remove(conn_id=ucid)
dag = taskflow_example()
......@@ -86,12 +86,12 @@
{% block footer %}
{% if not current_user.is_anonymous %}
{% set version_label = 'Version' %}
<!-- Use a wrapper div to detach the footer from the content. -->
<div style="padding-bottom: 3rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important;
<!-- Use a wrapper div to detach the footer from the content -> just nice to have -->
<div style="padding-bottom: 1rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important;
flex-direction: column !important;">
.
</div>
<footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: 2rem;">
<footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: auto !important;">
<div class="container p-0" style="padding: 0px !important">
<div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat !important; background-size: cover !important; padding: 0px; !important"></div>
<div class="row mt-2 px-3" style="margin-top: 0.5rem; padding-right: 1rem;">
......@@ -143,6 +143,24 @@
var csrfToken = '{{ csrf_token() }}';
$('time[title]').tooltip();
</script>
<!-- Calculate the size of the dynamic footer to make sure that it doesn't cover the content of the page. Helps also on mobile devices. -->
<script>
function footerAlign() {
$('footer').css('display', 'block');
$('footer').css('height', 'auto');
var footerHeight = $('footer').outerHeight();
$('body').css('padding-bottom', footerHeight);
$('footer').css('height', footerHeight);
}
$(document).ready(function(){
footerAlign();
});
$( window ).resize(function() {
footerAlign();
});
</script>
<!--[if IE ]>
<script src="{{ url_for_asset('ie.js') }}"></script>
<![endif]-->
......
......@@ -18,4 +18,4 @@ class TestADag(unittest.TestCase):
dag = self.dagbag.get_dag(dag_id='taskflow_example')
assert self.dagbag.import_errors == {}
assert dag is not None
self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}")
self.assertEqual(len(dag.tasks), 5, f"Actually: {len(dag.tasks)}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment