diff --git a/README.md b/README.md index 59b2f655572adf2a02ba05ba878666d89ef9c793..22cdd8cda34113eab3aa1982ec83b30f9d44ac15 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,14 @@ eFlows4HPC Data Logistics Service +This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558. The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway. + +## Install and run ``` -mkdir ./logs ./plugins -echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env +git pull ... +mkdir ./logs ./tmp +echo -e "AIRFLOW_UID=$(id -u)" > .env reqs=`cat requirements.txt | tr '\n' ' '` echo "_PIP_ADDITIONAL_REQUIREMENTS=$reqs" >> .env @@ -36,4 +40,4 @@ Connections can also be added through env variables, like ``` AIRFLOW_CONN_MY_PROD_DATABASE=my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2 -``` \ No newline at end of file +``` diff --git a/config/airflow.cfg b/config/airflow.cfg index ada6ce91ce005785f6ad985983496e63e0bc84d9..b304e72a91e130ec0318b88dbad24cd096d8d6b2 100644 --- a/config/airflow.cfg +++ b/config/airflow.cfg @@ -21,21 +21,22 @@ default_timezone = utc # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, # ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the # full import path to the class when using a custom executor. -executor = SequentialExecutor +executor = CeleryExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engines. # More information here: # http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri -# sql_alchemy_conn = sqlite:////opt/airflow/airflow.db +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db # The encoding for the databases sql_engine_encoding = utf-8 # Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. -# This is particularly useful in case of mysql with utf8mb4 encoding because -# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should -# be set to ``utf8mb3_general_ci``. +# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` +# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed +# the maximum size of allowed index when collation is set to ``utf8mb4`` variant +# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). # sql_engine_collation_for_ids = # If SqlAlchemy should pool database connections. @@ -85,9 +86,12 @@ parallelism = 32 # The maximum number of task instances allowed to run concurrently in each DAG. To calculate # the number of tasks that is running concurrently for a DAG, add up the number of running -# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``, -# which is defaulted as ``dag_concurrency``. -dag_concurrency = 16 +# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +# which is defaulted as ``max_active_tasks_per_dag``. +# +# An example scenario when this would be useful is when you want to stop a new dag with an early +# start date from stealing all the executor slots in a cluster. +max_active_tasks_per_dag = 16 # Are DAGs paused by default at creation dags_are_paused_at_creation = True @@ -100,7 +104,7 @@ max_active_runs_per_dag = 16 # Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment -load_examples = True +load_examples = False # Whether to load the default connections that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production @@ -169,6 +173,9 @@ dag_discovery_safe_mode = True # The number of retries each task is going to have by default. Can be overridden at dag or task level. default_task_retries = 0 +# The weighting method used for the effective total priority weight of the task +default_task_weight_rule = downstream + # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 @@ -176,13 +183,6 @@ min_serialized_dag_update_interval = 30 # read rate. This config controls when your DAGs are updated in the Webserver min_serialized_dag_fetch_interval = 10 -# Whether to persist DAG files code in DB. -# If set to True, Webserver reads file contents from DB instead of -# trying to access files in a DAG folder. -# (Default is ``True``) -# Example: store_dag_code = True -# store_dag_code = - # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # in the Database. # All the template_fields for each of Task Instance are stored in the Database. @@ -220,6 +220,11 @@ hide_sensitive_var_conn_fields = True # extra JSON. sensitive_var_conn_names = +# Task Slot counts for ``default_pool``. This setting would not have any effect in an existing +# deployment where the ``default_pool`` is already created. For existing deployments, users can +# change the number of slots using Webserver, API or the CLI +default_pool_task_slot_count = 128 + [logging] # The folder where airflow should store its log files # This path must be absolute @@ -258,7 +263,7 @@ logging_level = INFO # Logging level for Flask-appbuilder UI. # # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. -fab_logging_level = WARN +fab_logging_level = WARNING # Logging class # Specify the class that will specify the logging configuration @@ -297,8 +302,15 @@ task_log_reader = task # A comma\-separated list of third-party logger names that will be configured to print messages to # consoles\. -# Example: extra_loggers = connexion,sqlalchemy -extra_loggers = +# Example: extra_logger_names = connexion,sqlalchemy +extra_logger_names = + +# When you start an airflow worker, airflow starts a tiny web server +# subprocess to serve the workers local log files to the airflow main +# web server, who then builds pages and sends them to users. This defines +# the port on which the logs are served. It needs to be unused, and open +# visible from the main web server to connect into the workers. +worker_log_server_port = 8793 [metrics] @@ -406,8 +418,9 @@ access_control_allow_headers = # Specifies the method or methods allowed when accessing the resource. access_control_allow_methods = -# Indicates whether the response can be shared with requesting code from the given origin. -access_control_allow_origin = +# Indicates whether the response can be shared with requesting code from the given origins. +# Separate URLs with space. +access_control_allow_origins = [lineage] # what lineage backend to use @@ -491,7 +504,7 @@ reload_on_plugin_change = False # Secret key used to run your flask app. It should be as random as possible. However, when running # more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise # one of them will error with "CSRF session token is missing". -secret_key = 8kUFwlRKUhs6i8NBAvUmWg== +secret_key = Jvww64wGcBs22UNHJjToNw== # Number of workers to run the Gunicorn web server workers = 4 @@ -529,7 +542,7 @@ dag_orientation = LR # The amount of time (in secs) webserver will wait for initial handshake # while fetching logs from other worker machine -log_fetch_timeout_sec = 5 +log_fetch_timeout_sec = 15 # Time interval (in secs) to wait before next log fetching. log_fetch_delay_sec = 2 @@ -603,7 +616,11 @@ update_fab_perms = True session_lifetime_minutes = 43200 # Sets a custom page title for the DAGs overview page and site title for all pages -instance_name =eFlows4HPC +instance_name = eFlows4HPC + +# How frequently, in seconds, the DAG data will auto-refresh in graph or tree view +# when auto-refresh is turned on +auto_refresh_interval = 3 [email] @@ -654,11 +671,14 @@ smtp_retry_limit = 5 # additional configuration options based on the Python platform. See: # https://docs.sentry.io/error-reporting/configuration/?platform=python. # Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, -# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``. +# ``ignore_errors``, ``before_breadcrumb``, ``transport``. # Enable error reporting to Sentry sentry_on = false sentry_dsn = +# Dotted path to a before_send function that the sentry SDK should be configured to use. +# before_send = + [celery_kubernetes_executor] # This section only applies if you are using the ``CeleryKubernetesExecutor`` in @@ -701,13 +721,6 @@ worker_concurrency = 16 # Example: worker_prefetch_multiplier = 1 # worker_prefetch_multiplier = -# When you start an airflow worker, airflow starts a tiny web server -# subprocess to serve the workers local log files to the airflow main -# web server, who then builds pages and sends them to users. This defines -# the port on which the logs are served. It needs to be unused, and open -# visible from the main web server to connect into the workers. -worker_log_server_port = 8793 - # Umask that will be used when starting workers with the ``airflow celery worker`` # in daemon mode. This control the file-creation mode mask which determines the initial # value of file permission bits for newly created files. @@ -812,10 +825,6 @@ tls_key = # listen (in seconds). job_heartbeat_sec = 5 -# How often (in seconds) to check and tidy up 'running' TaskInstancess -# that no longer have a matching DagRun -clean_tis_without_dagrun_interval = 15.0 - # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). @@ -825,8 +834,10 @@ scheduler_heartbeat_sec = 5 # -1 indicates unlimited number num_runs = -1 -# The number of seconds to wait between consecutive DAG file processing -processor_poll_interval = 1 +# Controls how long the scheduler will sleep between loops, but if there was nothing to do +# in the loop. i.e. if it scheduled something then it will start the next loop +# iteration straight away. +scheduler_idle_sleep_time = 1 # Number of seconds after which a DAG file is parsed. The DAG file is parsed every # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after @@ -865,11 +876,8 @@ scheduler_zombie_task_threshold = 300 catchup_by_default = True # This changes the batch size of queries in the scheduling main loop. -# If this is too high, SQL query performance may be impacted by one -# or more of the following: -# - reversion to full table scan -# - complexity of query predicate -# - excessive locking +# If this is too high, SQL query performance may be impacted by +# complexity of query predicate, and/or excessive locking. # Additionally, you may hit the maximum allowable query length for your db. # Set this to 0 for no limit (not advised) max_tis_per_query = 512 @@ -917,6 +925,13 @@ allow_trigger_in_future = False # DAG dependency detector class to use dependency_detector = airflow.serialization.serialized_objects.DependencyDetector +# How often to check for expired trigger requests that have not run yet. +trigger_timeout_check_interval = 15 + +[triggerer] +# How many triggers a single Triggerer will run at once, by default. +default_capacity = 1000 + [kerberos] ccache = /tmp/airflow_krb5_ccache @@ -926,6 +941,12 @@ reinit_frequency = 3600 kinit_path = kinit keytab = airflow.keytab +# Allow to disable ticket forwardability. +forwardable = True + +# Allow to remove source IP from token, useful when using token behind NATted Docker host. +include_ip = True + [github_enterprise] api_rev = v3 @@ -941,7 +962,8 @@ end_of_log_mark = end_of_log # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Code will construct log_id using the log_id template from the argument above. -# NOTE: The code will prefix the https:// automatically, don't include that here. +# NOTE: scheme will default to https if one is not provided +# Example: frontend = http://localhost:5601/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id: "{log_id}"'),sort:!(log.offset,asc)) frontend = # Write the task logs to the stdout of the worker, rather than the default files @@ -964,7 +986,7 @@ use_ssl = False verify_certs = True [kubernetes] -# Path to the YAML pod file. If set, all other kubernetes-related fields are ignored. +# Path to the YAML pod file that forms the basis for KubernetesExecutor workers. pod_template_file = # The repository of the Kubernetes Image for the Worker to Run @@ -1049,6 +1071,9 @@ worker_pods_pending_timeout = 300 # How often in seconds to check if Pending workers have exceeded their timeouts worker_pods_pending_timeout_check_interval = 120 +# How often in seconds to check for task instances stuck in "queued" status without a pod +worker_pods_queued_check_interval = 60 + # How many pending pods to check for timeout violations in each check interval. # You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``. worker_pods_pending_timeout_batch_size = 100 @@ -1068,5 +1093,3 @@ shards = 5 # comma separated sensor classes support in smart_sensor. sensors_enabled = NamedHivePartitionSensor -rbac = True - diff --git a/dags/GAdemo.py b/dags/GAdemo.py new file mode 100644 index 0000000000000000000000000000000000000000..9cdfda5f95b4154ade482d9cf56ab5bcf7ba2f56 --- /dev/null +++ b/dags/GAdemo.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.utils.dates import days_ago +from airflow.sensors.filesystem import FileSensor +from airflow.operators.python import PythonOperator +from airflow.operators.dummy import DummyOperator + +def_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + +} + +def train_model(): + print('Will start model training') + +with DAG('GAtest', default_args=def_args, description='testing GA', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: + s1 = FileSensor(task_id='file_sensor', filepath='/work/afile.txt') + t1 = BashOperator(task_id='move_data', bash_command='date') + t2 = PythonOperator(task_id='train_model', python_callable=train_model) + t3 = BashOperator(task_id='eval_model', bash_command='echo "evaluating"') + t4 = DummyOperator(task_id='upload_model_to_repo') + t5 = DummyOperator(task_id='publish_results') + + s1 >> t1 >> t2 >> t4 + t2 >> t3 >> t5 diff --git a/dags/taskflow.py b/dags/taskflow.py index c0153ed3d415f23bdf2adb6e9128132694919f84..c86066f98a352dd536dc1e92fbb1be129f1b29d6 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -2,6 +2,7 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook +from airflow.models import Variable from airflow.utils.dates import days_ago import os @@ -39,9 +40,12 @@ def taskflow_example(): @task(multiple_outputs=True) def transform(flist: dict): name_mappings = {} + tmp_dir = Variable.get("working_dir", default_var='/tmp/') + print(f"Local working dir is: {tmp_dir}") + for fname, url in flist.items(): print(f"Processing: {fname} --> {url}") - tmpname = download_file(url=url, target_dir='/tmp/') + tmpname = download_file(url=url, target_dir=tmp_dir) name_mappings[fname] = tmpname return name_mappings diff --git a/dockers/README.md b/dockers/README.md new file mode 100644 index 0000000000000000000000000000000000000000..1cb752d4e3d406c64804060e72512cd31d2d0656 --- /dev/null +++ b/dockers/README.md @@ -0,0 +1,13 @@ +# Maintenance for customizations + +### Footer +The DLS Service has a custom footer to contribute to the consortium image of the eFlows Project. The design of the custom footer is part of templates/main.html. This file is being injected as a volume in docker-compose.yaml, thus overriding the existing template from the public airflow image. For testing reasons, the path has been hard-coded in the docker-compose.yaml. + +### Updates + Taking a hard-coded path approach means that with every update of the official airflow image, the currect main.html file has to be pulled anew from the official container. + + For example: + + ```docker exec airflow_airflow-webserver_1 find /home/airflow/ | grep main.html ``` + + Copy this file into the local repository and substitute the ```<footer>``` section with the custom DLS ```%footer%``` block. In case of a new python version in the official airflow image (and container) you will need to adjust the new path in the volume section of the docker-compose.yaml. \ No newline at end of file diff --git a/dockers/connections.json b/dockers/connections.json new file mode 100644 index 0000000000000000000000000000000000000000..232a3bfa48c3b0353d3aca016d0db544637523d9 --- /dev/null +++ b/dockers/connections.json @@ -0,0 +1,22 @@ +{ + "default_b2share": { + "conn_type": "https", + "description": null, + "host": "b2share-testing.fz-juelich.de", + "login": null, + "password": null, + "schema": "", + "port": null, + "extra": null + }, + "default_ssh": { + "conn_type": "ssh", + "description": null, + "host": "openssh-server", + "login": "eflows", + "password": "rand", + "schema": null, + "port": 2222, + "extra": null + } +} \ No newline at end of file diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index ac93c50afa2d5460eda18d14b667da533a6e16ee..24c78d4b44dc381c2382b39ca5ffea88c6c44a14 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -24,12 +24,9 @@ # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:master-python3.8 +# Default: apache/airflow:2.2.1 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 -# AIRFLOW_GID - Group ID in Airflow containers -# Default: 50000 -# # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode # # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). @@ -44,11 +41,13 @@ version: '3' x-airflow-common: &airflow-common - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.1} + # build: . environment: &airflow-common-env - AIRFLOW_HOME: /opt/airflow - AIRFLOW__CORE_dags_folder: /opt/airflow/dags AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow @@ -60,11 +59,13 @@ x-airflow-common: _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ./dags:/opt/airflow/dags + - ./config/airflow.cfg:/opt/airflow/airflow.cfg - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - - ./config/airflow.cfg:/opt/airflow/airflow.cfg - user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" + - ./templates/main.html:/home/airflow/.local/lib/python3.7/site-packages/airflow/www/templates/airflow/main.html + user: "${AIRFLOW_UID:-50000}:0" depends_on: + &airflow-common-depends-on redis: condition: service_healthy postgres: @@ -78,7 +79,7 @@ services: POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - - /var/lib/postgresql/data + - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 5s @@ -87,8 +88,8 @@ services: redis: image: redis:latest - ports: - - 6379:6379 + expose: + - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s @@ -103,23 +104,28 @@ services: - 7001:8080 healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] - interval: 10s + interval: 60s timeout: 10s retries: 5 - volumes: - - ./templates/footer.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/appbuilder/footer.html - - ./templates/main.html:/home/airflow/.local/lib/python3.6/site-packages/airflow/www/templates/airflow/main.html restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] - interval: 10s + interval: 60s timeout: 10s retries: 5 restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-worker: <<: *airflow-common @@ -128,32 +134,151 @@ services: test: - "CMD-SHELL" - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' - interval: 10s + interval: 30s + timeout: 10s + retries: 5 + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + volumes: + - ./dags:/opt/airflow/dags + - ./config/airflow.cfg:/opt/airflow/airflow.cfg + - ./logs:/opt/airflow/logs + - ./tmp/:/work/ + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 60s timeout: 10s retries: 5 + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully airflow-init: <<: *airflow-common - command: version + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + user: "0:0" + volumes: + - .:/sources - flower: + airflow-cli: <<: *airflow-common - command: celery flower - ports: - - 5555:5555 - healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:5555/"] - interval: 10s - timeout: 10s - retries: 5 - restart: always + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + airflow-setup: + <<: *airflow-common + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + entrypoint: /bin/bash + command: + - -c + - | + exec /entrypoint airflow variables import /opt/airflow/variables.json + echo "Variables added" + volumes: + - ./dockers/variables.json:/opt/airflow/variables.json + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully volumes: postgres-db-volume: diff --git a/dockers/variables.json b/dockers/variables.json new file mode 100644 index 0000000000000000000000000000000000000000..8ec7d2d7c05e213e9cfb4b32b1739ca2f2c1beff --- /dev/null +++ b/dockers/variables.json @@ -0,0 +1,3 @@ +{ + "working_dir": "/work/" +} \ No newline at end of file diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc index e2f3905b6626aec916f421f6aefc0a55ebfad7f6..e6cc42f26c3f4d4df30c99e0169242eba838ab90 100644 --- a/docs/apirequests.adoc +++ b/docs/apirequests.adoc @@ -40,14 +40,14 @@ includes one 100MB file. The target parameter is optional with default value +/t ---- curl -X POST -u USER:PASS -H "Content-Type: application/json" \ - --data '{"conf": {"oid": ID}, "target": PATH}' \ + --data '{"conf": {"oid": ID, "target": PATH}}' \ $DLS/dags/taskflow_example/dagRuns ---- If you want to use your own connection ('myown_con'): ---- curl -X POST -u USER:PASS -H "Content-Type: application/json" \ - --data '{"conf": {"oid": ID}, "target": PATH, "connection": "myown_con"}' \ + --data '{"conf": {"oid": ID, "target": PATH, "connection": "myown_con"}}' \ $DLS/dags/taskflow_example/dagRuns ---- diff --git a/plugins/eFlows_menu_link.py b/plugins/eFlows_menu_link.py index f8a24680a605f8e75fd6298a6b4564707d0b64d0..da5b6603b96752fcd2f24773242b5fce75d891f0 100644 --- a/plugins/eFlows_menu_link.py +++ b/plugins/eFlows_menu_link.py @@ -12,3 +12,11 @@ class AirflowEFlowsPlugin(AirflowPlugin): hooks = [] admin_views = [] appbuilder_menu_items = [appbuilder_eFlows] + +class AirflowDataCatPlugin(AirflowPlugin): + name = "Data Catalogue" + operators = [] + flask_blueprints = [] + hooks = [] + admin_views = [] + appbuilder_menu_items = [{"name": "Data Catalogue", "href": "https://datacatalog.fz-juelich.de/index.html"}] diff --git a/requirements.txt b/requirements.txt index 240d00f906211bb0f8dad8dc7426a58b99fea37f..aeecd2f8a68c7beca6f3259e5c3642f6349f8984 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ requests urllib3==1.26.6 +plyvel apache-airflow-providers-ssh apache-airflow-providers-http apache-airflow-providers-sftp diff --git a/scripts/cloudinit.yml b/scripts/cloudinit.yml index 73137c7cb172bd9c3ca04f5e34e6998882f1ef0c..3a5e9dae1d3154a0729925aa48c00cbf47177193 100644 --- a/scripts/cloudinit.yml +++ b/scripts/cloudinit.yml @@ -49,16 +49,15 @@ users: runcmd: - 'git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git /home/maria/data-logistics-service' - cd /home/maria - - mkdir airflow-testing - - cd airflow-testing + - mkdir airflow + - cd airflow - mkdir -p ./dags ./logs ./plugins ./config ./templates - cp ../data-logistics-service/dags/* ./dags - cp -r ../data-logistics-service/plugins/* ./plugins - cp ../data-logistics-service/config/* ./config - cp ../data-logistics-service/templates/* ./templates - # - echo "AIRFLOW_UID=0\nAIRFLOW_GID=0" > ../data-logistics-service/dockers/.env #for root + - echo -e "AIRFLOW_UID=$(id -u)" > ../data-logistics-service/dockers/.env #for root - export AIRFLOW_UID=$(id -u) - - export AIRFLOW_GID=0 - - docker-compose -f ../data-logistics-service/dockers/docker-compose.yaml --project-directory . --verbose up airflow-init - - docker-compose -f ../data-logistics-service/dockers/docker-compose.yaml --project-directory . up + - docker-compose -f ~/data-logistics-service/dockers/docker-compose.yaml --project-directory ~/airflow --verbose up airflow-init + - docker-compose -f ~/data-logistics-service/dockers/docker-compose.yaml --project-directory ~/airflow . up # - /bin/bash ../data-logistics-service/scripts/deployment.sh . diff --git a/templates/footer.html b/templates/footer.html deleted file mode 100644 index 442715dc62e970cc309627fa838c814440cc9a17..0000000000000000000000000000000000000000 --- a/templates/footer.html +++ /dev/null @@ -1,37 +0,0 @@ -{% block footer %} -<footer class="footer d-inlign-flex" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important;"> - <div class="container p-0"> - <div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat; background-size: cover"></div> - <div class="row mt-2 px-3"> - <div class="col-lg-6 col-12 d-inlign-flex"> - <p class="m-3 text-center align-self-center"> - <a href="https://www.eFlows4HPC.eu"> - <img src="https://eflows4hpc.eu/wp-content/uploads/2021/02/logo-blanc_1-1.svg" alt="eFlows4HPC Logo" title="eFlows4HPC" style="height: auto; max-height: 70px;" class="m-4 align-self-center"/> - </a> - <a href="https://twitter.com/eFlows4HPC"><i class="fa fa-twitter-square m-4 fa-2x" style="color: white"></i></a> - <a href="https://www.linkedin.com/company/eflows4hpc/"><i class="fa fa-linkedin-square mr-4 fa-2x" style="color: white"></i></a> - <a href="https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/"><i class="fa fa-github-square mr-4 fa-2x" style="color: white"></i></a> - </p> - </div> - <div class="col-lg-6 col-12 d-inlign-flex"> - <p class="m-2 align-self-center" style="color: white"> - <span class="mr-3 mt-1 float-left"> - <img loading="lazy" src="https://eflows4hpc.eu/wp-content/uploads/2021/01/bandera-8.png" alt="" style="max-width:52px; max-height:34px"> - </span> - <small style="display: flow-root"> - This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558. - <br> - The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway. - <strong> - <a style="color: #f39200" href="https://www.fz-juelich.de/portal/EN/Service/LegalNotice/_node.html">Impressum</a> - </strong> - </small> - <div class="row mt-4 pl-5"> - <p style="border-top: 1px solid darkgray;"><small>This service is based on Apache Airflow {{ version_label }}: {% if airflow_version %}<a href="https://pypi.python.org/pypi/apache-airflow/{{ airflow_version }}" target="_blank">v{{ airflow_version }}</a>{% else %} N/A{% endif %}</small></p> - </div> - </p> - </div> - </div> - </div> -</footer> -{% endblock %} diff --git a/templates/main.html b/templates/main.html index 187275c5a77b89463ed2fb39807e074138c7c7f8..153b1bff2c19eac774b7ba700bc2d202c44d08d9 100644 --- a/templates/main.html +++ b/templates/main.html @@ -18,6 +18,7 @@ #} {% extends 'appbuilder/baselayout.html' %} +{% from 'airflow/_messages.html' import message %} {% block page_title -%} {% if title is defined -%} @@ -51,7 +52,7 @@ {% block messages %} {% include 'appbuilder/flash.html' %} {% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %} - <div class="alert alert-warning"> + {% call message(category='warning', dismissable=false) %} <p>The scheduler does not appear to be running. {% if scheduler_job %} Last heartbeat was received @@ -63,14 +64,68 @@ {% endif %} </p> <p>The DAGs list may not update, and new tasks will not be scheduled.</p> - </div> + {% endcall %} + {% endif %} + {% if triggerer_job is defined and (not triggerer_job or not triggerer_job.is_alive()) %} + {% call message(category='warning', dismissable=false) %} + <p>The triggerer does not appear to be running. + {% if triggerer_job %} + Last heartbeat was received + <time class="scheduler-last-heartbeat" + title="{{ triggerer_job.latest_heartbeat.isoformat() }}" + datetime="{{ triggerer_job.latest_heartbeat.isoformat() }}" + data-datetime-convert="false" + >{{ macros.datetime_diff_for_humans(triggerer_job.latest_heartbeat) }}</time>. + {% endif %} + </p> + <p>Triggers will not run, and any deferred operator will remain deferred until it times out and fails.</p> + {% endcall %} {% endif %} {% endblock %} {% block footer %} {% if not current_user.is_anonymous %} {% set version_label = 'Version' %} - {% include 'appbuilder/footer.html' %} + <!-- Use a wrapper div to detach the footer from the content. --> + <div style="padding-bottom: 3rem !important; visibility: hidden !important; flex-grow: 1 !important; -ms-flex-direction: column !important; display: flex !important; + flex-direction: column !important;"> + . + </div> + <footer class="footer" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/02/Barra-gris-footer.png) !important; height: auto; color: #575756 !important; background-repeat: no-repeat !important; background-size: cover !important; margin-top: 2rem;"> + <div class="container p-0" style="padding: 0px !important"> + <div class="p-0 w-100" style="background-image: url(https://eflows4hpc.eu/wp-content/uploads/2021/01/barra-3-color-8.png) !important; width: 100%; height: 15px; background-repeat: no-repeat !important; background-size: cover !important; padding: 0px; !important"></div> + <div class="row mt-2 px-3" style="margin-top: 0.5rem; padding-right: 1rem;"> + <div class="col-lg-6 col-12 d-inlign-flex"> + <p class="m-3 text-center align-self-center" style="-ms-flex-item-align: center !important; align-self: center !important; margin: 1rem !important"> + <a href="https://www.eFlows4HPC.eu"> + <img src="https://eflows4hpc.eu/wp-content/uploads/2021/02/logo-blanc_1-1.svg" alt="eFlows4HPC Logo" title="eFlows4HPC" style="height: auto; max-height: 70px;" class="m-4 align-self-center"/> + </a> + <a href="https://twitter.com/eFlows4HPC"><i class="fa fa-twitter-square m-4 fa-2x" style="color: white"></i></a> + <a href="https://www.linkedin.com/company/eflows4hpc/"><i class="fa fa-linkedin-square mr-4 fa-2x" style="color: white"></i></a> + <a href="https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service/"><i class="fa fa-github-square mr-4 fa-2x" style="color: white"></i></a> + </p> + </div> + <div class="col-lg-6 col-12 d-inlign-flex"> + <p class="m-2 align-self-center" style="color: white; -ms-flex-item-align: center !important; align-self: center !important; margin: 0.5rem;"> + <span class="mr-3 mt-1 float-left" style="float: left !important; margin-right: 1rem; margin-top: 0.25rem "> + <img loading="lazy" src="https://eflows4hpc.eu/wp-content/uploads/2021/01/bandera-8.png" alt="" style="max-width:52px; max-height:34px;"> + </span> + <small style="display: flow-root"> + This work has been supported by the eFlows4HPC project, contract #955558. This project has received funding from the European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955558. + <br> + The JU receives support from the European Union’s Horizon 2020 research and innovation programme and Spain, Germany, France, Italy, Poland, Switzerland, Norway. + <strong> + <a style="color: #f39200" href="https://www.fz-juelich.de/portal/EN/Service/LegalNotice/_node.html">Impressum</a> + </strong> + </small> + <div class="row mt-4 pl-5" style="margin-top: 1.5rem; padding-left: 3rem !important; "> + <p style="border-top: 1px solid darkgray;"><small>This service is based on Apache Airflow {{ version_label }}: {% if airflow_version %}<a href="https://pypi.python.org/pypi/apache-airflow/{{ airflow_version }}" target="_blank">v{{ airflow_version }}</a>{% else %} N/A{% endif %}</small></p> + </div> + </p> + </div> + </div> + </div> + </footer> {% endif %} {% endblock %}