diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 058bc2352273e23f9d3e2ac60783bc6815067a46..5e39baf6ac3a510c0096858942c230efb4736ee4 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,6 +16,7 @@ test: before_script: - airflow db init - pip install -r requirements.txt + - pip install nose==1.3.7 - airflow connections add --conn-uri https://b2share-testing.fz-juelich.de/ default_b2share script: - ls @@ -24,6 +25,7 @@ test: - airflow dags list - airflow connections list - airflow dags test firsto 2021-08-18 + - nosetests deploy-test: stage: deploy diff --git a/README.md b/README.md index 666a1061c23346f3a287a2036b9803e11266a4da..323709ec3608f63bfbb93fdaef1312529e9450ad 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,8 @@ eFlows4HPC Data Logistics Service ``` mkdir ./logs ./plugins echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env -echo "_PIP_ADDITIONAL_REQUIREMENTS=urllib3==1.26.6" >> .env +reqs=`cat requirements.txt | tr '\n' ' '` +echo "_PIP_ADDITIONAL_REQUIREMENTS=$reqs" >> .env docker-compose -f dockers/docker-compose.yaml --project-directory . up airflow-init ``` @@ -16,5 +17,7 @@ docker-compose -f dockers/docker-compose.yaml --project-directory . up -d ``` ## Setup connection +``` curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections +``` diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index 628fd9bf5f7de2afbbce45bf59211f22f1ba1708..f9a4cb3fd36031a3d6313fb875c8373756c6719f 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -21,7 +21,7 @@ def get_object_md(server, oid): def download_file(url: str, target_dir: str): - fname = tempfile.mkstemp(dir=target_dir) + fname = tempfile.mktemp(dir=target_dir) urllib.request.urlretrieve(url=url, filename=fname) return fname diff --git a/dags/taskflow.py b/dags/taskflow.py index c346559d1087fc9a5765cdcd14cf4604d2f12185..e45ed47a3e2b830c426f1b27e02b3b2e9ce2c15b 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -1,27 +1,39 @@ -"Example of new taskflow api" - from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.models.connection import Connection +from airflow.models.dagrun import DagRun +from airflow.providers.ssh.hooks.ssh import SSHHook + import requests import urllib.request import tempfile -from b2shareoperator import get_file_list, download_file, get_object_md +from b2shareoperator import get_file_list, download_file, get_object_md, get_objects default_args = { 'owner': 'airflow', } @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_example(): - @task() - def extract(oid: str): +def taskflow_example(**kwargs): + @task(multiple_outputs=True) + def extract(**kwargs): connection = Connection.get_connection_from_secrets('default_b2share') server = connection.get_uri() print(f"Rereiving data from {server}") + + params = kwargs['params'] + if 'oid' not in params: + print(f"Missing object id in pipeline parameters") + lst = get_objects(server=server) + flist = {o['id']: [f['key'] for f in o['files']] for o in lst} + print(f"Objects on server: {flist}") + return {} + else: + oid = params['oid'] + obj = get_object_md(server=server, oid=oid) - print(f"Object: {obj}") + print(f"Retrieved object {oid}: {obj}") flist = get_file_list(obj) return flist @@ -37,9 +49,17 @@ def taskflow_example(): @task() def load(files: dict): print(f"Total files downloaded: {len(files)}") + ssh_hook = SSHHook(ssh_conn_id='default_ssh') + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + for [local, remote] in files.items(): + sftp_client.put(local, f"/tmp/{remote}") + + - data = extract(oid = 'b38609df2b334ea296ea1857e568dbea') + data = extract() files = transform(data) load(files) - + dag = taskflow_example() + diff --git a/requirements.txt b/requirements.txt index 05b86e9419e234ac71546a0db1edaf198c2af835..1f88981a3dcd892637e066ee46b51f044b32017f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ requests urllib3==1.26.6 +apache-airflow-providers-ssh \ No newline at end of file diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py new file mode 100644 index 0000000000000000000000000000000000000000..94ee6543db33bdf7b5d378739b5aa69d0c724f69 --- /dev/null +++ b/tests/test_b2shareoperator.py @@ -0,0 +1,26 @@ +import unittest +from airflow.utils.state import State +from airflow.utils.dates import days_ago +from dags.b2shareoperator import B2ShareOperator +from airflow import DAG +from airflow.models.taskinstance import TaskInstance + +DEFAULT_DATE = '2019-10-03' +TEST_DAG_ID = 'test_my_custom_operator' + +class B2ShareOperatorTest(unittest.TestCase): + def setUp(self): + self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : days_ago(2)}) + self.op = B2ShareOperator( + dag=self.dag, + task_id='test', + name='test_name' + ) + self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) + + def test_execute_no_trigger(self): + ... + #self.ti.run(ignore_ti_state=False) + #print(self.ti.state) + #self.assertEqual(State.SUCCESS, self.ti.state) + # Assert something related to tasks results \ No newline at end of file diff --git a/tests/test_dag.py b/tests/test_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..8a1faa1b7b3deb63adcb7bb56a2342abd72adcdd --- /dev/null +++ b/tests/test_dag.py @@ -0,0 +1,13 @@ +from airflow.models import DagBag +import unittest + +class TestADag(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.dagbag = DagBag() + + def test_dag_loaded(self): + dag = self.dagbag.get_dag(dag_id='firsto') + assert self.dagbag.import_errors == {} + assert dag is not None + self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}") \ No newline at end of file