diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index f56f8492bc21ed1b8f177b41c11787fc401fcb3c..ac76449db38991b3679f3c9cf7cf7e74505dcfab 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -1,5 +1,6 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection +from airflow.providers.http.hooks.http import HttpHook import requests from urllib.parse import urljoin import tempfile @@ -39,22 +40,17 @@ class B2ShareOperator(BaseOperator): self.name = name self.conn_id = conn_id self.target_dir = target_dir - print(self.target_dir) - + def execute(self, **kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - print('Kwargs') - print(kwargs) - + hook = HttpHook(http_conn_id=self.conn_id, method='GET') params = kwargs['context']['params'] oid = params['oid'] - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) + + hrespo = hook.run(endpoint=f"/api/records/{oid}") + print(hrespo) + flist = get_file_list(hrespo.json()) + ti = kwargs['context']['ti'] name_mappings = {} for fname, url in flist.items(): @@ -66,5 +62,5 @@ class B2ShareOperator(BaseOperator): ti.xcom_push(key='remote', value=fname) break # for now only one file - + ti.xcom_push(key='mappins', value=name_mappings) return len(name_mappings) diff --git a/requirements.txt b/requirements.txt index 1f88981a3dcd892637e066ee46b51f044b32017f..240d00f906211bb0f8dad8dc7426a58b99fea37f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ requests urllib3==1.26.6 -apache-airflow-providers-ssh \ No newline at end of file +apache-airflow-providers-ssh +apache-airflow-providers-http +apache-airflow-providers-sftp diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py index 94ee6543db33bdf7b5d378739b5aa69d0c724f69..b7b3bc16d9f2dfdedc33240762e0a60c94b03a3f 100644 --- a/tests/test_b2shareoperator.py +++ b/tests/test_b2shareoperator.py @@ -1,16 +1,21 @@ import unittest -from airflow.utils.state import State -from airflow.utils.dates import days_ago -from dags.b2shareoperator import B2ShareOperator +from unittest.mock import patch, Mock + from airflow import DAG from airflow.models.taskinstance import TaskInstance +from airflow.utils.dates import days_ago +from airflow.utils.state import State + +from dags.b2shareoperator import B2ShareOperator, get_file_list DEFAULT_DATE = '2019-10-03' TEST_DAG_ID = 'test_my_custom_operator' + class B2ShareOperatorTest(unittest.TestCase): - def setUp(self): - self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : days_ago(2)}) + def setUp(self): + self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', + default_args={'start_date': days_ago(2)}, params={"oid": "111"}) self.op = B2ShareOperator( dag=self.dag, task_id='test', @@ -18,9 +23,24 @@ class B2ShareOperatorTest(unittest.TestCase): ) self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) - def test_execute_no_trigger(self): - ... - #self.ti.run(ignore_ti_state=False) - #print(self.ti.state) - #self.assertEqual(State.SUCCESS, self.ti.state) - # Assert something related to tasks results \ No newline at end of file + + @patch('dags.b2shareoperator.HttpHook') + @patch('dags.b2shareoperator.get_file_list') + @patch('dags.b2shareoperator.download_file') + def test_alt_execute_no_trigger(self, down, gfl, ht): + gfl.return_value = {'ooo.txt': 'htt://file/to/download'} + down.return_value = 'tmp_name' + + self.ti.run(ignore_ti_state=True, test_mode=True) + print(self.ti.state) + self.assertEqual(State.SUCCESS, self.ti.state) + # Assert something related to tasks results + + def test_get_files(self): + with patch('dags.b2shareoperator.requests.get') as get: + m = Mock() + m.json.return_value = {'contents': [{'key': 'veryimportant.txt', 'links':{'self': 'http://foo.bar'}}]} + get.return_value = m + ret = get_file_list(obj={'links': {'files': ['bla']}}) + self.assertEqual(len(ret), 1) +