diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1460b12b8b2d9e5a102f4b2808d193186bd40900..687feb7a73ae122b50faafde8f4de28e6e457ce1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -34,34 +34,12 @@ variables: - chmod 700 ~/.ssh stages: - - test - build - publish - deploy - test-deployment - cleanup -test: - stage: test - image: - name: $CI_REGISTRY_IMAGE/eflows-airflow:latest - entrypoint: [""] - before_script: - - echo "DEBUG:" - - pip --version - - airflow db init - - pip install -r requirements.txt - - pip install nose==1.3.7 - - airflow connections add --conn-uri https://b2share-testing.fz-juelich.de/ default_b2share - script: - - ls - - pwd - - cp dags/* /opt/airflow/dags/ - - airflow dags list - - airflow connections list - - airflow dags test testdag 2021-08-18 - - nosetests - build-custom-image: stage: build @@ -143,8 +121,9 @@ full-deploy-production: # do the mount /dev/vdb1 stuff - openstack server add volume $INSTANCE_ID $VOLUME_ID - sleep 20 # apparently it may take some time until the volume is available to the OS - - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data" + - while [ "`ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls `" ]; do sleep 5; done - until ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls /finished_cloudinit >/dev/null 2>&1; do sleep 30; done # wait until cloudinit script is complete + - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data" - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo service docker restart" # to use the configured docker data path - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL" - echo "Done" @@ -159,7 +138,7 @@ light-deploy-production: <<: *ssh_setup environment: Production script: - - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "cd /home/airflow/data-logistics-service && git stash && git stash clear && git checkout main && git checkout -f $CI_COMMIT_TAG && git pull --all" + - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "cd /home/airflow/data-logistics-service && git stash && git stash clear && git checkout main && git checkout -f $CI_COMMIT_TAG && git pull --all && rm -rf dags && git clone https://github.com/eflows4hpc/dls-dags.git dags" - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL" test-production-webserver: diff --git a/README.md b/README.md index 4a2904989e84e8aa2c8d61c9a93c379459c5cf33..afb609b3938f6120a00746fe25544b573016f1f1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[](https://zenodo.org/badge/latestdoi/447551112) + # Data Logistics Service eFlows4HPC Data Logistics Service diff --git a/dags/.empty b/dags/.empty new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dags/GAdemo.py b/dags/GAdemo.py deleted file mode 100644 index 9cdfda5f95b4154ade482d9cf56ab5bcf7ba2f56..0000000000000000000000000000000000000000 --- a/dags/GAdemo.py +++ /dev/null @@ -1,32 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago -from airflow.sensors.filesystem import FileSensor -from airflow.operators.python import PythonOperator -from airflow.operators.dummy import DummyOperator - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -def train_model(): - print('Will start model training') - -with DAG('GAtest', default_args=def_args, description='testing GA', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - s1 = FileSensor(task_id='file_sensor', filepath='/work/afile.txt') - t1 = BashOperator(task_id='move_data', bash_command='date') - t2 = PythonOperator(task_id='train_model', python_callable=train_model) - t3 = BashOperator(task_id='eval_model', bash_command='echo "evaluating"') - t4 = DummyOperator(task_id='upload_model_to_repo') - t5 = DummyOperator(task_id='publish_results') - - s1 >> t1 >> t2 >> t4 - t2 >> t3 >> t5 diff --git a/dags/another-testdag.py b/dags/another-testdag.py deleted file mode 100644 index c00489fc207af25223e505e7d643c22d95b5acaa..0000000000000000000000000000000000000000 --- a/dags/another-testdag.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('another-testdag', default_args=def_args, description='simple testing dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - t1 = BashOperator(task_id='print_date', bash_command='date') - t2 = BashOperator(task_id='do_noting', bash_command='sleep 5') - - t1 >> t2 diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py deleted file mode 100644 index 948027509818883a93d4ef699fe6b1a3818439af..0000000000000000000000000000000000000000 --- a/dags/b2shareoperator.py +++ /dev/null @@ -1,116 +0,0 @@ -import json -import os -import tempfile -import urllib -from urllib.parse import urljoin - -import requests -from airflow.models.baseoperator import BaseOperator -from airflow.providers.http.hooks.http import HttpHook - - -def get_objects(server): - lst = requests.get(urljoin(server, 'api/records')).json() - return lst['hits']['hits'] - - -def get_file_list(obj): - file_url = obj['links']['files'] - fls = requests.get(file_url).json() - - return {it['key']: it['links']['self'] for it in fls['contents']} - - -def get_object_md(server, oid): - obj = requests.get(urljoin(server, f"api/records/{oid}")).json() - return obj - - -def download_file(url: str, target_dir: str): - fname = tempfile.mktemp(dir=target_dir) - urllib.request.urlretrieve(url=url, filename=fname) - return fname - -def get_record_template(): - return {"titles":[{"title":"DLS dataset record"}], - "creators":[{"creator_name": "eflows4HPC"}], - "descriptions": - [{"description": "Output of eflows4HPC DLS", "description_type": "Abstract"}], - "community" : "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific" :{ - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": True} - -def get_schema(url): - r = requests.get(url) - return r.json() - -def get_community(server, community_id): - response = requests.get( url=urljoin(server, f"api/communities/{community_id}"), - headers={'Content-Type':'application/json'}).json() - if 'status' in response: - return None - schema = get_schema(url=response['links']['schema']) - return response['id'], schema['json_schema']['allOf'][0]['required'] - - -def create_draft_record(server: str, token: str, record): - response = requests.post( url=urljoin(server, 'api/records/'), - headers={'Content-Type':'application/json'}, - data=json.dumps(record), params={'access_token': token}) - return response.json() - -# the simplest version, target should be chunked -def add_file(record, fname: str, token: str, remote:str): - jf = os.path.split(remote)[-1] - return requests.put(url=f"{record['links']['files']}/{jf}", - params={'access_token': token}, - headers={"Content-Type":"application/octet-stream"}, - data=open(fname, 'rb')) - -def submit_draft(record, token): - pub = [{"op": "add", "path":"/publication_state", "value": "submitted"}] - response = requests.patch(record['links']['self'], - headers={"Content-Type":"application/json-patch+json"}, - data=json.dumps(pub), params={'access_token': token}) - return response.json() - - -class B2ShareOperator(BaseOperator): - template_fields = ('target_dir',) - - def __init__( - self, - name: str, - conn_id: str = 'default_b2share', # 'https://b2share-testing.fz-juelich.de/', - target_dir: str = '/tmp/', - **kwargs) -> None: - super().__init__(**kwargs) - self.name = name - self.conn_id = conn_id - self.target_dir = target_dir - - def execute(self, **kwargs): - hook = HttpHook(http_conn_id=self.conn_id, method='GET') - params = kwargs['context']['params'] - oid = params['oid'] - - hrespo = hook.run(endpoint=f"/api/records/{oid}") - print(hrespo) - - flist = get_file_list(hrespo.json()) - - ti = kwargs['context']['ti'] - name_mappings = {} - for fname, url in flist.items(): - tmpname = download_file(url=url, target_dir=self.target_dir) - print(f"Processing: {fname} --> {url} --> {tmpname}") - - name_mappings[fname] = tmpname - ti.xcom_push(key='local', value=tmpname) - ti.xcom_push(key='remote', value=fname) - break # for now only one file - - ti.xcom_push(key='mappings', value=name_mappings) - return len(name_mappings) diff --git a/dags/conn_deco.py b/dags/conn_deco.py deleted file mode 100644 index 1aebe1cd8c267367a47658bc05075a45a91da047..0000000000000000000000000000000000000000 --- a/dags/conn_deco.py +++ /dev/null @@ -1,46 +0,0 @@ -from datetime import timedelta - -from airflow.decorators import dag, task -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -from decors import get_connection, remove, setup - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) -} - - -@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def conn_decorator(): - - @task() - def doing_nothing(conn_id, **kwargs): - print(f"Using connection {conn_id}") - - ssh_hook = get_connection(conn_id=conn_id, **kwargs) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - print("Connected") - lst = sftp_client.listdir(path='/tmp/') - for f in lst: - print(f) - - return conn_id - - conn_id = PythonOperator(python_callable=setup, task_id='setup_connection') - # another way of mixing taskflow and classical api: - a_id = conn_id.output['return_value'] - dno = doing_nothing(conn_id=a_id) - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': dno}, task_id='cleanup') - - conn_id >> dno >> en - - -dag = conn_decorator() diff --git a/dags/datacat_connection_sync.py b/dags/datacat_connection_sync.py deleted file mode 100644 index 8a8f096dc29d94d333aef39b9c61432698f7fe44..0000000000000000000000000000000000000000 --- a/dags/datacat_connection_sync.py +++ /dev/null @@ -1,119 +0,0 @@ - -from typing import Dict -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.utils.dates import days_ago -from airflow import settings -import logging -from sqlalchemy.orm.session import Session as SASession -from datacat_integration.secrets import DataCatConnectionWithSecrets - -from datacat_integration.hooks import DataCatalogHook - -default_args = { - 'owner': 'airflow', -} - -connections_type = "airflow_connections" -substring_start = len(connections_type) + 1 -substring_end = substring_start + 36 # length of a UUID4 - -log = logging.getLogger(__name__) - -def get_conn_name(datacat_type: str, oid: str): - return "{}/{}-connection".format(datacat_type, oid) - -def get_normal_or_secret_property(key: str, props: Dict[str,str], secrets: Dict[str, str], default_value = None): - return props.get(key, secrets.get(key, default_value)) - - -def get_connection(hook: DataCatalogHook, datacat_type: str, oid: str): - conn_id = get_conn_name(datacat_type, oid) - secrets_connection = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password) - datacat_entry: Dict[str,str] = secrets_connection.get_object(datacat_type, oid)['metadata'] - datacat_entry_secrets = secrets_connection.get_all_secret_key_value(datacat_type, oid) - extra={} - predefined_keys = ['conn_type', 'description', 'host', 'login', 'password', 'schema', 'port'] - # build extra from non-predefined keys - for key in datacat_entry: - if key not in predefined_keys: - extra[key] = datacat_entry[key] - - for key in datacat_entry_secrets: - if key not in predefined_keys: - extra[key] = datacat_entry_secrets[key] - - - return Connection( - conn_id=conn_id, - conn_type=get_normal_or_secret_property('conn_type', datacat_entry, datacat_entry_secrets), - description=get_normal_or_secret_property('description', datacat_entry, datacat_entry_secrets, 'Automatically generated Connection from the datacatalog object {}/{}'.format(connections_type, oid)), - host=get_normal_or_secret_property('host', datacat_entry, datacat_entry_secrets), - login=get_normal_or_secret_property('login', datacat_entry, datacat_entry_secrets), - password=get_normal_or_secret_property('password', datacat_entry, datacat_entry_secrets), - schema=get_normal_or_secret_property('schema', datacat_entry, datacat_entry_secrets), - port=int(get_normal_or_secret_property('port', datacat_entry, datacat_entry_secrets)), - extra=extra - ) - - -@dag(default_args=default_args, schedule_interval='@hourly', start_date=days_ago(1), tags=['dls-service-dag']) -def sync_connections(): - - @task - def list_catalog_connections(**kwargs): - hook = DataCatalogHook("datacatalog") - objects = hook.list_type(connections_type) - oid_list = [element[1] for element in objects] - return oid_list - - @task - def list_airflow_connections(**kwargs): - session : SASession = settings.Session() - conns = session.query(Connection).filter(Connection.conn_id.like("{}/%-connection".format(connections_type))) - oid_list = [conn.conn_id[substring_start:substring_end] for conn in conns] - return oid_list - - @task - def get_add_list(catalog_connections, airflow_connections, **kwargs): - return list(set(catalog_connections).difference(airflow_connections)) - - @task - def get_remove_list(catalog_connections, airflow_connections, **kwargs): - return list(set(airflow_connections).difference(catalog_connections)) - - @task - def remove_connections(oid_list, **kwargs): - log.info("Going to remove from conections: " + ','.join(oid_list)) - session : SASession = settings.Session() - for oid in oid_list: - session.query(Connection).filter(Connection.conn_id == get_conn_name(connections_type, oid)).delete() - session.commit() - - @task - def add_connections(oid_list, **kwargs): - log.info("Going to add to conections: " + ','.join(oid_list)) - hook = DataCatalogHook("datacatalog") - connections = [] - for oid in oid_list: - connections.append(get_connection(hook, connections_type, oid)) - - session = settings.Session() - # no check for existsnce necessary, since it is handled by get_add_list() - for conn in connections: - session.add(conn) - - session.commit() - - cat_conn = list_catalog_connections() - air_conn = list_airflow_connections() - - add_list = get_add_list(cat_conn, air_conn) - remove_list = get_remove_list(cat_conn, air_conn) - - add_connections(add_list) - - remove_connections(remove_list) - - -dag = sync_connections() diff --git a/dags/decors.py b/dags/decors.py deleted file mode 100644 index 034e6a30be3367fb56624d67542c3e3237c6848a..0000000000000000000000000000000000000000 --- a/dags/decors.py +++ /dev/null @@ -1,84 +0,0 @@ -from airflow import settings -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.models.connection import Connection -from airflow.providers.hashicorp.hooks.vault import VaultHook - - -def create_temp_connection(rrid, params): - host = params.get('host') - port = params.get('port', 2222) - user = params.get('login', 'eflows') - key = params.get('key') - - conn_id = f"tmp_connection_{rrid}" - extra = {"private_key": key} - conn = Connection( - conn_id=conn_id, - conn_type='ssh', - description='Automatically generated Connection', - host=host, - login=user, - port=port, - extra=extra - ) - - session = settings.Session() - session.add(conn) - session.commit() - print(f"Connection {conn_id} created") - return conn_id - - -def get_connection(conn_id, **kwargs): - if conn_id.startswith('vault'): - vault_hook = VaultHook(vault_conn_id='my_vault') - con = vault_hook.get_secret( - secret_path=f"/ssh-credentials/{conn_id[6:]}") - print(f"Got some values from vault {list(con.keys())}") - - # for now SSH is hardcoded - params = kwargs['params'] - host = params.get('host') - port = int(params.get('port', 22)) - user = params.get('login', 'eflows') - hook = SSHHook(remote_host=host, port=port, username=user) - # key in vault should be in form of formated string: - # -----BEGIN OPENSSH PRIVATE KEY----- - # b3BlbnNzaC1rZXktdjEAAAAA - # .... - hook.pkey = hook._pkey_from_private_key(private_key=con['privateKey']) - return hook - - # otherwise use previously created temp connection - return SSHHook(ssh_conn_id=conn_id) - - -def setup(**kwargs): - params = kwargs['params'] - print("Setting up the connection") - - if 'vault_id' in params: - print('Retrieving connection details from vault') - return f"vault_{params['vault_id']}" - - # otherwise use creds provided in request - return create_temp_connection(rrid=kwargs['run_id'], params=params) - - -def remove(conn_id): - if conn_id.startswith('vault'): - return - - print(f"Removing conneciton {conn_id}") - session = settings.Session() - for con in session.query(Connection).all(): - print(con) - - session.query(Connection).filter(Connection.conn_id == conn_id).delete() - session.commit() - - -def get_conn_id(**kwargs): - ti = kwargs['ti'] - conn_id = ti.xcom_pull(key='return_value', task_ids='setup_connection') - return conn_id diff --git a/dags/docker_cmd.py b/dags/docker_cmd.py deleted file mode 100644 index a00e215b8813f119245e87822c7be81da774b7ad..0000000000000000000000000000000000000000 --- a/dags/docker_cmd.py +++ /dev/null @@ -1,36 +0,0 @@ -def get_dockercmd(params:dict, location): - """A task which runs in the docker worker and spins up a docker container with the an image and giver parameters. - - Args: - image(str): contianer image - stagein_args (list): a list of files necesarry for the executeion - stageout_args (list): a list of files which are results from the execution - string_args (str): a string of further arguments which might be needed for the task execution - entrypoint (str): specify or overwrite the docker entrypoint - command(str): you can specify or override the command to be executed - args_to_dockerrun(str): docker options - - """ - - image = params.get('image') # {"image": 'ghcr.io/helmholtz-analytics/heat:1.1.1-alpha'} - stagein_args = params.get('stagein_args', []) # {"stagein_args": ["demo_knn.py", "iris.h5"]} - stageout_args = params.get('stageout_args', []) # {"stageout_args": ["result.out"]} - string_args = params.get('string_args', '') - entrypoint = params.get('entrypoint', '') # {"entrypoint": "/bin/bash"} - command = params.get('command', '') # {"command": "python"} - args_to_dockerrun = params.get('args_to_docker', '') - - user_id = "id -u" - entrypoint_part = f"--entrypoint={entrypoint}" if entrypoint else '' - - working_dir = "/data" - file_args = stagein_args + stageout_args - args = " " - args = args.join(file_args) - args = args + string_args - cmd_part = f"-c \"{command} {args}\"" if command else args - volumes = f"-v {location}:{working_dir} -w={working_dir}" if file_args else '' - - cmd = f'userid=$({user_id}) ; docker run {args_to_dockerrun} --user=$userid --rm --name="test" {volumes} {entrypoint_part} {image} {cmd_part}' - - return cmd \ No newline at end of file diff --git a/dags/docker_in_ssh_op.py b/dags/docker_in_ssh_op.py deleted file mode 100644 index 7b7871dd6357ae8d3128b3e7bb24706c40c8aa5a..0000000000000000000000000000000000000000 --- a/dags/docker_in_ssh_op.py +++ /dev/null @@ -1,163 +0,0 @@ -from airflow.decorators import dag, task -from airflow.providers.ssh.operators.ssh import SSHOperator -from airflow.utils.dates import days_ago -from airflow.models.connection import Connection -from airflow.models import Variable -from airflow.operators.python import get_current_context -from b2shareoperator import (download_file, get_file_list, get_object_md, - get_objects) -from decors import get_connection, remove, setup -import docker_cmd as doc -import os - -"""This piplines is a test case for starting a clusterting algorithm with HeAT, running in a Docker environment. -A test set of parameters with a HeAT example: -{"oid": "b143bf73efd24d149bba4c081964b459", "image": "ghcr.io/helmholtz-analytics/heat:1.1.1-alpha", "stagein_args": ["demo_knn.py", "iris.h5"], "stageout_args": ["result.out"], "entrypoint": "/bin/bash", "command": "python"} -Params: - oid (str): oid of the data - image (str): a docker contianer image - stagein_args (list): a list of stage in files necesarry for the executeion - stageout_args (list): a list of stage out files which are results from the execution - string_args (str): a string of further arguments which might be needed for the task execution - entrypoint (str): you can specify or overwrite the docker entrypoint - command (str): you can specify or override the command to be executed - args_to_dockerrun (str): docker run additional options -""" - -default_args = { - 'owner': 'airflow', -} - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'docker']) -def docker_with_ssh(): - DW_CONNECTION_ID = "docker_worker" - DATA_LOCATION = '/wf_pipeline_data/userdata' - - @task(multiple_outputs=True) - def extract(**kwargs): - """ - #### Extract task - A simple Extract task to get data ready for the rest of the data - pipeline. In this case, getting data is simulated by reading from a - b2share connection. - :param oid: ID of the file to be extracted - """ - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - params = kwargs['params'] - if 'oid' not in params: # {"oid": "b143bf73efd24d149bba4c081964b459"} - print("Missing object id in pipeline parameters") - lst = get_objects(server=server) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"Objects on server: {flist}") - return -1 # non zero exit code is a task failure - - oid = params['oid'] - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - - return flist - - @task(multiple_outputs=True) - def transform(flist: dict): - """ - #### Transform task - A Transform task which takes in the collection of data, retrieved from the connection, downloads the files - and returns a map of the filename with the corresponding filepath. - """ - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - - return name_mappings - - @task() - def load(files: dict, **kwargs): - """This task copies the data to a location, - which will enable the following tasks an access to the data - - Args: - files (dict): the files that will be stored on another system - Returns: - list: the locations of the newly loaded files - """ - print(f"Using {DW_CONNECTION_ID} connection") - ssh_hook = get_connection(conn_id=DW_CONNECTION_ID) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - for [truename, local] in files.items(): - print( - f"Copying {local} --> {DW_CONNECTION_ID}:{os.path.join(DATA_LOCATION, truename)}") - sftp_client.put(local, os.path.join(DATA_LOCATION, truename)) - # or separate cleanup task? - os.unlink(local) - - # loaded_files = [] - # for [truename, local_path] in files.items(): - - # destination = shutil.copy(local_path, os.path.join(DATA_LOCATION, truename)) - # print(f"Copying {local_path} --> copying to: {destination};") - # loaded_files.append(destination) - # os.unlink(local_path) - - # return loaded_files - - @task - def run_container(**kwargs): - - params = kwargs['params'] - stageout_args = params.get('stageout_args', []) - - cmd = doc.get_dockercmd(params, DATA_LOCATION) - print(f"Executing docker command {cmd}") - - print(f"Using {DW_CONNECTION_ID} connection") - hook = get_connection(conn_id=DW_CONNECTION_ID) - - task_calculate = SSHOperator( - task_id="calculate", - ssh_hook=hook, - command=cmd - ) - - context = get_current_context() - task_calculate.execute(context) - - return stageout_args - - @task - def postprocess_results(output_files: list): - if not output_files: - return "No output to stage out. Nothing more to do." - hook = get_connection(conn_id=DW_CONNECTION_ID) - sp = " " - cmd = f"cd {DATA_LOCATION}; cat {sp.join(output_files)}" - process = SSHOperator( - task_id="print_results", - ssh_hook=hook, - command=cmd - ) - context = get_current_context() - process.execute(context) - - #TODO a cleanup job - - data = extract() - files = transform(data) - data_locations = load(files) - output_files = run_container() - - data >> files >> data_locations >> output_files >> postprocess_results(output_files) - -dag = docker_with_ssh() - diff --git a/dags/firsto.py b/dags/firsto.py deleted file mode 100644 index 3b3e672f1216761dcf7527fbe7140c0a47c3c0b6..0000000000000000000000000000000000000000 --- a/dags/firsto.py +++ /dev/null @@ -1,35 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.providers.sftp.operators.sftp import SFTPOperator -from airflow.utils.dates import days_ago - -from b2shareoperator import B2ShareOperator - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - - get_b2obj = B2ShareOperator(task_id='task_b2sh', - dag=dag, - name='B2Share', - target_dir="{{ var.value.source_path}}") - - put_file = SFTPOperator( - task_id="upload_scp", - ssh_conn_id="default_ssh", - local_filepath="{{ti.xcom_pull(task_ids='task_b2sh', key='local')}}", - remote_filepath="{{ti.xcom_pull(task_ids='task_b2sh',key='remote')}}", - operation="put", - create_intermediate_dirs=True, - dag=dag) - - get_b2obj >> put_file diff --git a/dags/image_transfer.py b/dags/image_transfer.py deleted file mode 100644 index ba2da0642cbc4d2a517af7152e06943316ca7079..0000000000000000000000000000000000000000 --- a/dags/image_transfer.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -import requests - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.operators.python import PythonOperator - -from decors import setup, get_connection, remove - -default_args = { - 'owner': 'airflow', -} - -def file_exist(sftp, name): - try: - r = sftp.stat(name) - return r.st_size - except: - return -1 - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def transfer_image(): - - @task - def stream_upload(connection_id, **kwargs): - params = kwargs['params'] - target = params.get('target', '/tmp/') - image_id = params.get('image_id', 'wordcount_skylake.sif') - url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}" - - print(f"Putting {url} --> {target} connection") - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - remote_name = os.path.join(target, image_id) - size = file_exist(sftp=sftp_client, name=remote_name) - if size>0: - print(f"File {remote_name} exists and has {size} bytes") - force = params.get('force', True) - if force!= True: - return 0 - print("Forcing overwrite") - - ssh_client.exec_command(command=f"mkdir -p {target}") - - with requests.get(url, stream=True, verify=False) as r: - with sftp_client.open(remote_name, 'wb') as f: - f.set_pipelined(pipelined=True) - while True: - chunk=r.raw.read(1024 * 1000) - if not chunk: - break - content_to_write = memoryview(chunk) - f.write(content_to_write) - - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - cleanup_task = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - setup_task >> stream_upload(connection_id=a_id) >> cleanup_task - - -dag = transfer_image() diff --git a/dags/image_transfer_alt.py b/dags/image_transfer_alt.py deleted file mode 100644 index 49772e5ca50bd794fc4f5d2cc09e096646e935c4..0000000000000000000000000000000000000000 --- a/dags/image_transfer_alt.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import shutil -import requests - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.operators.python import PythonOperator -from airflow.models import Variable -from justreg import get_parameter -from decors import setup, get_connection, remove - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def transfer_image_alt(): - - @task - def im_download(connection_id, **kwargs): - - work_dir = Variable.get("working_dir", default_var='/tmp/') - - image_id = get_parameter( - 'image_id', default='wordcount_skylake.sif', **kwargs) - url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}" - - print(f"Putting {url} --> {work_dir} connection") - with requests.get(url, stream=True, verify=False) as r: - with open(os.path.join(work_dir, image_id), 'wb') as f: - shutil.copyfileobj(r.raw, f) - - @task - def im_upload(connection_id, **kwargs): - if not get_parameter('upload', False, **kwargs): - print('Skipping upload') - return 0 - work_dir = Variable.get("working_dir", default_var='/tmp/') - target = get_parameter('target', default='/tmp/', **kwargs) - image_id = get_parameter( - 'image_id', default='wordcount_skylake.sif', **kwargs) - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - print( - f"Copying local {os.path.join(work_dir, image_id)} -> {connection_id}:{target}") - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - ssh_client.exec_command(command=f"mkdir -p {target}") - with open(os.path.join(work_dir, image_id), 'rb') as r: - with sftp_client.open(os.path.join(target, image_id), 'wb') as f: - shutil.copyfileobj(r.raw, f) - - print('Removing local copy') - os.unlink(os.path.join(work_dir, image_id)) - - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - - cleanup_task = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - setup_task >> im_download(connection_id=a_id) >> im_upload( - connection_id=a_id) >> cleanup_task - - -dag = transfer_image_alt() diff --git a/dags/justreg.py b/dags/justreg.py deleted file mode 100644 index 160829f3a989e4efd3c4ed7fc45d348d22381f6e..0000000000000000000000000000000000000000 --- a/dags/justreg.py +++ /dev/null @@ -1,95 +0,0 @@ - -from airflow.decorators import dag, task -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook -from datacat_integration.connection import DataCatalogEntry -import json - -default_args = { - 'owner': 'airflow', -} - - -def get_record(name, url): - return { - "name": name, - "url": url, - "metadata": { - "author": "DLS on behalf of eFlows", - } - } - -def create_template(hrespo): - return { - "titles": [{"title": hrespo['title']}], - "creators": [{"creator_name": hrespo['creator_name']}], - "descriptions": [ - { - "description": hrespo['description'], - "description_type": "Abstract" - } - ], - "community": "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific": { - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": hrespo['open_access'] == "True" - } - -def get_parameter(parameter, default=False, **kwargs): - params = kwargs['params'] - return params.get(parameter, default) - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def datacat_registration_example(): - - @task() - def register(object_url, **kwargs): - reg = get_parameter(parameter='register', default=False, **kwargs) - if not reg: - print("Skipping registration as 'register' parameter is not set") - return 0 - - hook = DataCatalogHook() - print("Connected to datacat via hook", hook.list_type('dataset')) - - entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", - url=object_url, - metadata= { - "author": "DLS on behalf of eFlows", - "access": "hook-based"} - ) - try: - r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) - return f"{hook.base_url}/dataset/{r}" - except ConnectionError as e: - print('Registration failed', e) - return -1 - - @task - def get_template(): - hook = DataCatalogHook() - print("Connected to datacat via hook", hook.list_type('dataset')) - - mid = '71e863ac-aee6-4680-a57c-de318530b71e' - entry = json.loads(hook.get_entry(datacat_type='storage_target', oid=mid)) - print(entry) - print(entry['metadata']) - print('---') - print(create_template(entry['metadata'])) - - - - - step1 = BashOperator(task_id='jj', bash_command='ls') #BashOperator(bash_command='curl -X GET -k https://bscgrid20.bsc.es/image_creation/images/download/wordcount_skylake.sif -o /work/ww', task_id='nothing') - step2 = register( - object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7') - - step3 = get_template() - stepr = BashOperator(bash_command='rm /work/ww', task_id='remov') - step1 >> step2 >> step3 >> stepr - - -dag = datacat_registration_example() diff --git a/dags/taskflow datacat_integration.py b/dags/taskflow datacat_integration.py deleted file mode 100644 index ca67402e4ef7aa242c70a42c43e22756f21d547c..0000000000000000000000000000000000000000 --- a/dags/taskflow datacat_integration.py +++ /dev/null @@ -1,77 +0,0 @@ - -from airflow import settings -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.models import Variable -from airflow.utils.dates import days_ago -import os - -from b2shareoperator import (download_file, get_file_list, get_object_md, - get_objects) - -default_args = { - 'owner': 'airflow' -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example', 'datacat_integration']) -def taskflow_datacat_integration(): - - @task(multiple_outputs=True) - def extract(conn_id, **kwargs): - connection = Connection.get_connection_from_secrets(conn_id) - server = connection.get_uri() - print(f"Retreiving data from {server}") - - params = kwargs['params'] - if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"} - print("Missing object id in pipeline parameters") - lst = get_objects(server=server) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"Objects on server: {flist}") - return -1 # non zero exit code is a task failure - - oid = params['oid'] - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def transform(flist: dict): - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - return name_mappings - - @task() - def load(connection_id, files: dict, **kwargs): - print(f"Total files downloaded: {len(files)}") - params = kwargs['params'] - target = params.get('target', '/tmp/') - - print(f"Using {connection_id} connection") - - ssh_hook = SSHHook(ssh_conn_id=connection_id) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - for [truename, local] in files.items(): - print(f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") - sftp_client.put(local, os.path.join(target, truename)) - # or separate cleanup task? - os.unlink(local) - - - conn_id = "e3710075-9f8f-4ae0-a1c3-7d92c0182d19" # created as copy of default_b2share - data = extract(conn_id) - files = transform(data) - load(connection_id = conn_id, files=files) - -dag = taskflow_datacat_integration() diff --git a/dags/taskflow.py b/dags/taskflow.py deleted file mode 100644 index 223d3dde9e7550121ce4bfd1e08722830667d257..0000000000000000000000000000000000000000 --- a/dags/taskflow.py +++ /dev/null @@ -1,105 +0,0 @@ - -import json -import os - -from airflow.decorators import dag, task -from airflow.models import Variable -from airflow.models.connection import Connection -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook - -from b2shareoperator import download_file, get_file_list, get_object_md -from decors import get_connection, remove, setup - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_example(): - - @task(multiple_outputs=True) - def extract(conn_id, **kwargs): - params = kwargs['params'] - if 'oid' not in params: - print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") - return -1 - oid = params['oid'] - - hook = DataCatalogHook() - try: - entry = json.loads(hook.get_entry('dataset', oid)) - if entry and 'b2share' in entry['url']: - print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") - oid = entry['url'].split('/')[-1] - print(f"Extracted oid {oid}") - else: - print('No entry in data cat or not a b2share entry') - - except: - # falling back to b2share - print("No entry found. Probably a b2share object") - - - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - # check status? - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def transform(flist: dict): - name_mappings = {} - tmp_dir = Variable.get("working_dir", default_var='/tmp/') - print(f"Local working dir is: {tmp_dir}") - - for fname, url in flist.items(): - print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir=tmp_dir) - name_mappings[fname] = tmpname - return name_mappings - - @task() - def load(connection_id, files: dict, **kwargs): - print(f"Total files downloaded: {len(files)}") - params = kwargs['params'] - target = params.get('target', '/tmp/') - - print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - # check dir? - ssh_client.exec_command(command=f"mkdir -p {target}") - for [truename, local] in files.items(): - print( - f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") - sftp_client.put(local, os.path.join(target, truename)) - # or separate cleanup task? - os.unlink(local) - - return connection_id - - conn_id = PythonOperator(python_callable=setup, task_id='setup_connection') - # another way of mixing taskflow and classical api: - a_id = conn_id.output['return_value'] - - data = extract(conn_id=a_id) - files = transform(flist=data) - ucid = load(connection_id=a_id, files=files) - - #b_id = ucid.output['return_value'] - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': ucid}, task_id='cleanup') - - conn_id >> data >> files >> ucid >> en - - -dag = taskflow_example() diff --git a/dags/taskflow_stream.py b/dags/taskflow_stream.py deleted file mode 100644 index 4b24fbe8d1ebd14ff54edd4aa5107a8070610358..0000000000000000000000000000000000000000 --- a/dags/taskflow_stream.py +++ /dev/null @@ -1,78 +0,0 @@ - -import os -import shutil -import requests - -from airflow.decorators import dag, task -from airflow.models.connection import Connection -from airflow.providers.ssh.hooks.ssh import SSHHook -from airflow.utils.dates import days_ago -from datacat_integration.hooks import DataCatalogHook - -import json - - -from b2shareoperator import (get_file_list, get_object_md) - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_stream(): - @task(multiple_outputs=True) - def get_flist(**kwargs): - params = kwargs['params'] - if 'oid' not in params: - print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") - return -1 - oid = params['oid'] - - hook = DataCatalogHook() - try: - entry = json.loads(hook.get_entry('dataset', oid)) - if entry and 'b2share' in entry['url']: - print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") - oid = entry['url'].split('/')[-1] - print(f"Extracted oid {oid}") - else: - print('No entry in data cat or not a b2share entry') - - except: - # falling back to b2share - print("No entry found. Probably a b2share object") - - - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) - return flist - - @task(multiple_outputs=True) - def stream_upload(flist: dict, **kwargs): - params = kwargs['params'] - target = params.get('target', '/tmp/') - connection_id = params.get('connection', 'default_ssh') - ssh_hook = SSHHook(ssh_conn_id=connection_id) - mappings = dict() - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - - for fname, url in flist.items(): - print(f"Processing: {url} --> {fname}") - with requests.get(url, stream=True) as r: - with sftp_client.open(os.path.join(target, fname), 'wb') as f: - shutil.copyfileobj(r.raw, f) - mappings[url] = os.path.join(target, fname) - return mappings - - flist = get_flist() - stats = stream_upload(flist) - -dag = taskflow_stream() diff --git a/dags/test_secrets_backend.py b/dags/test_secrets_backend.py deleted file mode 100644 index 1cfe57412d002634d058e7bf8ec46d90cc33c3fd..0000000000000000000000000000000000000000 --- a/dags/test_secrets_backend.py +++ /dev/null @@ -1,25 +0,0 @@ - -from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -from airflow.hooks.base import BaseHook - -default_args = { - 'owner': 'airflow', -} - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def test_secrets_backend(): - @task() - def get_print_and_return_conenction(**kwargs): - oid = '860355e9-975f-4253-9421-1815e20c879b' - params = kwargs['params'] - if 'oid' in params: - oid = params['oid'] - conn = BaseHook.get_connection(oid) - print(conn.get_extra()) - - get_print_and_return_conenction() - - -dag = test_secrets_backend() diff --git a/dags/testdag.py b/dags/testdag.py deleted file mode 100644 index f974f232cb4a474c6978f1e0cbe8e00fb7f1bad8..0000000000000000000000000000000000000000 --- a/dags/testdag.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago - -def_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) - -} - -with DAG('testdag', default_args=def_args, description='simple testing dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - t1 = BashOperator(task_id='print_date', bash_command='date') - t2 = BashOperator(task_id='do_noting', bash_command='sleep 5') - - t1 >> t2 diff --git a/dags/uploadflow.py b/dags/uploadflow.py deleted file mode 100644 index 72c4d7e0d1f0f1737c27bd94d8543c4cf94c5c5c..0000000000000000000000000000000000000000 --- a/dags/uploadflow.py +++ /dev/null @@ -1,153 +0,0 @@ - -import os -import tempfile -import json - -from airflow.decorators import dag, task -from airflow.models import Variable -from airflow.models.connection import Connection -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -from b2shareoperator import (add_file, create_draft_record, get_community, - submit_draft) -from decors import get_connection, remove, setup -from justreg import get_parameter -from datacat_integration.hooks import DataCatalogHook -from datacat_integration.connection import DataCatalogEntry - -default_args = { - 'owner': 'airflow', -} - - -def create_template(hrespo): - return { - "titles": [{"title": hrespo['title']}], - "creators": [{"creator_name": hrespo['creator_name']}], - "descriptions": [ - { - "description": hrespo['description'], - "description_type": "Abstract" - } - ], - "community": "2d58eb08-af65-4cad-bd25-92f1a17d325b", - "community_specific": { - "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} - }, - "open_access": hrespo['open_access'] == "True" - } - - -@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def upload_example(): - - @task() - def load(connection_id, **kwargs): - params = kwargs['params'] - target = Variable.get("working_dir", default_var='/tmp/') - source = params.get('source', '/tmp/') - - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - lst = sftp_client.listdir(path=source) - mappings = dict() - for fname in lst: - local = tempfile.mktemp(prefix='dls', dir=target) - full_name = os.path.join(source, fname) - sts = sftp_client.stat(full_name) - if str(sts).startswith('d'): - print(f"{full_name} is a directory. Skipping") - continue - - print(f"Copying {connection_id}:{full_name} --> {local}") - sftp_client.get(os.path.join(source, fname), local) - mappings[local] = fname - - return mappings - - @task() - def upload(files: dict, **kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - # hate such hacks: - server = "https://" + connection.host - token = connection.extra_dejson['access_token'] - - params = kwargs['params'] - mid = params['mid'] - - - hook = DataCatalogHook() - print("Connected to datacat via hook") - entry = json.loads(hook.get_entry(datacat_type='storage_target', oid=mid)) - print('Got following metadata', entry) - - template = create_template(hrespo=entry['metadata']) - community = get_community(server=server, community_id=template['community']) - if not community: - print("Not existing community") - return -1 - cid, required = community - missing = [r for r in required if r not in template] - if any(missing): - print(f"Community {cid} required field {missing} are missing. This could pose some problems") - - r = create_draft_record(server=server, token=token, record=template) - if 'id' in r: - print(f"Draft record created {r['id']} --> {r['links']['self']}") - else: - print('Something went wrong with registration', r, r.text) - return -1 - - for [local, true_name] in files.items(): - print(f"Uploading {local} --> {true_name}") - _ = add_file(record=r, fname=local, token=token, remote=true_name) - # delete local - os.unlink(local) - - print("Submitting record for pubication") - submitted = submit_draft(record=r, token=token) - print(f"Record created {submitted}") - - return submitted['links']['publication'] - - @task() - def register(object_url, **kwargs): - reg = get_parameter(parameter='register', default=False, **kwargs) - if not reg: - print("Skipping registration as 'register' parameter is not set") - return 0 - - hook = DataCatalogHook() - print("Connected to datacat via hook") - - entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", - url=object_url, - metadata= { - "author": "DLS on behalf of eFlows", - "access": "hook-based"} - ) - try: - r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) - return f"{hook.base_url}/dataset/{r}" - except ConnectionError as e: - print('Registration failed', e) - return -1 - - setup_task = PythonOperator(python_callable=setup, task_id='setup_connection') - a_id = setup_task.output['return_value'] - - files = load(connection_id=a_id) - uid = upload(files) - - en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') - - reg = register(object_url=uid) - - setup_task >> files >> uid >> reg >> en - - -dag = upload_example() diff --git a/requirements.txt b/requirements.txt index d03021ad89ad44d4898718669ad5e7bde56f9dcf..cb40ab776ed5a9aa04d12e6d3851f0e5dfbf682d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ requests urllib3 plyvel +webdavclient3 apache-airflow-providers-ssh apache-airflow-providers-http apache-airflow-providers-sftp diff --git a/scripts/cloudinit.yml b/scripts/cloudinit.yml index 010991b03b61d40cbfd8a9a402d76b146d914e37..3d6931de3127f323145e4fff0d866312a6e48aa2 100644 --- a/scripts/cloudinit.yml +++ b/scripts/cloudinit.yml @@ -2,6 +2,11 @@ # This is a cloud config that install most basic packages, and clones and prepares the git repo for the datacatalog # This should prepare everything that is possible, so that (after assigning the ip address and generating the static files) only docker-compose needs to be run +#cloud-config +apt: + primary: + - arches: [default] + uri: http://de.archive.ubuntu.com/ubuntu/ # upgrade packages package_update: true @@ -28,15 +33,14 @@ users: - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDrgXm/3kbHrgPuHrru2LCLxKBPNnZkwTSbaadkYm6N+EzE7GwVPcXorPReC+2SHT2e8YnczcjHMcazmf7VWmHAQVV3fGrZiQtk+xTjXt3tC+Rm2zuqB4vvJcR5DXXomMMRJwG3sk/PcozvFfKFv6P7bbHxKOR090o4krM3mE2Vo43EnsBaPUS8cWI2EkhcR4gAJHhreFHbIS+nrFaJydfmzfwHNE1WjjtfIBA0U8ld2tk8eelMUjvkWrYXK+qqdaUKL0n/wVMo8D/Kl1lNGKym8LE6ZiojjEX0Aq0ajSHyyEWGscJunv/tJkrrOX2C4jd9pGEP6d0YyAunimsT1glv cboet@Desktop-CB - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCRsldcJ7kiksXTn2hivYfZ+Y9gziBWaMPpfVPNVlPi5XizbMurXAPQ3gUbBTDRp+Plf5LiXAfFNBdPTACb5ymFhIUKj/3sJhxc92uvJktLyjObAZ74ImBzDhVwGzs/cKhWc2otFgyMwrfPuIxdarCiLTjmG+dZ0a+IZbWta241kc3qBPjuqKK/LSZOK/Jx9Dl4rURs780GdcoA7Q2r6I6Bq8m0Cpfl2Otwi5Vr4d6hxWrl8D100ssLctn4FlL4SzVHPyZJVNeFJYQv1boJwldHBST8tJ0r0KC1V5CboB+Rdh1b/Qy1y6l/y9fPX+axFSGIIxSb6egRSwcE89f3kCC1 cboettcher@zam024 - - - name: mpetrova - gecos: Maria Petrova-El Sayed + - name: jj + gecos: Jedrzej Rybicki groups: sudo, docker shell: /bin/bash sudo: ALL=(ALL) NOPASSWD:ALL lock_passwd: true ssh_authorized_keys: - - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDUNFmYnaZ1raXQm04/mfdoBfn4i6xYknic2nhGDOrkhp5r6kv4F1m7wgtuL/pddRKuEoQpiXjRWciEMljFmxvVc7+9VitsAn5zBsnzY9+Sq9+si5aKe93RK8JGLX/WsfZGnPMdKPkK2GO9LFJN4TyL9hTpFdFQfxtO82NIa3WikG4RI+WQuKeQ4qr8FHNymr+gHTw/+YaM9331xnM5YqkmOC27CvVtiQx96MNMAyMQ8RJcHy1GL8donTBL+knVZdIwGt4SUy9dIF8iwTXGFkLe8V7/DIEB7RW9gvk2sG3YPo2eq56HsQKAB3yre+5QFhmH/uqUnTKVFgZLqlDUC0duFOwALCRmlEgtOeZqOzRBa6a0RveTIfccMb48ac4FpeeJdo4KId1QO1JaEZ8fYKgRVw3xRuOjDMpxCFuxELpSvx/hd1jgrK9lRizH9DXNf5/5Go2O16hj8LPufBbhX2EiChjWJEJkoRWBhQ3UHmstbqRiuNU/MsHq0FPSHMHV6BU= maria@jsc-strela + - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCxPi4EYQLBxFOECG3m8/yv4Qq0iByEirxoVBM/BkR3nbVrCLCB7L+gKKgLL36jOnkZKHyXRcRA94or8sUHrIRKH3o4ubnCKK/j2MEojGOj90QJiV5VaZ0jaHpo8IxKuYBZQR95B+l5Jpfn02nIEq0C7pdDmuV73Igl07eakmn07EFezWKzMDFcTfU5KLx/OyTYgNfVBTUqwc0nUEKqSe/b0WmY34nHnIXDPjg+eXNDMQrAl6j8cVUeJs57lZFdDNTacoZCune3z1UZ4N3X+rQvdvZ04GjAJPAlYaaJ21eeIrAyt65A1H3bT8OFfU5vK9Fi+2uA0yqFgCooDrUQFsdF - name: airflow gecos: Common user for running the airflow services @@ -72,4 +76,4 @@ runcmd: - crontab -l | { cat ; echo '*/5 * * * * cd /home/airflow/eflows-airflow/dags && sudo git pull >/dev/null 2>&1'; } | crontab - # setup dag crontab - touch /finished_cloudinit -final_message: "The system is finally up, after $UPTIME seconds" \ No newline at end of file +final_message: "The system is finally up, after $UPTIME seconds" diff --git a/scripts/deployment.sh b/scripts/deployment.sh index 78bf11ac89e4b9f79522cc5f408c731a48cb0c39..678910533c879ac1fe26fd545bcf298197f53f89 100755 --- a/scripts/deployment.sh +++ b/scripts/deployment.sh @@ -40,7 +40,7 @@ echo "Proceeding as user $(whoami)" # Make the necessary folders for the airflow artefacts and copy the corresponging content mkdir -p ./dags ./logs ./plugins ./config ./templates cd $GIT_REPO -rm -rf $AIRFLOW_DIR/dags/* && rm -rf $AIRFLOW_DIR/dags/.git && git clone $DAG_GIT_URL $AIRFLOW_DIR/dags +rm -rf $AIRFLOW_DIR/dags && mkdir $AIRFLOW_DIR/dags && git clone $DAG_GIT_URL $AIRFLOW_DIR/dags cp -r plugins/* $AIRFLOW_DIR/plugins cp config/* $AIRFLOW_DIR/config/ cp -r templates/* $AIRFLOW_DIR/templates diff --git a/templates/main.html b/templates/main.html index aa31b5f7a0425ed7503946b306510cc172e094b2..09191f13525f51f403f3f68fb72eab8142de14a1 100644 --- a/templates/main.html +++ b/templates/main.html @@ -108,7 +108,7 @@ </div> <div class="col-lg-2 col-6 d-inlign-flex"> <p class="text-center align-self-center"> - <img loading="lazy" src="./../../static/BMBF_gefoerdert_2017_en.jpg" alt="Funded by BMBF" style="max-height:100px"> + <img loading="lazy" src="{{ url_for('static', filename='BMBF_gefoerdert_2017_en.jpg') }}" alt="Funded by BMBF" style="max-height:100px"> </p> </div> <div class="col-lg-5 col-12 d-inlign-flex"> diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py deleted file mode 100644 index 3a28ea31f5ddf465217aac9244fbc9492bcc23a0..0000000000000000000000000000000000000000 --- a/tests/test_b2shareoperator.py +++ /dev/null @@ -1,112 +0,0 @@ -import unittest -from unittest.mock import Mock, patch -import tempfile -import os - -from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType -from airflow import DAG -from airflow.models.taskinstance import TaskInstance -from airflow.utils.dates import days_ago -from airflow.utils.state import State - -from dags.b2shareoperator import (B2ShareOperator, download_file, - get_file_list, get_object_md, get_objects, - get_record_template, create_draft_record, add_file, submit_draft) - -DEFAULT_DATE = '2019-10-03' -TEST_DAG_ID = 'test_my_custom_operator' -TEST_TASK_ID = 'test' - - -class B2ShareOperatorTest(unittest.TestCase): - def setUp(self): - with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag: - B2ShareOperator( - task_id=TEST_TASK_ID, - name='test_name' - ) - self.dag = dag - #self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) - - @patch('dags.b2shareoperator.HttpHook') - @patch('dags.b2shareoperator.get_file_list') - @patch('dags.b2shareoperator.download_file') - def test_alt_execute_no_trigger(self, down, gfl, ht): - gfl.return_value = {'ooo.txt': 'htt://file/to/download'} - down.return_value = 'tmp_name' - - dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL) - ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) - ti.task = self.dag.get_task(task_id=TEST_TASK_ID) - ti.run(ignore_ti_state=True) - print(ti.state) - - self.assertEqual(State.SUCCESS, ti.state) - - # return value - ret = ti.xcom_pull() - self.assertEqual(ret, 1, f"{ret}") - - lcl = ti.xcom_pull(key='local') - rmt = ti.xcom_pull(key='remote') - mps = ti.xcom_pull(key='mappings') - self.assertEqual(len(mps), 1, f"{mps}") - self.assertDictEqual( - mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") - self.assertEqual(lcl, 'tmp_name', f"unexpecting local name: {lcl}") - self.assertEqual(rmt, 'ooo.txt', f"unexpected remote name: {rmt}") - - def test_get_files(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - m.json.return_value = {'contents': [ - {'key': 'veryimportant.txt', 'links': {'self': 'http://foo.bar'}}]} - get.return_value = m - ret = get_file_list(obj={'links': {'files': ['bla']}}) - self.assertEqual(len(ret), 1) - - def test_download_file(self): - with patch('dags.b2shareoperator.urllib.request.urlretrieve') as rr: - with patch('dags.b2shareoperator.tempfile.mktemp') as mt: - mt.return_value = '/tmp/val' - fname = download_file( - url='http://foo.bar', target_dir='/no/tmp/') - self.assertEqual(fname, '/tmp/val') - - def test_get_md(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - rval = {'links': {'files': ['a', 'b']}} - m.json.return_value = rval - get.return_value = m - r = get_object_md(server='foo', oid='bar') - self.assertDictEqual(rval, r) - - def test_get_objects(self): - with patch('dags.b2shareoperator.requests.get') as get: - m = Mock() - rval = {'hits': {'hits': ['a', 'b']}} - m.json.return_value = rval - get.return_value = m - r = get_objects(server='foo') - self.assertListEqual(['a', 'b'], r) - - def test_upload(self): - template = get_record_template() - server='https://b2share-testing.fz-juelich.de/' - token = '' - with patch('dags.b2shareoperator.requests.post') as post: - r = create_draft_record(server=server, token=token, record=template) - - r = dict() - r['links']={'files':server, 'self': server} - with patch('dags.b2shareoperator.requests.post') as put: - a = tempfile.NamedTemporaryFile() - a.write(b"some content") - up = add_file(record=r, fname=a.name, token=token, remote='/tmp/somefile.txt') - - - with patch('dags.b2shareoperator.requests.patch') as p: - submitted = submit_draft(record=r, token=token) - diff --git a/tests/test_dag.py b/tests/test_dag.py deleted file mode 100644 index 360a6987757d05da99b8448ca6303b215a18bb6b..0000000000000000000000000000000000000000 --- a/tests/test_dag.py +++ /dev/null @@ -1,22 +0,0 @@ -import unittest - -from airflow.models import DagBag - - -class TestADag(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.dagbag = DagBag() - - def test_dag_loaded(self): - dag = self.dagbag.get_dag(dag_id='firsto') - print(self.dagbag.import_errors) - self.assertDictEqual(self.dagbag.import_errors, {}, "not equal") - assert dag is not None - self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") - - def test_tf_loaded(self): - dag = self.dagbag.get_dag(dag_id='taskflow_example') - assert self.dagbag.import_errors == {} - assert dag is not None - self.assertEqual(len(dag.tasks), 5, f"Actually: {len(dag.tasks)}")