Skip to content
Snippets Groups Projects
Commit ff21daec authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

test working again

parent 9d966404
Branches
Tags
No related merge requests found
Pipeline #88756 passed
...@@ -3,6 +3,8 @@ from unittest.mock import Mock, patch ...@@ -3,6 +3,8 @@ from unittest.mock import Mock, patch
import tempfile import tempfile
import os import os
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from airflow import DAG from airflow import DAG
from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstance import TaskInstance
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
...@@ -14,18 +16,18 @@ from dags.b2shareoperator import (B2ShareOperator, download_file, ...@@ -14,18 +16,18 @@ from dags.b2shareoperator import (B2ShareOperator, download_file,
DEFAULT_DATE = '2019-10-03' DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator' TEST_DAG_ID = 'test_my_custom_operator'
TEST_TASK_ID = 'test'
class B2ShareOperatorTest(unittest.TestCase): class B2ShareOperatorTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag:
default_args={'start_date': days_ago(2)}, params={"oid": "111"}) B2ShareOperator(
self.op = B2ShareOperator( task_id=TEST_TASK_ID,
dag=self.dag,
task_id='test',
name='test_name' name='test_name'
) )
self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) self.dag = dag
#self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
@patch('dags.b2shareoperator.HttpHook') @patch('dags.b2shareoperator.HttpHook')
@patch('dags.b2shareoperator.get_file_list') @patch('dags.b2shareoperator.get_file_list')
...@@ -34,18 +36,21 @@ class B2ShareOperatorTest(unittest.TestCase): ...@@ -34,18 +36,21 @@ class B2ShareOperatorTest(unittest.TestCase):
gfl.return_value = {'ooo.txt': 'htt://file/to/download'} gfl.return_value = {'ooo.txt': 'htt://file/to/download'}
down.return_value = 'tmp_name' down.return_value = 'tmp_name'
self.ti.run(ignore_ti_state=True, test_mode=True) dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL)
print(self.ti.state) ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
print(ti.state)
self.assertEqual(State.SUCCESS, self.ti.state) self.assertEqual(State.SUCCESS, ti.state)
# return value # return value
ret = self.ti.xcom_pull() ret = ti.xcom_pull()
self.assertEqual(ret, 1, f"{ret}") self.assertEqual(ret, 1, f"{ret}")
lcl = self.ti.xcom_pull(key='local') lcl = ti.xcom_pull(key='local')
rmt = self.ti.xcom_pull(key='remote') rmt = ti.xcom_pull(key='remote')
mps = self.ti.xcom_pull(key='mappings') mps = ti.xcom_pull(key='mappings')
self.assertEqual(len(mps), 1, f"{mps}") self.assertEqual(len(mps), 1, f"{mps}")
self.assertDictEqual( self.assertDictEqual(
mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}") mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}")
......
...@@ -10,7 +10,8 @@ class TestADag(unittest.TestCase): ...@@ -10,7 +10,8 @@ class TestADag(unittest.TestCase):
def test_dag_loaded(self): def test_dag_loaded(self):
dag = self.dagbag.get_dag(dag_id='firsto') dag = self.dagbag.get_dag(dag_id='firsto')
assert self.dagbag.import_errors == {} print(self.dagbag.import_errors)
self.assertDictEqual(self.dagbag.import_errors, {}, "not equal")
assert dag is not None assert dag is not None
self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment