From 19312a2e074598d45f24ecbcab115f7b59569dac Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Mon, 8 Nov 2021 11:58:45 +0100 Subject: [PATCH] seems to work --- config/airflow.cfg | 119 ++++++++++++++++------------ dockers/docker-compose.yaml | 149 ++++++++++++++++++++++++++++++++---- 2 files changed, 204 insertions(+), 64 deletions(-) diff --git a/config/airflow.cfg b/config/airflow.cfg index daf7de7..a2b2eaf 100644 --- a/config/airflow.cfg +++ b/config/airflow.cfg @@ -21,21 +21,22 @@ default_timezone = utc # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, # ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the # full import path to the class when using a custom executor. -executor = SequentialExecutor +executor = CeleryExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engines. # More information here: # http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri -# sql_alchemy_conn = sqlite:////opt/airflow/airflow.db +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db # The encoding for the databases sql_engine_encoding = utf-8 # Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. -# This is particularly useful in case of mysql with utf8mb4 encoding because -# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should -# be set to ``utf8mb3_general_ci``. +# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` +# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed +# the maximum size of allowed index when collation is set to ``utf8mb4`` variant +# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). # sql_engine_collation_for_ids = # If SqlAlchemy should pool database connections. @@ -85,9 +86,12 @@ parallelism = 32 # The maximum number of task instances allowed to run concurrently in each DAG. To calculate # the number of tasks that is running concurrently for a DAG, add up the number of running -# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``, -# which is defaulted as ``dag_concurrency``. -dag_concurrency = 16 +# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +# which is defaulted as ``max_active_tasks_per_dag``. +# +# An example scenario when this would be useful is when you want to stop a new dag with an early +# start date from stealing all the executor slots in a cluster. +max_active_tasks_per_dag = 16 # Are DAGs paused by default at creation dags_are_paused_at_creation = True @@ -100,7 +104,7 @@ max_active_runs_per_dag = 16 # Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment -load_examples = True +load_examples = False # Whether to load the default connections that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production @@ -169,6 +173,9 @@ dag_discovery_safe_mode = True # The number of retries each task is going to have by default. Can be overridden at dag or task level. default_task_retries = 0 +# The weighting method used for the effective total priority weight of the task +default_task_weight_rule = downstream + # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 @@ -176,13 +183,6 @@ min_serialized_dag_update_interval = 30 # read rate. This config controls when your DAGs are updated in the Webserver min_serialized_dag_fetch_interval = 10 -# Whether to persist DAG files code in DB. -# If set to True, Webserver reads file contents from DB instead of -# trying to access files in a DAG folder. -# (Default is ``True``) -# Example: store_dag_code = True -# store_dag_code = - # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # in the Database. # All the template_fields for each of Task Instance are stored in the Database. @@ -220,6 +220,11 @@ hide_sensitive_var_conn_fields = True # extra JSON. sensitive_var_conn_names = +# Task Slot counts for ``default_pool``. This setting would not have any effect in an existing +# deployment where the ``default_pool`` is already created. For existing deployments, users can +# change the number of slots using Webserver, API or the CLI +default_pool_task_slot_count = 128 + [logging] # The folder where airflow should store its log files # This path must be absolute @@ -258,7 +263,7 @@ logging_level = INFO # Logging level for Flask-appbuilder UI. # # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. -fab_logging_level = WARN +fab_logging_level = WARNING # Logging class # Specify the class that will specify the logging configuration @@ -297,8 +302,15 @@ task_log_reader = task # A comma\-separated list of third-party logger names that will be configured to print messages to # consoles\. -# Example: extra_loggers = connexion,sqlalchemy -extra_loggers = +# Example: extra_logger_names = connexion,sqlalchemy +extra_logger_names = + +# When you start an airflow worker, airflow starts a tiny web server +# subprocess to serve the workers local log files to the airflow main +# web server, who then builds pages and sends them to users. This defines +# the port on which the logs are served. It needs to be unused, and open +# visible from the main web server to connect into the workers. +worker_log_server_port = 8793 [metrics] @@ -406,8 +418,9 @@ access_control_allow_headers = # Specifies the method or methods allowed when accessing the resource. access_control_allow_methods = -# Indicates whether the response can be shared with requesting code from the given origin. -access_control_allow_origin = +# Indicates whether the response can be shared with requesting code from the given origins. +# Separate URLs with space. +access_control_allow_origins = [lineage] # what lineage backend to use @@ -491,7 +504,7 @@ reload_on_plugin_change = False # Secret key used to run your flask app. It should be as random as possible. However, when running # more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise # one of them will error with "CSRF session token is missing". -secret_key = 8kUFwlRKUhs6i8NBAvUmWg== +secret_key = Jvww64wGcBs22UNHJjToNw== # Number of workers to run the Gunicorn web server workers = 4 @@ -529,7 +542,7 @@ dag_orientation = LR # The amount of time (in secs) webserver will wait for initial handshake # while fetching logs from other worker machine -log_fetch_timeout_sec = 5 +log_fetch_timeout_sec = 15 # Time interval (in secs) to wait before next log fetching. log_fetch_delay_sec = 2 @@ -603,7 +616,11 @@ update_fab_perms = True session_lifetime_minutes = 43200 # Sets a custom page title for the DAGs overview page and site title for all pages -instance_name =eFlows4HPC +# instance_name = + +# How frequently, in seconds, the DAG data will auto-refresh in graph or tree view +# when auto-refresh is turned on +auto_refresh_interval = 3 [email] @@ -654,11 +671,14 @@ smtp_retry_limit = 5 # additional configuration options based on the Python platform. See: # https://docs.sentry.io/error-reporting/configuration/?platform=python. # Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, -# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``. +# ``ignore_errors``, ``before_breadcrumb``, ``transport``. # Enable error reporting to Sentry sentry_on = false sentry_dsn = +# Dotted path to a before_send function that the sentry SDK should be configured to use. +# before_send = + [celery_kubernetes_executor] # This section only applies if you are using the ``CeleryKubernetesExecutor`` in @@ -701,13 +721,6 @@ worker_concurrency = 16 # Example: worker_prefetch_multiplier = 1 # worker_prefetch_multiplier = -# When you start an airflow worker, airflow starts a tiny web server -# subprocess to serve the workers local log files to the airflow main -# web server, who then builds pages and sends them to users. This defines -# the port on which the logs are served. It needs to be unused, and open -# visible from the main web server to connect into the workers. -worker_log_server_port = 8793 - # Umask that will be used when starting workers with the ``airflow celery worker`` # in daemon mode. This control the file-creation mode mask which determines the initial # value of file permission bits for newly created files. @@ -812,10 +825,6 @@ tls_key = # listen (in seconds). job_heartbeat_sec = 5 -# How often (in seconds) to check and tidy up 'running' TaskInstancess -# that no longer have a matching DagRun -clean_tis_without_dagrun_interval = 15.0 - # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). @@ -825,8 +834,10 @@ scheduler_heartbeat_sec = 5 # -1 indicates unlimited number num_runs = -1 -# The number of seconds to wait between consecutive DAG file processing -processor_poll_interval = 1 +# Controls how long the scheduler will sleep between loops, but if there was nothing to do +# in the loop. i.e. if it scheduled something then it will start the next loop +# iteration straight away. +scheduler_idle_sleep_time = 1 # Number of seconds after which a DAG file is parsed. The DAG file is parsed every # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after @@ -865,11 +876,8 @@ scheduler_zombie_task_threshold = 300 catchup_by_default = True # This changes the batch size of queries in the scheduling main loop. -# If this is too high, SQL query performance may be impacted by one -# or more of the following: -# - reversion to full table scan -# - complexity of query predicate -# - excessive locking +# If this is too high, SQL query performance may be impacted by +# complexity of query predicate, and/or excessive locking. # Additionally, you may hit the maximum allowable query length for your db. # Set this to 0 for no limit (not advised) max_tis_per_query = 512 @@ -917,6 +925,13 @@ allow_trigger_in_future = False # DAG dependency detector class to use dependency_detector = airflow.serialization.serialized_objects.DependencyDetector +# How often to check for expired trigger requests that have not run yet. +trigger_timeout_check_interval = 15 + +[triggerer] +# How many triggers a single Triggerer will run at once, by default. +default_capacity = 1000 + [kerberos] ccache = /tmp/airflow_krb5_ccache @@ -926,6 +941,12 @@ reinit_frequency = 3600 kinit_path = kinit keytab = airflow.keytab +# Allow to disable ticket forwardability. +forwardable = True + +# Allow to remove source IP from token, useful when using token behind NATted Docker host. +include_ip = True + [github_enterprise] api_rev = v3 @@ -941,7 +962,8 @@ end_of_log_mark = end_of_log # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Code will construct log_id using the log_id template from the argument above. -# NOTE: The code will prefix the https:// automatically, don't include that here. +# NOTE: scheme will default to https if one is not provided +# Example: frontend = http://localhost:5601/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id: "{log_id}"'),sort:!(log.offset,asc)) frontend = # Write the task logs to the stdout of the worker, rather than the default files @@ -964,7 +986,7 @@ use_ssl = False verify_certs = True [kubernetes] -# Path to the YAML pod file. If set, all other kubernetes-related fields are ignored. +# Path to the YAML pod file that forms the basis for KubernetesExecutor workers. pod_template_file = # The repository of the Kubernetes Image for the Worker to Run @@ -1049,6 +1071,9 @@ worker_pods_pending_timeout = 300 # How often in seconds to check if Pending workers have exceeded their timeouts worker_pods_pending_timeout_check_interval = 120 +# How often in seconds to check for task instances stuck in "queued" status without a pod +worker_pods_queued_check_interval = 60 + # How many pending pods to check for timeout violations in each check interval. # You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``. worker_pods_pending_timeout_batch_size = 100 @@ -1066,6 +1091,4 @@ shard_code_upper_limit = 10000 shards = 5 # comma separated sensor classes support in smart_sensor. -sensors_enabled = NamedHivePartitionSensor - -rbac = True \ No newline at end of file +sensors_enabled = NamedHivePartitionSensor \ No newline at end of file diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index 28261cd..b3c5e7b 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -24,12 +24,9 @@ # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:master-python3.8 +# Default: apache/airflow:2.2.1 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 -# AIRFLOW_GID - Group ID in Airflow containers -# Default: 50000 -# # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode # # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). @@ -44,11 +41,13 @@ version: '3' x-airflow-common: &airflow-common - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.1} + # build: . environment: &airflow-common-env - AIRFLOW_HOME: /opt/airflow - AIRFLOW__CORE_dags_folder: /opt/airflow/dags AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow @@ -57,14 +56,16 @@ x-airflow-common: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' + _AIRFLOW_WWW_USER_PASSWORD: 'somepass' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./config/airflow.cfg:/opt/airflow/airflow.cfg - user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" + user: "${AIRFLOW_UID:-50000}:0" depends_on: + &airflow-common-depends-on redis: condition: service_healthy postgres: @@ -78,17 +79,17 @@ services: POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - - /var/lib/postgresql/data + - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 5s retries: 5 - restart: "always" + restart: always redis: image: redis:latest - ports: - - 6379:6379 + expose: + - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s @@ -101,15 +102,20 @@ services: command: webserver ports: - 7001:8080 + volumes: + - ./templates/footer.html:/home/airflow/.local/lib/python3.7/site-packages/airflow/www/templates/appbuilder/footer.html + - ./templates/main.html:/home/airflow/.local/lib/python3.7/site-packages/airflow/www/templates/airflow/main.html + - ./config/airflow.cfg:/opt/airflow/airflow.cfg healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 10s timeout: 10s retries: 5 - volumes: - - ./templates/footer.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/appbuilder/footer.html - - ./templates/main.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/airflow/main.html restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-scheduler: <<: *airflow-common @@ -120,6 +126,10 @@ services: timeout: 10s retries: 5 restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-worker: <<: *airflow-common @@ -131,17 +141,120 @@ services: interval: 10s timeout: 10s retries: 5 + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-init: <<: *airflow-common - command: version + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + user: "0:0" + volumes: + - .:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow flower: <<: *airflow-common @@ -154,6 +267,10 @@ services: timeout: 10s retries: 5 restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully volumes: postgres-db-volume: -- GitLab