-
Christian Boettcher authoredChristian Boettcher authored
unity_integration.py 3.56 KiB
import os, random, string, logging
from authlib.integrations.flask_client import OAuth
from flask import url_for, redirect, current_app as app, Blueprint, abort
from flask_login import login_user
from flask_appbuilder import BaseView as AppBuilderBaseView
from airflow.plugins_manager import AirflowPlugin
log = logging.getLogger(__name__)
log.setLevel("DEBUG")
FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public" # The "Public" role is given no permissions
oauth = OAuth(app)
oauth.register(
name='unity',
client_id=os.getenv("OAUTH_CLIENT_ID"),
server_metadata_url=os.getenv("OAUTH_METADATA_URL"),
client_secret=os.getenv("OAUTH_CLIENT_SECRET"),
client_kwargs={'scope' : 'openid profile email eflows'}
)
unity = Blueprint('unity', __name__, url_prefix="/unity")
class UnityIntegrationView(AppBuilderBaseView):
@unity.route('/')
@unity.route('/login')
def login():
redirect_uri = url_for('.authorize', _external=True)
log.debug("Redirect uri is '" + str(redirect_uri) + "'")
log.debug("Metadata uri is '" + str(os.getenv("OAUTH_METADATA_URL")) + "'")
return oauth.unity.authorize_redirect(redirect_uri)
@unity.route('/authorize')
def authorize():
try:
token = oauth.unity.authorize_access_token()
except:
abort(403)
user = oauth.unity.userinfo(token=token)
# get relevant data from token
log.debug(str(user))
email = user['email']
persistent_identifier = user["sub"]
first_name = user["firstname"]
last_name = user["surname"]
admin_access = user.get('eflows:dlsAccess', 'False')
log.debug("SSO user logging in...")
log.debug("sub : " + persistent_identifier)
log.debug("first name : " + first_name)
log.debug("last name : " + last_name)
log.debug("email : " + email)
log.debug("admin : " + admin_access)
log.debug("......................")
role = FAB_VIEWER_ROLE
if admin_access:
role = FAB_ADMIN_ROLE
# check airflow user backend
# check if user already exists, if not create it (with long random password)
sec_manager = app.appbuilder.sm
fab_user = sec_manager.find_user(username=persistent_identifier)
log.debug("Searching for user by name gave '" + (str(fab_user)) + "'")
if fab_user is None:
fab_user = sec_manager.find_user(email=email)
log.debug("Searching for user by email gave '" + (str(fab_user)) + "'")
if fab_user is None:
log.debug("Trying to create non-existing user")
characters = string.ascii_letters + string.digits + string.punctuation
if sec_manager.add_user(
username=persistent_identifier,
first_name=first_name,
last_name=last_name,
email=email,
role=sec_manager.find_role(role),
password=''.join(random.choice(characters) for i in range(20))
):
fab_user = sec_manager.find_user(username=persistent_identifier)
log.info("Successfully created user " + str(fab_user))
else:
log.error("User creation unsuccessful.")
abort(500)
# login as that user
login_user(fab_user, remember=False)
return redirect(url_for("Airflow.index"))
@unity.route('/logout')
def logout():
pass
class UnityIntegrationPlugin(AirflowPlugin):
name = "Unity Integration"
flask_blueprints = [unity]