Skip to content
Snippets Groups Projects
Commit b441a438 authored by Christian Boettcher's avatar Christian Boettcher
Browse files

try to add unity integration with a plugin

parent 5e011493
No related branches found
No related tags found
No related merge requests found
Pipeline #130862 passed
import os, random, string
from authlib.integrations.flask_client import OAuth
from flask import url_for, redirect
from flask_login import login_user
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.plugins_manager import AirflowPlugin
import logging
import os
log = logging.getLogger(__name__)
log.setLevel(os.getenv("AIRFLOW__LOGGING__FAB_LOGGING_LEVEL", "INFO"))
FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public" # The "Public" role is given no permissions
app= get_airflow_app()
oauth = OAuth(app)
oauth.register(
name='unity',
client_id=os.getenv("OAUTH_CLIENT_ID"),
server_metadata_url=os.getenv("OAUTH_METADATA_URL"),
client_secret=os.getenv("OAUTH_CLIENT_SECRET"),
client_kwargs={
'scope' : 'openid email profile eflows'
}
)
class UnityIntegrationLoginView(AppBuilderBaseView):
#@expose("/unity_login")
@app.route('/unity_login')
def unity_login():
redirect_uri = url_for('unity_authorize', _external=True)
return oauth.unity.authorize_redirect(redirect_uri)
class UnityIntegrationAuthView(AppBuilderBaseView):
#@expose("/unity_authorize")
@app.route('/unity_authorize')
async def authorize():
token = await oauth.unity.authorize_access_token()
user = await oauth.unity.userinfo(token=token)
# get relevant data from token
email = user['email']
persistent_identifier = user["sub"]
first_name = user["given_name"]
last_name = user["family_name"]
admin_access = user.get('eflows:dlsAccess', False)
role = FAB_VIEWER_ROLE
if admin_access:
role = FAB_ADMIN_ROLE
# check airflow user backend
# check if user already exists, if not create it (with long random password)
sec_manager = app.appbuilder.sm
fab_user = sec_manager.find_user('username')
if fab_user is None: # TODO check if None is the rioght thing to compare to
characters = string.ascii_letters + string.digits + string.punctuation
fab_user = sec_manager.add_user(
username=persistent_identifier,
first_name=first_name,
last_name=last_name,
email=email,
role=role,
password=''.join(random.choice(characters) for i in range(20))
)
# login as that user
login_user(fab_user, remember=False)
return redirect('/')
v_unity_login_view = UnityIntegrationLoginView()
v_unity_login_package = {
"name": "Unity Login View",
"category": "Unity Integration",
"view": v_unity_login_view,
}
v_unity_auth_view = UnityIntegrationAuthView()
v_unity_auth_package = {
"name": "Unity Auth View",
"category": "Unity Integration",
"view": v_unity_auth_view,
}
class UnityIntegrationPlugin(AirflowPlugin):
name = "unity_integration"
appbuilder_views = [v_unity_auth_package, v_unity_login_package]
\ No newline at end of file
......@@ -9,3 +9,4 @@ apache-airflow-providers-sftp
airflow-datacat-integration>=0.1.4
flask-oidc
aiohttp
authlib
import os, logging, json, posixpath
from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager
from flask import abort, make_response, redirect
from flask_appbuilder.security.manager import AUTH_OID
from flask_appbuilder.security.views import AuthOIDView
from flask_appbuilder.views import ModelView, SimpleFormView, expose
from flask_login import login_user
from flask_oidc import OpenIDConnect
logger = logging.getLogger(__name__)
# Set the OIDC field that should be used
NICKNAME_OIDC_FIELD = 'nickname'
FULL_NAME_OIDC_FIELD = 'name'
GROUPS_OIDC_FIELD = 'groups'
EMAIL_FIELD = 'email'
SUB_FIELD = 'sub' # User ID
# Convert groups from comma separated string to list
ALLOWED_GROUPS = os.environ.get('ALLOWED_GROUPS')
if ALLOWED_GROUPS:
ALLOWED_GROUPS = [g.strip() for g in ALLOWED_GROUPS.split(',')]
else: ALLOWED_GROUPS = []
if ALLOWED_GROUPS:
logger.debug('AirFlow access requires membership to one of the following groups: %s'
% ', '.join(ALLOWED_GROUPS))
# Extending AuthOIDView
class AuthOIDCView(AuthOIDView):
@expose('/login/', methods=['GET', 'POST'])
def login(self, flag=True):
sm = self.appbuilder.sm
oidc = sm.oid
@self.appbuilder.sm.oid.require_login
def handle_login():
user = sm.auth_user_oid(oidc.user_getfield(EMAIL_FIELD))
# Group membership required
if ALLOWED_GROUPS:
# Fetch group membership information from GitLab
groups = oidc.user_getinfo([GROUPS_OIDC_FIELD]).get(GROUPS_OIDC_FIELD, [])
intersection = set(ALLOWED_GROUPS) & set(groups)
logger.debug('AirFlow user member of groups in ACL list: %s' % ', '.join(intersection))
# Unable to find common groups, prevent login
if not intersection:
return abort(403)
# Create user (if it doesn't already exist)
if user is None:
info = oidc.user_getinfo([
NICKNAME_OIDC_FIELD,
FULL_NAME_OIDC_FIELD,
GROUPS_OIDC_FIELD,
SUB_FIELD,
EMAIL_FIELD,
"profile"
])
full_name = info.get(FULL_NAME_OIDC_FIELD)
if " " in full_name:
full_name = full_name.split(" ")
first_name = full_name[0]
last_name = full_name[1]
else:
first_name = full_name
last_name = ""
user = sm.add_user(
username=info.get(NICKNAME_OIDC_FIELD),
first_name=first_name,
last_name=last_name,
email=info.get(EMAIL_FIELD),
role=sm.find_role(sm.auth_user_registration_role)
)
login_user(user, remember=False)
return redirect(self.appbuilder.get_url_for_index)
return handle_login()
@expose('/logout/', methods=['GET', 'POST'])
def logout(self):
oidc = self.appbuilder.sm.oid
if not oidc.credentials_store:
return redirect('/login/')
self.revoke_token()
oidc.logout()
super(AuthOIDCView, self).logout()
response = make_response("You have been signed out")
return response
def revoke_token(self):
""" Revokes the provided access token. Sends a POST request to the token revocation endpoint
"""
import aiohttp
import asyncio
import json
oidc = self.appbuilder.sm.oid
sub = oidc.user_getfield(SUB_FIELD)
config = oidc.credentials_store
config = config.get(str(sub))
config = json.loads(config)
payload = {
"token": config['access_token'],
"token_type_hint": "refresh_token"
}
auth = aiohttp.BasicAuth(config['client_id'], config['client_secret'])
# Sends an asynchronous POST request to revoke the token
async def revoke():
async with aiohttp.ClientSession() as session:
async with session.post(self.appbuilder.app.config.get('OIDC_LOGOUT_URI'), data=payload, auth=auth) as response:
logging.info(f"Revoke response {response.status}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(revoke())
class OIDCSecurityManager(AirflowSecurityManager):
"""
Custom security manager class that allows using the OpenID Connection authentication method.
"""
def __init__(self, appbuilder):
super(OIDCSecurityManager, self).__init__(appbuilder)
if self.auth_type == AUTH_OID:
self.oid = OpenIDConnect(self.appbuilder.get_app)
self.authoidview = AuthOIDCView
basedir = os.path.abspath(os.path.dirname(__file__))
SECURITY_MANAGER_CLASS = OIDCSecurityManager
# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')
# Flask-WTF flag for CSRF
CSRF_ENABLED = True
AUTH_TYPE = AUTH_OID
OIDC_CLIENT_SECRETS = 'client_secret.json' # Configuration file for OIDC
OIDC_COOKIE_SECURE= False
OIDC_ID_TOKEN_COOKIE_SECURE = False
OIDC_REQUIRE_VERIFIED_EMAIL = False
OIDC_USER_INFO_ENABLED = True
CUSTOM_SECURITY_MANAGER = OIDCSecurityManager
# Ensure that the secrets file exists
if not os.path.exists(OIDC_CLIENT_SECRETS):
ValueError('Unable to load OIDC client configuration. %s does not exist.' % OIDC_CLIENT_SECRETS)
# Parse client_secret.json for scopes and logout URL
with open(OIDC_CLIENT_SECRETS) as f:
OIDC_APPCONFIG = json.loads(f.read())
# Ensure that the logout/revoke URL is specified in the client secrets file
UNITY_OIDC_URL = OIDC_APPCONFIG.get('web', {}).get('issuer')
if not UNITY_OIDC_URL:
raise ValueError('Invalid OIDC client configuration, GitLab OIDC URI not specified.')
OIDC_SCOPES = OIDC_APPCONFIG.get('OIDC_SCOPES', ['openid', 'email', 'profile']) # Scopes that should be requested.
OIDC_LOGOUT_URI = posixpath.join(UNITY_OIDC_URL, 'oauth/revoke') # OIDC logout URL
# Allow user self registration
AUTH_USER_REGISTRATION = False
# Default role to provide to new users
AUTH_USER_REGISTRATION_ROLE = os.environ.get('AUTH_USER_REGISTRATION_ROLE', 'Public')
AUTH_ROLE_ADMIN = 'Admin'
AUTH_ROLE_PUBLIC = "Public"
OPENID_PROVIDERS = [
{'name': 'Unity', 'url': posixpath.join(UNITY_OIDC_URL, 'oauth/authorize')}
]
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment