diff --git a/README.md b/README.md index fe0d6923d7b0c9dc2cdc42cff5c23dfa6559d778..59b2f655572adf2a02ba05ba878666d89ef9c793 100644 --- a/README.md +++ b/README.md @@ -32,4 +32,8 @@ Copy to target goes through scp (example with username/pass) curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_ssh", "conn_type": "ssh", "host": "ssh", "login": "user", "port": 2222, "password": "pass"}' airflow:7001/api/v1/connections ``` +Connections can also be added through env variables, like +``` +AIRFLOW_CONN_MY_PROD_DATABASE=my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2 +``` \ No newline at end of file diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index ac76449db38991b3679f3c9cf7cf7e74505dcfab..a9dae0da5755b15428cb0da188f5be2c8d9a381b 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -62,5 +62,5 @@ class B2ShareOperator(BaseOperator): ti.xcom_push(key='remote', value=fname) break # for now only one file - ti.xcom_push(key='mappins', value=name_mappings) + ti.xcom_push(key='mappings', value=name_mappings) return len(name_mappings) diff --git a/dags/taskflow.py b/dags/taskflow.py index 9b7b4deedb2e2134bf49c1c2c8f6bf25125b8188..5e7ebbdb912024a8ee2a0f3fca3fbdda3c106ab1 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -28,7 +28,7 @@ def taskflow_example(**kwargs): lst = get_objects(server=server) flist = {o['id']: [f['key'] for f in o['files']] for o in lst} print(f"Objects on server: {flist}") - return {} + return -1 # non zero exit code is a task failure else: oid = params['oid'] diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py index b7b3bc16d9f2dfdedc33240762e0a60c94b03a3f..22c1371b39293e4cca14e08796e64b7c69012ed4 100644 --- a/tests/test_b2shareoperator.py +++ b/tests/test_b2shareoperator.py @@ -6,7 +6,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.utils.dates import days_ago from airflow.utils.state import State -from dags.b2shareoperator import B2ShareOperator, get_file_list +from dags.b2shareoperator import B2ShareOperator, get_file_list, download_file, get_object_md, get_objects DEFAULT_DATE = '2019-10-03' TEST_DAG_ID = 'test_my_custom_operator' @@ -33,8 +33,21 @@ class B2ShareOperatorTest(unittest.TestCase): self.ti.run(ignore_ti_state=True, test_mode=True) print(self.ti.state) + self.assertEqual(State.SUCCESS, self.ti.state) - # Assert something related to tasks results + + # return value + ret = self.ti.xcom_pull() + self.assertEqual(ret,1,f"{ret}") + + lcl = self.ti.xcom_pull(key='local') + rmt = self.ti.xcom_pull(key='remote') + mps = self.ti.xcom_pull(key='mappings') + self.assertEqual(len(mps), 1, f"{mps}") + self.assertDictEqual(mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") + self.assertEqual(lcl, 'tmp_name', f"unexpecting local name: {lcl}") + self.assertEqual(rmt, 'ooo.txt', f"unexpected remote name: {rmt}" ) + def test_get_files(self): with patch('dags.b2shareoperator.requests.get') as get: @@ -43,4 +56,31 @@ class B2ShareOperatorTest(unittest.TestCase): get.return_value = m ret = get_file_list(obj={'links': {'files': ['bla']}}) self.assertEqual(len(ret), 1) + + def test_download_file(self): + with patch('dags.b2shareoperator.urllib.request.urlretrieve') as rr: + with patch('dags.b2shareoperator.tempfile.mktemp') as mt: + mt.return_value = '/tmp/val' + fname = download_file(url='http://foo.bar', target_dir='/no/tmp/') + self.assertEqual(fname, '/tmp/val') + + def test_get_md(self): + with patch('dags.b2shareoperator.requests.get') as get: + m = Mock() + rval = {'links': {'files': ['a', 'b']}} + m.json.return_value = rval + get.return_value = m + r = get_object_md(server='foo', oid='bar') + self.assertDictEqual(rval, r) + + def test_get_objects(self): + with patch('dags.b2shareoperator.requests.get') as get: + m = Mock() + rval = {'hits': {'hits': ['a', 'b']}} + m.json.return_value = rval + get.return_value = m + r = get_objects(server='foo') + self.assertListEqual(['a', 'b'], r) + +