Skip to content
Snippets Groups Projects
unity_integration.py 4.47 KiB
Newer Older
import os, random, string, logging
from authlib.integrations.flask_client import OAuth
from flask import url_for, redirect, current_app as app, Blueprint, abort
from flask_login import login_user
from flask_appbuilder import BaseView as AppBuilderBaseView
from airflow.plugins_manager import AirflowPlugin

log = logging.getLogger(__name__)
log.setLevel("DEBUG")

FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public"  # The "Public" role is given no permissions

oauth = OAuth(app)
oauth.register(
    name='unity',
    client_id=os.getenv("OAUTH_CLIENT_ID"),
    server_metadata_url=os.getenv("OAUTH_METADATA_URL"),
    client_secret=os.getenv("OAUTH_CLIENT_SECRET"),
Christian Boettcher's avatar
Christian Boettcher committed
    client_kwargs={'scope' : 'openid profile email eflows'}
unity = Blueprint('unity', __name__, url_prefix="/unity")

class UnityIntegrationView(AppBuilderBaseView):
    '''
    This adds the neccessary routes for a oauth2 login support in addition to the basic_auth provided by airflow.
    This is achieved by handling the oauth2 flow fully via the added routes nad using the basic_auth via internal functions.
    Once a user has been authenticated, a matching user (via username or email) is either found or created in the airflow basic_auth database.

    This means that the Oauth2 provider needs to be trusted to the same extent that the local user db is trusted.
    This can also break on airflow updates without notice, as airflow-internal security functions are used, that are not well-documented and appear to not be intended for this use.
    '''
    @unity.route('/login')
    def login():
        redirect_uri = url_for('.authorize', _external=True)
        log.debug("Redirect uri is '" + str(redirect_uri) + "'")
Christian Boettcher's avatar
Christian Boettcher committed
        log.debug("Metadata uri is '" + str(os.getenv("OAUTH_METADATA_URL")) + "'")
        return oauth.unity.authorize_redirect(redirect_uri)
    
    @unity.route('/authorize')
    def authorize():
        try:
            token = oauth.unity.authorize_access_token()
        except:
            abort(403)
        user = oauth.unity.userinfo(token=token)
        # get relevant data from token
Christian Boettcher's avatar
Christian Boettcher committed
        log.debug(str(user))
        email = user['email']
        persistent_identifier = user["sub"]
Christian Boettcher's avatar
Christian Boettcher committed
        first_name = user["firstname"]
        last_name = user["surname"]
        admin_access = user.get('eflows:dlsAccess', 'false')

        log.debug("SSO user logging in...")
        log.debug("sub : " + persistent_identifier)
        log.debug("first name : " + first_name)
        log.debug("last name : " + last_name)
        log.debug("email : " + email)
        log.debug("admin : " + admin_access)
        log.debug("......................")

        role = FAB_VIEWER_ROLE
        if admin_access == 'true':
            role = FAB_ADMIN_ROLE

        # check airflow user backend
        # check if user already exists, if not create it (with long random password)
        sec_manager = app.appbuilder.sm
        fab_user = sec_manager.find_user(username=persistent_identifier)
Christian Boettcher's avatar
Christian Boettcher committed
        log.debug("Searching for user by name gave '" + (str(fab_user)) + "'")

        if fab_user is None:
            fab_user = sec_manager.find_user(email=email)
            log.debug("Searching for user by email gave '" + (str(fab_user)) + "'")

        if fab_user is None:
Christian Boettcher's avatar
Christian Boettcher committed
            log.debug("Trying to create non-existing user")
            characters = string.ascii_letters + string.digits + string.punctuation
            if sec_manager.add_user(
                username=persistent_identifier,
                first_name=first_name,
                last_name=last_name,
                email=email,
                role=sec_manager.find_role(role),
                password=''.join(random.choice(characters) for i in range(20))
                fab_user = sec_manager.find_user(username=persistent_identifier)
                log.info("Successfully created user " + str(fab_user))
            else:
                log.error("User creation unsuccessful.")
                abort(500)
        else:
            # set role permissions of that user, if it already exists (just take the role from SSO and assign it)
            fab_user.role = sec_manager.find_role(role)
            sec_manager.update_user(fab_user)
        # login as that user
        login_user(fab_user, remember=False)
        return redirect(url_for("Airflow.index"))
    
    @unity.route('/logout')
    def logout():
        pass

class UnityIntegrationPlugin(AirflowPlugin):
    name = "Unity Integration"
    flask_blueprints = [unity]