Skip to content
Snippets Groups Projects
Commit d9cdacc0 authored by Christian Boettcher's avatar Christian Boettcher
Browse files

Merge branch 'main' into datacatalog-integration

parents ed21253b 45a443bb
Branches
Tags
No related merge requests found
Pipeline #84244 failed
...@@ -2,10 +2,14 @@ ...@@ -2,10 +2,14 @@
eFlows4HPC Data Logistics Service eFlows4HPC Data Logistics Service
This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558. The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway.
## Install and run
``` ```
mkdir ./logs ./plugins git pull ...
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env mkdir ./logs ./tmp
echo -e "AIRFLOW_UID=$(id -u)" > .env
reqs=`cat requirements.txt | tr '\n' ' '` reqs=`cat requirements.txt | tr '\n' ' '`
echo "_PIP_ADDITIONAL_REQUIREMENTS=$reqs" >> .env echo "_PIP_ADDITIONAL_REQUIREMENTS=$reqs" >> .env
... ...
......
...@@ -21,21 +21,22 @@ default_timezone = utc ...@@ -21,21 +21,22 @@ default_timezone = utc
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the # ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor. # full import path to the class when using a custom executor.
executor = SequentialExecutor executor = CeleryExecutor
# The SqlAlchemy connection string to the metadata database. # The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines. # SqlAlchemy supports many different database engines.
# More information here: # More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri # http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
# sql_alchemy_conn = sqlite:////opt/airflow/airflow.db sql_alchemy_conn = sqlite:////opt/airflow/airflow.db
# The encoding for the databases # The encoding for the databases
sql_engine_encoding = utf-8 sql_engine_encoding = utf-8
# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. # Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding.
# This is particularly useful in case of mysql with utf8mb4 encoding because # By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb``
# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should # the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed
# be set to ``utf8mb3_general_ci``. # the maximum size of allowed index when collation is set to ``utf8mb4`` variant
# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618).
# sql_engine_collation_for_ids = # sql_engine_collation_for_ids =
# If SqlAlchemy should pool database connections. # If SqlAlchemy should pool database connections.
...@@ -85,9 +86,12 @@ parallelism = 32 ...@@ -85,9 +86,12 @@ parallelism = 32
# The maximum number of task instances allowed to run concurrently in each DAG. To calculate # The maximum number of task instances allowed to run concurrently in each DAG. To calculate
# the number of tasks that is running concurrently for a DAG, add up the number of running # the number of tasks that is running concurrently for a DAG, add up the number of running
# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``, # tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``,
# which is defaulted as ``dag_concurrency``. # which is defaulted as ``max_active_tasks_per_dag``.
dag_concurrency = 16 #
# An example scenario when this would be useful is when you want to stop a new dag with an early
# start date from stealing all the executor slots in a cluster.
max_active_tasks_per_dag = 16
# Are DAGs paused by default at creation # Are DAGs paused by default at creation
dags_are_paused_at_creation = True dags_are_paused_at_creation = True
...@@ -100,7 +104,7 @@ max_active_runs_per_dag = 16 ...@@ -100,7 +104,7 @@ max_active_runs_per_dag = 16
# Whether to load the DAG examples that ship with Airflow. It's good to # Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production # get started, but you probably want to set this to ``False`` in a production
# environment # environment
load_examples = True load_examples = False
# Whether to load the default connections that ship with Airflow. It's good to # Whether to load the default connections that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production # get started, but you probably want to set this to ``False`` in a production
...@@ -169,6 +173,9 @@ dag_discovery_safe_mode = True ...@@ -169,6 +173,9 @@ dag_discovery_safe_mode = True
# The number of retries each task is going to have by default. Can be overridden at dag or task level. # The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0 default_task_retries = 0
# The weighting method used for the effective total priority weight of the task
default_task_weight_rule = downstream
# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30 min_serialized_dag_update_interval = 30
...@@ -176,13 +183,6 @@ min_serialized_dag_update_interval = 30 ...@@ -176,13 +183,6 @@ min_serialized_dag_update_interval = 30
# read rate. This config controls when your DAGs are updated in the Webserver # read rate. This config controls when your DAGs are updated in the Webserver
min_serialized_dag_fetch_interval = 10 min_serialized_dag_fetch_interval = 10
# Whether to persist DAG files code in DB.
# If set to True, Webserver reads file contents from DB instead of
# trying to access files in a DAG folder.
# (Default is ``True``)
# Example: store_dag_code = True
# store_dag_code =
# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
# in the Database. # in the Database.
# All the template_fields for each of Task Instance are stored in the Database. # All the template_fields for each of Task Instance are stored in the Database.
...@@ -220,6 +220,11 @@ hide_sensitive_var_conn_fields = True ...@@ -220,6 +220,11 @@ hide_sensitive_var_conn_fields = True
# extra JSON. # extra JSON.
sensitive_var_conn_names = sensitive_var_conn_names =
# Task Slot counts for ``default_pool``. This setting would not have any effect in an existing
# deployment where the ``default_pool`` is already created. For existing deployments, users can
# change the number of slots using Webserver, API or the CLI
default_pool_task_slot_count = 128
[logging] [logging]
# The folder where airflow should store its log files # The folder where airflow should store its log files
# This path must be absolute # This path must be absolute
...@@ -258,7 +263,7 @@ logging_level = INFO ...@@ -258,7 +263,7 @@ logging_level = INFO
# Logging level for Flask-appbuilder UI. # Logging level for Flask-appbuilder UI.
# #
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
fab_logging_level = WARN fab_logging_level = WARNING
# Logging class # Logging class
# Specify the class that will specify the logging configuration # Specify the class that will specify the logging configuration
...@@ -297,8 +302,15 @@ task_log_reader = task ...@@ -297,8 +302,15 @@ task_log_reader = task
# A comma\-separated list of third-party logger names that will be configured to print messages to # A comma\-separated list of third-party logger names that will be configured to print messages to
# consoles\. # consoles\.
# Example: extra_loggers = connexion,sqlalchemy # Example: extra_logger_names = connexion,sqlalchemy
extra_loggers = extra_logger_names =
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793
[metrics] [metrics]
...@@ -406,8 +418,9 @@ access_control_allow_headers = ...@@ -406,8 +418,9 @@ access_control_allow_headers =
# Specifies the method or methods allowed when accessing the resource. # Specifies the method or methods allowed when accessing the resource.
access_control_allow_methods = access_control_allow_methods =
# Indicates whether the response can be shared with requesting code from the given origin. # Indicates whether the response can be shared with requesting code from the given origins.
access_control_allow_origin = # Separate URLs with space.
access_control_allow_origins =
[lineage] [lineage]
# what lineage backend to use # what lineage backend to use
...@@ -491,7 +504,7 @@ reload_on_plugin_change = False ...@@ -491,7 +504,7 @@ reload_on_plugin_change = False
# Secret key used to run your flask app. It should be as random as possible. However, when running # Secret key used to run your flask app. It should be as random as possible. However, when running
# more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise # more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise
# one of them will error with "CSRF session token is missing". # one of them will error with "CSRF session token is missing".
secret_key = 8kUFwlRKUhs6i8NBAvUmWg== secret_key = Jvww64wGcBs22UNHJjToNw==
# Number of workers to run the Gunicorn web server # Number of workers to run the Gunicorn web server
workers = 4 workers = 4
...@@ -529,7 +542,7 @@ dag_orientation = LR ...@@ -529,7 +542,7 @@ dag_orientation = LR
# The amount of time (in secs) webserver will wait for initial handshake # The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine # while fetching logs from other worker machine
log_fetch_timeout_sec = 5 log_fetch_timeout_sec = 15
# Time interval (in secs) to wait before next log fetching. # Time interval (in secs) to wait before next log fetching.
log_fetch_delay_sec = 2 log_fetch_delay_sec = 2
...@@ -605,6 +618,10 @@ session_lifetime_minutes = 43200 ...@@ -605,6 +618,10 @@ session_lifetime_minutes = 43200
# Sets a custom page title for the DAGs overview page and site title for all pages # Sets a custom page title for the DAGs overview page and site title for all pages
instance_name = eFlows4HPC instance_name = eFlows4HPC
# How frequently, in seconds, the DAG data will auto-refresh in graph or tree view
# when auto-refresh is turned on
auto_refresh_interval = 3
[email] [email]
# Configuration email backend and whether to # Configuration email backend and whether to
...@@ -654,11 +671,14 @@ smtp_retry_limit = 5 ...@@ -654,11 +671,14 @@ smtp_retry_limit = 5
# additional configuration options based on the Python platform. See: # additional configuration options based on the Python platform. See:
# https://docs.sentry.io/error-reporting/configuration/?platform=python. # https://docs.sentry.io/error-reporting/configuration/?platform=python.
# Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, # Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``,
# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``. # ``ignore_errors``, ``before_breadcrumb``, ``transport``.
# Enable error reporting to Sentry # Enable error reporting to Sentry
sentry_on = false sentry_on = false
sentry_dsn = sentry_dsn =
# Dotted path to a before_send function that the sentry SDK should be configured to use.
# before_send =
[celery_kubernetes_executor] [celery_kubernetes_executor]
# This section only applies if you are using the ``CeleryKubernetesExecutor`` in # This section only applies if you are using the ``CeleryKubernetesExecutor`` in
...@@ -701,13 +721,6 @@ worker_concurrency = 16 ...@@ -701,13 +721,6 @@ worker_concurrency = 16
# Example: worker_prefetch_multiplier = 1 # Example: worker_prefetch_multiplier = 1
# worker_prefetch_multiplier = # worker_prefetch_multiplier =
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793
# Umask that will be used when starting workers with the ``airflow celery worker`` # Umask that will be used when starting workers with the ``airflow celery worker``
# in daemon mode. This control the file-creation mode mask which determines the initial # in daemon mode. This control the file-creation mode mask which determines the initial
# value of file permission bits for newly created files. # value of file permission bits for newly created files.
...@@ -812,10 +825,6 @@ tls_key = ...@@ -812,10 +825,6 @@ tls_key =
# listen (in seconds). # listen (in seconds).
job_heartbeat_sec = 5 job_heartbeat_sec = 5
# How often (in seconds) to check and tidy up 'running' TaskInstancess
# that no longer have a matching DagRun
clean_tis_without_dagrun_interval = 15.0
# The scheduler constantly tries to trigger new tasks (look at the # The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines # scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds). # how often the scheduler should run (in seconds).
...@@ -825,8 +834,10 @@ scheduler_heartbeat_sec = 5 ...@@ -825,8 +834,10 @@ scheduler_heartbeat_sec = 5
# -1 indicates unlimited number # -1 indicates unlimited number
num_runs = -1 num_runs = -1
# The number of seconds to wait between consecutive DAG file processing # Controls how long the scheduler will sleep between loops, but if there was nothing to do
processor_poll_interval = 1 # in the loop. i.e. if it scheduled something then it will start the next loop
# iteration straight away.
scheduler_idle_sleep_time = 1
# Number of seconds after which a DAG file is parsed. The DAG file is parsed every # Number of seconds after which a DAG file is parsed. The DAG file is parsed every
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
...@@ -865,11 +876,8 @@ scheduler_zombie_task_threshold = 300 ...@@ -865,11 +876,8 @@ scheduler_zombie_task_threshold = 300
catchup_by_default = True catchup_by_default = True
# This changes the batch size of queries in the scheduling main loop. # This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by one # If this is too high, SQL query performance may be impacted by
# or more of the following: # complexity of query predicate, and/or excessive locking.
# - reversion to full table scan
# - complexity of query predicate
# - excessive locking
# Additionally, you may hit the maximum allowable query length for your db. # Additionally, you may hit the maximum allowable query length for your db.
# Set this to 0 for no limit (not advised) # Set this to 0 for no limit (not advised)
max_tis_per_query = 512 max_tis_per_query = 512
...@@ -917,6 +925,13 @@ allow_trigger_in_future = False ...@@ -917,6 +925,13 @@ allow_trigger_in_future = False
# DAG dependency detector class to use # DAG dependency detector class to use
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
# How often to check for expired trigger requests that have not run yet.
trigger_timeout_check_interval = 15
[triggerer]
# How many triggers a single Triggerer will run at once, by default.
default_capacity = 1000
[kerberos] [kerberos]
ccache = /tmp/airflow_krb5_ccache ccache = /tmp/airflow_krb5_ccache
...@@ -926,6 +941,12 @@ reinit_frequency = 3600 ...@@ -926,6 +941,12 @@ reinit_frequency = 3600
kinit_path = kinit kinit_path = kinit
keytab = airflow.keytab keytab = airflow.keytab
# Allow to disable ticket forwardability.
forwardable = True
# Allow to remove source IP from token, useful when using token behind NATted Docker host.
include_ip = True
[github_enterprise] [github_enterprise]
api_rev = v3 api_rev = v3
...@@ -941,7 +962,8 @@ end_of_log_mark = end_of_log ...@@ -941,7 +962,8 @@ end_of_log_mark = end_of_log
# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
# Code will construct log_id using the log_id template from the argument above. # Code will construct log_id using the log_id template from the argument above.
# NOTE: The code will prefix the https:// automatically, don't include that here. # NOTE: scheme will default to https if one is not provided
# Example: frontend = http://localhost:5601/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id: "{log_id}"'),sort:!(log.offset,asc))
frontend = frontend =
# Write the task logs to the stdout of the worker, rather than the default files # Write the task logs to the stdout of the worker, rather than the default files
...@@ -964,7 +986,7 @@ use_ssl = False ...@@ -964,7 +986,7 @@ use_ssl = False
verify_certs = True verify_certs = True
[kubernetes] [kubernetes]
# Path to the YAML pod file. If set, all other kubernetes-related fields are ignored. # Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
pod_template_file = pod_template_file =
# The repository of the Kubernetes Image for the Worker to Run # The repository of the Kubernetes Image for the Worker to Run
...@@ -1049,6 +1071,9 @@ worker_pods_pending_timeout = 300 ...@@ -1049,6 +1071,9 @@ worker_pods_pending_timeout = 300
# How often in seconds to check if Pending workers have exceeded their timeouts # How often in seconds to check if Pending workers have exceeded their timeouts
worker_pods_pending_timeout_check_interval = 120 worker_pods_pending_timeout_check_interval = 120
# How often in seconds to check for task instances stuck in "queued" status without a pod
worker_pods_queued_check_interval = 60
# How many pending pods to check for timeout violations in each check interval. # How many pending pods to check for timeout violations in each check interval.
# You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``. # You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
worker_pods_pending_timeout_batch_size = 100 worker_pods_pending_timeout_batch_size = 100
...@@ -1068,5 +1093,3 @@ shards = 5 ...@@ -1068,5 +1093,3 @@ shards = 5
# comma separated sensor classes support in smart_sensor. # comma separated sensor classes support in smart_sensor.
sensors_enabled = NamedHivePartitionSensor sensors_enabled = NamedHivePartitionSensor
rbac = True
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
def_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
def train_model():
print('Will start model training')
with DAG('GAtest', default_args=def_args, description='testing GA', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag:
s1 = FileSensor(task_id='file_sensor', filepath='/work/afile.txt')
t1 = BashOperator(task_id='move_data', bash_command='date')
t2 = PythonOperator(task_id='train_model', python_callable=train_model)
t3 = BashOperator(task_id='eval_model', bash_command='echo "evaluating"')
t4 = DummyOperator(task_id='upload_model_to_repo')
t5 = DummyOperator(task_id='publish_results')
s1 >> t1 >> t2 >> t4
t2 >> t3 >> t5
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
from airflow.decorators import dag, task from airflow.decorators import dag, task
from airflow.models.connection import Connection from airflow.models.connection import Connection
from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.models import Variable
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
import os import os
...@@ -39,9 +40,12 @@ def taskflow_example(): ...@@ -39,9 +40,12 @@ def taskflow_example():
@task(multiple_outputs=True) @task(multiple_outputs=True)
def transform(flist: dict): def transform(flist: dict):
name_mappings = {} name_mappings = {}
tmp_dir = Variable.get("working_dir", default_var='/tmp/')
print(f"Local working dir is: {tmp_dir}")
for fname, url in flist.items(): for fname, url in flist.items():
print(f"Processing: {fname} --> {url}") print(f"Processing: {fname} --> {url}")
tmpname = download_file(url=url, target_dir='/tmp/') tmpname = download_file(url=url, target_dir=tmp_dir)
name_mappings[fname] = tmpname name_mappings[fname] = tmpname
return name_mappings return name_mappings
... ...
......
# Maintenance for customizations
### Footer
The DLS Service has a custom footer to contribute to the consortium image of the eFlows Project. The design of the custom footer is part of templates/main.html. This file is being injected as a volume in docker-compose.yaml, thus overriding the existing template from the public airflow image. For testing reasons, the path has been hard-coded in the docker-compose.yaml.
### Updates
Taking a hard-coded path approach means that with every update of the official airflow image, the currect main.html file has to be pulled anew from the official container.
For example:
```docker exec airflow_airflow-webserver_1 find /home/airflow/ | grep main.html ```
Copy this file into the local repository and substitute the ```<footer>``` section with the custom DLS ```%footer%``` block. In case of a new python version in the official airflow image (and container) you will need to adjust the new path in the volume section of the docker-compose.yaml.
\ No newline at end of file
{
"default_b2share": {
"conn_type": "https",
"description": null,
"host": "b2share-testing.fz-juelich.de",
"login": null,
"password": null,
"schema": "",
"port": null,
"extra": null
},
"default_ssh": {
"conn_type": "ssh",
"description": null,
"host": "openssh-server",
"login": "eflows",
"password": "rand",
"schema": null,
"port": 2222,
"extra": null
}
}
\ No newline at end of file
...@@ -24,12 +24,9 @@ ...@@ -24,12 +24,9 @@
# The following variables are supported: # The following variables are supported:
# #
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:master-python3.8 # Default: apache/airflow:2.2.1
# AIRFLOW_UID - User ID in Airflow containers # AIRFLOW_UID - User ID in Airflow containers
# Default: 50000 # Default: 50000
# AIRFLOW_GID - Group ID in Airflow containers
# Default: 50000
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
# #
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
...@@ -44,11 +41,13 @@ ...@@ -44,11 +41,13 @@
version: '3' version: '3'
x-airflow-common: x-airflow-common:
&airflow-common &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} # In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.1}
# build: .
environment: environment:
&airflow-common-env &airflow-common-env
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE_dags_folder: /opt/airflow/dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
...@@ -60,11 +59,13 @@ x-airflow-common: ...@@ -60,11 +59,13 @@ x-airflow-common:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes: volumes:
- ./dags:/opt/airflow/dags - ./dags:/opt/airflow/dags
- ./config/airflow.cfg:/opt/airflow/airflow.cfg
- ./logs:/opt/airflow/logs - ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins - ./plugins:/opt/airflow/plugins
- ./config/airflow.cfg:/opt/airflow/airflow.cfg - ./templates/main.html:/home/airflow/.local/lib/python3.7/site-packages/airflow/www/templates/airflow/main.html
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" user: "${AIRFLOW_UID:-50000}:0"
depends_on: depends_on:
&airflow-common-depends-on
redis: redis:
condition: service_healthy condition: service_healthy
postgres: postgres:
...@@ -78,7 +79,7 @@ services: ...@@ -78,7 +79,7 @@ services:
POSTGRES_PASSWORD: airflow POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow POSTGRES_DB: airflow
volumes: volumes:
- /var/lib/postgresql/data - postgres-db-volume:/var/lib/postgresql/data
healthcheck: healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"] test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s interval: 5s
...@@ -87,8 +88,8 @@ services: ...@@ -87,8 +88,8 @@ services:
redis: redis:
image: redis:latest image: redis:latest
ports: expose:
- 6379:6379 - 6379
healthcheck: healthcheck:
test: ["CMD", "redis-cli", "ping"] test: ["CMD", "redis-cli", "ping"]
interval: 5s interval: 5s
...@@ -103,23 +104,28 @@ services: ...@@ -103,23 +104,28 @@ services:
- 7001:8080 - 7001:8080
healthcheck: healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s interval: 60s
timeout: 10s timeout: 10s
retries: 5 retries: 5
volumes:
- ./templates/footer.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/appbuilder/footer.html
- ./templates/main.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/airflow/main.html
restart: always restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler: airflow-scheduler:
<<: *airflow-common <<: *airflow-common
command: scheduler command: scheduler
healthcheck: healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s interval: 60s
timeout: 10s timeout: 10s
retries: 5 retries: 5
restart: always restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker: airflow-worker:
<<: *airflow-common <<: *airflow-common
...@@ -128,32 +134,151 @@ services: ...@@ -128,32 +134,151 @@ services:
test: test:
- "CMD-SHELL" - "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s interval: 30s
timeout: 10s timeout: 10s
retries: 5 retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always restart: always
volumes:
- ./dags:/opt/airflow/dags
- ./config/airflow.cfg:/opt/airflow/airflow.cfg
- ./logs:/opt/airflow/logs
- ./tmp/:/work/
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 60s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init: airflow-init:
<<: *airflow-common <<: *airflow-common
command: version entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment: environment:
<<: *airflow-common-env <<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- .:/sources
flower: airflow-cli:
<<: *airflow-common <<: *airflow-common
command: celery flower profiles:
ports: - debug
- 5555:5555 environment:
healthcheck: <<: *airflow-common-env
test: ["CMD", "curl", "--fail", "http://localhost:5555/"] CONNECTION_CHECK_MAX_COUNT: "0"
interval: 10s # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
timeout: 10s command:
retries: 5 - bash
restart: always - -c
- airflow
airflow-setup:
<<: *airflow-common
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
entrypoint: /bin/bash
command:
- -c
- |
exec /entrypoint airflow variables import /opt/airflow/variables.json
echo "Variables added"
volumes:
- ./dockers/variables.json:/opt/airflow/variables.json
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes: volumes:
postgres-db-volume: postgres-db-volume:
{
"working_dir": "/work/"
}
\ No newline at end of file
...@@ -40,14 +40,14 @@ includes one 100MB file. The target parameter is optional with default value +/t ...@@ -40,14 +40,14 @@ includes one 100MB file. The target parameter is optional with default value +/t
---- ----
curl -X POST -u USER:PASS -H "Content-Type: application/json" \ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
--data '{"conf": {"oid": ID}, "target": PATH}' \ --data '{"conf": {"oid": ID, "target": PATH}}' \
$DLS/dags/taskflow_example/dagRuns $DLS/dags/taskflow_example/dagRuns
---- ----
If you want to use your own connection ('myown_con'): If you want to use your own connection ('myown_con'):
---- ----
curl -X POST -u USER:PASS -H "Content-Type: application/json" \ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
--data '{"conf": {"oid": ID}, "target": PATH, "connection": "myown_con"}' \ --data '{"conf": {"oid": ID, "target": PATH, "connection": "myown_con"}}' \
$DLS/dags/taskflow_example/dagRuns $DLS/dags/taskflow_example/dagRuns
---- ----
... ...
......
...@@ -12,3 +12,11 @@ class AirflowEFlowsPlugin(AirflowPlugin): ...@@ -12,3 +12,11 @@ class AirflowEFlowsPlugin(AirflowPlugin):
hooks = [] hooks = []
admin_views = [] admin_views = []
appbuilder_menu_items = [appbuilder_eFlows] appbuilder_menu_items = [appbuilder_eFlows]
class AirflowDataCatPlugin(AirflowPlugin):
name = "Data Catalogue"
operators = []
flask_blueprints = []
hooks = []
admin_views = []
appbuilder_menu_items = [{"name": "Data Catalogue", "href": "https://datacatalog.fz-juelich.de/index.html"}]
...@@ -49,16 +49,15 @@ users: ...@@ -49,16 +49,15 @@ users:
runcmd: runcmd:
- 'git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git /home/maria/data-logistics-service' - 'git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git /home/maria/data-logistics-service'
- cd /home/maria - cd /home/maria
- mkdir airflow-testing - mkdir airflow
- cd airflow-testing - cd airflow
- mkdir -p ./dags ./logs ./plugins ./config ./templates - mkdir -p ./dags ./logs ./plugins ./config ./templates
- cp ../data-logistics-service/dags/* ./dags - cp ../data-logistics-service/dags/* ./dags
- cp -r ../data-logistics-service/plugins/* ./plugins - cp -r ../data-logistics-service/plugins/* ./plugins
- cp ../data-logistics-service/config/* ./config - cp ../data-logistics-service/config/* ./config
- cp ../data-logistics-service/templates/* ./templates - cp ../data-logistics-service/templates/* ./templates
# - echo "AIRFLOW_UID=0\nAIRFLOW_GID=0" > ../data-logistics-service/dockers/.env #for root - echo -e "AIRFLOW_UID=$(id -u)" > ../data-logistics-service/dockers/.env #for root
- export AIRFLOW_UID=$(id -u) - export AIRFLOW_UID=$(id -u)
- export AIRFLOW_GID=0 - docker-compose -f ~/data-logistics-service/dockers/docker-compose.yaml --project-directory ~/airflow --verbose up airflow-init
- docker-compose -f ../data-logistics-service/dockers/docker-compose.yaml --project-directory . --verbose up airflow-init - docker-compose -f ~/data-logistics-service/dockers/docker-compose.yaml --project-directory ~/airflow . up
- docker-compose -f ../data-logistics-service/dockers/docker-compose.yaml --project-directory . up
# - /bin/bash ../data-logistics-service/scripts/deployment.sh . # - /bin/bash ../data-logistics-service/scripts/deployment.sh .
{% block footer %}
<footer class="footer d-inlign-flex" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important;">
<div class="container p-0">
<div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat; background-size: cover"></div>
<div class="row mt-2 px-3">
<div class="col-lg-6 col-12 d-inlign-flex">
<p class="m-3 text-center align-self-center">
<a href="https://www.eFlows4HPC.eu">
<img src="https://eflows4hpc.eu/wp-content/uploads/2021/02/logo-blanc_1-1.svg" alt="eFlows4HPC Logo" title="eFlows4HPC" style="height: auto; max-height: 70px;" class="m-4 align-self-center"/>
</a>
<a href="https://twitter.com/eFlows4HPC"><i class="fa fa-twitter-square m-4 fa-2x" style="color: white"></i></a>
<a href="https://www.linkedin.com/company/eflows4hpc/"><i class="fa fa-linkedin-square mr-4 fa-2x" style="color: white"></i></a>
<a href="https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/"><i class="fa fa-github-square mr-4 fa-2x" style="color: white"></i></a>
</p>
</div>
<div class="col-lg-6 col-12 d-inlign-flex">
<p class="m-2 align-self-center" style="color: white">
<span class="mr-3 mt-1 float-left">
<img loading="lazy" src="https://eflows4hpc.eu/wp-content/uploads/2021/01/bandera-8.png" alt="" style="max-width:52px; max-height:34px">
</span>
<small style="display: flow-root">
This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558.
<br>
The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway.
<strong>
<a style="color: #f39200" href="https://www.fz-juelich.de/portal/EN/Service/LegalNotice/_node.html">Impressum</a>
</strong>
</small>
<div class="row mt-4 pl-5">
<p style="border-top: 1px solid darkgray;"><small>This service is based on Apache Airflow {{ version_label }}: {% if airflow_version %}<a href="https://pypi.python.org/pypi/apache-airflow/{{ airflow_version }}" target="_blank">v{{ airflow_version }}</a>{% else %} N/A{% endif %}</small></p>
</div>
</p>
</div>
</div>
</div>
</footer>
{% endblock %}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#} #}
{% extends 'appbuilder/baselayout.html' %} {% extends 'appbuilder/baselayout.html' %}
{% from 'airflow/_messages.html' import message %}
{% block page_title -%} {% block page_title -%}
{% if title is defined -%} {% if title is defined -%}
...@@ -51,7 +52,7 @@ ...@@ -51,7 +52,7 @@
{% block messages %} {% block messages %}
{% include 'appbuilder/flash.html' %} {% include 'appbuilder/flash.html' %}
{% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %} {% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %}
<div class="alert alert-warning"> {% call message(category='warning', dismissable=false) %}
<p>The scheduler does not appear to be running. <p>The scheduler does not appear to be running.
{% if scheduler_job %} {% if scheduler_job %}
Last heartbeat was received Last heartbeat was received
...@@ -63,14 +64,68 @@ ...@@ -63,14 +64,68 @@
{% endif %} {% endif %}
</p> </p>
<p>The DAGs list may not update, and new tasks will not be scheduled.</p> <p>The DAGs list may not update, and new tasks will not be scheduled.</p>
</div> {% endcall %}
{% endif %}
{% if triggerer_job is defined and (not triggerer_job or not triggerer_job.is_alive()) %}
{% call message(category='warning', dismissable=false) %}
<p>The triggerer does not appear to be running.
{% if triggerer_job %}
Last heartbeat was received
<time class="scheduler-last-heartbeat"
title="{{ triggerer_job.latest_heartbeat.isoformat() }}"
datetime="{{ triggerer_job.latest_heartbeat.isoformat() }}"
data-datetime-convert="false"
>{{ macros.datetime_diff_for_humans(triggerer_job.latest_heartbeat) }}</time>.
{% endif %}
</p>
<p>Triggers will not run, and any deferred operator will remain deferred until it times out and fails.</p>
{% endcall %}
{% endif %} {% endif %}
{% endblock %} {% endblock %}
{% block footer %} {% block footer %}
{% if not current_user.is_anonymous %} {% if not current_user.is_anonymous %}
{% set version_label = 'Version' %} {% set version_label = 'Version' %}
{% include 'appbuilder/footer.html' %} <!-- Use a wrapper div to detach the footer from the content. -->
<div style="padding-bottom: 3rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important;
flex-direction: column !important;">
.
</div>
<footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: 2rem;">
<div class="container p-0" style="padding: 0px !important">
<div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat !important; background-size: cover !important; padding: 0px; !important"></div>
<div class="row mt-2 px-3" style="margin-top: 0.5rem; padding-right: 1rem;">
<div class="col-lg-6 col-12 d-inlign-flex">
<p class="m-3 text-center align-self-center" style="-ms-flex-item-align: center !important; align-self: center !important; margin: 1rem !important">
<a href="https://www.eFlows4HPC.eu">
<img src="https://eflows4hpc.eu/wp-content/uploads/2021/02/logo-blanc_1-1.svg" alt="eFlows4HPC Logo" title="eFlows4HPC" style="height: auto; max-height: 70px;" class="m-4 align-self-center"/>
</a>
<a href="https://twitter.com/eFlows4HPC"><i class="fa fa-twitter-square m-4 fa-2x" style="color: white"></i></a>
<a href="https://www.linkedin.com/company/eflows4hpc/"><i class="fa fa-linkedin-square mr-4 fa-2x" style="color: white"></i></a>
<a href="https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/"><i class="fa fa-github-square mr-4 fa-2x" style="color: white"></i></a>
</p>
</div>
<div class="col-lg-6 col-12 d-inlign-flex">
<p class="m-2 align-self-center" style="color: white; -ms-flex-item-align: center !important; align-self: center !important; margin: 0.5rem;">
<span class="mr-3 mt-1 float-left" style="float: left !important; margin-right: 1rem; margin-top: 0.25rem ">
<img loading="lazy" src="https://eflows4hpc.eu/wp-content/uploads/2021/01/bandera-8.png" alt="" style="max-width:52px; max-height:34px;">
</span>
<small style="display: flow-root">
This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558.
<br>
The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway.
<strong>
<a style="color: #f39200" href="https://www.fz-juelich.de/portal/EN/Service/LegalNotice/_node.html">Impressum</a>
</strong>
</small>
<div class="row mt-4 pl-5" style="margin-top: 1.5rem; padding-left: 3rem !important; ">
<p style="border-top: 1px solid darkgray;"><small>This service is based on Apache Airflow {{ version_label }}: {% if airflow_version %}<a href="https://pypi.python.org/pypi/apache-airflow/{{ airflow_version }}" target="_blank">v{{ airflow_version }}</a>{% else %} N/A{% endif %}</small></p>
</div>
</p>
</div>
</div>
</div>
</footer>
{% endif %} {% endif %}
{% endblock %} {% endblock %}
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment