feat: Move the account parse calls to a method

This commit is contained in:
Jérôme BECOT 2021-12-31 09:51:12 +01:00
parent 30a2925fae
commit 07919ea210
4 changed files with 56 additions and 36 deletions

View file

@ -129,6 +129,13 @@ class AccountNotExists(StructuredException):
self.message = message self.message = message
self.name = name self.name = name
class InvalidAccountNameException(StructuredException):
status_code = 400
def __init__(self, name=None, message="The account name is invalid"):
StructuredException.__init__(self)
self.message = message
self.name = name
class UserCreateFail(StructuredException): class UserCreateFail(StructuredException):
status_code = 500 status_code = 500
@ -138,7 +145,6 @@ class UserCreateFail(StructuredException):
self.message = message self.message = message
self.name = name self.name = name
class UserCreateDuplicate(StructuredException): class UserCreateDuplicate(StructuredException):
status_code = 409 status_code = 409

View file

@ -3,6 +3,7 @@ from flask import current_app
from urllib.parse import urljoin from urllib.parse import urljoin
from ..lib import utils from ..lib import utils
from ..lib.errors import InvalidAccountNameException
from .base import db from .base import db
from .setting import Setting from .setting import Setting
from .user import User from .user import User
@ -22,7 +23,7 @@ class Account(db.Model):
back_populates="accounts") back_populates="accounts")
def __init__(self, name=None, description=None, contact=None, mail=None): def __init__(self, name=None, description=None, contact=None, mail=None):
self.name = name self.name = Account.sanitize_name(name) if name is not None else name
self.description = description self.description = description
self.contact = contact self.contact = contact
self.mail = mail self.mail = mail
@ -33,13 +34,30 @@ class Account(db.Model):
self.PDNS_VERSION = Setting().get('pdns_version') self.PDNS_VERSION = Setting().get('pdns_version')
self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION) self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION)
if self.name is not None:
if Setting().get('account_name_extra_chars'): @staticmethod
char_list = "abcdefghijklmnopqrstuvwxyz0123456789_-." def sanitize_name(name):
else: """
char_list = "abcdefghijklmnopqrstuvwxyz0123456789" Formats the provided name to fit into the constraint
self.name = ''.join(c for c in self.name.lower() """
if c in char_list) if not isinstance(name, str):
raise InvalidAccountNameException("Account name must be a string")
allowed_characters = "abcdefghijklmnopqrstuvwxyz0123456789"
if Setting().get('account_name_extra_chars'):
allowed_characters += "_-."
sanitized_name = ''.join(c for c in name.lower() if c in allowed_characters)
if len(sanitized_name) > Account.name.type.length:
current_app.logger.error("Account name {0} too long. Truncated to: {1}".format(
sanitized_name, sanitized_name[:Account.name.type.length]))
if not sanitized_name:
raise InvalidAccountNameException("Empty string is not a valid account name")
return sanitized_name[:Account.name.type.length]
def __repr__(self): def __repr__(self):
return '<Account {0}r>'.format(self.name) return '<Account {0}r>'.format(self.name)
@ -72,11 +90,9 @@ class Account(db.Model):
""" """
Create a new account Create a new account
""" """
# Sanity check - account name self.name = Account.sanitize_name(self.name)
if self.name == "":
return {'status': False, 'msg': 'No account name specified'}
# check that account name is not already used # Check that account name is not already used
account = Account.query.filter(Account.name == self.name).first() account = Account.query.filter(Account.name == self.name).first()
if account: if account:
return {'status': False, 'msg': 'Account already exists'} return {'status': False, 'msg': 'Account already exists'}

View file

@ -23,7 +23,7 @@ from ..lib.errors import (
AccountCreateFail, AccountUpdateFail, AccountDeleteFail, AccountCreateFail, AccountUpdateFail, AccountDeleteFail,
AccountCreateDuplicate, AccountNotExists, AccountCreateDuplicate, AccountNotExists,
UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail, UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail,
UserUpdateFailEmail, UserUpdateFailEmail, InvalidAccountNameException
) )
from ..decorators import ( from ..decorators import (
api_basic_auth, api_can_create_domain, is_json, apikey_auth, api_basic_auth, api_can_create_domain, is_json, apikey_auth,
@ -863,12 +863,15 @@ def api_create_account():
contact = data['contact'] if 'contact' in data else None contact = data['contact'] if 'contact' in data else None
mail = data['mail'] if 'mail' in data else None mail = data['mail'] if 'mail' in data else None
if not name: if not name:
current_app.logger.debug("Account name missing") current_app.logger.debug("Account creation failed: name missing")
abort(400) raise InvalidAccountNameException(message="Account name missing")
sanitized_name = Account.sanitize_name(name)
account_exists = Account.query.filter(Account.name == sanitized_name).all()
account_exists = [] or Account.query.filter(Account.name == name).all()
if len(account_exists) > 0: if len(account_exists) > 0:
msg = "Account {} already exists".format(name) msg = ("Requested Account {} would be translated to {}"
" which already exists").format(name, sanitized_name)
current_app.logger.debug(msg) current_app.logger.debug(msg)
raise AccountCreateDuplicate(message=msg) raise AccountCreateDuplicate(message=msg)
@ -906,8 +909,9 @@ def api_update_account(account_id):
if not account: if not account:
abort(404) abort(404)
if name and name != account.name: if name and Account.sanitize_name(name) != account.name:
abort(400) msg = "Account name is immutable"
raise AccountUpdateFail(msg=msg)
if current_user.role.name not in ['Administrator', 'Operator']: if current_user.role.name not in ['Administrator', 'Operator']:
msg = "User role update accounts" msg = "User role update accounts"

View file

@ -325,8 +325,8 @@ def login():
# Regexp didn't match, continue to next iteration # Regexp didn't match, continue to next iteration
continue continue
account = Account() sanitized_group_name = Account.sanitize_name(group_name)
account_id = account.get_id_by_name(account_name=group_name) account_id = account.get_id_by_name(account_name=sanitized_group_name)
if account_id: if account_id:
account = Account.query.get(account_id) account = Account.query.get(account_id)
@ -347,10 +347,12 @@ def login():
current_app.logger.info('User {} added to Account {}'.format( current_app.logger.info('User {} added to Account {}'.format(
user.username, account.name)) user.username, account.name))
else: else:
account.name = group_name account = Account(
account.description = group_description name=sanitized_group_name,
account.contact = '' description=group_description,
account.mail = '' contact='',
mail=''
)
account.create_account() account.create_account()
history = History(msg='Create account {0}'.format( history = History(msg='Create account {0}'.format(
account.name), account.name),
@ -1088,18 +1090,10 @@ def create_group_to_account_mapping():
def handle_account(account_name, account_description=""): def handle_account(account_name, account_description=""):
if Setting().get('account_name_extra_chars'): clean_name = Account.sanitize_name(account_name)
char_list = "abcdefghijklmnopqrstuvwxyz0123456789_-."
else:
char_list = "abcdefghijklmnopqrstuvwxyz0123456789"
clean_name = ''.join(c for c in account_name.lower()
if c in char_list)
if len(clean_name) > Account.name.type.length:
current_app.logger.error(
"Account name {0} too long. Truncated.".format(clean_name))
account = Account.query.filter_by(name=clean_name).first() account = Account.query.filter_by(name=clean_name).first()
if not account: if not account:
account = Account(name=clean_name.lower(), account = Account(name=clean_name,
description=account_description, description=account_description,
contact='', contact='',
mail='') mail='')