diff --git a/dags/conn_deco.py b/dags/conn_deco.py index 4d1fe66990b4359b85882144c1c0fe7914fdfcea..4e8258c9ba56bba78191751ed58ab378a4725f4a 100644 --- a/dags/conn_deco.py +++ b/dags/conn_deco.py @@ -23,7 +23,7 @@ def conn_decorator(): def doing_nothing(conn_id): print(f"Using connection {conn_id}") - ssh_hook = get_connection(conn_id=conn_id) + ssh_hook = get_connection(conn_id=conn_id, default_host='amdlogin.bsc.es') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() print("Connected") diff --git a/dags/decors.py b/dags/decors.py index b0453afb780d36b943481bce203c1a1ea8fe5238..f41fb8db680e3815858a44eea998a24f9da5fe9b 100644 --- a/dags/decors.py +++ b/dags/decors.py @@ -29,7 +29,7 @@ def create_temp_connection(rrid, params): return conn_id -def get_connection(conn_id): +def get_connection(conn_id, default_host='amdlogin.bsc.es'): if conn_id.startswith('vault'): vault_hook = VaultHook(vault_conn_id='my_vault') con = vault_hook.get_secret( @@ -37,7 +37,7 @@ def get_connection(conn_id): print(f"Got some values from vault {list(con.keys())}") # for now SSH is hardcoded - host = con.get('host', 'bsc') + host = con.get('host', default_host) port = int(con.get('port', 22)) hook = SSHHook(remote_host=host, port=port, username=con['userName']) # key in vault should be in form of formated string: diff --git a/dags/taskflow.py b/dags/taskflow.py index 5d9ee1569262ebe6c5011be28962dfee85fcb653..b8980148b191b6de9d40963c9b91e3e72a102df3 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -58,7 +58,7 @@ def taskflow_example(): target = params.get('target', '/tmp/') print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=connection_id) + ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index 454309e48e60c3441cca266f1c9e3c7bd48b1862..ebaad4b9b87aa1f94cbca9b0eb8b1e2b5888907c 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -11,7 +11,7 @@ from airflow.utils.dates import days_ago from b2shareoperator import (add_file, create_draft_record, get_community, submit_draft) -from decors import remove, setup +from decors import remove, setup, get_connection default_args = { 'owner': 'airflow', @@ -45,7 +45,7 @@ def upload_example(): target = params.get('target', '/tmp/') source = params.get('source', '/tmp/') - ssh_hook = SSHHook(ssh_conn_id=connection_id) + ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() lst = sftp_client.listdir(path=source)