Skip to content
Snippets Groups Projects
Commit 1f914082 authored by Christian Boettcher's avatar Christian Boettcher
Browse files

Merge branch 'main' of gitlab.jsc.fz-juelich.de:eflows4hpc-wp2/data-logistics-service into main

parents 95ba1606 8bfabd2c
No related branches found
No related tags found
No related merge requests found
......@@ -20,10 +20,10 @@ def_args = {
def conn_decorator():
@task()
def doing_nothing(conn_id):
def doing_nothing(conn_id, **kwargs):
print(f"Using connection {conn_id}")
ssh_hook = get_connection(conn_id=conn_id, default_host='amdlogin.bsc.es')
ssh_hook = get_connection(conn_id=conn_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
print("Connected")
......
......@@ -29,7 +29,7 @@ def create_temp_connection(rrid, params):
return conn_id
def get_connection(conn_id, default_host='amdlogin.bsc.es'):
def get_connection(conn_id, **kwargs):
if conn_id.startswith('vault'):
vault_hook = VaultHook(vault_conn_id='my_vault')
con = vault_hook.get_secret(
......@@ -37,9 +37,11 @@ def get_connection(conn_id, default_host='amdlogin.bsc.es'):
print(f"Got some values from vault {list(con.keys())}")
# for now SSH is hardcoded
host = con.get('host', default_host)
port = int(con.get('port', 22))
hook = SSHHook(remote_host=host, port=port, username=con['userName'])
params = kwargs['params']
host = params.get('host')
port = int(params.get('port', 2222))
user = params.get('login', 'eflows')
hook = SSHHook(remote_host=host, port=port, username=user)
# key in vault should be in form of formated string:
# -----BEGIN OPENSSH PRIVATE KEY-----
# b3BlbnNzaC1rZXktdjEAAAAA
......
......@@ -58,10 +58,12 @@ def taskflow_example():
target = params.get('target', '/tmp/')
print(f"Using {connection_id} connection")
ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es')
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
# check dir?
ssh_client.exec_command(command=f"mkdir -p {target}")
for [truename, local] in files.items():
print(
f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}")
......
......@@ -6,8 +6,8 @@ from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago
from airflow.models import Variable
from b2shareoperator import (add_file, create_draft_record, get_community,
submit_draft)
......@@ -42,10 +42,10 @@ def upload_example():
@task()
def load(connection_id, **kwargs):
params = kwargs['params']
target = params.get('target', '/tmp/')
target = Variable.get("working_dir", default_var='/tmp/')
source = params.get('source', '/tmp/')
ssh_hook = get_connection(conn_id=connection_id, default_host='amdlogin.bsc.es')
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
lst = sftp_client.listdir(path=source)
......@@ -82,7 +82,7 @@ def upload_example():
server=server, community_id=template['community'])
if not community:
print("Not existing community")
return
return -1
cid, required = community
missing = [r for r in required if r not in template]
if any(missing):
......@@ -94,10 +94,13 @@ def upload_example():
for [local, true_name] in files.items():
print(f"Uploading {local} --> {true_name}")
_ = add_file(record=r, fname=local, token=token, remote=true_name)
# delete local
os.unlink(local)
print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted['id']}")
return submitted['id']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment