diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3b978f3a3ad6b56f3fe322ae500325b755e145ff..2cf2b747a1bf62b6822a473e048889c3380ec326 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -36,27 +36,6 @@ stages: - test-deployment - cleanup -test: - stage: test - image: - name: $CI_REGISTRY_IMAGE/eflows-airflow:latest - entrypoint: [""] - before_script: - - echo "DEBUG:" - - pip --version - - airflow db init - - pip install -r requirements.txt - - pip install nose==1.3.7 - - airflow connections add --conn-uri https://b2share-testing.fz-juelich.de/ default_b2share - script: - - ls - - pwd - - cp dags/* /opt/airflow/dags/ - - airflow dags list - - airflow connections list - - airflow dags test testdag 2021-08-18 - - nosetests - build-custom-image: stage: build diff --git a/dags/GAdemo.py b/dags/GAdemo.py deleted file mode 100644 index 9cdfda5f95b4154ade482d9cf56ab5bcf7ba2f56..0000000000000000000000000000000000000000 --- a/dags/GAdemo.py +++ /dev/null @@ -1,32 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago -from airflow.sensors.filesystem import FileSensor -from airflow.operators.python import PythonOperator -from airflow.operators.dummy import DummyOperator - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -def train_model(): - print('Will start model training') - -with DAG('GAtest', default_args=def_args, description='testing GA', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - s1 = FileSensor(task_id='file_sensor', filepath='/work/afile.txt') - t1 = BashOperator(task_id='move_data', bash_command='date') - t2 = PythonOperator(task_id='train_model', python_callable=train_model) - t3 = BashOperator(task_id='eval_model', bash_command='echo "evaluating"') - t4 = DummyOperator(task_id='upload_model_to_repo') - t5 = DummyOperator(task_id='publish_results') - - s1 >> t1 >> t2 >> t4 - t2 >> t3 >> t5 diff --git a/dags/another-testdag.py b/dags/another-testdag.py deleted file mode 100644 index c00489fc207af25223e505e7d643c22d95b5acaa..0000000000000000000000000000000000000000 --- a/dags/another-testdag.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('another-testdag', default_args=def_args, description='simple testing dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - t1 = BashOperator(task_id='print_date', bash_command='date') - t2 = BashOperator(task_id='do_noting', bash_command='sleep 5') - - t1 >> t2 diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py deleted file mode 100644 index 948027509818883a93d4ef699fe6b1a3818439af..0000000000000000000000000000000000000000 --- a/dags/b2shareoperator.py +++ /dev/null @@ -1,116 +0,0 @@ -import json -import os -import tempfile -import urllib -from urllib.parse import urljoin - -import requests -from airflow.models.baseoperator import BaseOperator -from airflow.providers.http.hooks.http import HttpHook - - -def get_objects(server): - lst = requests.get(urljoin(server, 'api/records')).json() - return lst['hits']['hits'] - - -def get_file_list(obj): - file_url = obj['links']['files'] - fls = requests.get(file_url).json() - - return {it['key']: it['links']['self'] for it in fls['contents']} - - -def get_object_md(server, oid): - obj = requests.get(urljoin(server, f"api/records/{oid}")).json() - return obj - - -def download_file(url: str, target_dir: str): - fname = tempfile.mktemp(dir=target_dir) - urllib.request.urlretrieve(url=url, filename=fname) - return fname - -def get_record_template(): - return {"titles":[{"title":"DLS dataset record"}], - "creators":[{"creator_name": "eflows4HPC"}], - "descriptions": - [{"description": "Output of eflows4HPC DLS", "description_type": "Abstract"}], - "community" : "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific" :{ - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": True} - -def get_schema(url): - r = requests.get(url) - return r.json() - -def get_community(server, community_id): - response = requests.get( url=urljoin(server, f"api/communities/{community_id}"), - headers={'Content-Type':'application/json'}).json() - if 'status' in response: - return None - schema = get_schema(url=response['links']['schema']) - return response['id'], schema['json_schema']['allOf'][0]['required'] - - -def create_draft_record(server: str, token: str, record): - response = requests.post( url=urljoin(server, 'api/records/'), - headers={'Content-Type':'application/json'}, - data=json.dumps(record), params={'access_token': token}) - return response.json() - -# the simplest version, target should be chunked -def add_file(record, fname: str, token: str, remote:str): - jf = os.path.split(remote)[-1] - return requests.put(url=f"{record['links']['files']}/{jf}", - params={'access_token': token}, - headers={"Content-Type":"application/octet-stream"}, - data=open(fname, 'rb')) - -def submit_draft(record, token): - pub = [{"op": "add", "path":"/publication_state", "value": "submitted"}] - response = requests.patch(record['links']['self'], - headers={"Content-Type":"application/json-patch+json"}, - data=json.dumps(pub), params={'access_token': token}) - return response.json() - - -class B2ShareOperator(BaseOperator): - template_fields = ('target_dir',) - - def __init__( - self, - name: str, - conn_id: str = 'default_b2share', # 'https://b2share-testing.fz-juelich.de/', - target_dir: str = '/tmp/', - **kwargs) -> None: - super().__init__(**kwargs) - self.name = name - self.conn_id = conn_id - self.target_dir = target_dir - - def execute(self, **kwargs): - hook = HttpHook(http_conn_id=self.conn_id, method='GET') - params = kwargs['context']['params'] - oid = params['oid'] - - hrespo = hook.run(endpoint=f"/api/records/{oid}") - print(hrespo) - - flist = get_file_list(hrespo.json()) - - ti = kwargs['context']['ti'] - name_mappings = {} - for fname, url in flist.items(): - tmpname = download_file(url=url, target_dir=self.target_dir) - print(f"Processing: {fname} --> {url} --> {tmpname}") - - name_mappings[fname] = tmpname - ti.xcom_push(key='local', value=tmpname) - ti.xcom_push(key='remote', value=fname) - break # for now only one file - - ti.xcom_push(key='mappings', value=name_mappings) - return len(name_mappings) diff --git a/dags/conn_deco.py b/dags/conn_deco.py deleted file mode 100644 index 1aebe1cd8c267367a47658bc05075a45a91da047..0000000000000000000000000000000000000000 --- a/dags/conn_deco.py +++ /dev/null @@ -1,46 +0,0 @@ -from datetime import timedelta - -from airflow.decorators import dag, task -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -from decors import get_connection, remove, setup - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) -} - - -@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def conn_decorator(): - - @task() - def doing_nothing(conn_id, **kwargs): - print(f"Using connection {conn_id}") - - ssh_hook = get_connection(conn_id=conn_id, **kwargs) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - print("Connected") - lst = sftp_client.listdir(path='/tmp/') - for f in lst: - print(f) - - return conn_id - - conn_id = PythonOperator(python_callable=setup, task_id='setup_connection') - # another way of mixing taskflow and classical api: - a_id = conn_id.output['return_value'] - dno = doing_nothing(conn_id=a_id) - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': dno}, task_id='cleanup') - - conn_id >> dno >> en - - -dag = conn_decorator() diff --git a/dags/datacat_connection_sync.py b/dags/datacat_connection_sync.py deleted file mode 100644 index 8a8f096dc29d94d333aef39b9c61432698f7fe44..0000000000000000000000000000000000000000 --- a/dags/datacat_connection_sync.py +++ /dev/null @@ -1,119 +0,0 @@ - -from typing import Dict -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.utils.dates import days_ago -from airflow import settings -import logging -from sqlalchemy.orm.session import Session as SASession -from datacat_integration.secrets import DataCatConnectionWithSecrets - -from datacat_integration.hooks import DataCatalogHook - -default_args = { - 'owner': 'airflow', -} - -connections_type = "airflow_connections" -substring_start = len(connections_type) + 1 -substring_end = substring_start + 36 # length of a UUID4 - -log = logging.getLogger(__name__) - -def get_conn_name(datacat_type: str, oid: str): - return "{}/{}-connection".format(datacat_type, oid) - -def get_normal_or_secret_property(key: str, props: Dict[str,str], secrets: Dict[str, str], default_value = None): - return props.get(key, secrets.get(key, default_value)) - - -def get_connection(hook: DataCatalogHook, datacat_type: str, oid: str): - conn_id = get_conn_name(datacat_type, oid) - secrets_connection = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password) - datacat_entry: Dict[str,str] = secrets_connection.get_object(datacat_type, oid)['metadata'] - datacat_entry_secrets = secrets_connection.get_all_secret_key_value(datacat_type, oid) - extra={} - predefined_keys = ['conn_type', 'description', 'host', 'login', 'password', 'schema', 'port'] - # build extra from non-predefined keys - for key in datacat_entry: - if key not in predefined_keys: - extra[key] = datacat_entry[key] - - for key in datacat_entry_secrets: - if key not in predefined_keys: - extra[key] = datacat_entry_secrets[key] - - - return Connection( - conn_id=conn_id, - conn_type=get_normal_or_secret_property('conn_type', datacat_entry, datacat_entry_secrets), - description=get_normal_or_secret_property('description', datacat_entry, datacat_entry_secrets, 'Automatically generated Connection from the datacatalog object {}/{}'.format(connections_type, oid)), - host=get_normal_or_secret_property('host', datacat_entry, datacat_entry_secrets), - login=get_normal_or_secret_property('login', datacat_entry, datacat_entry_secrets), - password=get_normal_or_secret_property('password', datacat_entry, datacat_entry_secrets), - schema=get_normal_or_secret_property('schema', datacat_entry, datacat_entry_secrets), - port=int(get_normal_or_secret_property('port', datacat_entry, datacat_entry_secrets)), - extra=extra - ) - - -@dag(default_args=default_args, schedule_interval='@hourly', start_date=days_ago(1), tags=['dls-service-dag']) -def sync_connections(): - - @task - def list_catalog_connections(**kwargs): - hook = DataCatalogHook("datacatalog") - objects = hook.list_type(connections_type) - oid_list = [element[1] for element in objects] - return oid_list - - @task - def list_airflow_connections(**kwargs): - session : SASession = settings.Session() - conns = session.query(Connection).filter(Connection.conn_id.like("{}/%-connection".format(connections_type))) - oid_list = [conn.conn_id[substring_start:substring_end] for conn in conns] - return oid_list - - @task - def get_add_list(catalog_connections, airflow_connections, **kwargs): - return list(set(catalog_connections).difference(airflow_connections)) - - @task - def get_remove_list(catalog_connections, airflow_connections, **kwargs): - return list(set(airflow_connections).difference(catalog_connections)) - - @task - def remove_connections(oid_list, **kwargs): - log.info("Going to remove from conections: " + ','.join(oid_list)) - session : SASession = settings.Session() - for oid in oid_list: - session.query(Connection).filter(Connection.conn_id == get_conn_name(connections_type, oid)).delete() - session.commit() - - @task - def add_connections(oid_list, **kwargs): - log.info("Going to add to conections: " + ','.join(oid_list)) - hook = DataCatalogHook("datacatalog") - connections = [] - for oid in oid_list: - connections.append(get_connection(hook, connections_type, oid)) - - session = settings.Session() - # no check for existsnce necessary, since it is handled by get_add_list() - for conn in connections: - session.add(conn) - - session.commit() - - cat_conn = list_catalog_connections() - air_conn = list_airflow_connections() - - add_list = get_add_list(cat_conn, air_conn) - remove_list = get_remove_list(cat_conn, air_conn) - - add_connections(add_list) - - remove_connections(remove_list) - - -dag = sync_connections() diff --git a/dags/decors.py b/dags/decors.py deleted file mode 100644 index 034e6a30be3367fb56624d67542c3e3237c6848a..0000000000000000000000000000000000000000 --- a/dags/decors.py +++ /dev/null @@ -1,84 +0,0 @@ -from airflow import settings -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.models.connection import Connection -from airflow.providers.hashicorp.hooks.vault import VaultHook - - -def create_temp_connection(rrid, params): - host = params.get('host') - port = params.get('port', 2222) - user = params.get('login', 'eflows') - key = params.get('key') - - conn_id = f"tmp_connection_{rrid}" - extra = {"private_key": key} - conn = Connection( - conn_id=conn_id, - conn_type='ssh', - description='Automatically generated Connection', - host=host, - login=user, - port=port, - extra=extra - ) - - session = settings.Session() - session.add(conn) - session.commit() - print(f"Connection {conn_id} created") - return conn_id - - -def get_connection(conn_id, **kwargs): - if conn_id.startswith('vault'): - vault_hook = VaultHook(vault_conn_id='my_vault') - con = vault_hook.get_secret( - secret_path=f"/ssh-credentials/{conn_id[6:]}") - print(f"Got some values from vault {list(con.keys())}") - - # for now SSH is hardcoded - params = kwargs['params'] - host = params.get('host') - port = int(params.get('port', 22)) - user = params.get('login', 'eflows') - hook = SSHHook(remote_host=host, port=port, username=user) - # key in vault should be in form of formated string: - # -----BEGIN OPENSSH PRIVATE KEY----- - # b3BlbnNzaC1rZXktdjEAAAAA - # .... - hook.pkey = hook._pkey_from_private_key(private_key=con['privateKey']) - return hook - - # otherwise use previously created temp connection - return SSHHook(ssh_conn_id=conn_id) - - -def setup(**kwargs): - params = kwargs['params'] - print("Setting up the connection") - - if 'vault_id' in params: - print('Retrieving connection details from vault') - return f"vault_{params['vault_id']}" - - # otherwise use creds provided in request - return create_temp_connection(rrid=kwargs['run_id'], params=params) - - -def remove(conn_id): - if conn_id.startswith('vault'): - return - - print(f"Removing conneciton {conn_id}") - session = settings.Session() - for con in session.query(Connection).all(): - print(con) - - session.query(Connection).filter(Connection.conn_id == conn_id).delete() - session.commit() - - -def get_conn_id(**kwargs): - ti = kwargs['ti'] - conn_id = ti.xcom_pull(key='return_value', task_ids='setup_connection') - return conn_id diff --git a/dags/docker_cmd.py b/dags/docker_cmd.py deleted file mode 100644 index d83b74235e361c14cb93e2132e8cd3883363343b..0000000000000000000000000000000000000000 --- a/dags/docker_cmd.py +++ /dev/null @@ -1,35 +0,0 @@ -import os - -WORKER_DATA_LOCATION = '/wf_pipeline_data/userdata' - -def get_dockercmd(params:dict, location): - """A task which runs in the docker worker and spins up a docker container with the an image and giver parameters. - - Args: - image(str): contianer image - stageout_args (list): a list of files which are results from the execution - job_args (str): a string of further arguments which might be needed for the task execution - entrypoint (str): specify or overwrite the docker entrypoint - command(str): you can specify or override the command to be executed - args_to_dockerrun(str): docker options - - """ - - image = params.get('image') # {"image": 'ghcr.io/helmholtz-analytics/heat:1.1.1-alpha'} - - job_args = params.get('job_args', '') - entrypoint = params.get('entrypoint', '') # {"entrypoint": "/bin/bash"} - command = params.get('command', '') # {"command": "python"} - args_to_dockerrun = params.get('args_to_docker', '') - - user_id = "id -u" - entrypoint_part = f"--entrypoint={entrypoint}" if entrypoint else '' - - working_dir = "/data" - - cmd_part = f"-c \"{command}\"" if command else '' - volumes = f"-v {location}:{working_dir} -w={working_dir}" - - cmd = f'userid=$({user_id}) ; docker run {args_to_dockerrun} --user=$userid --rm --name="test" {volumes} {entrypoint_part} {image} {cmd_part} {job_args} > {location}/stdout.txt' - - return cmd \ No newline at end of file diff --git a/dags/docker_in_worker.py b/dags/docker_in_worker.py deleted file mode 100644 index a7b998bf587c7e7d5db2def180491cef3d821b74..0000000000000000000000000000000000000000 --- a/dags/docker_in_worker.py +++ /dev/null @@ -1,318 +0,0 @@ -from airflow.decorators import dag, task -from airflow.providers.ssh.operators.ssh import SSHOperator -from airflow.utils.dates import days_ago -from airflow.models.connection import Connection -from airflow.models import Variable -from airflow.operators.python import get_current_context - -from datacat_integration.hooks import DataCatalogHook -from datacat_integration.connection import DataCatalogEntry - -from b2shareoperator import (download_file, get_file_list, get_object_md, - get_record_template, create_draft_record, add_file, submit_draft) -from decors import get_connection -import docker_cmd as doc -from docker_cmd import WORKER_DATA_LOCATION -import os -import uuid -import tempfile - -"""This piplines is a test case for starting a clusterting algorithm with HeAT, running in a Docker environment. -A test set of parameters with a HeAT example: -Data Catalog Integration example: {"oid": "e13bcab6-3664-4090-bebb-defdb58483e0", "image": "ghcr.io/helmholtz-analytics/heat:1.1.1-alpha", "entrypoint": "/bin/bash", "command": "python demo_knn.py iris.h5 calc_res.txt", "register":"True"} -Data Catalog Integration example: {"oid": "e13bcab6-3664-4090-bebb-defdb58483e0", "image":"hello-world", "register":"True"} -Params: - oid (str): oid of the data (e.g, from data catalog) - image (str): a docker contianer image - job_args (str): - Optional: a string of further arguments which might be needed for the task execution - entrypoint (str): - Optional: you can specify or overwrite the docker entrypoint - command (str): - Optional: you can specify or override the command to be executed - args_to_dockerrun (str): - Optional: docker run additional arguments - register (True, False): - Optional, default is False: register the resulsts in the data catalog -""" - -default_args = { - 'owner': 'airflow', -} - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'docker', 'datacat']) -def docker_in_worker(): - DW_CONNECTION_ID = "docker_worker" - - - @task() - def stagein(**kwargs): - """ stage in task - This task gets the 'datacat_oid' or 'oid' from the DAG params to retreive a connection from it (b2share for now). - It then downloads all data from the b2share entry to the local disk, and returns a mapping of these files to the local download location, - which can be used by the following tasks. - """ - params = kwargs['params'] - datacat_hook = DataCatalogHook() - - if 'oid' not in params: # {"oid": "b143bf73efd24d149bba4c081964b459"} - if 'datacat_oid' not in params: - print("Missing object id in pipeline parameters") - return -1 # non zero exit code is a task failure - else: - params['oid'] = params['datacat_oid'] - oid_split = params['oid'].split("/") - type = 'dataset' - oid = 'placeholder_text' - if len(oid_split) is 2: - type = oid_split[0] - oid = oid_split[1] - elif len(oid_split) is 1: - oid = oid_split[0] - else: - print("Malformed oid passed as parameter.") - return -1 - - entry = DataCatalogEntry.from_json(datacat_hook.get_entry(type, oid)) - - print(f"using entry: {entry}") - b2share_server_uri = entry.url - # TODO general stage in based on type metadata - # using only b2share for now - b2share_oid = entry.metadata['b2share_oid'] - - obj = get_object_md(server=b2share_server_uri, oid=b2share_oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - - return name_mappings - - @task() - def move_to_docker_host(files: dict, **kwargs): - """This task copies the data onto the remote docker worker, - which will enable the following tasks an access to the data - - Args: - files (dict): the files that will be stored on the docker worker - Returns: - target_dir: the location of the files on the docker worker - """ - print(f"Using {DW_CONNECTION_ID} connection") - ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) - user_dir_name = str(uuid.uuid4()) - target_dir = os.path.join(WORKER_DATA_LOCATION, user_dir_name) - - with ssh_hook.get_conn() as ssh_client: - - sftp_client = ssh_client.open_sftp() - - sftp_client.mkdir(target_dir, mode=0o755) - for [truename, local] in files.items(): - print( - f"Copying {local} --> {DW_CONNECTION_ID}:{os.path.join(target_dir, truename)}") - sftp_client.put(local, os.path.join(target_dir, truename)) - # or separate cleanup task? - os.unlink(local) - - return target_dir - - @task - def run_container(data_location, **kwargs): - """A task which runs in the docker worker and spins up a docker container with the an image and giver parameters. - - Args: - image (str): a docker contianer image - job_args (str): - Optional: a string of further arguments which might be needed for the task execution - entrypoint (str): - Optional: you can specify or overwrite the docker entrypoint - command (str): - Optional: you can specify or override the command to be executed - args_to_dockerrun (str): - Optional: docker run additional arguments - """ - params = kwargs['params'] - - cmd = doc.get_dockercmd(params, data_location) - print(f"Executing docker command {cmd}") - - print(f"Using {DW_CONNECTION_ID} connection") - hook = get_connection(conn_id=DW_CONNECTION_ID) - - task_calculate = SSHOperator( - task_id="calculate", - ssh_hook=hook, - command=cmd - ) - - context = get_current_context() - task_calculate.execute(context) - - return data_location - - @task - def ls_results(output_dir): - if not output_dir: - return "No output to stage out. Nothing more to do." - hook = get_connection(conn_id=DW_CONNECTION_ID) - - cmd = f"ls -al {output_dir}" - process = SSHOperator( - task_id="print_results", - ssh_hook=hook, - command=cmd - ) - context = get_current_context() - process.execute(context) - - @task() - def retrieve_res(output_dir: str, input_files: dict, **kwargs): - """This task copies the data from the remote docker worker back to airflow workspace - - Args: - output_dir (str): the folder containing all the user files for the executed task, located on the docker worker - Returns: - local_fpath (list): the path of the files copied back to the airflow host - """ - working_dir = Variable.get("working_dir", default_var='/tmp/') - name_mappings = {} - print(f"Using {DW_CONNECTION_ID} connection") - ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - - for fname in sftp_client.listdir(output_dir): - if fname not in input_files.keys(): - - tmpname = tempfile.mktemp(dir=working_dir) - local = os.path.join(working_dir, tmpname) - print(f"Copying {os.path.join(output_dir, fname)} to {local}") - sftp_client.get(os.path.join(output_dir, fname), local) - name_mappings[fname] = local - - return name_mappings - - @task() - def cleanup_doc_worker(res_fpaths_local, data_on_worker, **kwargs): - """This task deletes all the files from the docker worker - - Args: - res_fpaths_local: used only to define the order of tasks within the DAG, i.e. wait for previos task to complete before cleaning the worker space - data_on_worker (str): delete the folder with the user data from the docker worker - """ - - print(f"Using {DW_CONNECTION_ID} connection") - ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - d = os.path.join(WORKER_DATA_LOCATION, data_on_worker) - - for f in sftp_client.listdir(d): - print(f"Deleting file {f}") - sftp_client.remove(os.path.join(d, f)) - print(f"Deleting directory {DW_CONNECTION_ID}:{d}") - sftp_client.rmdir(d) - - - @task - def stageout_results(output_mappings: dict): - """This task transfers the output files to b2share - - Args: - output_mappings (dict): {true_filename, local_path} a dictionary of the output files to be submitted to the remote storage, e.g., b2share - Returns: - a b2share record - """ - if not output_mappings: - print("No output to stage out. Nothing more to do.") - return -1 - connection = Connection.get_connection_from_secrets('default_b2share') - - server = "https://" + connection.host - token = '' - if 'access_token' in connection.extra_dejson.keys(): - token = connection.extra_dejson['access_token'] - print(f"Registering data to {server}") - template = get_record_template() - - r = create_draft_record(server=server, token=token, record=template) - print(f"record {r}") - if 'id' in r: - print(f"Draft record created {r['id']} --> {r['links']['self']}") - else: - print('Something went wrong with registration', r, r.text) - return -1 - - for [truename, local] in output_mappings.items(): - print(f"Uploading {truename}") - _ = add_file(record=r, fname=local, token=token, remote=truename) - # delete local - os.unlink(local) - - print("Submitting record for pubication") - submitted = submit_draft(record=r, token=token) - print(f"Record created {submitted}") - - return submitted['links']['publication'] - - - - @task() - def register(object_url, additional_metadata = {}, **kwargs): - """This task registers the b2share record into the data catalog - - Args: - object_url: from b2share - additional_metadata - """ - params = kwargs['params'] - reg = params.get('register', False) - if not reg: - print("Skipping registration as 'register' parameter is not set") - return 0 - - hook = DataCatalogHook() - print("Connected to datacat via hook") - - if not additional_metadata.get('author', False): - additional_metadata['author'] = "DLS on behalft of eFlows" - - if not additional_metadata.get('access', False): - additional_metadata['access'] = "hook-based" - - entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", - url=object_url, - metadata=additional_metadata - ) - try: - r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) - return f"{hook.base_url}/dataset/{r}" - except ConnectionError as e: - print('Registration failed', e) - return -1 - - input_files = stagein() - data_location = move_to_docker_host(input_files) - data_on_worker = run_container(data_location) - ls_results(data_on_worker) - res_fpaths = retrieve_res(data_on_worker, input_files) - cleanup_doc_worker(res_fpaths, data_on_worker) - url_or_errcode = stageout_results(res_fpaths) - register(url_or_errcode) - - # files >> data_locations >> output_fnames >> ls_results(output_fnames) >> files >> stageout_results(files) >> cleanup() - -dag = docker_in_worker() - diff --git a/dags/firsto.py b/dags/firsto.py deleted file mode 100644 index 3b3e672f1216761dcf7527fbe7140c0a47c3c0b6..0000000000000000000000000000000000000000 --- a/dags/firsto.py +++ /dev/null @@ -1,35 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.providers.sftp.operators.sftp import SFTPOperator -from airflow.utils.dates import days_ago - -from b2shareoperator import B2ShareOperator - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - - get_b2obj = B2ShareOperator(task_id='task_b2sh', - dag=dag, - name='B2Share', - target_dir="{{ var.value.source_path}}") - - put_file = SFTPOperator( - task_id="upload_scp", - ssh_conn_id="default_ssh", - local_filepath="{{ti.xcom_pull(task_ids='task_b2sh', key='local')}}", - remote_filepath="{{ti.xcom_pull(task_ids='task_b2sh',key='remote')}}", - operation="put", - create_intermediate_dirs=True, - dag=dag) - - get_b2obj >> put_file diff --git a/dags/image_transfer.py b/dags/image_transfer.py deleted file mode 100644 index a517bb2bc29baa020898c1d6fe3942406aa3429f..0000000000000000000000000000000000000000 --- a/dags/image_transfer.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import requests - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.operators.python import PythonOperator -from dags.uploadflow import copy_streams - -from decors import setup, get_connection, remove - -default_args = { - 'owner': 'airflow', -} - -def file_exist(sftp, name): - try: - r = sftp.stat(name) - return r.st_size - except: - return -1 - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def transfer_image(): - - - @task - def stream_upload(connection_id, **kwargs): - params = kwargs['params'] - target = params.get('target', '/tmp/') - image_id = params.get('image_id', 'wordcount_skylake.sif') - url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}" - - print(f"Putting {url} --> {target} connection") - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - remote_name = os.path.join(target, image_id) - size = file_exist(sftp=sftp_client, name=remote_name) - if size>0: - print(f"File {remote_name} exists and has {size} bytes") - force = params.get('force', True) - if force!= True: - return 0 - print("Forcing overwrite") - - ssh_client.exec_command(command=f"mkdir -p {target}") - - with requests.get(url, stream=True, verify=False) as r: - with sftp_client.open(remote_name, 'wb') as f: - f.set_pipelined(pipelined=True) - copy_streams(input=r, output=f) - - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - cleanup_task = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - setup_task >> stream_upload(connection_id=a_id) >> cleanup_task - - -dag = transfer_image() diff --git a/dags/image_transfer_alt.py b/dags/image_transfer_alt.py deleted file mode 100644 index 49772e5ca50bd794fc4f5d2cc09e096646e935c4..0000000000000000000000000000000000000000 --- a/dags/image_transfer_alt.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import shutil -import requests - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.operators.python import PythonOperator -from airflow.models import Variable -from justreg import get_parameter -from decors import setup, get_connection, remove - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def transfer_image_alt(): - - @task - def im_download(connection_id, **kwargs): - - work_dir = Variable.get("working_dir", default_var='/tmp/') - - image_id = get_parameter( - 'image_id', default='wordcount_skylake.sif', **kwargs) - url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}" - - print(f"Putting {url} --> {work_dir} connection") - with requests.get(url, stream=True, verify=False) as r: - with open(os.path.join(work_dir, image_id), 'wb') as f: - shutil.copyfileobj(r.raw, f) - - @task - def im_upload(connection_id, **kwargs): - if not get_parameter('upload', False, **kwargs): - print('Skipping upload') - return 0 - work_dir = Variable.get("working_dir", default_var='/tmp/') - target = get_parameter('target', default='/tmp/', **kwargs) - image_id = get_parameter( - 'image_id', default='wordcount_skylake.sif', **kwargs) - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - print( - f"Copying local {os.path.join(work_dir, image_id)} -> {connection_id}:{target}") - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - ssh_client.exec_command(command=f"mkdir -p {target}") - with open(os.path.join(work_dir, image_id), 'rb') as r: - with sftp_client.open(os.path.join(target, image_id), 'wb') as f: - shutil.copyfileobj(r.raw, f) - - print('Removing local copy') - os.unlink(os.path.join(work_dir, image_id)) - - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - - cleanup_task = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - setup_task >> im_download(connection_id=a_id) >> im_upload( - connection_id=a_id) >> cleanup_task - - -dag = transfer_image_alt() diff --git a/dags/justreg.py b/dags/justreg.py deleted file mode 100644 index 160829f3a989e4efd3c4ed7fc45d348d22381f6e..0000000000000000000000000000000000000000 --- a/dags/justreg.py +++ /dev/null @@ -1,95 +0,0 @@ - -from airflow.decorators import dag, task -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook -from datacat_integration.connection import DataCatalogEntry -import json - -default_args = { - 'owner': 'airflow', -} - - -def get_record(name, url): - return { - "name": name, - "url": url, - "metadata": { - "author": "DLS on behalf of eFlows", - } - } - -def create_template(hrespo): - return { - "titles": [{"title": hrespo['title']}], - "creators": [{"creator_name": hrespo['creator_name']}], - "descriptions": [ - { - "description": hrespo['description'], - "description_type": "Abstract" - } - ], - "community": "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific": { - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": hrespo['open_access'] == "True" - } - -def get_parameter(parameter, default=False, **kwargs): - params = kwargs['params'] - return params.get(parameter, default) - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def datacat_registration_example(): - - @task() - def register(object_url, **kwargs): - reg = get_parameter(parameter='register', default=False, **kwargs) - if not reg: - print("Skipping registration as 'register' parameter is not set") - return 0 - - hook = DataCatalogHook() - print("Connected to datacat via hook", hook.list_type('dataset')) - - entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", - url=object_url, - metadata= { - "author": "DLS on behalf of eFlows", - "access": "hook-based"} - ) - try: - r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) - return f"{hook.base_url}/dataset/{r}" - except ConnectionError as e: - print('Registration failed', e) - return -1 - - @task - def get_template(): - hook = DataCatalogHook() - print("Connected to datacat via hook", hook.list_type('dataset')) - - mid = '71e863ac-aee6-4680-a57c-de318530b71e' - entry = json.loads(hook.get_entry(datacat_type='storage_target', oid=mid)) - print(entry) - print(entry['metadata']) - print('---') - print(create_template(entry['metadata'])) - - - - - step1 = BashOperator(task_id='jj', bash_command='ls') #BashOperator(bash_command='curl -X GET -k https://bscgrid20.bsc.es/image_creation/images/download/wordcount_skylake.sif -o /work/ww', task_id='nothing') - step2 = register( - object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7') - - step3 = get_template() - stepr = BashOperator(bash_command='rm /work/ww', task_id='remov') - step1 >> step2 >> step3 >> stepr - - -dag = datacat_registration_example() diff --git a/dags/taskflow datacat_integration.py b/dags/taskflow datacat_integration.py deleted file mode 100644 index ca67402e4ef7aa242c70a42c43e22756f21d547c..0000000000000000000000000000000000000000 --- a/dags/taskflow datacat_integration.py +++ /dev/null @@ -1,77 +0,0 @@ - -from airflow import settings -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.models import Variable -from airflow.utils.dates import days_ago -import os - -from b2shareoperator import (download_file, get_file_list, get_object_md, - get_objects) - -default_args = { - 'owner': 'airflow' -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'datacat_integration']) -def taskflow_datacat_integration(): - - @task(multiple_outputs=True) - def extract(conn_id, **kwargs): - connection = Connection.get_connection_from_secrets(conn_id) - server = connection.get_uri() - print(f"Retreiving data from {server}") - - params = kwargs['params'] - if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"} - print("Missing object id in pipeline parameters") - lst = get_objects(server=server) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"Objects on server: {flist}") - return -1 # non zero exit code is a task failure - - oid = params['oid'] - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def transform(flist: dict): - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - return name_mappings - - @task() - def load(connection_id, files: dict, **kwargs): - print(f"Total files downloaded: {len(files)}") - params = kwargs['params'] - target = params.get('target', '/tmp/') - - print(f"Using {connection_id} connection") - - ssh_hook = SSHHook(ssh_conn_id=connection_id) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - for [truename, local] in files.items(): - print(f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") - sftp_client.put(local, os.path.join(target, truename)) - # or separate cleanup task? - os.unlink(local) - - - conn_id = "e3710075-9f8f-4ae0-a1c3-7d92c0182d19" # created as copy of default_b2share - data = extract(conn_id) - files = transform(data) - load(connection_id = conn_id, files=files) - -dag = taskflow_datacat_integration() diff --git a/dags/taskflow.py b/dags/taskflow.py deleted file mode 100644 index 223d3dde9e7550121ce4bfd1e08722830667d257..0000000000000000000000000000000000000000 --- a/dags/taskflow.py +++ /dev/null @@ -1,105 +0,0 @@ - -import json -import os - -from airflow.decorators import dag, task -from airflow.models import Variable -from airflow.models.connection import Connection -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook - -from b2shareoperator import download_file, get_file_list, get_object_md -from decors import get_connection, remove, setup - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_example(): - - @task(multiple_outputs=True) - def extract(conn_id, **kwargs): - params = kwargs['params'] - if 'oid' not in params: - print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") - return -1 - oid = params['oid'] - - hook = DataCatalogHook() - try: - entry = json.loads(hook.get_entry('dataset', oid)) - if entry and 'b2share' in entry['url']: - print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") - oid = entry['url'].split('/')[-1] - print(f"Extracted oid {oid}") - else: - print('No entry in data cat or not a b2share entry') - - except: - # falling back to b2share - print("No entry found. Probably a b2share object") - - - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - # check status? - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def transform(flist: dict): - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - return name_mappings - - @task() - def load(connection_id, files: dict, **kwargs): - print(f"Total files downloaded: {len(files)}") - params = kwargs['params'] - target = params.get('target', '/tmp/') - - print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - # check dir? - ssh_client.exec_command(command=f"mkdir -p {target}") - for [truename, local] in files.items(): - print( - f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") - sftp_client.put(local, os.path.join(target, truename)) - # or separate cleanup task? - os.unlink(local) - - return connection_id - - conn_id = PythonOperator(python_callable=setup, task_id='setup_connection') - # another way of mixing taskflow and classical api: - a_id = conn_id.output['return_value'] - - data = extract(conn_id=a_id) - files = transform(flist=data) - ucid = load(connection_id=a_id, files=files) - - #b_id = ucid.output['return_value'] - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': ucid}, task_id='cleanup') - - conn_id >> data >> files >> ucid >> en - - -dag = taskflow_example() diff --git a/dags/taskflow_stream.py b/dags/taskflow_stream.py deleted file mode 100644 index 4b24fbe8d1ebd14ff54edd4aa5107a8070610358..0000000000000000000000000000000000000000 --- a/dags/taskflow_stream.py +++ /dev/null @@ -1,78 +0,0 @@ - -import os -import shutil -import requests - -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook - -import json - - -from b2shareoperator import (get_file_list, get_object_md) - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_stream(): - @task(multiple_outputs=True) - def get_flist(**kwargs): - params = kwargs['params'] - if 'oid' not in params: - print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") - return -1 - oid = params['oid'] - - hook = DataCatalogHook() - try: - entry = json.loads(hook.get_entry('dataset', oid)) - if entry and 'b2share' in entry['url']: - print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") - oid = entry['url'].split('/')[-1] - print(f"Extracted oid {oid}") - else: - print('No entry in data cat or not a b2share entry') - - except: - # falling back to b2share - print("No entry found. Probably a b2share object") - - - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def stream_upload(flist: dict, **kwargs): - params = kwargs['params'] - target = params.get('target', '/tmp/') - connection_id = params.get('connection', 'default_ssh') - ssh_hook = SSHHook(ssh_conn_id=connection_id) - mappings = dict() - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - - for fname, url in flist.items(): - print(f"Processing: {url} --> {fname}") - with requests.get(url, stream=True) as r: - with sftp_client.open(os.path.join(target, fname), 'wb') as f: - shutil.copyfileobj(r.raw, f) - mappings[url] = os.path.join(target, fname) - return mappings - - flist = get_flist() - stats = stream_upload(flist) - -dag = taskflow_stream() diff --git a/dags/test_secrets_backend.py b/dags/test_secrets_backend.py deleted file mode 100644 index 1cfe57412d002634d058e7bf8ec46d90cc33c3fd..0000000000000000000000000000000000000000 --- a/dags/test_secrets_backend.py +++ /dev/null @@ -1,25 +0,0 @@ - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.hooks.base import BaseHook - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def test_secrets_backend(): - @task() - def get_print_and_return_conenction(**kwargs): - oid = '860355e9-975f-4253-9421-1815e20c879b' - params = kwargs['params'] - if 'oid' in params: - oid = params['oid'] - conn = BaseHook.get_connection(oid) - print(conn.get_extra()) - - get_print_and_return_conenction() - - -dag = test_secrets_backend() diff --git a/dags/testdag.py b/dags/testdag.py deleted file mode 100644 index f974f232cb4a474c6978f1e0cbe8e00fb7f1bad8..0000000000000000000000000000000000000000 --- a/dags/testdag.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('testdag', default_args=def_args, description='simple testing dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - t1 = BashOperator(task_id='print_date', bash_command='date') - t2 = BashOperator(task_id='do_noting', bash_command='sleep 5') - - t1 >> t2 diff --git a/dags/uploadflow.py b/dags/uploadflow.py deleted file mode 100644 index e2b04a2563ee9d5e909ae47788eabde6271b4f0c..0000000000000000000000000000000000000000 --- a/dags/uploadflow.py +++ /dev/null @@ -1,175 +0,0 @@ - -import os -import tempfile -import json - -from airflow.decorators import dag, task -from airflow.models import Variable -from airflow.models.connection import Connection -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -from b2shareoperator import (add_file, create_draft_record, get_community, - submit_draft) -from decors import get_connection, remove, setup -from justreg import get_parameter -from datacat_integration.hooks import DataCatalogHook -from datacat_integration.connection import DataCatalogEntry - -default_args = { - 'owner': 'airflow', -} - - -def create_template(hrespo): - return { - "titles": [{"title": hrespo['title']}], - "creators": [{"creator_name": hrespo['creator_name']}], - "descriptions": [ - { - "description": hrespo['description'], - "description_type": "Abstract" - } - ], - "community": "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific": { - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": hrespo['open_access'] == "True" - } - -def copy_streams(input, output, chunk_size = 1024 * 1000): - while True: - chunk=input.raw.read(chunk_size) - if not chunk: - break - content_to_write = memoryview(chunk) - output.write(content_to_write) - - -def ssh_download(sftp_client, remote, local): - #sftp_client.get(remote, local) - with sftp_client.open(remote, 'rb') as input: - with open(local, 'wb') as output: - input.set_pipelined(pipelined=True) - copy_streams(input=input, output=output) - - -def ssh2local_copy(ssh_hook, source: str, target: str): - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - lst = sftp_client.listdir(path=source) - - print(f"{len(lst)} objects in {source}") - mappings = dict() - for fname in lst: - local = tempfile.mktemp(prefix='dls', dir=target) - full_name = os.path.join(source, fname) - sts = sftp_client.stat(full_name) - if str(sts).startswith('d'): - print(f"{full_name} is a directory. Skipping") - continue - - print(f"Copying {full_name} --> {local}") - ssh_download(sftp_client=sftp_client, remote=full_name, local=local) - mappings[local] = fname - - return mappings - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def upload_example(): - - @task() - def load(connection_id, **kwargs): - params = kwargs['params'] - target = Variable.get("working_dir", default_var='/tmp/') - source = params.get('source', '/tmp/') - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target) - return mappings - - @task() - def upload(files: dict, **kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - # hate such hacks: - server = "https://" + connection.host - token = connection.extra_dejson['access_token'] - - params = kwargs['params'] - mid = params['mid'] - - - hook = DataCatalogHook() - print("Connected to datacat via hook") - entry = json.loads(hook.get_entry(datacat_type='storage_target', oid=mid)) - print('Got following metadata', entry) - - template = create_template(hrespo=entry['metadata']) - community = get_community(server=server, community_id=template['community']) - if not community: - print("Not existing community") - return -1 - cid, required = community - missing = [r for r in required if r not in template] - if any(missing): - print(f"Community {cid} required field {missing} are missing. This could pose some problems") - - r = create_draft_record(server=server, token=token, record=template) - if 'id' in r: - print(f"Draft record created {r['id']} --> {r['links']['self']}") - else: - print('Something went wrong with registration', r, r.text) - return -1 - - for [local, true_name] in files.items(): - print(f"Uploading {local} --> {true_name}") - _ = add_file(record=r, fname=local, token=token, remote=true_name) - # delete local - os.unlink(local) - - print("Submitting record for pubication") - submitted = submit_draft(record=r, token=token) - print(f"Record created {submitted}") - - return submitted['links']['publication'] - - @task() - def register(object_url, **kwargs): - reg = get_parameter(parameter='register', default=False, **kwargs) - if not reg: - print("Skipping registration as 'register' parameter is not set") - return 0 - - hook = DataCatalogHook() - print("Connected to datacat via hook") - - entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", - url=object_url, - metadata= { - "author": "DLS on behalf of eFlows", - "access": "hook-based"} - ) - try: - r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) - return f"{hook.base_url}/dataset/{r}" - except ConnectionError as e: - print('Registration failed', e) - return -1 - - setup_task = PythonOperator(python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - - files = load(connection_id=a_id) - uid = upload(files) - - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - reg = register(object_url=uid) - - setup_task >> files >> uid >> reg >> en - - -dag = upload_example() diff --git a/dags/webdav_example.py b/dags/webdav_example.py deleted file mode 100644 index 9829bfda9a33f7fbd2533763d3a934afe2310313..0000000000000000000000000000000000000000 --- a/dags/webdav_example.py +++ /dev/null @@ -1,74 +0,0 @@ -import os - -from airflow.decorators import dag, task -from airflow.models import Variable -from airflow.models.connection import Connection -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago -from webdav3.client import Client - -from uploadflow import ssh2local_copy -from decors import get_connection, remove, setup - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def webdav_upload(): - - @task() - def download(connection_id, **kwargs): - - params = kwargs['params'] - target = Variable.get("working_dir", default_var='/tmp/') - source = params.get('source', '/tmp/') - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target) - - return mappings - - @task() - def load(mappings, **kwargs): - params = kwargs['params'] - target = params.get('target', '/airflow-test') - connection = Connection.get_connection_from_secrets('b2drop_webdav') - options = {'webdav_hostname': f"https://{connection.host}{connection.schema}", - 'webdav_login': connection.login, - 'webdav_password': connection.get_password() - } - print(f"Translated http to webdav: {options}") - client = Client(options) - res = client.mkdir(target) - print(f"Creating {target}: {'ok' if res else 'failed'}") - - print(f"Starting upload -> {target}") - for [local, true_name] in mappings.items(): - full_name = full_name = os.path.join(target, true_name) - print(f"Processing {local} --> {full_name}") - client.upload_sync(remote_path=full_name, local_path=local) - - # delete local - os.unlink(local) - - return True - - @task - def print_stats(res): - print('Finished') - - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - - mappings = download(connection_id=a_id) - res = load(mappings=mappings) - - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - res >> en - - -dag = webdav_upload() diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py deleted file mode 100644 index 3a28ea31f5ddf465217aac9244fbc9492bcc23a0..0000000000000000000000000000000000000000 --- a/tests/test_b2shareoperator.py +++ /dev/null @@ -1,112 +0,0 @@ -import unittest -from unittest.mock import Mock, patch -import tempfile -import os - -from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType -from airflow import DAG -from airflow.models.taskinstance import TaskInstance -from airflow.utils.dates import days_ago -from airflow.utils.state import State - -from dags.b2shareoperator import (B2ShareOperator, download_file, - get_file_list, get_object_md, get_objects, - get_record_template, create_draft_record, add_file, submit_draft) - -DEFAULT_DATE = '2019-10-03' -TEST_DAG_ID = 'test_my_custom_operator' -TEST_TASK_ID = 'test' - - -class B2ShareOperatorTest(unittest.TestCase): - def setUp(self): - with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag: - B2ShareOperator( - task_id=TEST_TASK_ID, - name='test_name' - ) - self.dag = dag - #self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) - - @patch('dags.b2shareoperator.HttpHook') - @patch('dags.b2shareoperator.get_file_list') - @patch('dags.b2shareoperator.download_file') - def test_alt_execute_no_trigger(self, down, gfl, ht): - gfl.return_value = {'ooo.txt': 'htt://file/to/download'} - down.return_value = 'tmp_name' - - dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL) - ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) - ti.task = self.dag.get_task(task_id=TEST_TASK_ID) - ti.run(ignore_ti_state=True) - print(ti.state) - - self.assertEqual(State.SUCCESS, ti.state) - - # return value - ret = ti.xcom_pull() - self.assertEqual(ret, 1, f"{ret}") - - lcl = ti.xcom_pull(key='local') - rmt = ti.xcom_pull(key='remote') - mps = ti.xcom_pull(key='mappings') - self.assertEqual(len(mps), 1, f"{mps}") - self.assertDictEqual( - mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") - self.assertEqual(lcl, 'tmp_name', f"unexpecting local name: {lcl}") - self.assertEqual(rmt, 'ooo.txt', f"unexpected remote name: {rmt}") - - def test_get_files(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - m.json.return_value = {'contents': [ - {'key': 'veryimportant.txt', 'links': {'self': 'http://foo.bar'}}]} - get.return_value = m - ret = get_file_list(obj={'links': {'files': ['bla']}}) - self.assertEqual(len(ret), 1) - - def test_download_file(self): - with patch('dags.b2shareoperator.urllib.request.urlretrieve') as rr: - with patch('dags.b2shareoperator.tempfile.mktemp') as mt: - mt.return_value = '/tmp/val' - fname = download_file( - url='http://foo.bar', target_dir='/no/tmp/') - self.assertEqual(fname, '/tmp/val') - - def test_get_md(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - rval = {'links': {'files': ['a', 'b']}} - m.json.return_value = rval - get.return_value = m - r = get_object_md(server='foo', oid='bar') - self.assertDictEqual(rval, r) - - def test_get_objects(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - rval = {'hits': {'hits': ['a', 'b']}} - m.json.return_value = rval - get.return_value = m - r = get_objects(server='foo') - self.assertListEqual(['a', 'b'], r) - - def test_upload(self): - template = get_record_template() - server='https://b2share-testing.fz-juelich.de/' - token = '' - with patch('dags.b2shareoperator.requests.post') as post: - r = create_draft_record(server=server, token=token, record=template) - - r = dict() - r['links']={'files':server, 'self': server} - with patch('dags.b2shareoperator.requests.post') as put: - a = tempfile.NamedTemporaryFile() - a.write(b"some content") - up = add_file(record=r, fname=a.name, token=token, remote='/tmp/somefile.txt') - - - with patch('dags.b2shareoperator.requests.patch') as p: - submitted = submit_draft(record=r, token=token) - diff --git a/tests/test_dag.py b/tests/test_dag.py deleted file mode 100644 index 360a6987757d05da99b8448ca6303b215a18bb6b..0000000000000000000000000000000000000000 --- a/tests/test_dag.py +++ /dev/null @@ -1,22 +0,0 @@ -import unittest - -from airflow.models import DagBag - - -class TestADag(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.dagbag = DagBag() - - def test_dag_loaded(self): - dag = self.dagbag.get_dag(dag_id='firsto') - print(self.dagbag.import_errors) - self.assertDictEqual(self.dagbag.import_errors, {}, "not equal") - assert dag is not None - self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") - - def test_tf_loaded(self): - dag = self.dagbag.get_dag(dag_id='taskflow_example') - assert self.dagbag.import_errors == {} - assert dag is not None - self.assertEqual(len(dag.tasks), 5, f"Actually: {len(dag.tasks)}") diff --git a/tests/test_ssh.py b/tests/test_ssh.py deleted file mode 100644 index ac2ad3664e9b128c8ee8d4bf963c909b189b28a3..0000000000000000000000000000000000000000 --- a/tests/test_ssh.py +++ /dev/null @@ -1,74 +0,0 @@ -import tempfile -import unittest -from unittest.mock import MagicMock, patch -import os - -from dags.uploadflow import ssh2local_copy, copy_streams - -class TestSSH(unittest.TestCase): - - @patch('dags.uploadflow.tempfile.mktemp') - def test_copy_files(self, tmp): - tmp.side_effect = ['tmpA', 'tmpB'] - - my_hook = MagicMock() - a = MagicMock() - a.return_value = ['a', 'c'] - stat = MagicMock(side_effect=['elo', 'elo']) - cpy = MagicMock(return_value=False) - my_hook.get_conn().__enter__().open_sftp().listdir = a - my_hook.get_conn().__enter__().open_sftp().stat = stat - my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy - - mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg') - my_hook.get_conn.assert_any_call() - a.assert_called_once_with(path='srcZ') - cpy.assert_called() - - print(mapps) - self.assertEqual(len(mapps), 2) - - - @patch('dags.uploadflow.tempfile.mktemp') - def test_skipdir_files(self, tmp): - tmp.side_effect = ['tmpA', 'tmpB'] - - my_hook = MagicMock() - a = MagicMock() - a.return_value = ['a', 'c'] - stat = MagicMock(side_effect=['elo', 'd elo']) - cpy = MagicMock(return_value=False) - my_hook.get_conn().__enter__().open_sftp().listdir = a - my_hook.get_conn().__enter__().open_sftp().stat = stat - my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy - - mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg') - my_hook.get_conn.assert_any_call() - a.assert_called_once_with(path='srcZ') - cpy.assert_called() - - print(mapps) - - self.assertEqual(len(mapps), 1) - - - def test_copy_streams(self): - """ - def copy_streams(input, output): - """ - with tempfile.TemporaryDirectory() as dir: - text = 'Some input text' - input_name = os.path.join(dir,'input.txt') - output_name = os.path.join(dir, 'output') - with open(input_name, 'w') as fln: - fln.write(text) - - with open(input_name, 'rb') as input: - with open(output_name, 'wb') as output: - copy_streams(input=input, output=output) - - with open(output_name, 'r') as f: - txt = f.read() - print("Read following: ", txt) - - self.assertEqual(text, txt)