diff --git a/dags/conn_deco.py b/dags/conn_deco.py index 4e8258c9ba56bba78191751ed58ab378a4725f4a..1aebe1cd8c267367a47658bc05075a45a91da047 100644 --- a/dags/conn_deco.py +++ b/dags/conn_deco.py @@ -20,10 +20,10 @@ def_args = { def conn_decorator(): @task() - def doing_nothing(conn_id): + def doing_nothing(conn_id, **kwargs): print(f"Using connection {conn_id}") - ssh_hook = get_connection(conn_id=conn_id, default_host='amdlogin.bsc.es') + ssh_hook = get_connection(conn_id=conn_id, **kwargs) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() print("Connected") diff --git a/dags/decors.py b/dags/decors.py index f41fb8db680e3815858a44eea998a24f9da5fe9b..2f9fd133aef585975fc871f4f9c70fb8d712f297 100644 --- a/dags/decors.py +++ b/dags/decors.py @@ -29,7 +29,7 @@ def create_temp_connection(rrid, params): return conn_id -def get_connection(conn_id, default_host='amdlogin.bsc.es'): +def get_connection(conn_id, **kwargs): if conn_id.startswith('vault'): vault_hook = VaultHook(vault_conn_id='my_vault') con = vault_hook.get_secret( @@ -37,9 +37,11 @@ def get_connection(conn_id, default_host='amdlogin.bsc.es'): print(f"Got some values from vault {list(con.keys())}") # for now SSH is hardcoded - host = con.get('host', default_host) - port = int(con.get('port', 22)) - hook = SSHHook(remote_host=host, port=port, username=con['userName']) + params = kwargs['params'] + host = params.get('host') + port = int(params.get('port', 2222)) + user = params.get('login', 'eflows') + hook = SSHHook(remote_host=host, port=port, username=user) # key in vault should be in form of formated string: # -----BEGIN OPENSSH PRIVATE KEY----- # b3BlbnNzaC1rZXktdjEAAAAA diff --git a/dags/taskflow.py b/dags/taskflow.py index b8980148b191b6de9d40963c9b91e3e72a102df3..cd9b05c96e8475db9fea6e861061daff43753fcf 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -58,7 +58,7 @@ def taskflow_example(): target = params.get('target', '/tmp/') print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es') + ssh_hook = get_connection(conn_id=connection_id, **kwargs) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index ebaad4b9b87aa1f94cbca9b0eb8b1e2b5888907c..7feb46d4219e45c96dc3ed37ecab47966e1ec84d 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -6,7 +6,6 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.operators.python import PythonOperator from airflow.providers.http.hooks.http import HttpHook -from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago from b2shareoperator import (add_file, create_draft_record, get_community, @@ -45,7 +44,7 @@ def upload_example(): target = params.get('target', '/tmp/') source = params.get('source', '/tmp/') - ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es') + ssh_hook = get_connection(conn_id=connection_id, **kwargs) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() lst = sftp_client.listdir(path=source) @@ -82,7 +81,7 @@ def upload_example(): server=server, community_id=template['community']) if not community: print("Not existing community") - return + return -1 cid, required = community missing = [r for r in required if r not in template] if any(missing):