diff --git a/dags/docker_in_ssh_op.py b/dags/docker_in_ssh_op.py index 7b7871dd6357ae8d3128b3e7bb24706c40c8aa5a..e8cbbdb37ef2f5bbac44d7ca3c55433a9255e37f 100644 --- a/dags/docker_in_ssh_op.py +++ b/dags/docker_in_ssh_op.py @@ -5,8 +5,8 @@ from airflow.models.connection import Connection from airflow.models import Variable from airflow.operators.python import get_current_context from b2shareoperator import (download_file, get_file_list, get_object_md, - get_objects) -from decors import get_connection, remove, setup + get_objects, get_record_template, create_draft_record, add_file, submit_draft) +from decors import get_connection import docker_cmd as doc import os @@ -113,10 +113,10 @@ def docker_with_ssh(): # return loaded_files @task - def run_container(**kwargs): + def run_container(data_locations, **kwargs): params = kwargs['params'] - stageout_args = params.get('stageout_args', []) + stageout_fnames = params.get('stageout_args', []) cmd = doc.get_dockercmd(params, DATA_LOCATION) print(f"Executing docker command {cmd}") @@ -133,15 +133,15 @@ def docker_with_ssh(): context = get_current_context() task_calculate.execute(context) - return stageout_args + return stageout_fnames @task - def postprocess_results(output_files: list): + def ls_results(output_files: list): if not output_files: return "No output to stage out. Nothing more to do." hook = get_connection(conn_id=DW_CONNECTION_ID) sp = " " - cmd = f"cd {DATA_LOCATION}; cat {sp.join(output_files)}" + cmd = f"cd {DATA_LOCATION}; ls -al {sp.join(output_files)}" process = SSHOperator( task_id="print_results", ssh_hook=hook, @@ -150,14 +150,81 @@ def docker_with_ssh(): context = get_current_context() process.execute(context) + @task() + def retrieve_res(output_fnames: list, **kwargs): + """This task copies the data from the remote location back to the host + + Args: + output_fnames (list): the files that will be stored on another system + Returns: + files (list): the locations of the newly retrieved result files + """ + local_tmp_dir = Variable.get("working_dir", default_var='/tmp/') + files = [] + print(f"Using {DW_CONNECTION_ID} connection") + ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) + + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + for fname in output_fnames: + local = os.path.join(local_tmp_dir,fname) + print(f"Copying {os.path.join(DATA_LOCATION, fname)} to {local}") + sftp_client.get(os.path.join(DATA_LOCATION, fname), local) + files.append(local) + + return files + + @task + def stageout_results(output_files: list): + if not output_files: + print("No output to stage out. Nothing more to do.") + return -1 + connection = Connection.get_connection_from_secrets('default_b2share') + + server = "https://" + connection.host + token = '' + if 'access_token' in connection.extra_dejson.keys(): + token = connection.extra_dejson['access_token'] + print(f"Registering data to {server}") + template = get_record_template() + + r = create_draft_record(server=server, token=token, record=template) + print(f"record {r}") + if 'id' in r: + print(f"Draft record created {r['id']} --> {r['links']['self']}") + else: + print('Something went wrong with registration', r, r.text) + return -1 + + for f in output_files: + print(f"Uploading {f}") + _ = add_file(record=r, fname=f.name, token=token, remote=f) + # delete local + # os.unlink(local) + + print("Submitting record for pubication") + submitted = submit_draft(record=r, token=token) + print(f"Record created {submitted}") + + return submitted['links']['publication'] + # context = get_current_context() + # process.execute(context) + #TODO a cleanup job + @task + def cleanup(errorcode): + print("TODO: Clean up") data = extract() files = transform(data) data_locations = load(files) - output_files = run_container() + output_fnames = run_container(data_locations) + ls_results(output_fnames) + files = retrieve_res(output_fnames) + errcode = stageout_results(files) + cleanup(errcode) - data >> files >> data_locations >> output_files >> postprocess_results(output_files) + # data >> files >> data_locations >> output_fnames >> ls_results(output_fnames) >> files >> stageout_results(files) >> cleanup() dag = docker_with_ssh()