Newer
Older
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from airflow import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.utils.dates import days_ago
from airflow.utils.state import State
from dags.b2shareoperator import (B2ShareOperator, download_file,
get_file_list, get_object_md, get_objects,
get_record_template, create_draft_record, add_file, submit_draft)
DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator'
with DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date': days_ago(2)}, params={"oid": "111"}) as dag:
B2ShareOperator(
task_id=TEST_TASK_ID,
name='test_name'
)
self.dag = dag
#self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
@patch('dags.b2shareoperator.HttpHook')
@patch('dags.b2shareoperator.get_file_list')
@patch('dags.b2shareoperator.download_file')
def test_alt_execute_no_trigger(self, down, gfl, ht):
gfl.return_value = {'ooo.txt': 'htt://file/to/download'}
down.return_value = 'tmp_name'
dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING, run_id=TEST_DAG_ID, run_type=DagRunType.MANUAL)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
print(ti.state)
lcl = ti.xcom_pull(key='local')
rmt = ti.xcom_pull(key='remote')
mps = ti.xcom_pull(key='mappings')
self.assertDictEqual(
mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}")
self.assertEqual(lcl, 'tmp_name', f"unexpecting local name: {lcl}")
self.assertEqual(rmt, 'ooo.txt', f"unexpected remote name: {rmt}")
def test_get_files(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
m.json.return_value = {'contents': [
{'key': 'veryimportant.txt', 'links': {'self': 'http://foo.bar'}}]}
get.return_value = m
ret = get_file_list(obj={'links': {'files': ['bla']}})
self.assertEqual(len(ret), 1)
def test_download_file(self):
with patch('dags.b2shareoperator.urllib.request.urlretrieve') as rr:
fname = download_file(
url='http://foo.bar', target_dir='/no/tmp/')
def test_get_md(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
rval = {'links': {'files': ['a', 'b']}}
m.json.return_value = rval
r = get_object_md(server='foo', oid='bar')
self.assertDictEqual(rval, r)
def test_get_objects(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
rval = {'hits': {'hits': ['a', 'b']}}
m.json.return_value = rval
get.return_value = m
r = get_objects(server='foo')
self.assertListEqual(['a', 'b'], r)
def test_upload(self):
template = get_record_template()
server='https://b2share-testing.fz-juelich.de/'
token = ''
with patch('dags.b2shareoperator.requests.post') as post:
r = create_draft_record(server=server, token=token, record=template)
r = dict()
r['links']={'files':server, 'self': server}
with patch('dags.b2shareoperator.requests.post') as put:
a = tempfile.NamedTemporaryFile()
a.write(b"some content")
up = add_file(record=r, fname=a.name, token=token, remote='/tmp/somefile.txt')
with patch('dags.b2shareoperator.requests.patch') as p:
submitted = submit_draft(record=r, token=token)