diff --git a/powerdnsadmin/models/setting.py b/powerdnsadmin/models/setting.py index f1b9651..3264570 100644 --- a/powerdnsadmin/models/setting.py +++ b/powerdnsadmin/models/setting.py @@ -148,6 +148,10 @@ class Setting(db.Model): 'saml_want_message_signed': False, 'saml_metadata_cache_duration': 'PT5M', 'saml_metadata_valid_until': '999999999999999999', + 'saml_autoprovisioning': True, + 'saml_urn_prefix': 'urn:mace:uoa.gr', + 'saml_autoprovisioning_attribute': 'urn:oid:1.3.6.1.4.1.5923.1.1.1.7', + 'saml_purge': False, 'forward_records_allow_edit': { 'A': True, 'AAAA': True, diff --git a/powerdnsadmin/models/user.py b/powerdnsadmin/models/user.py index f5e3556..9021e4b 100644 --- a/powerdnsadmin/models/user.py +++ b/powerdnsadmin/models/user.py @@ -659,11 +659,11 @@ class User(db.Model): current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e)) return entitlements - def updateUser(self, Entitlements): + def updateUser(self, Entitlements, urn_value): """ Update user associations based on ldap attribute """ - entitlements= getCorrectEntitlements(Entitlements) + entitlements= getCorrectEntitlements(Entitlements, urn_value) if len(entitlements)!=0: self.revoke_privilege(True) for entitlement in entitlements: @@ -702,12 +702,11 @@ class User(db.Model): if account!=None: account.add_user(user) -def getCorrectEntitlements(Entitlements): +def getCorrectEntitlements(Entitlements, urn_value): """ Gather a list of valid records from the ldap attribute given """ from ..models.role import Role - urn_value=Setting().get('urn_value') urnArgs=[x.lower() for x in urn_value.split(':')] entitlements=[] for Entitlement in Entitlements: diff --git a/powerdnsadmin/routes/index.py b/powerdnsadmin/routes/index.py index 18e18d1..bc128d2 100644 --- a/powerdnsadmin/routes/index.py +++ b/powerdnsadmin/routes/index.py @@ -504,7 +504,7 @@ def login(): elif len(Entitlements)!=0: if checkForPDAEntries(Entitlements, urn_value): - user.updateUser(Entitlements) + user.updateUser(Entitlements, urn_value) else: current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') if Setting().get('purge'): @@ -939,7 +939,6 @@ def saml_metadata(): resp = make_response(errors.join(', '), 500) return resp - @index_bp.route('/saml/authorized', methods=['GET', 'POST']) def saml_authorized(): errors = [] @@ -1020,51 +1019,81 @@ def saml_authorized(): user.firstname = name[0] user.lastname = ' '.join(name[1:]) - if group_attribute_name: - user_groups = session['samlUserdata'].get(group_attribute_name, []) - else: - user_groups = [] - if admin_attribute_name or group_attribute_name: - user_accounts = set(user.get_accounts()) - saml_accounts = [] - for group_mapping in group_to_account_mapping: - mapping = group_mapping.split('=') - group = mapping[0] - account_name = mapping[1] + if not Setting().get('saml_autoprovisioning'): + if group_attribute_name: + user_groups = session['samlUserdata'].get(group_attribute_name, []) + else: + user_groups = [] + if admin_attribute_name or group_attribute_name: + user_accounts = set(user.get_accounts()) + saml_accounts = [] + for group_mapping in group_to_account_mapping: + mapping = group_mapping.split('=') + group = mapping[0] + account_name = mapping[1] - if group in user_groups: + if group in user_groups: + account = handle_account(account_name) + saml_accounts.append(account) + + for account_name in session['samlUserdata'].get( + account_attribute_name, []): account = handle_account(account_name) saml_accounts.append(account) + saml_accounts = set(saml_accounts) + for account in saml_accounts - user_accounts: + account.add_user(user) + history = History(msg='Adding {0} to account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + for account in user_accounts - saml_accounts: + account.remove_user(user) + history = History(msg='Removing {0} from account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + if admin_attribute_name and 'true' in session['samlUserdata'].get( + admin_attribute_name, []): + uplift_to_admin(user) + elif admin_group_name in user_groups: + uplift_to_admin(user) + elif admin_attribute_name or group_attribute_name: + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Assertion') + history.add() + elif Setting().get('saml_autoprovisioning'): + urn_prefix = Setting().get('saml_urn_prefix') + autoprovisioning_attribute = Setting().get('saml_autoprovisioning_attribute') + Entitlements = [] + if autoprovisioning_attribute in session['samlUserdata']: + for k in session['samlUserdata'][autoprovisioning_attribute]: + Entitlements.append(k) + + if len(Entitlements)==0 and Setting().get('saml_purge'): + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() + elif len(Entitlements)!=0: + if checkForPDAEntries(Entitlements, urn_prefix): + user.updateUser(Entitlements, urn_prefix) + else: + current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') + if Setting().get('saml_purge'): + current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' ) + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() - for account_name in session['samlUserdata'].get( - account_attribute_name, []): - account = handle_account(account_name) - saml_accounts.append(account) - saml_accounts = set(saml_accounts) - for account in saml_accounts - user_accounts: - account.add_user(user) - history = History(msg='Adding {0} to account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - for account in user_accounts - saml_accounts: - account.remove_user(user) - history = History(msg='Removing {0} from account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - if admin_attribute_name and 'true' in session['samlUserdata'].get( - admin_attribute_name, []): - uplift_to_admin(user) - elif admin_group_name in user_groups: - uplift_to_admin(user) - elif admin_attribute_name or group_attribute_name: - if user.role.name != 'User': - user.role_id = Role.query.filter_by(name='User').first().id - history = History(msg='Demoting {0} to user'.format( - user.username), - created_by='SAML Assertion') - history.add() user.plain_text_password = None user.update_profile() session['authentication_type'] = 'SAML'