Newer
Older
from authlib.integrations.flask_client import OAuth
from flask import url_for, redirect, current_app as app, Blueprint, abort
from flask_login import login_user
from flask_appbuilder import BaseView as AppBuilderBaseView
from airflow.plugins_manager import AirflowPlugin
log = logging.getLogger(__name__)
FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public" # The "Public" role is given no permissions
oauth = OAuth(app)
oauth.register(
name='unity',
client_id=os.getenv("OAUTH_CLIENT_ID"),
server_metadata_url=os.getenv("OAUTH_METADATA_URL"),
client_secret=os.getenv("OAUTH_CLIENT_SECRET"),
client_kwargs={'scope' : 'openid profile email eflows'}
unity = Blueprint('unity', __name__, url_prefix="/unity")
class UnityIntegrationView(AppBuilderBaseView):
'''
This adds the neccessary routes for a oauth2 login support in addition to the basic_auth provided by airflow.
This is achieved by handling the oauth2 flow fully via the added routes nad using the basic_auth via internal functions.
Once a user has been authenticated, a matching user (via username or email) is either found or created in the airflow basic_auth database.
This means that the Oauth2 provider needs to be trusted to the same extent that the local user db is trusted.
This can also break on airflow updates without notice, as airflow-internal security functions are used, that are not well-documented and appear to not be intended for this use.
'''
@unity.route('/login')
def login():
redirect_uri = url_for('.authorize', _external=True)
log.debug("Redirect uri is '" + str(redirect_uri) + "'")
log.debug("Metadata uri is '" + str(os.getenv("OAUTH_METADATA_URL")) + "'")
return oauth.unity.authorize_redirect(redirect_uri)
@unity.route('/authorize')
def authorize():
try:
token = oauth.unity.authorize_access_token()
except:
abort(403)
user = oauth.unity.userinfo(token=token)
# get relevant data from token
email = user['email']
persistent_identifier = user["sub"]
first_name = user["firstname"]
last_name = user["surname"]
admin_access = user.get('eflows:dlsAccess', 'false')
log.debug("SSO user logging in...")
log.debug("sub : " + persistent_identifier)
log.debug("first name : " + first_name)
log.debug("last name : " + last_name)
log.debug("email : " + email)
log.debug("admin : " + admin_access)
log.debug("......................")
role = FAB_ADMIN_ROLE
# check airflow user backend
# check if user already exists, if not create it (with long random password)
sec_manager = app.appbuilder.sm
fab_user = sec_manager.find_user(username=persistent_identifier)
log.debug("Searching for user by name gave '" + (str(fab_user)) + "'")
if fab_user is None:
fab_user = sec_manager.find_user(email=email)
log.debug("Searching for user by email gave '" + (str(fab_user)) + "'")
if fab_user is None:
characters = string.ascii_letters + string.digits + string.punctuation
if sec_manager.add_user(
username=persistent_identifier,
first_name=first_name,
last_name=last_name,
email=email,
password=''.join(random.choice(characters) for i in range(20))
fab_user = sec_manager.find_user(username=persistent_identifier)
log.info("Successfully created user " + str(fab_user))
else:
log.error("User creation unsuccessful.")
abort(500)
else:
# set role permissions of that user, if it already exists (just take the role from SSO and assign it)
fab_user.role = sec_manager.find_role(role)
sec_manager.update_user(fab_user)
# login as that user
login_user(fab_user, remember=False)
return redirect(url_for("Airflow.index"))
@unity.route('/logout')
def logout():
pass
class UnityIntegrationPlugin(AirflowPlugin):
flask_blueprints = [unity]