Skip to content
Snippets Groups Projects
Commit d588d77b authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merge branch 'main' into mptest

parents 553a0be8 d7b49887
Branches
Tags
No related merge requests found
......@@ -23,7 +23,7 @@ test:
- cp dags/* /opt/airflow/dags/
- airflow dags list
- airflow connections list
- airflow dags test firsto 2021-08-18
- airflow dags test testdag 2021-08-18
- nosetests
deploy-test:
......
......@@ -32,4 +32,8 @@ Copy to target goes through scp (example with username/pass)
curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_ssh", "conn_type": "ssh", "host": "ssh", "login": "user", "port": 2222, "password": "pass"}' airflow:7001/api/v1/connections
```
Connections can also be added through env variables, like
```
AIRFLOW_CONN_MY_PROD_DATABASE=my-conn-type://login:password@host:port/schema?param1=val1&param2=val2
```
\ No newline at end of file
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
from airflow.providers.http.hooks.http import HttpHook
import requests
from urllib.parse import urljoin
import tempfile
import urllib
from urllib.parse import urljoin
import requests
from airflow.models.baseoperator import BaseOperator
from airflow.providers.http.hooks.http import HttpHook
def get_objects(server):
lst = requests.get(urljoin(server, 'api/records')).json()
return lst['hits']['hits']
def get_file_list(obj):
file_url = obj['links']['files']
fls = requests.get(file_url).json()
return {it['key']: it['links']['self'] for it in fls['contents']}
def get_object_md(server, oid):
obj = requests.get(urljoin(server, f"api/records/{oid}")).json()
return obj
def download_file(url: str, target_dir: str):
fname = tempfile.mktemp(dir=target_dir)
urllib.request.urlretrieve(url=url, filename=fname)
return fname
class B2ShareOperator(BaseOperator):
template_fields = ('target_dir',)
......@@ -62,5 +65,5 @@ class B2ShareOperator(BaseOperator):
ti.xcom_push(key='remote', value=fname)
break # for now only one file
ti.xcom_push(key='mappins', value=name_mappings)
ti.xcom_push(key='mappings', value=name_mappings)
return len(name_mappings)
from datetime import timedelta
from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from b2shareoperator import B2ShareOperator
def_args = {
......@@ -35,4 +33,3 @@ with DAG('firsto', default_args=def_args, description='first dag', schedule_inte
dag=dag)
get_b2obj >> put_file
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.models.connection import Connection
from airflow.models.dagrun import DagRun
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago
import requests
import urllib.request
import tempfile
from b2shareoperator import get_file_list, download_file, get_object_md, get_objects
from b2shareoperator import (download_file, get_file_list, get_object_md,
get_objects)
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_example(**kwargs):
def taskflow_example():
@task(multiple_outputs=True)
def extract(**kwargs):
connection = Connection.get_connection_from_secrets('default_b2share')
......@@ -24,12 +22,12 @@ def taskflow_example(**kwargs):
params = kwargs['params']
if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"}
print(f"Missing object id in pipeline parameters")
print("Missing object id in pipeline parameters")
lst = get_objects(server=server)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"Objects on server: {flist}")
return {}
else:
return -1 # non zero exit code is a task failure
oid = params['oid']
obj = get_object_md(server=server, oid=oid)
......@@ -55,11 +53,9 @@ def taskflow_example(**kwargs):
for [truename, local] in files.items():
sftp_client.put(local, f"/tmp/{truename}")
data = extract()
files = transform(data)
load(files)
dag = taskflow_example()
dag = taskflow_example()
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
def_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('testdag', default_args=def_args, description='simple testing dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag:
t1 = BashOperator(task_id='print_date', bash_command='date')
t2 = BashOperator(task_id='do_noting', bash_command='sleep 5')
t1 >> t2
import unittest
from unittest.mock import patch, Mock
from unittest.mock import Mock, patch
from airflow import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.utils.dates import days_ago
from airflow.utils.state import State
from dags.b2shareoperator import B2ShareOperator, get_file_list
from dags.b2shareoperator import (B2ShareOperator, download_file,
get_file_list, get_object_md, get_objects)
DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator'
......@@ -23,7 +24,6 @@ class B2ShareOperatorTest(unittest.TestCase):
)
self.ti = TaskInstance(task=self.op, execution_date=days_ago(1))
@patch('dags.b2shareoperator.HttpHook')
@patch('dags.b2shareoperator.get_file_list')
@patch('dags.b2shareoperator.download_file')
......@@ -33,14 +33,53 @@ class B2ShareOperatorTest(unittest.TestCase):
self.ti.run(ignore_ti_state=True, test_mode=True)
print(self.ti.state)
self.assertEqual(State.SUCCESS, self.ti.state)
# Assert something related to tasks results
# return value
ret = self.ti.xcom_pull()
self.assertEqual(ret, 1, f"{ret}")
lcl = self.ti.xcom_pull(key='local')
rmt = self.ti.xcom_pull(key='remote')
mps = self.ti.xcom_pull(key='mappings')
self.assertEqual(len(mps), 1, f"{mps}")
self.assertDictEqual(
mps, {'ooo.txt': 'tmp_name'}, f"unexpecting mappings: {mps}")
self.assertEqual(lcl, 'tmp_name', f"unexpecting local name: {lcl}")
self.assertEqual(rmt, 'ooo.txt', f"unexpected remote name: {rmt}")
def test_get_files(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
m.json.return_value = {'contents': [{'key': 'veryimportant.txt', 'links':{'self': 'http://foo.bar'}}]}
m.json.return_value = {'contents': [
{'key': 'veryimportant.txt', 'links': {'self': 'http://foo.bar'}}]}
get.return_value = m
ret = get_file_list(obj={'links': {'files': ['bla']}})
self.assertEqual(len(ret), 1)
def test_download_file(self):
with patch('dags.b2shareoperator.urllib.request.urlretrieve') as rr:
with patch('dags.b2shareoperator.tempfile.mktemp') as mt:
mt.return_value = '/tmp/val'
fname = download_file(
url='http://foo.bar', target_dir='/no/tmp/')
self.assertEqual(fname, '/tmp/val')
def test_get_md(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
rval = {'links': {'files': ['a', 'b']}}
m.json.return_value = rval
get.return_value = m
r = get_object_md(server='foo', oid='bar')
self.assertDictEqual(rval, r)
def test_get_objects(self):
with patch('dags.b2shareoperator.requests.get') as get:
m = Mock()
rval = {'hits': {'hits': ['a', 'b']}}
m.json.return_value = rval
get.return_value = m
r = get_objects(server='foo')
self.assertListEqual(['a', 'b'], r)
from airflow.models import DagBag
import unittest
from airflow.models import DagBag
class TestADag(unittest.TestCase):
@classmethod
def setUpClass(cls):
......@@ -11,3 +13,9 @@ class TestADag(unittest.TestCase):
assert self.dagbag.import_errors == {}
assert dag is not None
self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}")
def test_tf_loaded(self):
dag = self.dagbag.get_dag(dag_id='taskflow_example')
assert self.dagbag.import_errors == {}
assert dag is not None
self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment