Skip to content
Snippets Groups Projects
Commit 444353aa authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merge new updates from branch 'main' into mptest

parents ac29be0a 16254e9f
No related branches found
No related tags found
No related merge requests found
Pipeline #78158 passed
...@@ -17,7 +17,19 @@ docker-compose -f dockers/docker-compose.yaml --project-directory . up -d ...@@ -17,7 +17,19 @@ docker-compose -f dockers/docker-compose.yaml --project-directory . up -d
``` ```
## Setup connection ## Setup connection
### B2Share connection
Here we use testing instance (check hostname)
``` ```
curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' airflow:7001/api/v1/connections
``` ```
### SSH
Copy to target goes through scp (example with username/pass)
```
curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_ssh", "conn_type": "ssh", "host": "ssh", "login": "user", "port": 2222, "password": "pass"}' airflow:7001/api/v1/connections
```
...@@ -20,33 +20,51 @@ def get_object_md(server, oid): ...@@ -20,33 +20,51 @@ def get_object_md(server, oid):
return obj return obj
def download_file(url: str, target_dir: str): def download_file(url: str, target_dir: str):
fname = tempfile.mktemp(dir=target_dir) fname = tempfile.mktemp(dir=target_dir)
urllib.request.urlretrieve(url=url, filename=fname) urllib.request.urlretrieve(url=url, filename=fname)
return fname return fname
server='https://b2share-testing.fz-juelich.de/'
class B2ShareOperator(BaseOperator): class B2ShareOperator(BaseOperator):
template_fields = ('target_dir',)
def __init__( def __init__(
self, self,
name: str, name: str,
conn_id: str = 'default_b2share', conn_id: str = 'default_b2share', # 'https://b2share-testing.fz-juelich.de/',
target_dir: str = '/tmp/',
**kwargs) -> None: **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.name = name self.name = name
self.conn_id = conn_id self.conn_id = conn_id
self.target_dir = target_dir
print(self.target_dir)
def execute(self, **kwargs):
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
print('Kwargs')
print(kwargs) print(kwargs)
def execute(self, context): params = kwargs['context']['params']
oid = params['oid']
obj = get_object_md(server=server, oid=oid)
print(f"Retrieved object {oid}: {obj}")
flist = get_file_list(obj)
ti = kwargs['context']['ti']
name_mappings = {}
for fname, url in flist.items():
tmpname = download_file(url=url, target_dir=self.target_dir)
print(f"Processing: {fname} --> {url} --> {tmpname}")
name_mappings[fname]=tmpname
ti.xcom_push(key='local', value=tmpname)
ti.xcom_push(key='remote', value=fname)
break # for now only one file
connection = Connection.get_connection_from_secrets(self.conn_id)
print(f"Rereiving data from {connection.get_uri()}")
lst = get_objects(server=connection.get_uri()) return len(name_mappings)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"GOT: {flist}")
print(self.params)
return len(flist)
...@@ -5,7 +5,7 @@ from airflow import DAG ...@@ -5,7 +5,7 @@ from airflow import DAG
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from b2shareoperator import B2ShareOperator from b2shareoperator import B2ShareOperator
def_args = { def_args = {
...@@ -19,10 +19,20 @@ def_args = { ...@@ -19,10 +19,20 @@ def_args = {
} }
with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag:
t1 = BashOperator(task_id='print_date', bash_command='date')
t2 = BashOperator(task_id='do_noting', bash_command='sleep 5')
t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share')
t1 >> t2 >> t3 get_b2obj = B2ShareOperator(task_id='task_b2sh',
dag=dag,
name='B2Share',
target_dir="{{ var.value.source_path}}")
put_file = SFTPOperator(
task_id="upload_scp",
ssh_conn_id="default_ssh",
local_filepath="{{ti.xcom_pull(task_ids='task_b2sh', key='local')}}",
remote_filepath="{{ti.xcom_pull(task_ids='task_b2sh',key='remote')}}",
operation="put",
create_intermediate_dirs=True,
dag=dag)
get_b2obj >> put_file
...@@ -23,7 +23,7 @@ def taskflow_example(**kwargs): ...@@ -23,7 +23,7 @@ def taskflow_example(**kwargs):
print(f"Rereiving data from {server}") print(f"Rereiving data from {server}")
params = kwargs['params'] params = kwargs['params']
if 'oid' not in params: if 'oid' not in params: #{"oid":"b38609df2b334ea296ea1857e568dbea"}
print(f"Missing object id in pipeline parameters") print(f"Missing object id in pipeline parameters")
lst = get_objects(server=server) lst = get_objects(server=server)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst} flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
...@@ -52,8 +52,8 @@ def taskflow_example(**kwargs): ...@@ -52,8 +52,8 @@ def taskflow_example(**kwargs):
ssh_hook = SSHHook(ssh_conn_id='default_ssh') ssh_hook = SSHHook(ssh_conn_id='default_ssh')
with ssh_hook.get_conn() as ssh_client: with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
for [local, remote] in files.items(): for [truename, local] in files.items():
sftp_client.put(local, f"/tmp/{remote}") sftp_client.put(local, f"/tmp/{truename}")
......
...@@ -10,4 +10,4 @@ class TestADag(unittest.TestCase): ...@@ -10,4 +10,4 @@ class TestADag(unittest.TestCase):
dag = self.dagbag.get_dag(dag_id='firsto') dag = self.dagbag.get_dag(dag_id='firsto')
assert self.dagbag.import_errors == {} assert self.dagbag.import_errors == {}
assert dag is not None assert dag is not None
self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}") self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}")
\ No newline at end of file \ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment