diff --git a/powerdnsadmin/models/setting.py b/powerdnsadmin/models/setting.py index a46cfb6..a713d76 100644 --- a/powerdnsadmin/models/setting.py +++ b/powerdnsadmin/models/setting.py @@ -110,6 +110,10 @@ class Setting(db.Model): 'oidc_oauth_email': 'email', 'oidc_oauth_account_name_property': '', 'oidc_oauth_account_description_property': '', + 'saml_autoprovisioning': False, + 'saml_urn_value': '', + 'saml_autoprovisioning_attribute': '', + 'saml_purge': False, 'forward_records_allow_edit': { 'A': True, 'AAAA': True, diff --git a/powerdnsadmin/models/user.py b/powerdnsadmin/models/user.py index f5e3556..9021e4b 100644 --- a/powerdnsadmin/models/user.py +++ b/powerdnsadmin/models/user.py @@ -659,11 +659,11 @@ class User(db.Model): current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e)) return entitlements - def updateUser(self, Entitlements): + def updateUser(self, Entitlements, urn_value): """ Update user associations based on ldap attribute """ - entitlements= getCorrectEntitlements(Entitlements) + entitlements= getCorrectEntitlements(Entitlements, urn_value) if len(entitlements)!=0: self.revoke_privilege(True) for entitlement in entitlements: @@ -702,12 +702,11 @@ class User(db.Model): if account!=None: account.add_user(user) -def getCorrectEntitlements(Entitlements): +def getCorrectEntitlements(Entitlements, urn_value): """ Gather a list of valid records from the ldap attribute given """ from ..models.role import Role - urn_value=Setting().get('urn_value') urnArgs=[x.lower() for x in urn_value.split(':')] entitlements=[] for Entitlement in Entitlements: diff --git a/powerdnsadmin/routes/index.py b/powerdnsadmin/routes/index.py index ccdcd6b..2fb3b4b 100644 --- a/powerdnsadmin/routes/index.py +++ b/powerdnsadmin/routes/index.py @@ -504,7 +504,7 @@ def login(): elif len(Entitlements)!=0: if checkForPDAEntries(Entitlements, urn_value): - user.updateUser(Entitlements) + user.updateUser(Entitlements, urn_value) else: current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') if Setting().get('purge'): @@ -924,7 +924,6 @@ def saml_metadata(): resp = make_response(errors.join(', '), 500) return resp - @index_bp.route('/saml/authorized', methods=['GET', 'POST']) def saml_authorized(): errors = [] @@ -989,51 +988,77 @@ def saml_authorized(): user.firstname = name[0] user.lastname = ' '.join(name[1:]) - if group_attribute_name: - user_groups = session['samlUserdata'].get(group_attribute_name, []) - else: - user_groups = [] - if admin_attribute_name or group_attribute_name: - user_accounts = set(user.get_accounts()) - saml_accounts = [] - for group_mapping in group_to_account_mapping: - mapping = group_mapping.split('=') - group = mapping[0] - account_name = mapping[1] + if not Setting().get('saml_autoprovisioning'): + if group_attribute_name: + user_groups = session['samlUserdata'].get(group_attribute_name, []) + else: + user_groups = [] + if admin_attribute_name or group_attribute_name: + user_accounts = set(user.get_accounts()) + saml_accounts = [] + for group_mapping in group_to_account_mapping: + mapping = group_mapping.split('=') + group = mapping[0] + account_name = mapping[1] - if group in user_groups: + if group in user_groups: + account = handle_account(account_name) + saml_accounts.append(account) + + for account_name in session['samlUserdata'].get( + account_attribute_name, []): account = handle_account(account_name) saml_accounts.append(account) + saml_accounts = set(saml_accounts) + for account in saml_accounts - user_accounts: + account.add_user(user) + history = History(msg='Adding {0} to account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + for account in user_accounts - saml_accounts: + account.remove_user(user) + history = History(msg='Removing {0} from account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + if admin_attribute_name and 'true' in session['samlUserdata'].get( + admin_attribute_name, []): + uplift_to_admin(user) + elif admin_group_name in user_groups: + uplift_to_admin(user) + elif admin_attribute_name or group_attribute_name: + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Assertion') + history.add() + elif Setting().get('saml_autoprovisioning'): + urn_value = Setting().get('saml_urn_value') # urn_value for + key = Setting().get('saml_autoprovisioning_attribute') + Entitlements = read_saml_entitlements(urn_value, session['samlUserdata']) + if len(Entitlements)==0 and Setting().get('saml_purge'): + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() + elif len(Entitlements)!=0: + if checkForPDAEntries(Entitlements, urn_value): + user.updateUser(Entitlements, urn_value) + else: + current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') + if Setting().get('saml_purge'): + current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' ) + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() - for account_name in session['samlUserdata'].get( - account_attribute_name, []): - account = handle_account(account_name) - saml_accounts.append(account) - saml_accounts = set(saml_accounts) - for account in saml_accounts - user_accounts: - account.add_user(user) - history = History(msg='Adding {0} to account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - for account in user_accounts - saml_accounts: - account.remove_user(user) - history = History(msg='Removing {0} from account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - if admin_attribute_name and 'true' in session['samlUserdata'].get( - admin_attribute_name, []): - uplift_to_admin(user) - elif admin_group_name in user_groups: - uplift_to_admin(user) - elif admin_attribute_name or group_attribute_name: - if user.role.name != 'User': - user.role_id = Role.query.filter_by(name='User').first().id - history = History(msg='Demoting {0} to user'.format( - user.username), - created_by='SAML Assertion') - history.add() user.plain_text_password = None user.update_profile() session['authentication_type'] = 'SAML' @@ -1043,6 +1068,12 @@ def saml_authorized(): else: return render_template('errors/SAML.html', errors=errors) +def read_saml_entitlements(urn_value, saml_userdata): + Entitlements = [] + if urn_value in saml_userdata: + for k in saml_userdata[urn_value]: + Entitlements.append(k) + return Entitlements def create_group_to_account_mapping(): group_to_account_mapping_string = current_app.config.get(