diff --git a/config/airflow.cfg b/config/airflow.cfg index e5dba33dc3655a56c33841e4935812ece961c238..299e2d79d2d9bee86bd1e3272268bcb13236e894 100644 --- a/config/airflow.cfg +++ b/config/airflow.cfg @@ -6,16 +6,16 @@ dags_folder = /opt/airflow/dags # Hostname by providing a path to a callable, which will resolve the hostname. # The format is "package.function". # -# For example, default value "socket.getfqdn" means that result from getfqdn() of "socket" -# package will be used as hostname. +# For example, default value "airflow.utils.net.getfqdn" means that result from patched +# version of socket.getfqdn() - see https://github.com/python/cpython/issues/49254. # # No argument should be required in the function specified. # If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address`` -hostname_callable = socket.getfqdn +hostname_callable = airflow.utils.net.getfqdn # Default timezone in case supplied date times are naive # can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) -default_timezone = utc +default_timezone = Europe/Amsterdam # The executor class that airflow should use. Choices include # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, @@ -23,65 +23,10 @@ default_timezone = utc # full import path to the class when using a custom executor. executor = CeleryExecutor -# The SqlAlchemy connection string to the metadata database. -# SqlAlchemy supports many different database engines. -# More information here: -# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri -sql_alchemy_conn = sqlite:////opt/airflow/airflow.db - -# The encoding for the databases -sql_engine_encoding = utf-8 - -# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. -# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` -# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed -# the maximum size of allowed index when collation is set to ``utf8mb4`` variant -# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). -# sql_engine_collation_for_ids = - -# If SqlAlchemy should pool database connections. -sql_alchemy_pool_enabled = True - -# The SqlAlchemy pool size is the maximum number of database connections -# in the pool. 0 indicates no limit. -sql_alchemy_pool_size = 5 - -# The maximum overflow size of the pool. -# When the number of checked-out connections reaches the size set in pool_size, -# additional connections will be returned up to this limit. -# When those additional connections are returned to the pool, they are disconnected and discarded. -# It follows then that the total number of simultaneous connections the pool will allow -# is pool_size + max_overflow, -# and the total number of "sleeping" connections the pool will allow is pool_size. -# max_overflow can be set to ``-1`` to indicate no overflow limit; -# no limit will be placed on the total number of concurrent connections. Defaults to ``10``. -sql_alchemy_max_overflow = 10 - -# The SqlAlchemy pool recycle is the number of seconds a connection -# can be idle in the pool before it is invalidated. This config does -# not apply to sqlite. If the number of DB connections is ever exceeded, -# a lower config value will allow the system to recover faster. -sql_alchemy_pool_recycle = 1800 - -# Check connection at the start of each connection pool checkout. -# Typically, this is a simple statement like "SELECT 1". -# More information here: -# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic -sql_alchemy_pool_pre_ping = True - -# The schema to use for the metadata database. -# SqlAlchemy supports databases with the concept of multiple schemas. -sql_alchemy_schema = - -# Import path for connect args in SqlAlchemy. Defaults to an empty dict. -# This is useful when you want to configure db engine args that SqlAlchemy won't parse -# in connection string. -# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args -# sql_alchemy_connect_args = - -# This defines the maximum number of task instances that can run concurrently in Airflow -# regardless of scheduler count and worker count. Generally, this value is reflective of -# the number of task instances with the running state in the metadata database. +# This defines the maximum number of task instances that can run concurrently per scheduler in +# Airflow, regardless of the worker count. Generally this value, multiplied by the number of +# schedulers in your cluster, is the maximum number of task instances with the running +# state in the metadata database. parallelism = 32 # The maximum number of task instances allowed to run concurrently in each DAG. To calculate @@ -101,16 +46,19 @@ dags_are_paused_at_creation = True # which is defaulted as ``max_active_runs_per_dag``. max_active_runs_per_dag = 16 +# The name of the method used in order to start Python processes via the multiprocessing module. +# This corresponds directly with the options available in the Python docs: +# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_start_method. +# Must be one of the values returned by: +# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.get_all_start_methods. +# Example: mp_start_method = fork +# mp_start_method = + # Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment load_examples = False -# Whether to load the default connections that ship with Airflow. It's good to -# get started, but you probably want to set this to ``False`` in a production -# environment -load_default_connections = True - # Path to the folder containing Airflow plugins plugins_folder = /opt/airflow/plugins @@ -158,6 +106,11 @@ unit_test_mode = False # RCE exploits). enable_xcom_pickling = False +# What classes can be imported during deserialization. This is a multi line value. +# The individual items will be parsed as regexp. Python built-in classes (like dict) +# are always allowed +allowed_deserialization_classes = airflow\..* + # When a task is killed forcefully, this is the amount of time in seconds that # it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED killed_task_cleanup_time = 60 @@ -170,15 +123,32 @@ dag_run_conf_overrides_params = True # When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``. dag_discovery_safe_mode = True +# The pattern syntax used in the ".airflowignore" files in the DAG directories. Valid values are +# ``regexp`` or ``glob``. +dag_ignore_file_syntax = regexp + # The number of retries each task is going to have by default. Can be overridden at dag or task level. default_task_retries = 0 +# The number of seconds each task is going to wait by default between retries. Can be overridden at +# dag or task level. +default_task_retry_delay = 300 + # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream +# The default task execution_timeout value for the operators. Expected an integer value to +# be passed into timedelta as seconds. If not specified, then the value is considered as None, +# meaning that the operators are never timed out by default. +default_task_execution_timeout = + # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 +# If True, serialized DAGs are compressed before writing to DB. +# Note: this will disable the DAG dependencies view +compress_serialized_dags = False + # Fetching serialized DAG can not be faster than a minimum interval to reduce database # read rate. This config controls when your DAGs are updated in the Webserver min_serialized_dag_fetch_interval = 10 @@ -206,11 +176,6 @@ lazy_load_plugins = True # loaded from module. lazy_discover_providers = True -# Number of times the code should be retried in case of DB Operational Errors. -# Not all transactions will be retried as it can cause undesired state. -# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. -max_db_retries = 3 - # Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True # # (Connection passwords are always hidden in logs) @@ -225,9 +190,105 @@ sensitive_var_conn_names = # change the number of slots using Webserver, API or the CLI default_pool_task_slot_count = 128 +# The maximum list/dict length an XCom can push to trigger task mapping. If the pushed list/dict has a +# length exceeding this value, the task pushing the XCom will be failed automatically to prevent the +# mapped tasks from clogging the scheduler. +max_map_length = 1024 + +# The default umask to use for process when run in daemon mode (scheduler, worker, etc.) +# +# This controls the file-creation mode mask which determines the initial value of file permission bits +# for newly created files. +# +# This value is treated as an octal-integer. +daemon_umask = 0o077 + +# Class to use as dataset manager. +# Example: dataset_manager_class = airflow.datasets.manager.DatasetManager +# dataset_manager_class = + +# Kwargs to supply to dataset manager. +# Example: dataset_manager_kwargs = {"some_param": "some_value"} +# dataset_manager_kwargs = + +[database] +# The SqlAlchemy connection string to the metadata database. +# SqlAlchemy supports many different database engines. +# More information here: +# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db + +# Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value +# Example: sql_alchemy_engine_args = {"arg1": True} +# sql_alchemy_engine_args = + +# The encoding for the databases +sql_engine_encoding = utf-8 + +# Collation for ``dag_id``, ``task_id``, ``key``, ``external_executor_id`` columns +# in case they have different encoding. +# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` +# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed +# the maximum size of allowed index when collation is set to ``utf8mb4`` variant +# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). +# sql_engine_collation_for_ids = + +# If SqlAlchemy should pool database connections. +sql_alchemy_pool_enabled = True + +# The SqlAlchemy pool size is the maximum number of database connections +# in the pool. 0 indicates no limit. +sql_alchemy_pool_size = 5 + +# The maximum overflow size of the pool. +# When the number of checked-out connections reaches the size set in pool_size, +# additional connections will be returned up to this limit. +# When those additional connections are returned to the pool, they are disconnected and discarded. +# It follows then that the total number of simultaneous connections the pool will allow +# is pool_size + max_overflow, +# and the total number of "sleeping" connections the pool will allow is pool_size. +# max_overflow can be set to ``-1`` to indicate no overflow limit; +# no limit will be placed on the total number of concurrent connections. Defaults to ``10``. +sql_alchemy_max_overflow = 10 + +# The SqlAlchemy pool recycle is the number of seconds a connection +# can be idle in the pool before it is invalidated. This config does +# not apply to sqlite. If the number of DB connections is ever exceeded, +# a lower config value will allow the system to recover faster. +sql_alchemy_pool_recycle = 1800 + +# Check connection at the start of each connection pool checkout. +# Typically, this is a simple statement like "SELECT 1". +# More information here: +# https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic +sql_alchemy_pool_pre_ping = True + +# The schema to use for the metadata database. +# SqlAlchemy supports databases with the concept of multiple schemas. +sql_alchemy_schema = + +# Import path for connect args in SqlAlchemy. Defaults to an empty dict. +# This is useful when you want to configure db engine args that SqlAlchemy won't parse +# in connection string. +# See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.connect_args +# sql_alchemy_connect_args = + +# Whether to load the default connections that ship with Airflow. It's good to +# get started, but you probably want to set this to ``False`` in a production +# environment +load_default_connections = True + +# Number of times the code should be retried in case of DB Operational Errors. +# Not all transactions will be retried as it can cause undesired state. +# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. +max_db_retries = 3 + [logging] -# The folder where airflow should store its log files -# This path must be absolute +# The folder where airflow should store its log files. +# This path must be absolute. +# There are a few existing configurations that assume this is set to the default. +# If you choose to override this you may need to update the dag_processor_manager_log_location and +# dag_processor_manager_log_location settings as well. base_log_folder = /opt/airflow/logs # Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. @@ -235,7 +296,8 @@ base_log_folder = /opt/airflow/logs remote_logging = False # Users must supply an Airflow connection id that provides access to the storage -# location. +# location. Depending on your remote logging service, this may only be used for +# reading logs, not writing them. remote_log_conn_id = # Path to Google Credential JSON file. If omitted, authorization based on `the Application Default @@ -260,6 +322,11 @@ encrypt_s3_logs = False # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. logging_level = INFO +# Logging level for celery. If not set, it uses the value of logging_level +# +# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. +celery_logging_level = + # Logging level for Flask-appbuilder UI. # # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. @@ -283,17 +350,24 @@ colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatte log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s +# Where to send dag parser logs. If "file", logs are sent to log files defined by child_process_log_directory. +dag_processor_log_target = file + +# Format of Dag Processor Log line +dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s +log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware + # Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter # Example: task_log_prefix_template = {ti.dag_id}-{ti.task_id}-{execution_date}-{try_number} task_log_prefix_template = # Formatting for how airflow generates file names/paths for each task run. -log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log +log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log # Formatting for how airflow generates file names for log log_processor_filename_template = {{ filename }}.log -# full path of dag_processor_manager logfile +# Full path of dag_processor_manager logfile. dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log # Name of handler to read task instance logs. @@ -326,7 +400,7 @@ statsd_prefix = airflow # start with the elements of the list (e.g: "scheduler,executor,dagrun") statsd_allow_list = -# A function that validate the statsd stat name, apply changes to the stat name if necessary and return +# A function that validate the StatsD stat name, apply changes to the stat name if necessary and return # the transformed stat name. # # The function should have the following signature: @@ -339,7 +413,7 @@ statsd_datadog_enabled = False # List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2) statsd_datadog_tags = -# If you want to utilise your own custom Statsd client set the relevant +# If you want to utilise your own custom StatsD client set the relevant # module path below. # Note: The module path must exist on your PYTHONPATH for Airflow to pick it up # statsd_custom_client_path = @@ -353,7 +427,7 @@ backend = # See documentation for the secrets backend you are using. JSON is expected. # Example for AWS Systems Manager ParameterStore: # ``{"connections_prefix": "/airflow/connections", "profile_name": "default"}`` -backend_kwargs = +backend_kwargs = [cli] # In what way should the cli access the API. The LocalClient will use the @@ -381,13 +455,13 @@ fail_fast = False # deprecated since version 2.0. Please consider using # `the Stable REST API <https://airflow.readthedocs.io/en/latest/stable-rest-api-ref.html>`__. # For more information on migration, see -# `UPDATING.md <https://github.com/apache/airflow/blob/main/UPDATING.md>`_ +# `RELEASE_NOTES.rst <https://github.com/apache/airflow/blob/main/RELEASE_NOTES.rst>`_ enable_experimental_api = False -# How to authenticate users of the API. See -# https://airflow.apache.org/docs/apache-airflow/stable/security.html for possible values. +# Comma separated list of auth backends to authenticate users of the API. See +# https://airflow.apache.org/docs/apache-airflow/stable/security/api.html for possible values. # ("airflow.api.auth.backend.default" allows all requests for historic reasons) -auth_backend = airflow.api.auth.backend.deny_all +auth_backends = airflow.api.auth.backend.session # Used to set the maximum page limit for API requests maximum_page_limit = 100 @@ -483,6 +557,10 @@ web_server_ssl_cert = # provided SSL will be enabled. This does not change the web server port. web_server_ssl_key = +# The type of backend used to store web session data, can be 'database' or 'securecookie' +# Example: session_backend = securecookie +session_backend = database + # Number of seconds the webserver waits before killing gunicorn master that doesn't respond web_server_master_timeout = 120 @@ -504,13 +582,19 @@ reload_on_plugin_change = False # Secret key used to run your flask app. It should be as random as possible. However, when running # more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise # one of them will error with "CSRF session token is missing". -secret_key = Jvww64wGcBs22UNHJjToNw== +# The webserver key is also used to authorize requests to Celery workers when logs are retrieved. +# The token generated using the secret key has a short expiry time though - make sure that time on +# ALL the machines that you run airflow components on is synchronized (for example using ntpd) +# otherwise you might get "forbidden" errors when the logs are accessed. +secret_key = zz1IEpLvDgZ0Zair3EVO7A== # Number of workers to run the Gunicorn web server workers = 4 # The worker class gunicorn should use. Choices include -# sync (default), eventlet, gevent +# sync (default), eventlet, gevent. Note when using gevent you might also want to set the +# "_AIRFLOW_PATCH_GEVENT" environment variable to "1" to make sure gevent patching is done as +# early as possible. worker_class = sync # Log files for the gunicorn webserver. '-' means log to stderr. @@ -524,17 +608,19 @@ error_logfile = - # documentation - https://docs.gunicorn.org/en/stable/settings.html#access-log-format access_logformat = -# Expose the configuration file in the web server +# Expose the configuration file in the web server. Set to "non-sensitive-only" to show all values +# except those that have security implications. "True" shows all values. "False" hides the +# configuration completely. expose_config = False # Expose hostname in the web server expose_hostname = True # Expose stacktrace in the web server -expose_stacktrace = True +expose_stacktrace = False -# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times`` -dag_default_view = tree +# Default DAG view. Valid values are: ``grid``, ``graph``, ``duration``, ``gantt``, ``landing_times`` +dag_default_view = grid # Default DAG orientation. Valid values are: # ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT`` (Bottom->Top) @@ -618,10 +704,27 @@ session_lifetime_minutes = 43200 # Sets a custom page title for the DAGs overview page and site title for all pages instance_name = eFlows4HPC -# How frequently, in seconds, the DAG data will auto-refresh in graph or tree view +# Whether the custom page title for the DAGs overview page contains any Markup language +instance_name_has_markup = False + +# How frequently, in seconds, the DAG data will auto-refresh in graph or grid view # when auto-refresh is turned on auto_refresh_interval = 3 +# Boolean for displaying warning for publicly viewable deployment +warn_deployment_exposure = True + +# Comma separated string of view events to exclude from dag audit view. +# All other events will be added minus the ones passed here. +# The audit logs in the db will not be affected by this parameter. +audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data + +# Comma separated string of view events to include in dag audit view. +# If passed, only these events will populate the dag audit view. +# The audit logs in the db will not be affected by this parameter. +# Example: audit_view_included_events = dagrun_cleared,failed +# audit_view_included_events = + [email] # Configuration email backend and whether to @@ -648,6 +751,11 @@ default_email_on_failure = True # Example: html_content_template = /path/to/my_html_content_template_file # html_content_template = +# Email address that will be used as sender address. +# It can either be raw email or the complete address in a format ``Sender Name <sender@email.com>`` +# Example: from_email = Airflow <airflow@example.com> +# from_email = + [smtp] # If you want airflow to send emails on retries, failure, and you want to use @@ -679,6 +787,16 @@ sentry_dsn = # Dotted path to a before_send function that the sentry SDK should be configured to use. # before_send = +[local_kubernetes_executor] + +# This section only applies if you are using the ``LocalKubernetesExecutor`` in +# ``[core]`` section above +# Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``. +# When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``), +# the task is executed via ``KubernetesExecutor``, +# otherwise via ``LocalExecutor`` +kubernetes_queue = kubernetes + [celery_kubernetes_executor] # This section only applies if you are using the ``CeleryKubernetesExecutor`` in @@ -718,13 +836,12 @@ worker_concurrency = 16 # running tasks while another worker has unutilized processes that are unable to process the already # claimed blocked tasks. # https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits -# Example: worker_prefetch_multiplier = 1 -# worker_prefetch_multiplier = +worker_prefetch_multiplier = 1 -# Umask that will be used when starting workers with the ``airflow celery worker`` -# in daemon mode. This control the file-creation mode mask which determines the initial -# value of file permission bits for newly created files. -worker_umask = 0o077 +# Specify if remote control of the workers is enabled. +# When using Amazon SQS as the broker, Celery creates lots of ``.*reply-celery-pidbox`` queues. You can +# prevent this by setting this to false. However, with this disabled Flower won't work. +worker_enable_remote_control = true # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more information. @@ -735,8 +852,10 @@ broker_url = redis://redis:6379/0 # or insert it into a database (depending of the backend) # This status is used by the scheduler to update the state of the task # The use of a database is highly recommended +# When not specified, sql_alchemy_conn with a db+ scheme prefix will be used # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings -result_backend = db+postgresql://postgres:airflow@postgres/airflow +# Example: result_backend = db+postgresql://postgres:airflow@postgres/airflow +# result_backend = # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start # it ``airflow celery flower``. This defines the IP that Celery Flower runs on @@ -781,10 +900,17 @@ operation_timeout = 1.0 # or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob. task_track_started = True -# Time in seconds after which Adopted tasks are cleared by CeleryExecutor. This is helpful to clear -# stalled tasks. +# Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled, +# and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but +# applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting +# also applies to adopted tasks. task_adoption_timeout = 600 +# Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically +# rescheduled. Adopted tasks will instead use the ``task_adoption_timeout`` setting if specified. +# When set to 0, automatic clearing of stalled tasks is disabled. +stalled_task_timeout = 0 + # The Maximum number of retries for publishing task messages to the broker when failing # due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed. task_publish_max_retries = 3 @@ -844,13 +970,18 @@ scheduler_idle_sleep_time = 1 # this interval. Keeping this number low will increase CPU usage. min_file_process_interval = 30 +# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in +# the expected files) which should be deactivated, as well as datasets that are no longer +# referenced and should be marked as orphaned. +parsing_cleanup_interval = 60 + # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 # How often should stats be printed to the logs. Setting to 0 will disable printing stats print_stats_interval = 30 -# How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled) +# How often (in seconds) should pool usage stats be sent to StatsD (if statsd_on is enabled) pool_metrics_interval = 5.0 # If the last scheduler heartbeat happened more than scheduler_health_check_threshold @@ -858,6 +989,14 @@ pool_metrics_interval = 5.0 # This is used by the health check in the "/health" endpoint scheduler_health_check_threshold = 30 +# When you start a scheduler, airflow starts a tiny web server +# subprocess to serve a health check if this is set to True +enable_health_check = False + +# When you start a scheduler, airflow starts a tiny web server +# subprocess to serve a health check on this port +scheduler_health_check_server_port = 8974 + # How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs orphaned_tasks_check_interval = 300.0 child_process_log_directory = /opt/airflow/logs/scheduler @@ -867,6 +1006,9 @@ child_process_log_directory = /opt/airflow/logs/scheduler # associated task instance as failed and will re-schedule the task. scheduler_zombie_task_threshold = 300 +# How often (in seconds) should the scheduler check for zombie tasks. +zombie_detection_interval = 10.0 + # Turn off scheduler catchup by setting this to ``False``. # Default behavior is unchanged and # Command Line Backfills still work, but the scheduler @@ -875,6 +1017,13 @@ scheduler_zombie_task_threshold = 300 # DAG definition (catchup) catchup_by_default = True +# Setting this to True will make first task instance of a task +# ignore depends_on_past setting. A task instance will be considered +# as the first task instance of a task when there is no task instance +# in the DB with an execution_date earlier than it., i.e. no manual marking +# success will be needed for a newly added task to be scheduled. +ignore_first_depends_on_past_by_default = True + # This changes the batch size of queries in the scheduling main loop. # If this is too high, SQL query performance may be impacted by # complexity of query predicate, and/or excessive locking. @@ -914,6 +1063,18 @@ parsing_processes = 2 # * ``alphabetical``: Sort by filename file_parsing_sort_mode = modified_time +# Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler +# job. +standalone_dag_processor = False + +# Only applicable if `[scheduler]standalone_dag_processor` is true and callbacks are stored +# in database. Contains maximum number of callbacks that are fetched during a single loop. +max_callbacks_per_loop = 20 + +# Only applicable if `[scheduler]standalone_dag_processor` is true. +# Time in seconds after which dags, which were not updated by Dag Processor are deactivated. +dag_stale_not_seen_duration = 600 + # Turn off scheduler use of cron intervals by setting this to False. # DAGs submitted manually in the web UI or with trigger_dag will still run. use_job_schedule = True @@ -922,9 +1083,6 @@ use_job_schedule = True # Only has effect if schedule_interval is set to None in DAG allow_trigger_in_future = False -# DAG dependency detector class to use -dependency_detector = airflow.serialization.serialized_objects.DependencyDetector - # How often to check for expired trigger requests that have not run yet. trigger_timeout_check_interval = 15 @@ -947,15 +1105,12 @@ forwardable = True # Allow to remove source IP from token, useful when using token behind NATted Docker host. include_ip = True -[github_enterprise] -api_rev = v3 - [elasticsearch] # Elasticsearch host host = # Format of the log_id, which is used to query for a given tasks logs -log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} +log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number} # Used to mark the end of a log stream for a task end_of_log_mark = end_of_log @@ -985,7 +1140,7 @@ offset_field = offset use_ssl = False verify_certs = True -[kubernetes] +[kubernetes_executor] # Path to the YAML pod file that forms the basis for KubernetesExecutor workers. pod_template_file = @@ -1078,18 +1233,6 @@ worker_pods_queued_check_interval = 60 # You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``. worker_pods_pending_timeout_batch_size = 100 -[smart_sensor] -# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to -# smart sensor task. -use_smart_sensor = False - -# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated -# by `hashcode % shard_code_upper_limit`. -shard_code_upper_limit = 10000 - -# The number of running smart sensor processes for each service. -shards = 5 - -# comma separated sensor classes support in smart_sensor. -sensors_enabled = NamedHivePartitionSensor - +[sensors] +# Sensor default timeout, 7 days by default (7 * 24 * 60 * 60). +default_timeout = 604800 diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index 0204d0792e70c0c8cb5656e6d3dd619fe3ba51e4..5d4eff8db895c1d544e14247f41e8b40e8244272 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -24,9 +24,11 @@ # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:2.2.1 +# Default: apache/airflow:2.5.1 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode # # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). @@ -44,26 +46,27 @@ x-airflow-common: # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. - image: ${AIRFLOW_IMAGE_NAME:-registry.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/eflows-airflow} + image: ${AIRFLOW_IMAGE_NAME:-registry.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/eflows-airflow} # build: . environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY} AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' - AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SECRETS__BACKEND_KWARGS: ${AIRFLOW__SECRETS__BACKEND_KWARGS} AIRFLOW__SECRETS__BACKEND: ${AIRFLOW__SECRETS__BACKEND} -# _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - - ./dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ./config/airflow.cfg:/opt/airflow/airflow.cfg - - /persistent_data/logs:/opt/airflow/logs - - ./plugins:/opt/airflow/plugins + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on @@ -73,10 +76,11 @@ x-airflow-common: condition: service_healthy services: - reverse-proxy: image: "jwilder/nginx-proxy:alpine" container_name: "reverse-proxy" + profiles: + - full volumes: - "html:/usr/share/nginx/html" - "dhparam:/etc/nginx/dhparam" @@ -91,6 +95,8 @@ services: letsencrypt: image: "jrcs/letsencrypt-nginx-proxy-companion:latest" container_name: "letsencrypt-helper" + profiles: + - full volumes: - "html:/usr/share/nginx/html" - "dhparam:/etc/nginx/dhparam" @@ -114,7 +120,7 @@ services: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] - interval: 60s + interval: 10s retries: 5 restart: always @@ -124,23 +130,21 @@ services: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] - interval: 60s + interval: 10s timeout: 30s retries: 50 restart: always airflow-webserver: <<: *airflow-common - command: webserver - ports: - - 7001:8080 - environment: <<: *airflow-common-env VIRTUAL_HOST: datalogistics.eflows4hpc.eu LETSENCRYPT_HOST: datalogistics.eflows4hpc.eu VIRTUAL_PORT: 8080 - + command: webserver + ports: + - 8080:8080 healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 60s @@ -173,7 +177,7 @@ services: test: - "CMD-SHELL" - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' - interval: 30s + interval: 60s timeout: 10s retries: 5 environment: @@ -183,10 +187,11 @@ services: DUMB_INIT_SETSID: "0" restart: always volumes: - - ./dags:/opt/airflow/dags - - ./config/airflow.cfg:/opt/airflow/airflow.cfg - - /persistent_data/logs:/opt/airflow/logs - - ./tmp/:/work/ + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ./config/airflow.cfg:/opt/airflow/airflow.cfg + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + - ./tmp/:/work/ depends_on: <<: *airflow-common-depends-on airflow-init: @@ -197,14 +202,9 @@ services: command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] - interval: 60s + interval: 10s timeout: 10s retries: 5 - environment: - <<: *airflow-common-env - # Required to handle warm shutdown of the celery workers properly - # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation - DUMB_INIT_SETSID: "0" restart: always depends_on: <<: *airflow-common-depends-on @@ -225,12 +225,13 @@ services: environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' - #_AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" volumes: - - .:/sources + - ${AIRFLOW_PROJ_DIR:-.}:/sources airflow-cli: <<: *airflow-common @@ -245,24 +246,6 @@ services: - -c - airflow - airflow-setup: - <<: *airflow-common - environment: - <<: *airflow-common-env - CONNECTION_CHECK_MAX_COUNT: "0" - entrypoint: /bin/bash - command: - - -c - - | - exec /entrypoint airflow variables import /opt/airflow/variables.json - echo "Variables added" - volumes: - - ./dockers/variables.json:/opt/airflow/variables.json - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - volumes: postgres-db-volume: external: true @@ -273,4 +256,3 @@ volumes: html: vhost: dhparam: - diff --git a/dockers/eflows-airflow.docker b/dockers/eflows-airflow.docker index df0e2737d5b27c6c056c49c5c2e08dec3fac995a..3864f776c56638c2095467d16e0f97f53541c046 100644 --- a/dockers/eflows-airflow.docker +++ b/dockers/eflows-airflow.docker @@ -1,11 +1,11 @@ -FROM apache/airflow:2.2.5 - +FROM apache/airflow:slim-2.5.1-python3.8 USER root RUN apt update && apt install git -y && apt clean && rm -rf /var/lib/apt/lists/* -COPY ./templates/main.html /home/airflow/.local/lib/python3.7/site-packages/airflow/www/templates/airflow/main.html -COPY ./templates/img/BMBF_gefoerdert_2017_en.jpg /home/airflow/.local/lib/python3.7/site-packages/airflow/www/static/BMBF_gefoerdert_2017_en.jpg -USER airflow +COPY ./templates/main.html /home/airflow/.local/lib/python3.8/site-packages/airflow/www/templates/airflow/main.html +COPY ./templates/img/BMBF_gefoerdert_2017_en.jpg /home/airflow/.local/lib/python3.8/site-packages/airflow/www/static/BMBF_gefoerdert_2017_en.jpg +USER airflow +ENV SQLALCHEMY_SILENCE_UBER_WARNING=1 RUN pip --version && python -m pip install --upgrade pip ADD requirements.txt /requirements.txt -RUN pip install -r /requirements.txt \ No newline at end of file +RUN pip install --no-cache-dir -r /requirements.txt