Skip to content
Snippets Groups Projects
Commit abbf29b9 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

output as DAG parameter and new version of airflow to properly work with ssh keys

parent 0ecdeb08
Branches
Tags
No related merge requests found
...@@ -3,6 +3,7 @@ from airflow.decorators import dag, task ...@@ -3,6 +3,7 @@ from airflow.decorators import dag, task
from airflow.models.connection import Connection from airflow.models.connection import Connection
from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
import os
from b2shareoperator import (download_file, get_file_list, get_object_md, from b2shareoperator import (download_file, get_file_list, get_object_md,
get_objects) get_objects)
...@@ -45,13 +46,20 @@ def taskflow_example(): ...@@ -45,13 +46,20 @@ def taskflow_example():
return name_mappings return name_mappings
@task() @task()
def load(files: dict): def load(files: dict, **kwargs):
print(f"Total files downloaded: {len(files)}") print(f"Total files downloaded: {len(files)}")
params = kwargs['params']
if 'target' not in params:
target = '/tmp/'
print(f"Using default target {target}")
else:
target = params['target']
ssh_hook = SSHHook(ssh_conn_id='default_ssh') ssh_hook = SSHHook(ssh_conn_id='default_ssh')
with ssh_hook.get_conn() as ssh_client: with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
for [truename, local] in files.items(): for [truename, local] in files.items():
sftp_client.put(local, f"/tmp/{truename}") sftp_client.put(local, os.path.join(target, truename))
data = extract() data = extract()
files = transform(data) files = transform(data)
......
...@@ -44,7 +44,7 @@ ...@@ -44,7 +44,7 @@
version: '2.4' version: '2.4'
x-airflow-common: x-airflow-common:
&airflow-common &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2} image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
environment: environment:
&airflow-common-env &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__EXECUTOR: CeleryExecutor
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment