From c012e06a6ed1e627095d07718b4f5e00dd180207 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Thu, 11 Nov 2021 12:15:28 +0100 Subject: [PATCH] example for GA meeting --- README.md | 2 +- dags/GAdemo.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 dags/GAdemo.py diff --git a/README.md b/README.md index 5c7f1f8..362aa3a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ eFlows4HPC Data Logistics Service ``` -mkdir ./logs +mkdir ./logs ./tmp echo -e "AIRFLOW_UID=$(id -u)" > .env reqs=`cat requirements.txt | tr '\n' ' '` echo "_PIP_ADDITIONAL_REQUIREMENTS=$reqs" >> .env diff --git a/dags/GAdemo.py b/dags/GAdemo.py new file mode 100644 index 0000000..9cdfda5 --- /dev/null +++ b/dags/GAdemo.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.utils.dates import days_ago +from airflow.sensors.filesystem import FileSensor +from airflow.operators.python import PythonOperator +from airflow.operators.dummy import DummyOperator + +def_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + +} + +def train_model(): + print('Will start model training') + +with DAG('GAtest', default_args=def_args, description='testing GA', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: + s1 = FileSensor(task_id='file_sensor', filepath='/work/afile.txt') + t1 = BashOperator(task_id='move_data', bash_command='date') + t2 = PythonOperator(task_id='train_model', python_callable=train_model) + t3 = BashOperator(task_id='eval_model', bash_command='echo "evaluating"') + t4 = DummyOperator(task_id='upload_model_to_repo') + t5 = DummyOperator(task_id='publish_results') + + s1 >> t1 >> t2 >> t4 + t2 >> t3 >> t5 -- GitLab