Skip to content
Snippets Groups Projects
Commit 5ccb37b5 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

directory making for upload, and removal temp files for registration

parent e82e0fdc
No related branches found
No related tags found
No related merge requests found
Pipeline #95951 passed
...@@ -62,6 +62,8 @@ def taskflow_example(): ...@@ -62,6 +62,8 @@ def taskflow_example():
with ssh_hook.get_conn() as ssh_client: with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
# check dir?
sftp_client.mkdir(target, ignore_existing=True)
for [truename, local] in files.items(): for [truename, local] in files.items():
print( print(
f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}")
......
...@@ -7,6 +7,7 @@ from airflow.models.connection import Connection ...@@ -7,6 +7,7 @@ from airflow.models.connection import Connection
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.hooks.http import HttpHook
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.models import Variable
from b2shareoperator import (add_file, create_draft_record, get_community, from b2shareoperator import (add_file, create_draft_record, get_community,
submit_draft) submit_draft)
...@@ -41,7 +42,7 @@ def upload_example(): ...@@ -41,7 +42,7 @@ def upload_example():
@task() @task()
def load(connection_id, **kwargs): def load(connection_id, **kwargs):
params = kwargs['params'] params = kwargs['params']
target = params.get('target', '/tmp/') target = Variable.get("working_dir", default_var='/tmp/')
source = params.get('source', '/tmp/') source = params.get('source', '/tmp/')
ssh_hook = get_connection(conn_id=connection_id, **kwargs) ssh_hook = get_connection(conn_id=connection_id, **kwargs)
...@@ -93,10 +94,13 @@ def upload_example(): ...@@ -93,10 +94,13 @@ def upload_example():
for [local, true_name] in files.items(): for [local, true_name] in files.items():
print(f"Uploading {local} --> {true_name}") print(f"Uploading {local} --> {true_name}")
_ = add_file(record=r, fname=local, token=token, remote=true_name) _ = add_file(record=r, fname=local, token=token, remote=true_name)
# delete local
os.unlink(local)
print("Submitting record for pubication") print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token) submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted['id']}") print(f"Record created {submitted['id']}")
return submitted['id'] return submitted['id']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment