Skip to content
Snippets Groups Projects
Commit b5df0971 authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merge the updated tests from branch 'main' into mptest

parents ca6501f0 ff21daec
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,8 @@ from unittest.mock import Mock, patch ...@@ -3,6 +3,8 @@ from unittest.mock import Mock, patch
import tempfile import tempfile
import os import os
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from airflow import DAG from airflow import DAG
from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstance import TaskInstance
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
...@@ -14,18 +16,18 @@ from dags.b2shareoperator import (B2ShareOperator, download_file, ...@@ -14,18 +16,18 @@ from dags.b2shareoperator import (B2ShareOperator, download_file,
DEFAULT_DATE = '2019-10-03' DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator' TEST_DAG_ID = 'test_my_custom_operator'
TEST_TASK_ID = 'test'
class B2ShareOperatorTest(unittest.TestCase): class B2ShareOperatorTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag:
default_args={'start_date': days_ago(2)}, params={"oid": "111"}) B2ShareOperator(
self.op = B2ShareOperator( task_id=TEST_TASK_ID,
dag=self.dag, name='test_name'
task_id='test', )
name='test_name' self.dag = dag
) #self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
@patch('dags.b2shareoperator.HttpHook') @patch('dags.b2shareoperator.HttpHook')
@patch('dags.b2shareoperator.get_file_list') @patch('dags.b2shareoperator.get_file_list')
...@@ -34,18 +36,21 @@ class B2ShareOperatorTest(unittest.TestCase): ...@@ -34,18 +36,21 @@ class B2ShareOperatorTest(unittest.TestCase):
gfl.return_value = {'ooo.txt': 'htt://file/to/download'} gfl.return_value = {'ooo.txt': 'htt://file/to/download'}
down.return_value = 'tmp_name' down.return_value = 'tmp_name'
self.ti.run(ignore_ti_state=True, test_mode=True) dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL)
print(self.ti.state) ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
print(ti.state)
self.assertEqual(State.SUCCESS, self.ti.state) self.assertEqual(State.SUCCESS, ti.state)
# return value # return value
ret = self.ti.xcom_pull() ret = ti.xcom_pull()
self.assertEqual(ret, 1, f"{ret}") self.assertEqual(ret, 1, f"{ret}")
lcl = self.ti.xcom_pull(key='local') lcl = ti.xcom_pull(key='local')
rmt = self.ti.xcom_pull(key='remote') rmt = ti.xcom_pull(key='remote')
mps = self.ti.xcom_pull(key='mappings') mps = ti.xcom_pull(key='mappings')
self.assertEqual(len(mps), 1, f"{mps}") self.assertEqual(len(mps), 1, f"{mps}")
self.assertDictEqual( self.assertDictEqual(
mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}")
......
...@@ -10,7 +10,8 @@ class TestADag(unittest.TestCase): ...@@ -10,7 +10,8 @@ class TestADag(unittest.TestCase):
def test_dag_loaded(self): def test_dag_loaded(self):
dag = self.dagbag.get_dag(dag_id='firsto') dag = self.dagbag.get_dag(dag_id='firsto')
assert self.dagbag.import_errors == {} print(self.dagbag.import_errors)
self.assertDictEqual(self.dagbag.import_errors, {}, "not equal")
assert dag is not None assert dag is not None
self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment