diff --git a/powerdnsadmin/__init__.py b/powerdnsadmin/__init__.py index 8281794..10695bf 100755 --- a/powerdnsadmin/__init__.py +++ b/powerdnsadmin/__init__.py @@ -41,6 +41,15 @@ def create_app(config=None): csrf.exempt(routes.api.api_zone_subpath_forward) csrf.exempt(routes.api.api_zone_forward) csrf.exempt(routes.api.api_create_zone) + csrf.exempt(routes.api.api_create_account) + csrf.exempt(routes.api.api_delete_account) + csrf.exempt(routes.api.api_update_account) + csrf.exempt(routes.api.api_create_user) + csrf.exempt(routes.api.api_delete_user) + csrf.exempt(routes.api.api_update_user) + csrf.exempt(routes.api.api_list_account_users) + csrf.exempt(routes.api.api_add_account_user) + csrf.exempt(routes.api.api_remove_account_user) # Load config from env variables if using docker if os.path.exists(os.path.join(app.root_path, 'docker_config.py')): @@ -96,4 +105,4 @@ def create_app(config=None): setting = Setting() return dict(SETTING=setting) - return app \ No newline at end of file + return app diff --git a/powerdnsadmin/decorators.py b/powerdnsadmin/decorators.py index 2e0918a..805deba 100644 --- a/powerdnsadmin/decorators.py +++ b/powerdnsadmin/decorators.py @@ -161,6 +161,41 @@ def is_json(f): return decorated_function +def api_role_can(action, roles=None, allow_self=False): + """ + Grant access if: + - user is in the permitted roles + - allow_self and kwargs['user_id'] = current_user.id + - allow_self and kwargs['username'] = current_user.username + """ + if roles is None: + roles = ['Administrator', 'Operator'] + + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + try: + user_id = int(kwargs.get('user_id')) + except: + user_id = None + try: + username = kwargs.get('username') + except: + username = None + if ( + (current_user.role.name in roles) or + (allow_self and user_id and current_user.id == user_id) or + (allow_self and username and current_user.username == username) + ): + return f(*args, **kwargs) + msg = ( + "User {} with role {} does not have enough privileges to {}" + ).format(current_user.username, current_user.role.name, action) + raise NotEnoughPrivileges(message=msg) + return decorated_function + return decorator + + def api_can_create_domain(f): """ Grant access if: diff --git a/powerdnsadmin/lib/errors.py b/powerdnsadmin/lib/errors.py index 5b0f36c..47a18c3 100644 --- a/powerdnsadmin/lib/errors.py +++ b/powerdnsadmin/lib/errors.py @@ -82,3 +82,57 @@ class RequestIsNotJSON(StructuredException): StructuredException.__init__(self) self.message = message self.name = name + + +class AccountCreateFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Creation of account failed"): + StructuredException.__init__(self) + self.message = message + self.name = name + + +class AccountUpdateFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Update of account failed"): + StructuredException.__init__(self) + self.message = message + self.name = name + + +class AccountDeleteFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Delete of account failed"): + StructuredException.__init__(self) + self.message = message + self.name = name + + +class UserCreateFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Creation of user failed"): + StructuredException.__init__(self) + self.message = message + self.name = name + + +class UserUpdateFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Update of user failed"): + StructuredException.__init__(self) + self.message = message + self.name = name + + +class UserDeleteFail(StructuredException): + status_code = 500 + + def __init__(self, name=None, message="Delete of user failed"): + StructuredException.__init__(self) + self.message = message + self.name = name diff --git a/powerdnsadmin/lib/schema.py b/powerdnsadmin/lib/schema.py index 05aac66..d9eeb9b 100644 --- a/powerdnsadmin/lib/schema.py +++ b/powerdnsadmin/lib/schema.py @@ -25,3 +25,21 @@ class ApiPlainKeySchema(Schema): domains = fields.Embed(schema=DomainSchema, many=True) description = fields.String() plain_key = fields.String() + + +class UserSchema(Schema): + id = fields.Integer() + username = fields.String() + firstname = fields.String() + lastname = fields.String() + email = fields.String() + role = fields.Embed(schema=RoleSchema) + + +class AccountSchema(Schema): + id = fields.Integer() + name = fields.String() + description = fields.String() + contact = fields.String() + mail = fields.String() + domains = fields.Embed(schema=DomainSchema, many=True) diff --git a/powerdnsadmin/routes/api.py b/powerdnsadmin/routes/api.py index 81ac886..60747a2 100644 --- a/powerdnsadmin/routes/api.py +++ b/powerdnsadmin/routes/api.py @@ -1,20 +1,40 @@ import json from urllib.parse import urljoin -from flask import Blueprint, g, request, abort, current_app, make_response, jsonify +from flask import ( + Blueprint, g, request, abort, current_app, make_response, jsonify, +) from flask_login import current_user from ..models.base import db -from ..models import User,Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey +from ..models import ( + User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, + Role, +) from ..lib import utils, helper -from ..lib.schema import ApiKeySchema, DomainSchema, ApiPlainKeySchema -from ..lib.errors import DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, RequestIsNotJSON, ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges -from ..decorators import api_basic_auth, api_can_create_domain, is_json, apikey_auth, apikey_is_admin, apikey_can_access_domain +from ..lib.schema import ( + ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema, +) +from ..lib.errors import ( + StructuredException, + DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, + RequestIsNotJSON, ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges, + AccountCreateFail, AccountUpdateFail, AccountDeleteFail, + UserCreateFail, UserUpdateFail, UserDeleteFail, +) +from ..decorators import ( + api_basic_auth, api_can_create_domain, is_json, apikey_auth, + apikey_is_admin, apikey_can_access_domain, api_role_can, +) +import random +import string api_bp = Blueprint('api', __name__, url_prefix='/api/v1') apikey_schema = ApiKeySchema(many=True) domain_schema = DomainSchema(many=True) apikey_plain_schema = ApiPlainKeySchema(many=True) +user_schema = UserSchema(many=True) +account_schema = AccountSchema(many=True) def get_user_domains(): @@ -53,6 +73,22 @@ def get_user_apikeys(domain_name=None): return info +def get_role_id(role_name, role_id=None): + if role_id: + if role_name: + role = Role.query.filter(Role.name == role_name).first() + if not role or role.id != role_id: + role_id = None + else: + role = Role.query.filter(Role.id == role_id).first() + if not role: + role_id = None + else: + role = Role.query.filter(Role.name == role_name).first() + role_id = role.id if role else None + return role_id + + @api_bp.errorhandler(400) def handle_400(err): return json.dumps({"msg": "Bad Request"}), 400 @@ -73,6 +109,11 @@ def handle_500(err): return json.dumps({"msg": "Internal Server Error"}), 500 +@api_bp.errorhandler(StructuredException) +def handle_StructuredException(err): + return json.dumps(err.to_dict()), err.status_code + + @api_bp.errorhandler(DomainNotExists) def handle_domain_not_exists(err): return json.dumps(err.to_dict()), err.status_code @@ -113,9 +154,12 @@ def handle_request_is_not_json(err): def before_request(): # Check site is in maintenance mode maintenance = Setting().get('maintenance') - if maintenance and current_user.is_authenticated and current_user.role.name not in [ + if ( + maintenance and current_user.is_authenticated and + current_user.role.name not in [ 'Administrator', 'Operator' - ]: + ] + ): return make_response( jsonify({ "status": False, @@ -467,6 +511,375 @@ def api_update_apikey(apikey_id): return '', 204 +@api_bp.route('/pdnsadmin/users', defaults={'username': None}) +@api_bp.route('/pdnsadmin/users/') +@api_basic_auth +@api_role_can('list users', allow_self=True) +def api_list_users(username=None): + if username is None: + user_list = [] or User.query.all() + else: + user_list = [] or User.query.filter(User.username == username).all() + if not user_list: + abort(404) + + return json.dumps(user_schema.dump(user_list)), 200 + + +@api_bp.route('/pdnsadmin/users', methods=['POST']) +@api_basic_auth +@api_role_can('create users', allow_self=True) +def api_create_user(): + """ + Create new user + """ + data = request.get_json() + username = data['username'] if 'username' in data else None + password = data['password'] if 'password' in data else None + plain_text_password = ( + data['plain_text_password'] + if 'plain_text_password' in data + else None + ) + firstname = data['firstname'] if 'firstname' in data else None + lastname = data['lastname'] if 'lastname' in data else None + email = data['email'] if 'email' in data else None + otp_secret = data['otp_secret'] if 'otp_secret' in data else None + confirmed = data['confirmed'] if 'confirmed' in data else None + role_name = data['role_name'] if 'role_name' in data else None + role_id = data['role_id'] if 'role_id' in data else None + + # Create user + if not username: + current_app.logger.debug('Invalid username {}'.format(username)) + abort(400) + if not confirmed: + confirmed = False + elif confirmed is not True: + current_app.logger.debug('Invalid confirmed {}'.format(confirmed)) + abort(400) + + if not plain_text_password and not password: + plain_text_password = ''.join( + random.choice(string.ascii_letters + string.digits) + for _ in range(15)) + if not role_name and not role_id: + role_name = 'User' + role_id = get_role_id(role_name, role_id) + if not role_id: + current_app.logger.debug( + 'Invalid role {}/{}'.format(role_name, role_id)) + abort(400) + + user = User( + username=username, + password=password, + plain_text_password=plain_text_password, + firstname=firstname, + lastname=lastname, + role_id=role_id, + email=email, + otp_secret=otp_secret, + confirmed=confirmed, + ) + try: + result = user.create_local_user() + except Exception as e: + current_app.logger.error('Create user ({}, {}) error: {}'.format( + username, email, e)) + raise UserCreateFail(message='User create failed') + if not result['status']: + current_app.logger.warning('Create user ({}, {}) error: {}'.format( + username, email, result['msg'])) + raise UserCreateFail(message=result['msg']) + + history = History(msg='Created user {0}'.format(user.username), + created_by=current_user.username) + history.add() + return json.dumps(user_schema.dump([user])), 201 + + +@api_bp.route('/pdnsadmin/users/', methods=['PUT']) +@api_basic_auth +@api_role_can('update users', allow_self=True) +def api_update_user(user_id): + """ + Update existing user + """ + data = request.get_json() + username = data['username'] if 'username' in data else None + password = data['password'] if 'password' in data else None + plain_text_password = ( + data['plain_text_password'] + if 'plain_text_password' in data + else None + ) + firstname = data['firstname'] if 'firstname' in data else None + lastname = data['lastname'] if 'lastname' in data else None + email = data['email'] if 'email' in data else None + otp_secret = data['otp_secret'] if 'otp_secret' in data else None + confirmed = data['confirmed'] if 'confirmed' in data else None + role_name = data['role_name'] if 'role_name' in data else None + role_id = data['role_id'] if 'role_id' in data else None + + user = User.query.get(user_id) + if not user: + current_app.logger.debug("User not found for id {}".format(user_id)) + abort(404) + if username: + if username != user.username: + current_app.logger.error( + 'Cannot change username for {}'.format(user.username) + ) + abort(400) + if password is not None: + user.password = password + user.plain_text_password = plain_text_password or '' + if firstname is not None: + user.firstname = firstname + if lastname is not None: + user.lastname = lastname + if email is not None: + user.email = email + if otp_secret is not None: + user.otp_secret = otp_secret + if confirmed is not None: + user.confirmed = confirmed + if role_name is not None: + user.role_id = get_role_id(role_name, role_id) + elif role_id is not None: + user.role_id = role_id + current_app.logger.debug( + "Updating user {} ({})".format(user_id, user.username)) + try: + result = user.update_local_user() + except Exception as e: + current_app.logger.error('Create user ({}, {}) error: {}'.format( + username, email, e)) + raise UserUpdateFail(message='User update failed') + if not result['status']: + current_app.logger.warning('Update user ({}, {}) error: {}'.format( + username, email, result['msg'])) + raise UserCreateFail(message=result['msg']) + + history = History(msg='Updated user {0}'.format(user.username), + created_by=current_user.username) + history.add() + return '', 204 + + +@api_bp.route('/pdnsadmin/users/', methods=['DELETE']) +@api_basic_auth +@api_role_can('delete users') +def api_delete_user(user_id): + user = User.query.get(user_id) + if not user: + current_app.logger.debug("User not found for id {}".format(user_id)) + abort(404) + if user.id == current_user.id: + current_app.logger.debug("Cannot delete self (id {})".format(user_id)) + msg = "Cannot delete self" + raise UserDeleteFail(message=msg) + + # Remove account associations first + user_accounts = Account.query.join(AccountUser).join( + User).filter(AccountUser.user_id == user.id, + AccountUser.account_id == Account.id).all() + for uc in user_accounts: + uc.revoke_privileges_by_id(user.id) + + # Then delete the user + result = user.delete() + if not result: + raise UserDeleteFail("Failed to delete user {}".format( + user.username)) + + history = History(msg='Delete user {0}'.format(user.username), + created_by=current_user.username) + history.add() + return '', 204 + + +@api_bp.route('/pdnsadmin/accounts', defaults={'account_name': None}) +@api_bp.route('/pdnsadmin/accounts/') +@api_basic_auth +@api_role_can('list accounts') +def api_list_accounts(account_name): + if current_user.role.name not in ['Administrator', 'Operator']: + msg = "{} role cannot list accounts".format(current_user.role.name) + raise NotEnoughPrivileges(message=msg) + else: + if account_name is None: + account_list = [] or Account.query.all() + else: + account_list = [] or Account.query.filter( + Account.name == account_name).all() + if not account_list: + abort(404) + return json.dumps(account_schema.dump(account_list)), 200 + + +@api_bp.route('/pdnsadmin/accounts', methods=['POST']) +@api_basic_auth +def api_create_account(): + if current_user.role.name not in ['Administrator', 'Operator']: + msg = "{} role cannot create accounts".format(current_user.role.name) + raise NotEnoughPrivileges(message=msg) + data = request.get_json() + name = data['name'] if 'name' in data else None + description = data['description'] if 'description' in data else None + contact = data['contact'] if 'contact' in data else None + mail = data['mail'] if 'mail' in data else None + if not name: + current_app.logger.debug("Account name missing") + abort(400) + + account = Account(name=name, + description=description, + contact=contact, + mail=mail) + + try: + result = account.create_account() + except Exception as e: + current_app.logger.error('Error: {0}'.format(e)) + raise AccountCreateFail(message='Account create failed') + if not result['status']: + raise AccountCreateFail(message=result['msg']) + + history = History(msg='Create account {0}'.format(account.name), + created_by=current_user.username) + history.add() + return json.dumps(account_schema.dump([account])), 201 + + +@api_bp.route('/pdnsadmin/accounts/', methods=['PUT']) +@api_basic_auth +@api_role_can('update accounts') +def api_update_account(account_id): + data = request.get_json() + name = data['name'] if 'name' in data else None + description = data['description'] if 'description' in data else None + contact = data['contact'] if 'contact' in data else None + mail = data['mail'] if 'mail' in data else None + + account = Account.query.get(account_id) + + if not account: + abort(404) + + if name and name != account.name: + abort(400) + + if current_user.role.name not in ['Administrator', 'Operator']: + msg = "User role update accounts" + raise NotEnoughPrivileges(message=msg) + + if description is not None: + account.description = description + if contact is not None: + account.contact = contact + if mail is not None: + account.mail = mail + + current_app.logger.debug( + "Updating account {} ({})".format(account_id, account.name)) + result = account.update_account() + if not result['status']: + raise AccountDeleteFail(message=result['msg']) + history = History(msg='Update account {0}'.format(account.name), + created_by=current_user.username) + history.add() + return '', 204 + + +@api_bp.route('/pdnsadmin/accounts/', methods=['DELETE']) +@api_basic_auth +@api_role_can('delete accounts') +def api_delete_account(account_id): + account_list = [] or Account.query.filter(Account.id == account_id).all() + if len(account_list) == 1: + account = account_list[0] + else: + abort(404) + current_app.logger.debug( + "Deleting account {} ({})".format(account_id, account.name)) + result = account.delete_account() + if not result: + raise AccountUpdateFail(message=result['msg']) + + history = History(msg='Delete account {0}'.format(account.name), + created_by=current_user.username) + history.add() + return '', 204 + + +@api_bp.route('/pdnsadmin/accounts/users/', methods=['GET']) +@api_basic_auth +@api_role_can('list account users') +def api_list_account_users(account_id): + account = Account.query.get(account_id) + if not account: + abort(404) + user_list = User.query.join(AccountUser).filter( + AccountUser.account_id == account_id).all() + return json.dumps(user_schema.dump(user_list)), 200 + + +@api_bp.route( + '/pdnsadmin/accounts/users//', + methods=['PUT']) +@api_basic_auth +@api_role_can('add user to account') +def api_add_account_user(account_id, user_id): + account = Account.query.get(account_id) + if not account: + abort(404) + user = User.query.get(user_id) + if not user: + abort(404) + if not account.add_user(user): + raise AccountUpdateFail("Cannot add user {} to {}".format( + user.username, account.name)) + + history = History( + msg='Revoke {} user privileges on {}'.format( + user.username, account.name), + created_by=current_user.username) + history.add() + return '', 204 + + +@api_bp.route( + '/pdnsadmin/accounts/users//', + methods=['DELETE']) +@api_basic_auth +@api_role_can('remove user from account') +def api_remove_account_user(account_id, user_id): + account = Account.query.get(account_id) + if not account: + abort(404) + user = User.query.get(user_id) + if not user: + abort(404) + user_list = User.query.join(AccountUser).filter( + AccountUser.account_id == account_id, + AccountUser.user_id == user_id, + ).all() + if not user_list: + abort(404) + if not account.remove_user(user): + raise AccountUpdateFail("Cannot remove user {} from {}".format( + user.username, account.name)) + + history = History( + msg='Revoke {} user privileges on {}'.format( + user.username, account.name), + created_by=current_user.username) + history.add() + return '', 204 + + @api_bp.route( '/servers//zones//', methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE']) diff --git a/powerdnsadmin/swagger-spec.yaml b/powerdnsadmin/swagger-spec.yaml index a2eaf81..061ce7a 100644 --- a/powerdnsadmin/swagger-spec.yaml +++ b/powerdnsadmin/swagger-spec.yaml @@ -963,6 +963,424 @@ paths: description: 'Internal Server Error. Contains error message' schema: $ref: '#/definitions/Error' + '/pdnsadmin/users': + get: + security: + - basicAuth: [] + summary: 'Get all User entries' + operationId: api_list_users + tags: + - user + responses: + '200': + description: List of User objects + schema: + type: array + items: + $ref: #/definitions/User + '500': + description: Internal Server Error, users could not be retrieved. Contains error message + schema: + $ref: #/definitions/Error + post: + security: + - basicAuth: [] + summary: Add a User + description: This methods adds a new User + operationId: api_create_user + tags: + - user + parameters: + - name: username + description: Login name for user (unique, immutable) + required: true + in: body + - name: password + description: Hashed password for authentication + required: false + in: body + - name: plain_text_password + description: Plain text password (will be hashed) for authentication + required: false + in: body + - name: firstname + description: Firstname of user + required: false + in: body + - name: lastname + description: Lastname of user + required: false + in: body + - name: email + description: Email address if user (must be unique) + required: true + in: body + - name: otp_secret + description: OTP secret + required: false + in: body + - name: confirmed + description: Confirmed status + required: false + in: body + - name: role_name + description: Name of role to be assigned to user (default 'User') + required: false + in: body + - name: role_id + description: Role ID of role to be assigned to user + required: false + in: body + responses: + '201': + description: Created + schema: + $ref: #/definitions/User + '400': + description: Unprocessable Entry, the User data provided has issues + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. There was a problem creating the user + schema: + $ref: #/definitions/Error + '/pdnsadmin/users/{username}': + parameters: + - name: username + type: string + in: path + required: true + description: The username of the user to retrieve + get: + security: + - basicAuth: [] + summary: Get a specific User on the server + operationId: api_list_users + tags: + - user + responses: + '200': + description: Retrieve a specific User + schema: + $ref: #/definitions/User + '404': + description: Not found. The User with the specified username does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error, user could not be retrieved. Contains error message + schema: + $ref: #/definitions/Error + '/pdnsadmin/users/{user_id}': + parameters: + - name: user_id + type: integer + in: path + required: true + description: The id of the user to modify or delete + put: + security: + - basicAuth: [] + summary: Modify a specific User on the server with supplied parameters + operationId: api_update_user + tags: + - user + parameters: + - name: username + description: Login name for user (unique, immutable) + required: false + in: body + - name: password + description: Hashed password for authentication + required: false + in: body + - name: plain_text_password + description: Plain text password (will be hashed) for authentication + required: false + in: body + - name: firstname + description: Firstname of user + required: false + in: body + - name: lastname + description: Lastname of user + required: false + in: body + - name: email + description: Email address if user (must be unique) + required: false + in: body + - name: otp_secret + description: OTP secret + required: false + in: body + - name: confirmed + description: Confirmed status + required: false + in: body + - name: role_name + description: Name of role to be assigned to user (default 'User') + required: false + in: body + - name: role_id + description: Role id of role to be assigned to user + required: false + in: body + responses: + '204': + description: OK. User is modified (empty response body) + '404': + description: Not found. The User with the specified user_id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + delete: + security: + - basicAuth: [] + summary: Delete a specific User + operationId: api_delete_user + tags: + - user + responses: + '204': + description: OK. User is deleted (empty response body) + '404': + description: Not found. The User with the specified user_id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + '/pdnsadmin/accounts': + get: + security: + - basicAuth: [] + summary: Get all Account entries + operationId: api_list_accounts + tags: + - account + responses: + '200': + description: List of Account objects + schema: + type: array + items: + $ref: #/definitions/Account + '500': + description: Internal Server Error, accounts could not be retrieved. Contains error message + schema: + $ref: #/definitions/Error + post: + security: + - basicAuth: [] + summary: Add an Account + description: This methods adds a new Account + operationId: api_create_account + tags: + - account + parameters: + - name: name + description: Name for account (unique, immutable) + required: true + in: body + - name: description + description: Description of account + required: false + in: body + - name: contact + description: Contact information + required: false + in: body + - name: mail + description: Email address for contact + required: false + in: body + responses: + '201': + description: Created + schema: + $ref: #/definitions/Account + '400': + description: Unprocessable Entry, the Account data provided has issues. + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. There was a problem creating the account + schema: + $ref: #/definitions/Error + '/pdnsadmin/accounts/{account_name}': + parameters: + - name: account_name + type: string + in: path + required: true + description: The name of the account to retrieve + get: + security: + - basicAuth: [] + summary: Get a specific Account on the server + operationId: api_list_accounts + tags: + - user + responses: + '200': + description: Retrieve a specific account + schema: + $ref: #/definitions/Account + '404': + description: Not found. The Account with the specified name does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error, account could not be retrieved. Contains error message + schema: + $ref: #/definitions/Error + '/pdnsadmin/accounts/{account_id}': + parameters: + - name: account_id + type: integer + in: path + required: true + description: The id of the account to modify or delete + put: + security: + - basicAuth: [] + summary: Modify a specific Account on the server with supplied parameters + operationId: api_update_account + tags: + - user + parameters: + - name: name + description: Name for account (unique, immutable) + required: true + in: body + - name: description + description: Description of account + required: false + in: body + - name: contact + description: Contact information + required: false + in: body + - name: mail + description: Email address for contact + required: false + in: body + responses: + '204': + description: OK. Account is modified (empty response body) + '404': + description: Not found. The Account with the specified account_id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + delete: + security: + - basicAuth: [] + summary: Delete a specific Account + operationId: api_delete_account + tags: + - user + responses: + '204': + description: OK. Account is deleted (empty response body) + '404': + description: Not found. The Account with the specified account_id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + '/pdnsadmin/accounts/users/{account_id}': + parameters: + - name: account_id + type: integer + in: path + required: true + description: The id of the account to list users linked to account + get: + security: + - basicAuth: [] + summary: List users linked to a specific account + operationId: api_list_account_users + tags: + - account + - user + responses: + '200': + description: List of User objects + schema: + type: array + items: + $ref: #/definitions/User + '404': + description: Not found. The Account with the specified account_id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error, accounts could not be retrieved. Contains error message + schema: + $ref: #/definitions/Error + '/pdnsadmin/accounts/users/{account_id}/{user_id}': + parameters: + - name: account_id + type: integer + in: path + required: true + description: The id of the account to link/unlink users to account + - name: user_id + type: integer + in: path + required: true + description: The id of the user to (un)link to/from account + put: + security: + - basicAuth: [] + summary: Link user to account + operationId: api_add_account_user + tags: + - account + - user + responses: + '204': + description: OK. User is linked (empty response body) + '404': + description: Not found. The Account or User with the specified id does not exist + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + delete: + security: + - basicAuth: [] + summary: Unlink user from account + operationId: api_remove_account_user + tags: + - account + - user + responses: + '204': + description: OK. User is unlinked (empty response body) + '404': + description: Not found. The Account or User with the specified id does not exist or user was not linked to account + schema: + $ref: #/definitions/Error + '500': + description: Internal Server Error. Contains error message + schema: + $ref: #/definitions/Error + + definitions: Server: title: Server @@ -1222,6 +1640,72 @@ definitions: type: string description: 'Some user defined description' + User: + title: User + description: User that can access the gui/api + properties: + id: + type: integer + description: The ID for this user (unique) + readOnly: true + username: + type: string + description: The username for this user (unique, immutable) + readOnly: false + password: + type: string + description: The hashed password for this user + readOnly: false + firstname: + type: string + description: The firstname of this user + readOnly: false + lastname: + type: string + description: The lastname of this user + readOnly: false + email: + type: string + description: Email addres for this user + readOnly: false + otp_secret: + type: string + description: OTP secret + readOnly: false + confirmed: + type: boolean + description: The confirmed status + readOnly: false + role_id: + type: integer + description: The ID of the role + readOnly: false + + Account: + title: Account + description: Account that 'owns' zones + properties: + id: + type: integer + description: The ID for this account (unique) + readOnly: true + name: + type: string + description: The name for this account (unique, immutable) + readOnly: false + description: + type: string + description: The description for this account + readOnly: false + contact: + type: string + description: The contact details for this account + readOnly: false + mail: + type: string + description: The email address of the contact for this account + readOnly: false + ConfigSetting: title: ConfigSetting properties: diff --git a/tests/fixtures.py b/tests/fixtures.py index 767b24b..39b6a70 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -38,6 +38,16 @@ def load_data(setting_name, *args, **kwargs): return True +@pytest.fixture +def test_admin_user(): + return app.config.get('TEST_ADMIN_USER') + + +@pytest.fixture +def test_user(): + return app.config.get('TEST_USER') + + @pytest.fixture def basic_auth_admin_headers(): test_admin_user = app.config.get('TEST_ADMIN_USER') @@ -284,3 +294,29 @@ def create_apikey_headers(passw): user_pass_base64 = b64encode(passw.encode('utf-8')) headers = {"X-API-KEY": "{0}".format(user_pass_base64.decode('utf-8'))} return headers + + +@pytest.fixture +def account_data(): + data = { + "name": "test1", + "description": "test1 account", + "contact": "test1 contact", + "mail": "test1@example.com", + } + return data + + +@pytest.fixture +def user1_data(): + data = { + "username": "testuser1", + "plain_text_password": "ChangeMePlease", + "firstname": "firstname1", + "lastname": "lastname1", + "email": "testuser1@example.com", + "otp_secret": "", + "confirmed": False, + "role_name": "User", + } + return data diff --git a/tests/integration/api/management/__init__.py b/tests/integration/api/management/__init__.py new file mode 100644 index 0000000..b546f46 --- /dev/null +++ b/tests/integration/api/management/__init__.py @@ -0,0 +1,54 @@ + + +class IntegrationApiManagement(object): + + def get_account(self, account_name, status_code=200): + res = self.client.get( + "/api/v1/pdnsadmin/accounts/{}".format(account_name), + headers=self.basic_auth_admin_headers, + content_type="application/json", + ) + if isinstance(status_code, (tuple, list)): + assert res.status_code in status_code + elif status_code: + assert res.status_code == status_code + if res.status_code == 200: + data = res.get_json(force=True) + assert len(data) == 1 + return data[0] + return None + + def check_account(self, cmpdata, data=None): + data = self.get_account(cmpdata["name"]) + for key, value in cmpdata.items(): + assert data[key] == value + return data + + def get_user(self, username, status_code=200): + res = self.client.get( + "/api/v1/pdnsadmin/users/{}".format(username), + headers=self.basic_auth_admin_headers, + content_type="application/json", + ) + if isinstance(status_code, (tuple, list)): + assert res.status_code in status_code + elif status_code: + assert res.status_code == status_code + assert res.status_code == status_code + if status_code == 200: + data = res.get_json(force=True) + assert len(data) == 1 + return data[0] + return None + + def check_user(self, cmpdata, data=None): + if data is None: + data = self.get_user(cmpdata["username"]) + for key, value in data.items(): + if key in ('username', 'firstname', 'lastname', 'email'): + assert cmpdata[key] == value + elif key == 'role': + assert data[key]['name'] == cmpdata['role_name'] + else: + assert key in ("id",) + return data diff --git a/tests/integration/api/management/test_admin_user.py b/tests/integration/api/management/test_admin_user.py new file mode 100644 index 0000000..3df0384 --- /dev/null +++ b/tests/integration/api/management/test_admin_user.py @@ -0,0 +1,367 @@ + +import json +from tests.fixtures import ( # noqa: F401 + client, initial_data, basic_auth_admin_headers, + test_admin_user, test_user, account_data, user1_data, +) +from . import IntegrationApiManagement + + +class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): + + def test_accounts_empty_get( + self, client, initial_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + res = client.get("/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers) + data = res.get_json(force=True) + assert res.status_code == 200 + assert data == [] + + def test_users_empty_get( + self, client, initial_data, # noqa: F811 + test_admin_user, test_user, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + res = client.get("/api/v1/pdnsadmin/users", + headers=basic_auth_admin_headers) + data = res.get_json(force=True) + assert res.status_code == 200 + # Initally contains 2 records + assert len(data) == 2 + for user in data: + assert user["username"] in (test_admin_user, test_user) + + def test_accounts( + self, client, initial_data, # noqa: F811 + account_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + account_name = account_data["name"] + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Create account + res = client.post( + "/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + + updated = account_data.copy() + # Update and check values + for upd_key in ["description", "contact", "mail"]: + upd_value = "upd-{}".format(account_data[upd_key]) + + # Update + data = {"name": account_name, upd_key: upd_value} + res = client.put( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + updated[upd_key] = upd_value + + # Check + data = self.check_account(updated) + + # Update to defaults + res = client.put( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Check account + res = client.get( + "/api/v1/pdnsadmin/accounts/{}".format(account_name), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 200 + assert len(data) == 1 + data = data[0] + account_id = data["id"] + for key, value in account_data.items(): + assert data[key] == value + + # Cleanup (delete account) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Get non-existing account (should fail) + data = self.get_account(account_name, status_code=404) + + # Update non-existing account (should fail) + res = client.put( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Delete non-existing account (should fail) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + def test_users( + self, client, initial_data, # noqa: F811 + user1_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + user1name = user1_data["username"] + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Create user (user1) + res = client.post( + "/api/v1/pdnsadmin/users", + headers=basic_auth_admin_headers, + data=json.dumps(user1_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + assert len(data) == 1 + + # Check user + user1 = self.check_user(user1_data, data[0]) + user1_id = user1["id"] + + updated = user1_data.copy() + # Update and check values + for upd_key in ["firstname", "lastname", "email"]: + upd_value = "upd-{}".format(user1_data[upd_key]) + + # Update + data = {"username": user1name, upd_key: upd_value} + res = client.put( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + updated[upd_key] = upd_value + + # Check + data = self.check_user(updated) + + # Update to defaults + res = client.put( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Check user + self.check_user(user1_data) + + # Cleanup (delete user) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Get non-existing user (should fail) + data = self.get_user(user1name, status_code=404) + + # Update non-existing user (should fail) + res = client.put( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Delete non-existing user (should fail) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + def test_account_users( + self, client, initial_data, # noqa: F811 + test_user, account_data, user1_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + test_user_id = self.get_user(test_user)["id"] + + # Create account + res = client.post( + "/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + + # Create user1 + res = client.post( + "/api/v1/pdnsadmin/users", + headers=basic_auth_admin_headers, + data=json.dumps(user1_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + assert len(data) == 1 + + # Check user + user1 = self.check_user(user1_data, data[0]) + user1_id = user1["id"] + + # Assert test account has no users + res = client.get( + "/api/v1/pdnsadmin/accounts/users/{}".format(account_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 200 + assert data == [] + + # Assert unlinking an unlinked account fails + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Link user to account + res = client.put( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Check user is linked to account + res = client.get( + "/api/v1/pdnsadmin/accounts/users/{}".format(account_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 200 + assert len(data) == 1 + self.check_user(user1_data, data[0]) + + # Unlink user from account + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Check user is unlinked from account + res = client.get( + "/api/v1/pdnsadmin/accounts/users/{}".format(account_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 200 + assert data == [] + + # Unlink unlinked user from account (should fail) + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Cleanup (delete user) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Link non-existing user to account (should fail) + res = client.put( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Unlink non-exiting user from account (should fail) + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Cleanup (delete account) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # List users in non-existing account (should fail) + res = client.get( + "/api/v1/pdnsadmin/accounts/users/{}".format(account_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 + + # Link existing user to non-existing account (should fail) + res = client.put( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, test_user_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 404 diff --git a/tests/integration/api/management/test_user.py b/tests/integration/api/management/test_user.py new file mode 100644 index 0000000..b56652e --- /dev/null +++ b/tests/integration/api/management/test_user.py @@ -0,0 +1,252 @@ + +import json + +from tests.fixtures import ( # noqa: F401 + client, initial_data, basic_auth_admin_headers, basic_auth_user_headers, + test_admin_user, test_user, account_data, user1_data, +) +from . import IntegrationApiManagement + + +class TestIntegrationApiManagementUser(IntegrationApiManagement): + + def test_accounts_empty_get( + self, client, initial_data, # noqa: F811 + basic_auth_user_headers): # noqa: F811 + res = client.get("/api/v1/pdnsadmin/accounts", + headers=basic_auth_user_headers) + assert res.status_code == 401 + + def test_users_empty_get( + self, client, initial_data, # noqa: F811 + test_admin_user, test_user, # noqa: F811 + basic_auth_user_headers): # noqa: F811 + res = client.get("/api/v1/pdnsadmin/users", + headers=basic_auth_user_headers) + assert res.status_code == 401 + + def test_self_get( + self, initial_data, client, test_user, # noqa: F811 + basic_auth_user_headers): # noqa: F811 + self.user = None + res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user), + headers=basic_auth_user_headers) + data = res.get_json(force=True) + assert res.status_code == 200 + assert len(data) == 1, data + self.user = data + + def test_accounts( + self, client, initial_data, # noqa: F811 + account_data, # noqa: F811 + basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Create account (should fail) + res = client.post( + "/api/v1/pdnsadmin/accounts", + headers=basic_auth_user_headers, + data=json.dumps(account_data), + content_type="application/json", + ) + assert res.status_code == 401 + + # Create account (as admin) + res = client.post( + "/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + + # Update to defaults (should fail) + res = client.put( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Delete account (should fail) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Cleanup (delete account as admin) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + def test_users( + self, client, initial_data, # noqa: F811 + user1_data, # noqa: F811 + basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Create user1 (should fail) + res = client.post( + "/api/v1/pdnsadmin/users", + headers=basic_auth_user_headers, + data=json.dumps(user1_data), + content_type="application/json", + ) + assert res.status_code == 401 + + # Create user1 (as admin) + res = client.post( + "/api/v1/pdnsadmin/users", + headers=basic_auth_admin_headers, + data=json.dumps(user1_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + assert len(data) == 1 + + # Check user + user1 = self.check_user(user1_data, data[0]) + user1_id = user1["id"] + + # Update to defaults (should fail) + res = client.put( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Delete user (should fail) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Cleanup (delete user as admin) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + def test_account_users( + self, client, initial_data, # noqa: F811 + account_data, user1_data, # noqa: F811 + basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Create account + res = client.post( + "/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + + # Create user1 + res = client.post( + "/api/v1/pdnsadmin/users", + headers=basic_auth_admin_headers, + data=json.dumps(user1_data), + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 201 + assert len(data) == 1 + + # Check user + user1 = self.check_user(user1_data, data[0]) + user1_id = user1["id"] + + # Assert test account has no users + res = client.get( + "/api/v1/pdnsadmin/accounts/users/{}".format(account_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + data = res.get_json(force=True) + assert res.status_code == 200 + assert data == [] + + # Link user to account (as user, should fail) + res = client.put( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Link user to account (as admin) + res = client.put( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Unlink user from account (as user, should fail) + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_user_headers, + content_type="application/json", + ) + assert res.status_code == 401 + + # Unlink user from account (as admin) + res = client.delete( + "/api/v1/pdnsadmin/accounts/users/{}/{}".format( + account_id, user1_id), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Cleanup (delete user) + res = client.delete( + "/api/v1/pdnsadmin/users/{}".format(user1_id), + data=json.dumps(user1_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204 + + # Cleanup (delete account) + res = client.delete( + "/api/v1/pdnsadmin/accounts/{}".format(account_id), + data=json.dumps(account_data), + headers=basic_auth_admin_headers, + content_type="application/json", + ) + assert res.status_code == 204