merged autoprovisioning, and tested it

This commit is contained in:
kkmanos 2021-12-08 16:35:40 +02:00
commit 210e51e47c
3 changed files with 79 additions and 47 deletions

View file

@ -148,6 +148,10 @@ class Setting(db.Model):
'saml_want_message_signed': False,
'saml_metadata_cache_duration': 'PT5M',
'saml_metadata_valid_until': '999999999999999999',
'saml_autoprovisioning': True,
'saml_urn_prefix': 'urn:mace:uoa.gr',
'saml_autoprovisioning_attribute': 'urn:oid:1.3.6.1.4.1.5923.1.1.1.7',
'saml_purge': False,
'forward_records_allow_edit': {
'A': True,
'AAAA': True,

View file

@ -659,11 +659,11 @@ class User(db.Model):
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
return entitlements
def updateUser(self, Entitlements):
def updateUser(self, Entitlements, urn_value):
"""
Update user associations based on ldap attribute
"""
entitlements= getCorrectEntitlements(Entitlements)
entitlements= getCorrectEntitlements(Entitlements, urn_value)
if len(entitlements)!=0:
self.revoke_privilege(True)
for entitlement in entitlements:
@ -702,12 +702,11 @@ class User(db.Model):
if account!=None:
account.add_user(user)
def getCorrectEntitlements(Entitlements):
def getCorrectEntitlements(Entitlements, urn_value):
"""
Gather a list of valid records from the ldap attribute given
"""
from ..models.role import Role
urn_value=Setting().get('urn_value')
urnArgs=[x.lower() for x in urn_value.split(':')]
entitlements=[]
for Entitlement in Entitlements:

View file

@ -504,7 +504,7 @@ def login():
elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_value):
user.updateUser(Entitlements)
user.updateUser(Entitlements, urn_value)
else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('purge'):
@ -939,7 +939,6 @@ def saml_metadata():
resp = make_response(errors.join(', '), 500)
return resp
@index_bp.route('/saml/authorized', methods=['GET', 'POST'])
def saml_authorized():
errors = []
@ -1020,51 +1019,81 @@ def saml_authorized():
user.firstname = name[0]
user.lastname = ' '.join(name[1:])
if group_attribute_name:
user_groups = session['samlUserdata'].get(group_attribute_name, [])
else:
user_groups = []
if admin_attribute_name or group_attribute_name:
user_accounts = set(user.get_accounts())
saml_accounts = []
for group_mapping in group_to_account_mapping:
mapping = group_mapping.split('=')
group = mapping[0]
account_name = mapping[1]
if not Setting().get('saml_autoprovisioning'):
if group_attribute_name:
user_groups = session['samlUserdata'].get(group_attribute_name, [])
else:
user_groups = []
if admin_attribute_name or group_attribute_name:
user_accounts = set(user.get_accounts())
saml_accounts = []
for group_mapping in group_to_account_mapping:
mapping = group_mapping.split('=')
group = mapping[0]
account_name = mapping[1]
if group in user_groups:
if group in user_groups:
account = handle_account(account_name)
saml_accounts.append(account)
for account_name in session['samlUserdata'].get(
account_attribute_name, []):
account = handle_account(account_name)
saml_accounts.append(account)
saml_accounts = set(saml_accounts)
for account in saml_accounts - user_accounts:
account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
for account in user_accounts - saml_accounts:
account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []):
uplift_to_admin(user)
elif admin_group_name in user_groups:
uplift_to_admin(user)
elif admin_attribute_name or group_attribute_name:
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Assertion')
history.add()
elif Setting().get('saml_autoprovisioning'):
urn_prefix = Setting().get('saml_urn_prefix')
autoprovisioning_attribute = Setting().get('saml_autoprovisioning_attribute')
Entitlements = []
if autoprovisioning_attribute in session['samlUserdata']:
for k in session['samlUserdata'][autoprovisioning_attribute]:
Entitlements.append(k)
if len(Entitlements)==0 and Setting().get('saml_purge'):
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Autoprovision')
history.add()
elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_prefix):
user.updateUser(Entitlements, urn_prefix)
else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('saml_purge'):
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Autoprovision')
history.add()
for account_name in session['samlUserdata'].get(
account_attribute_name, []):
account = handle_account(account_name)
saml_accounts.append(account)
saml_accounts = set(saml_accounts)
for account in saml_accounts - user_accounts:
account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
for account in user_accounts - saml_accounts:
account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []):
uplift_to_admin(user)
elif admin_group_name in user_groups:
uplift_to_admin(user)
elif admin_attribute_name or group_attribute_name:
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Assertion')
history.add()
user.plain_text_password = None
user.update_profile()
session['authentication_type'] = 'SAML'