Skip to content
Snippets Groups Projects
Commit f93d1dc3 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

fixing tests

parent ad0eb633
Branches
No related tags found
No related merge requests found
Pipeline #78291 failed
...@@ -40,18 +40,17 @@ class B2ShareOperator(BaseOperator): ...@@ -40,18 +40,17 @@ class B2ShareOperator(BaseOperator):
self.name = name self.name = name
self.conn_id = conn_id self.conn_id = conn_id
self.target_dir = target_dir self.target_dir = target_dir
print(self.target_dir)
def execute(self, **kwargs): def execute(self, **kwargs):
hook = HttpHook(http_conn_id=self.conn_id, method='GET') hook = HttpHook(http_conn_id=self.conn_id, method='GET')
print(kwargs)
params = kwargs['context']['params'] params = kwargs['context']['params']
oid = params['oid'] oid = params['oid']
hrespo = hook.run(endpoint=f"/api/records/{oid}") hrespo = hook.run(endpoint=f"/api/records/{oid}")
print(hrespo) print(hrespo)
flist = get_file_list(hrespo.json()) flist = get_file_list(hrespo.json())
print(flist)
ti = kwargs['context']['ti'] ti = kwargs['context']['ti']
name_mappings = {} name_mappings = {}
for fname, url in flist.items(): for fname, url in flist.items():
......
import unittest import unittest
from airflow.utils.state import State from unittest.mock import patch, Mock
from airflow.utils.dates import days_ago
from dags.b2shareoperator import B2ShareOperator
from airflow import DAG from airflow import DAG
from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstance import TaskInstance
from airflow.utils.dates import days_ago
from airflow.utils.state import State
from dags.b2shareoperator import B2ShareOperator, get_file_list
DEFAULT_DATE = '2019-10-03' DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator' TEST_DAG_ID = 'test_my_custom_operator'
class B2ShareOperatorTest(unittest.TestCase): class B2ShareOperatorTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : days_ago(2)}, params={"oid": "111"}) self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily',
default_args={'start_date': days_ago(2)}, params={"oid": "111"})
self.op = B2ShareOperator( self.op = B2ShareOperator(
dag=self.dag, dag=self.dag,
task_id='test', task_id='test',
...@@ -19,9 +23,24 @@ class B2ShareOperatorTest(unittest.TestCase): ...@@ -19,9 +23,24 @@ class B2ShareOperatorTest(unittest.TestCase):
) )
self.ti = TaskInstance(task=self.op, execution_date=days_ago(1)) self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
def test_execute_no_trigger(self):
#with conf_vars({('params': {'oid': 1111})}): @patch('dags.b2shareoperator.HttpHook')
@patch('dags.b2shareoperator.get_file_list')
@patch('dags.b2shareoperator.download_file')
def test_alt_execute_no_trigger(self, down, gfl, ht):
gfl.return_value = {'ooo.txt': 'htt://file/to/download'}
down.return_value = 'tmp_name'
self.ti.run(ignore_ti_state=False) self.ti.run(ignore_ti_state=False)
print(self.ti.state) print(self.ti.state)
self.assertEqual(State.SUCCESS, self.ti.state) self.assertEqual(State.SUCCESS, self.ti.state)
# Assert something related to tasks results # Assert something related to tasks results
def test_get_files(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
m.json.return_value = {'contents': [{'key': 'veryimportant.txt', 'links':{'self': 'http://foo.bar'}}]}
get.return_value = m
ret = get_file_list(obj={'links': {'files': ['bla']}})
self.assertEqual(len(ret), 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment