diff --git a/configs/development.py b/configs/development.py index 2c2e63d..40fcc62 100644 --- a/configs/development.py +++ b/configs/development.py @@ -147,6 +147,37 @@ SAML_ENABLED = False # #SAML_LOGOUT_URL = 'https://google.com' # #SAML_ASSERTION_ENCRYPTED = True +# SAML_WANT_MESSAGE_SIGNED + +# SAML Autoprovisioning +# If toggled on, the PDA Role and the associations of users found in the local db +# will be directly updated from the SAML IDP every time they log in. +# NOTE: This feature and the assertion of "Admin / Account" attributes are mutually exclusive. +# If used, the values for Admin/Account given above will be ignored. +SAML_AUTOPROVISIONING = True +# The urn value of the attribute in the SAML Authn Response where PDA will look +# for a new Role and/or new associations to domains/accounts. +# Example: urn:oid:1.3.6.1.4.1.5923.1.1.1.7 +# The record syntax for this attribute inside the SAML Response must look like: +# prefix:powerdns-admin:PDA-Role, to provision an Administrator or Operator, or +# prefix:powerdns-admin:User::, provision a User +# who has access to one or more Domains and belongs to one or more Accounts. +# the "prefix" is given in the next attribute +SAML_AUTOPROVISIONING_ATTRIBUTE = 'urn:oid:1.3.6.1.4.1.5923.1.1.1.7' +# The prefix used before the static keyword "powerdns-admin" for your entitlements +# in the SAML Response. Must be a valid URN. +# Example: urn:mace:example.com +SAML_URN_PREFIX = 'urn:mace:example.com' +# If toggled on, SAML logins that have no valid "powerdns-admin" records +# to their autoprovisioning field, will lose all their associations +# with any domain or account, also reverting to a User in the process, +# despite their current role in the local db. +# If toggled off, in the same scenario they get to keep +# their existing associations and their current Role. +### CAUTION: Enabling this feature will revoke existing users' access to their +# associated domains unless they have their autoprovisioning field prepopulated. +SAML_PURGE = False + # Remote authentication settings diff --git a/powerdnsadmin/models/setting.py b/powerdnsadmin/models/setting.py index 33b8db5..7a43a06 100644 --- a/powerdnsadmin/models/setting.py +++ b/powerdnsadmin/models/setting.py @@ -110,6 +110,10 @@ class Setting(db.Model): 'oidc_oauth_email': 'email', 'oidc_oauth_account_name_property': '', 'oidc_oauth_account_description_property': '', + 'saml_autoprovisioning': False, + 'saml_urn_prefix': '', + 'saml_autoprovisioning_attribute': '', + 'saml_purge': False, 'forward_records_allow_edit': { 'A': True, 'AAAA': True, diff --git a/powerdnsadmin/models/user.py b/powerdnsadmin/models/user.py index ca0561b..65bc5b2 100644 --- a/powerdnsadmin/models/user.py +++ b/powerdnsadmin/models/user.py @@ -605,7 +605,7 @@ class User(db.Model): return False def set_role(self, role_name): - role = Role.query.filter(Role.name == role_name).first() + role = Role.query.filter(Role.name == role_name.capitalize()).first() if role: user = User.query.filter(User.username == self.username).first() user.role_id = role.id @@ -669,19 +669,19 @@ class User(db.Model): current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e)) return entitlements - def updateUser(self, Entitlements): + def updateUser(self, Entitlements, urn_value): """ Update user associations based on ldap attribute """ - entitlements= getCorrectEntitlements(Entitlements) + entitlements= getCorrectEntitlements(Entitlements, urn_value) if len(entitlements)!=0: self.revoke_privilege(True) + role="user" for entitlement in entitlements: arguments=entitlement.split(':') entArgs=arguments[arguments.index('powerdns-admin')+1:] - role= entArgs[0] - self.set_role(role) - if (role=="User") and len(entArgs)>1: + role= self.get_role(role,entArgs[0].lower()) + if (role=="user") and len(entArgs)>1: current_domains=getUserInfo(self.get_user_domains()) current_accounts=getUserInfo(self.get_accounts()) domain=entArgs[1] @@ -689,6 +689,14 @@ class User(db.Model): if len(entArgs)>2: account=entArgs[2] self.addMissingAccount(account, current_accounts) + self.set_role(role) + + def get_role(self, previousRole, newRole): + dict = { "user": 1, "operator" : 2, "administrator" : 3} + if (dict[newRole] > dict[previousRole]): + return newRole + else: + return previousRole def addMissingDomain(self, autoprovision_domain, current_domains): """ @@ -712,12 +720,11 @@ class User(db.Model): if account!=None: account.add_user(user) -def getCorrectEntitlements(Entitlements): +def getCorrectEntitlements(Entitlements, urn_value): """ Gather a list of valid records from the ldap attribute given """ from ..models.role import Role - urn_value=Setting().get('urn_value') urnArgs=[x.lower() for x in urn_value.split(':')] entitlements=[] for Entitlement in Entitlements: @@ -742,7 +749,7 @@ def getCorrectEntitlements(Entitlements): continue entArgs=arguments[arguments.index('powerdns-admin')+1:] - role=entArgs[0] + role=entArgs[0].lower() roles= Role.query.all() role_names=get_role_names(roles) @@ -752,7 +759,7 @@ def getCorrectEntitlements(Entitlements): continue if len(entArgs)>1: - if (role!="User"): + if (role!="user"): e="Too many arguments for Admin or Operator" current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e)) continue @@ -797,7 +804,7 @@ def get_role_names(roles): """ roles_list=[] for role in roles: - roles_list.append(role.name) + roles_list.append(role.name.lower()) return roles_list def getUserInfo(DomainsOrAccounts): diff --git a/powerdnsadmin/routes/index.py b/powerdnsadmin/routes/index.py index 316db2d..f2ec0a7 100644 --- a/powerdnsadmin/routes/index.py +++ b/powerdnsadmin/routes/index.py @@ -496,7 +496,7 @@ def login(): elif len(Entitlements)!=0: if checkForPDAEntries(Entitlements, urn_value): - user.updateUser(Entitlements) + user.updateUser(Entitlements, urn_value) else: current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') if Setting().get('purge'): @@ -958,7 +958,6 @@ def saml_metadata(): resp = make_response(errors.join(', '), 500) return resp - @index_bp.route('/saml/authorized', methods=['GET', 'POST']) def saml_authorized(): errors = [] @@ -1023,51 +1022,83 @@ def saml_authorized(): user.firstname = name[0] user.lastname = ' '.join(name[1:]) - if group_attribute_name: - user_groups = session['samlUserdata'].get(group_attribute_name, []) - else: - user_groups = [] - if admin_attribute_name or group_attribute_name: - user_accounts = set(user.get_accounts()) - saml_accounts = [] - for group_mapping in group_to_account_mapping: - mapping = group_mapping.split('=') - group = mapping[0] - account_name = mapping[1] + if not Setting().get('saml_autoprovisioning'): + if group_attribute_name: + user_groups = session['samlUserdata'].get(group_attribute_name, []) + else: + user_groups = [] + if admin_attribute_name or group_attribute_name: + user_accounts = set(user.get_accounts()) + saml_accounts = [] + for group_mapping in group_to_account_mapping: + mapping = group_mapping.split('=') + group = mapping[0] + account_name = mapping[1] - if group in user_groups: + if group in user_groups: + account = handle_account(account_name) + saml_accounts.append(account) + + for account_name in session['samlUserdata'].get( + account_attribute_name, []): account = handle_account(account_name) saml_accounts.append(account) + saml_accounts = set(saml_accounts) + for account in saml_accounts - user_accounts: + account.add_user(user) + history = History(msg='Adding {0} to account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + for account in user_accounts - saml_accounts: + account.remove_user(user) + history = History(msg='Removing {0} from account {1}'.format( + user.username, account.name), + created_by='SAML Assertion') + history.add() + if admin_attribute_name and 'true' in session['samlUserdata'].get( + admin_attribute_name, []): + uplift_to_admin(user) + elif admin_group_name in user_groups: + uplift_to_admin(user) + elif admin_attribute_name or group_attribute_name: + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Assertion') + history.add() + elif Setting().get('saml_autoprovisioning'): + urn_prefix = Setting().get('saml_urn_prefix') + autoprovisioning_attribute = Setting().get('saml_autoprovisioning_attribute') + Entitlements = [] + if autoprovisioning_attribute in session['samlUserdata']: + for k in session['samlUserdata'][autoprovisioning_attribute]: + Entitlements.append(k) + + if len(Entitlements)==0 and Setting().get('saml_purge'): + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() + user.revoke_privilege(True) + elif len(Entitlements)!=0: + if checkForPDAEntries(Entitlements, urn_prefix): + user.updateUser(Entitlements, urn_prefix) + else: + current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') + if Setting().get('saml_purge'): + current_app.logger.warning('Procceding to revoke every privilege from ' + user.username + '.' ) + if user.role.name != 'User': + user.role_id = Role.query.filter_by(name='User').first().id + history = History(msg='Demoting {0} to user'.format( + user.username), + created_by='SAML Autoprovision') + history.add() + user.revoke_privilege(True) - for account_name in session['samlUserdata'].get( - account_attribute_name, []): - account = handle_account(account_name) - saml_accounts.append(account) - saml_accounts = set(saml_accounts) - for account in saml_accounts - user_accounts: - account.add_user(user) - history = History(msg='Adding {0} to account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - for account in user_accounts - saml_accounts: - account.remove_user(user) - history = History(msg='Removing {0} from account {1}'.format( - user.username, account.name), - created_by='SAML Assertion') - history.add() - if admin_attribute_name and 'true' in session['samlUserdata'].get( - admin_attribute_name, []): - uplift_to_admin(user) - elif admin_group_name in user_groups: - uplift_to_admin(user) - elif admin_attribute_name or group_attribute_name: - if user.role.name != 'User': - user.role_id = Role.query.filter_by(name='User').first().id - history = History(msg='Demoting {0} to user'.format( - user.username), - created_by='SAML Assertion') - history.add() user.plain_text_password = None user.update_profile() session['authentication_type'] = 'SAML'