From ff21daec28e362d4082d186a8c193d26908f396b Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Wed, 19 Jan 2022 13:13:15 +0100 Subject: [PATCH] test working again --- tests/test_b2shareoperator.py | 35 ++++++++++++++++++++--------------- tests/test_dag.py | 3 ++- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py index f873cc0..3a28ea3 100644 --- a/tests/test_b2shareoperator.py +++ b/tests/test_b2shareoperator.py @@ -3,6 +3,8 @@ from unittest.mock import Mock, patch import tempfile import os +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType from airflow import DAG from airflow.models.taskinstance import TaskInstance from airflow.utils.dates import days_ago @@ -14,18 +16,18 @@ from dags.b2shareoperator import (B2ShareOperator, download_file, DEFAULT_DATE = '2019-10-03' TEST_DAG_ID = 'test_my_custom_operator' +TEST_TASK_ID = 'test' class B2ShareOperatorTest(unittest.TestCase): def setUp(self): - self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', - default_args={'start_date': days_ago(2)}, params={"oid": "111"}) - self.op = B2ShareOperator( - dag=self.dag, - task_id='test', - name='test_name' - ) - self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) + with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag: + B2ShareOperator( + task_id=TEST_TASK_ID, + name='test_name' + ) + self.dag = dag + #self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) @patch('dags.b2shareoperator.HttpHook') @patch('dags.b2shareoperator.get_file_list') @@ -34,18 +36,21 @@ class B2ShareOperatorTest(unittest.TestCase): gfl.return_value = {'ooo.txt': 'htt://file/to/download'} down.return_value = 'tmp_name' - self.ti.run(ignore_ti_state=True, test_mode=True) - print(self.ti.state) + dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL) + ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) + ti.task = self.dag.get_task(task_id=TEST_TASK_ID) + ti.run(ignore_ti_state=True) + print(ti.state) - self.assertEqual(State.SUCCESS, self.ti.state) + self.assertEqual(State.SUCCESS, ti.state) # return value - ret = self.ti.xcom_pull() + ret = ti.xcom_pull() self.assertEqual(ret, 1, f"{ret}") - lcl = self.ti.xcom_pull(key='local') - rmt = self.ti.xcom_pull(key='remote') - mps = self.ti.xcom_pull(key='mappings') + lcl = ti.xcom_pull(key='local') + rmt = ti.xcom_pull(key='remote') + mps = ti.xcom_pull(key='mappings') self.assertEqual(len(mps), 1, f"{mps}") self.assertDictEqual( mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") diff --git a/tests/test_dag.py b/tests/test_dag.py index 3a91270..360a698 100644 --- a/tests/test_dag.py +++ b/tests/test_dag.py @@ -10,7 +10,8 @@ class TestADag(unittest.TestCase): def test_dag_loaded(self): dag = self.dagbag.get_dag(dag_id='firsto') - assert self.dagbag.import_errors == {} + print(self.dagbag.import_errors) + self.assertDictEqual(self.dagbag.import_errors, {}, "not equal") assert dag is not None self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") -- GitLab