Skip to content
Snippets Groups Projects
Commit 2bfd066b authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

same workflow now without taskflow

parent 77631454
No related branches found
No related tags found
No related merge requests found
Pipeline #77881 failed
...@@ -20,33 +20,51 @@ def get_object_md(server, oid): ...@@ -20,33 +20,51 @@ def get_object_md(server, oid):
return obj return obj
def download_file(url: str, target_dir: str): def download_file(url: str, target_dir: str):
fname = tempfile.mktemp(dir=target_dir) fname = tempfile.mktemp(dir=target_dir)
urllib.request.urlretrieve(url=url, filename=fname) urllib.request.urlretrieve(url=url, filename=fname)
return fname return fname
server='https://b2share-testing.fz-juelich.de/'
class B2ShareOperator(BaseOperator): class B2ShareOperator(BaseOperator):
template_fields = ('target_dir',)
def __init__( def __init__(
self, self,
name: str, name: str,
conn_id: str = 'default_b2share', conn_id: str = 'default_b2share', # 'https://b2share-testing.fz-juelich.de/',
target_dir: str = '/tmp/',
**kwargs) -> None: **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.name = name self.name = name
self.conn_id = conn_id self.conn_id = conn_id
self.target_dir = target_dir
print(self.target_dir)
def execute(self, **kwargs):
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
print('Kwargs')
print(kwargs) print(kwargs)
def execute(self, context): params = kwargs['context']['params']
oid = params['oid']
obj = get_object_md(server=server, oid=oid)
print(f"Retrieved object {oid}: {obj}")
flist = get_file_list(obj)
ti = kwargs['context']['ti']
name_mappings = {}
for fname, url in flist.items():
tmpname = download_file(url=url, target_dir=self.target_dir)
print(f"Processing: {fname} --> {url} --> {tmpname}")
name_mappings[fname]=tmpname
ti.xcom_push(key='local', value=tmpname)
ti.xcom_push(key='remote', value=fname)
break # for now only one file
connection = Connection.get_connection_from_secrets(self.conn_id)
print(f"Rereiving data from {connection.get_uri()}")
lst = get_objects(server=connection.get_uri()) return len(name_mappings)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"GOT: {flist}")
print(self.params)
return len(flist)
...@@ -5,7 +5,7 @@ from airflow import DAG ...@@ -5,7 +5,7 @@ from airflow import DAG
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from b2shareoperator import B2ShareOperator from b2shareoperator import B2ShareOperator
def_args = { def_args = {
...@@ -19,10 +19,20 @@ def_args = { ...@@ -19,10 +19,20 @@ def_args = {
} }
with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag:
t1 = BashOperator(task_id='print_date', bash_command='date')
t2 = BashOperator(task_id='do_noting', bash_command='sleep 5')
t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share')
t1 >> t2 >> t3 get_b2obj = B2ShareOperator(task_id='task_b2sh',
dag=dag,
name='B2Share',
target_dir="{{ var.value.source_path}}")
put_file = SFTPOperator(
task_id="upload_scp",
ssh_conn_id="default_ssh",
local_filepath="{{ti.xcom_pull(task_ids='task_b2sh', key='local')}}",
remote_filepath="{{ti.xcom_pull(task_ids='task_b2sh',key='remote')}}",
operation="put",
create_intermediate_dirs=True,
dag=dag)
get_b2obj >> put_file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment