Skip to content
Snippets Groups Projects
Commit 25367313 authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Stage out docker execution results into b2share

parent 67208e62
No related branches found
No related tags found
No related merge requests found
......@@ -5,8 +5,8 @@ from airflow.models.connection import Connection
from airflow.models import Variable
from airflow.operators.python import get_current_context
from b2shareoperator import (download_file, get_file_list, get_object_md,
get_objects)
from decors import get_connection, remove, setup
get_objects, get_record_template, create_draft_record, add_file, submit_draft)
from decors import get_connection
import docker_cmd as doc
import os
......@@ -113,10 +113,10 @@ def docker_with_ssh():
# return loaded_files
@task
def run_container(**kwargs):
def run_container(data_locations, **kwargs):
params = kwargs['params']
stageout_args = params.get('stageout_args', [])
stageout_fnames = params.get('stageout_args', [])
cmd = doc.get_dockercmd(params, DATA_LOCATION)
print(f"Executing docker command {cmd}")
......@@ -133,15 +133,15 @@ def docker_with_ssh():
context = get_current_context()
task_calculate.execute(context)
return stageout_args
return stageout_fnames
@task
def postprocess_results(output_files: list):
def ls_results(output_files: list):
if not output_files:
return "No output to stage out. Nothing more to do."
hook = get_connection(conn_id=DW_CONNECTION_ID)
sp = " "
cmd = f"cd {DATA_LOCATION}; cat {sp.join(output_files)}"
cmd = f"cd {DATA_LOCATION}; ls -al {sp.join(output_files)}"
process = SSHOperator(
task_id="print_results",
ssh_hook=hook,
......@@ -150,14 +150,81 @@ def docker_with_ssh():
context = get_current_context()
process.execute(context)
@task()
def retrieve_res(output_fnames: list, **kwargs):
"""This task copies the data from the remote location back to the host
Args:
output_fnames (list): the files that will be stored on another system
Returns:
files (list): the locations of the newly retrieved result files
"""
local_tmp_dir = Variable.get("working_dir", default_var='/tmp/')
files = []
print(f"Using {DW_CONNECTION_ID} connection")
ssh_hook = get_connection(conn_id=DW_CONNECTION_ID)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
for fname in output_fnames:
local = os.path.join(local_tmp_dir,fname)
print(f"Copying {os.path.join(DATA_LOCATION, fname)} to {local}")
sftp_client.get(os.path.join(DATA_LOCATION, fname), local)
files.append(local)
return files
@task
def stageout_results(output_files: list):
if not output_files:
print("No output to stage out. Nothing more to do.")
return -1
connection = Connection.get_connection_from_secrets('default_b2share')
server = "https://" + connection.host
token = ''
if 'access_token' in connection.extra_dejson.keys():
token = connection.extra_dejson['access_token']
print(f"Registering data to {server}")
template = get_record_template()
r = create_draft_record(server=server, token=token, record=template)
print(f"record {r}")
if 'id' in r:
print(f"Draft record created {r['id']} --> {r['links']['self']}")
else:
print('Something went wrong with registration', r, r.text)
return -1
for f in output_files:
print(f"Uploading {f}")
_ = add_file(record=r, fname=f.name, token=token, remote=f)
# delete local
# os.unlink(local)
print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted}")
return submitted['links']['publication']
# context = get_current_context()
# process.execute(context)
#TODO a cleanup job
@task
def cleanup(errorcode):
print("TODO: Clean up")
data = extract()
files = transform(data)
data_locations = load(files)
output_files = run_container()
output_fnames = run_container(data_locations)
ls_results(output_fnames)
files = retrieve_res(output_fnames)
errcode = stageout_results(files)
cleanup(errcode)
data >> files >> data_locations >> output_files >> postprocess_results(output_files)
# data >> files >> data_locations >> output_fnames >> ls_results(output_fnames) >> files >> stageout_results(files) >> cleanup()
dag = docker_with_ssh()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment