Skip to content
Snippets Groups Projects
Commit a9251696 authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Add cleanup tasks to docker dag

parent 57f64cf8
No related branches found
No related tags found
No related merge requests found
Pipeline #109990 passed
WORKER_DATA_LOCATION = '/wf_pipeline_data/userdata'
def get_dockercmd(params:dict, location): def get_dockercmd(params:dict, location):
"""A task which runs in the docker worker and spins up a docker container with the an image and giver parameters. """A task which runs in the docker worker and spins up a docker container with the an image and giver parameters.
......
...@@ -8,6 +8,7 @@ from b2shareoperator import (download_file, get_file_list, get_object_md, ...@@ -8,6 +8,7 @@ from b2shareoperator import (download_file, get_file_list, get_object_md,
get_objects, get_record_template, create_draft_record, add_file, submit_draft) get_objects, get_record_template, create_draft_record, add_file, submit_draft)
from decors import get_connection from decors import get_connection
import docker_cmd as doc import docker_cmd as doc
from docker_cmd import WORKER_DATA_LOCATION
import os import os
"""This piplines is a test case for starting a clusterting algorithm with HeAT, running in a Docker environment. """This piplines is a test case for starting a clusterting algorithm with HeAT, running in a Docker environment.
...@@ -29,9 +30,8 @@ default_args = { ...@@ -29,9 +30,8 @@ default_args = {
} }
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'docker']) @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'docker'])
def docker_with_ssh(): def docker_in_worker():
DW_CONNECTION_ID = "docker_worker" DW_CONNECTION_ID = "docker_worker"
DATA_LOCATION = '/wf_pipeline_data/userdata'
@task(multiple_outputs=True) @task(multiple_outputs=True)
def extract(**kwargs): def extract(**kwargs):
...@@ -97,8 +97,8 @@ def docker_with_ssh(): ...@@ -97,8 +97,8 @@ def docker_with_ssh():
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
for [truename, local] in files.items(): for [truename, local] in files.items():
print( print(
f"Copying {local} --> {DW_CONNECTION_ID}:{os.path.join(DATA_LOCATION, truename)}") f"Copying {local} --> {DW_CONNECTION_ID}:{os.path.join(WORKER_DATA_LOCATION, truename)}")
sftp_client.put(local, os.path.join(DATA_LOCATION, truename)) sftp_client.put(local, os.path.join(WORKER_DATA_LOCATION, truename))
# or separate cleanup task? # or separate cleanup task?
os.unlink(local) os.unlink(local)
...@@ -118,7 +118,7 @@ def docker_with_ssh(): ...@@ -118,7 +118,7 @@ def docker_with_ssh():
params = kwargs['params'] params = kwargs['params']
stageout_fnames = params.get('stageout_args', []) stageout_fnames = params.get('stageout_args', [])
cmd = doc.get_dockercmd(params, DATA_LOCATION) cmd = doc.get_dockercmd(params, WORKER_DATA_LOCATION)
print(f"Executing docker command {cmd}") print(f"Executing docker command {cmd}")
print(f"Using {DW_CONNECTION_ID} connection") print(f"Using {DW_CONNECTION_ID} connection")
...@@ -141,7 +141,7 @@ def docker_with_ssh(): ...@@ -141,7 +141,7 @@ def docker_with_ssh():
return "No output to stage out. Nothing more to do." return "No output to stage out. Nothing more to do."
hook = get_connection(conn_id=DW_CONNECTION_ID) hook = get_connection(conn_id=DW_CONNECTION_ID)
sp = " " sp = " "
cmd = f"cd {DATA_LOCATION}; ls -al {sp.join(output_files)}" cmd = f"cd {WORKER_DATA_LOCATION}; ls -al {sp.join(output_files)}"
process = SSHOperator( process = SSHOperator(
task_id="print_results", task_id="print_results",
ssh_hook=hook, ssh_hook=hook,
...@@ -151,28 +151,50 @@ def docker_with_ssh(): ...@@ -151,28 +151,50 @@ def docker_with_ssh():
process.execute(context) process.execute(context)
@task() @task()
def retrieve_res(output_fnames: list, **kwargs): def retrieve_res(fnames: list, **kwargs):
"""This task copies the data from the remote location back to the host """This task copies the data from the remote docker worker back to airflow workspace
Args: Args:
output_fnames (list): the files that will be stored on another system fnames (list): the files to be retrieved from the docker worker
Returns: Returns:
files (list): the locations of the newly retrieved result files local_fpath (list): the path of the files copied back to the airflow host
""" """
local_tmp_dir = Variable.get("working_dir", default_var='/tmp/') local_tmp_dir = Variable.get("working_dir", default_var='/tmp/')
files = [] local_fpath = []
print(f"Using {DW_CONNECTION_ID} connection") print(f"Using {DW_CONNECTION_ID} connection")
ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) ssh_hook = get_connection(conn_id=DW_CONNECTION_ID)
with ssh_hook.get_conn() as ssh_client: with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
for fname in output_fnames: for name in fnames:
local = os.path.join(local_tmp_dir,fname) l = os.path.join(local_tmp_dir, name)
print(f"Copying {os.path.join(DATA_LOCATION, fname)} to {local}") print(f"Copying {os.path.join(WORKER_DATA_LOCATION, name)} to {l}")
sftp_client.get(os.path.join(DATA_LOCATION, fname), local) sftp_client.get(os.path.join(WORKER_DATA_LOCATION, name), l)
files.append(local) local_fpath.append(l)
return local_fpath
@task()
def cleanup_doc_worker(files, **kwargs):
"""This task deletes all the files from the docker worker
# Args:
# fnames (list): the result files to be deleted on the docker worker
"""
params = kwargs['params']
stagein_fnames = params.get('stagein_args', [])
stageout_fnames = params.get('stageout_args', [])
all_fnames = stagein_fnames + stageout_fnames
print(f"Using {DW_CONNECTION_ID} connection")
ssh_hook = get_connection(conn_id=DW_CONNECTION_ID)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
for file in all_fnames:
print(
f"Deleting file {DW_CONNECTION_ID}:{os.path.join(WORKER_DATA_LOCATION, file)}")
sftp_client.remove(os.path.join(WORKER_DATA_LOCATION, file))
return files
@task @task
def stageout_results(output_files: list): def stageout_results(output_files: list):
...@@ -198,7 +220,7 @@ def docker_with_ssh(): ...@@ -198,7 +220,7 @@ def docker_with_ssh():
for f in output_files: for f in output_files:
print(f"Uploading {f}") print(f"Uploading {f}")
_ = add_file(record=r, fname=f.name, token=token, remote=f) _ = add_file(record=r, fname=f, token=token, remote=f)
# delete local # delete local
# os.unlink(local) # os.unlink(local)
...@@ -212,19 +234,28 @@ def docker_with_ssh(): ...@@ -212,19 +234,28 @@ def docker_with_ssh():
#TODO a cleanup job #TODO a cleanup job
@task @task
def cleanup(errorcode): def cleanup_local(errcode, res_fpaths):
print("TODO: Clean up") if type(errcode) == int:
print("The data could not be staged out in the repository. Cleaning up")
for f in res_fpaths:
print(f"Deleting file: {f}")
os.remove(f)
#delete local copies of file
data = extract() data = extract()
files = transform(data) files = transform(data)
data_locations = load(files) data_locations = load(files)
output_fnames = run_container(data_locations) output_fnames = run_container(data_locations)
ls_results(output_fnames) ls_results(output_fnames)
files = retrieve_res(output_fnames) res_fpaths = retrieve_res(output_fnames)
errcode = stageout_results(files) cleanup_doc_worker(res_fpaths)
cleanup(errcode) errcode = stageout_results(res_fpaths)
cleanup_local(errcode, res_fpaths)
# data >> files >> data_locations >> output_fnames >> ls_results(output_fnames) >> files >> stageout_results(files) >> cleanup() # data >> files >> data_locations >> output_fnames >> ls_results(output_fnames) >> files >> stageout_results(files) >> cleanup()
dag = docker_with_ssh() dag = docker_in_worker()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment