This commit is contained in:
Kostas Mparmparousis 2022-01-05 11:54:37 +01:00 committed by GitHub
commit dd39d1ebbe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 54 deletions

View file

@ -147,6 +147,37 @@ SAML_ENABLED = False
# #SAML_LOGOUT_URL = 'https://google.com'
# #SAML_ASSERTION_ENCRYPTED = True
# SAML_WANT_MESSAGE_SIGNED
# SAML Autoprovisioning
# If toggled on, the PDA Role and the associations of users found in the local db
# will be directly updated from the SAML IDP every time they log in.
# NOTE: This feature and the assertion of "Admin / Account" attributes are mutually exclusive.
# If used, the values for Admin/Account given above will be ignored.
SAML_AUTOPROVISIONING = True
# The urn value of the attribute in the SAML Authn Response where PDA will look
# for a new Role and/or new associations to domains/accounts.
# Example: urn:oid:1.3.6.1.4.1.5923.1.1.1.7
# The record syntax for this attribute inside the SAML Response must look like:
# prefix:powerdns-admin:PDA-Role, to provision an Administrator or Operator, or
# prefix:powerdns-admin:User:<domain>:<account>, provision a User
# who has access to one or more Domains and belongs to one or more Accounts.
# the "prefix" is given in the next attribute
SAML_AUTOPROVISIONING_ATTRIBUTE = 'urn:oid:1.3.6.1.4.1.5923.1.1.1.7'
# The prefix used before the static keyword "powerdns-admin" for your entitlements
# in the SAML Response. Must be a valid URN.
# Example: urn:mace:example.com
SAML_URN_PREFIX = 'urn:mace:example.com'
# If toggled on, SAML logins that have no valid "powerdns-admin" records
# to their autoprovisioning field, will lose all their associations
# with any domain or account, also reverting to a User in the process,
# despite their current role in the local db.
# If toggled off, in the same scenario they get to keep
# their existing associations and their current Role.
### CAUTION: Enabling this feature will revoke existing users' access to their
# associated domains unless they have their autoprovisioning field prepopulated.
SAML_PURGE = False
# Remote authentication settings

View file

@ -110,6 +110,10 @@ class Setting(db.Model):
'oidc_oauth_email': 'email',
'oidc_oauth_account_name_property': '',
'oidc_oauth_account_description_property': '',
'saml_autoprovisioning': False,
'saml_urn_prefix': '',
'saml_autoprovisioning_attribute': '',
'saml_purge': False,
'forward_records_allow_edit': {
'A': True,
'AAAA': True,

View file

@ -605,7 +605,7 @@ class User(db.Model):
return False
def set_role(self, role_name):
role = Role.query.filter(Role.name == role_name).first()
role = Role.query.filter(Role.name == role_name.capitalize()).first()
if role:
user = User.query.filter(User.username == self.username).first()
user.role_id = role.id
@ -669,19 +669,19 @@ class User(db.Model):
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
return entitlements
def updateUser(self, Entitlements):
def updateUser(self, Entitlements, urn_value):
"""
Update user associations based on ldap attribute
"""
entitlements= getCorrectEntitlements(Entitlements)
entitlements= getCorrectEntitlements(Entitlements, urn_value)
if len(entitlements)!=0:
self.revoke_privilege(True)
role="user"
for entitlement in entitlements:
arguments=entitlement.split(':')
entArgs=arguments[arguments.index('powerdns-admin')+1:]
role= entArgs[0]
self.set_role(role)
if (role=="User") and len(entArgs)>1:
role= self.get_role(role,entArgs[0].lower())
if (role=="user") and len(entArgs)>1:
current_domains=getUserInfo(self.get_user_domains())
current_accounts=getUserInfo(self.get_accounts())
domain=entArgs[1]
@ -689,6 +689,14 @@ class User(db.Model):
if len(entArgs)>2:
account=entArgs[2]
self.addMissingAccount(account, current_accounts)
self.set_role(role)
def get_role(self, previousRole, newRole):
dict = { "user": 1, "operator" : 2, "administrator" : 3}
if (dict[newRole] > dict[previousRole]):
return newRole
else:
return previousRole
def addMissingDomain(self, autoprovision_domain, current_domains):
"""
@ -712,12 +720,11 @@ class User(db.Model):
if account!=None:
account.add_user(user)
def getCorrectEntitlements(Entitlements):
def getCorrectEntitlements(Entitlements, urn_value):
"""
Gather a list of valid records from the ldap attribute given
"""
from ..models.role import Role
urn_value=Setting().get('urn_value')
urnArgs=[x.lower() for x in urn_value.split(':')]
entitlements=[]
for Entitlement in Entitlements:
@ -742,7 +749,7 @@ def getCorrectEntitlements(Entitlements):
continue
entArgs=arguments[arguments.index('powerdns-admin')+1:]
role=entArgs[0]
role=entArgs[0].lower()
roles= Role.query.all()
role_names=get_role_names(roles)
@ -752,7 +759,7 @@ def getCorrectEntitlements(Entitlements):
continue
if len(entArgs)>1:
if (role!="User"):
if (role!="user"):
e="Too many arguments for Admin or Operator"
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
continue
@ -797,7 +804,7 @@ def get_role_names(roles):
"""
roles_list=[]
for role in roles:
roles_list.append(role.name)
roles_list.append(role.name.lower())
return roles_list
def getUserInfo(DomainsOrAccounts):

View file

@ -496,7 +496,7 @@ def login():
elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_value):
user.updateUser(Entitlements)
user.updateUser(Entitlements, urn_value)
else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('purge'):
@ -958,7 +958,6 @@ def saml_metadata():
resp = make_response(errors.join(', '), 500)
return resp
@index_bp.route('/saml/authorized', methods=['GET', 'POST'])
def saml_authorized():
errors = []
@ -1023,51 +1022,83 @@ def saml_authorized():
user.firstname = name[0]
user.lastname = ' '.join(name[1:])
if group_attribute_name:
user_groups = session['samlUserdata'].get(group_attribute_name, [])
else:
user_groups = []
if admin_attribute_name or group_attribute_name:
user_accounts = set(user.get_accounts())
saml_accounts = []
for group_mapping in group_to_account_mapping:
mapping = group_mapping.split('=')
group = mapping[0]
account_name = mapping[1]
if not Setting().get('saml_autoprovisioning'):
if group_attribute_name:
user_groups = session['samlUserdata'].get(group_attribute_name, [])
else:
user_groups = []
if admin_attribute_name or group_attribute_name:
user_accounts = set(user.get_accounts())
saml_accounts = []
for group_mapping in group_to_account_mapping:
mapping = group_mapping.split('=')
group = mapping[0]
account_name = mapping[1]
if group in user_groups:
if group in user_groups:
account = handle_account(account_name)
saml_accounts.append(account)
for account_name in session['samlUserdata'].get(
account_attribute_name, []):
account = handle_account(account_name)
saml_accounts.append(account)
saml_accounts = set(saml_accounts)
for account in saml_accounts - user_accounts:
account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
for account in user_accounts - saml_accounts:
account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []):
uplift_to_admin(user)
elif admin_group_name in user_groups:
uplift_to_admin(user)
elif admin_attribute_name or group_attribute_name:
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Assertion')
history.add()
elif Setting().get('saml_autoprovisioning'):
urn_prefix = Setting().get('saml_urn_prefix')
autoprovisioning_attribute = Setting().get('saml_autoprovisioning_attribute')
Entitlements = []
if autoprovisioning_attribute in session['samlUserdata']:
for k in session['samlUserdata'][autoprovisioning_attribute]:
Entitlements.append(k)
if len(Entitlements)==0 and Setting().get('saml_purge'):
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Autoprovision')
history.add()
user.revoke_privilege(True)
elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_prefix):
user.updateUser(Entitlements, urn_prefix)
else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('saml_purge'):
current_app.logger.warning('Procceding to revoke every privilege from ' + user.username + '.' )
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Autoprovision')
history.add()
user.revoke_privilege(True)
for account_name in session['samlUserdata'].get(
account_attribute_name, []):
account = handle_account(account_name)
saml_accounts.append(account)
saml_accounts = set(saml_accounts)
for account in saml_accounts - user_accounts:
account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
for account in user_accounts - saml_accounts:
account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []):
uplift_to_admin(user)
elif admin_group_name in user_groups:
uplift_to_admin(user)
elif admin_attribute_name or group_attribute_name:
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Assertion')
history.add()
user.plain_text_password = None
user.update_profile()
session['authentication_type'] = 'SAML'