Fix the tests

Fix the tests

Fix the tests
This commit is contained in:
Khanh Ngo 2019-12-06 10:32:57 +07:00
parent 840e2a4750
commit 8de6df4d3b
23 changed files with 672 additions and 984 deletions

View file

@ -20,17 +20,17 @@ A PowerDNS web interface with advanced features.
- limited API for manipulating zones and records - limited API for manipulating zones and records
### Running PowerDNS-Admin ### Running PowerDNS-Admin
There are several ways to run PowerDNS-Admin. Following is a simple way to start PowerDNS-Admin with docker in development environment which has PowerDNS-Admin, PowerDNS server and MySQL Back-End Database. There are several ways to run PowerDNS-Admin. Following is a simple way to start PowerDNS-Admin using Docker
Step 1: Changing configuration Step 1: Build docker image
The configuration file for development environment is located at `configs/development.py`, you can override some configs by editing the `.env` file.
Step 2: Build docker images
```$ docker-compose build``` ```$ docker-compose build```
Step 3: Start docker containers Step 2: Change the configuration
Edit the `docker-compose.yml` file to update the database connection string in `SQLALCHEMY_DATABASE_URI`.
Step 3: Start docker container
```$ docker-compose up``` ```$ docker-compose up```
@ -38,11 +38,5 @@ You can now access PowerDNS-Admin at url http://localhost:9191
**NOTE:** For other methods to run PowerDNS-Admin, please take look at WIKI pages. **NOTE:** For other methods to run PowerDNS-Admin, please take look at WIKI pages.
## Build production docker container image
```
$ docker build -t powerdns-admin:latest -f docker/Production/Dockerfile .
```
### Screenshots ### Screenshots
![dashboard](https://user-images.githubusercontent.com/6447444/44068603-0d2d81f6-9fa5-11e8-83af-14e2ad79e370.png) ![dashboard](https://user-images.githubusercontent.com/6447444/44068603-0d2d81f6-9fa5-11e8-83af-14e2ad79e370.png)

View file

@ -14,10 +14,10 @@ SQLA_DB_HOST = '127.0.0.1'
SQLA_DB_NAME = 'pda' SQLA_DB_NAME = 'pda'
SQLALCHEMY_TRACK_MODIFICATIONS = True SQLALCHEMY_TRACK_MODIFICATIONS = True
### DATBASE - MySQL ### DATABASE - MySQL
# SQLALCHEMY_DATABASE_URI = 'mysql://' + SQLA_DB_USER + ':' + SQLA_DB_PASSWORD + '@' + SQLA_DB_HOST + '/' + SQLA_DB_NAME # SQLALCHEMY_DATABASE_URI = 'mysql://' + SQLA_DB_USER + ':' + SQLA_DB_PASSWORD + '@' + SQLA_DB_HOST + '/' + SQLA_DB_NAME
### DATABSE - SQLite ### DATABASE - SQLite
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'pdns.db') SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'pdns.db')
# SAML Authnetication # SAML Authnetication

25
configs/test.py Normal file
View file

@ -0,0 +1,25 @@
import os
basedir = os.path.abspath(os.path.abspath(os.path.dirname(__file__)))
### BASIC APP CONFIG
SALT = '$2b$12$yLUMTIfl21FKJQpTkRQXCu'
SECRET_KEY = 'e951e5a1f4b94151b360f47edf596dd2'
BIND_ADDRESS = '0.0.0.0'
PORT = 9191
HSTS_ENABLED = False
### DATABASE - SQLite
TEST_DB_LOCATION = '/tmp/testing.sqlite'
SQLALCHEMY_DATABASE_URI = 'sqlite:///{0}'.format(TEST_DB_LOCATION)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# SAML Authnetication
SAML_ENABLED = False
# TEST SAMPLE DATA
TEST_USER = 'test'
TEST_USER_PASSWORD = 'test'
TEST_ADMIN_USER = 'admin'
TEST_ADMIN_PASSWORD = 'admin'
TEST_USER_APIKEY = 'wewdsfewrfsfsdf'
TEST_ADMIN_APIKEY = 'nghnbnhtghrtert'

View file

@ -5,28 +5,14 @@ services:
build: build:
context: . context: .
dockerfile: docker-test/Dockerfile dockerfile: docker-test/Dockerfile
args:
- ENVIRONMENT=test
image: powerdns-admin-test image: powerdns-admin-test
env_file:
- ./docker-test/env
container_name: powerdns-admin-test container_name: powerdns-admin-test
mem_limit: 256M
memswap_limit: 256M
ports: ports:
- "9191:9191" - "9191:80"
volumes:
# Code
- .:/powerdns-admin/
- "./configs/test.py:/powerdns-admin/config.py"
- powerdns-admin-assets3:/powerdns-admin/logs
- ./app/static/custom:/powerdns-admin/app/static/custom
logging:
driver: json-file
options:
max-size: 50m
networks: networks:
- default - default
env_file:
- ./docker-test/env
depends_on: depends_on:
- pdns-server - pdns-server
@ -38,6 +24,7 @@ services:
ports: ports:
- "5053:53" - "5053:53"
- "5053:53/udp" - "5053:53/udp"
- "8081:8081"
networks: networks:
- default - default
env_file: env_file:
@ -45,6 +32,3 @@ services:
networks: networks:
default: default:
volumes:
powerdns-admin-assets3:

View file

@ -1,8 +1,3 @@
PDNS_DB_HOST=pdns-mysql
PDNS_DB_NAME=pdns
PDNS_DB_USER=pdns
PDNS_DB_PASSWORD=changeme
PDNS_PROTO=http PDNS_PROTO=http
PDNS_PORT=8081 PDNS_PORT=8081
PDNS_HOST=pdns-server PDNS_HOST=pdns-server

24
docs/running_tests.md Normal file
View file

@ -0,0 +1,24 @@
### Running tests
**NOTE:** Tests will create `__pycache__` folders which will be owned by root, which might be issue during rebuild
thus (e.g. invalid tar headers message) when such situation occurs, you need to remove those folders as root
1. Build images
```
docker-compose -f docker-compose-test.yml build
```
2. Run tests
```
docker-compose -f docker-compose-test.yml up
```
3. To teardown the test environment
```
docker-compose -f docker-compose-test.yml down
docker-compose -f docker-compose-test.yml rm
```

View file

@ -1,20 +0,0 @@
#!/usr/bin/env python3
from app import db
from app.models import Role, DomainTemplate
admin_role = Role(name='Administrator', description='Administrator')
user_role = Role(name='User', description='User')
template_1 = DomainTemplate(name='basic_template_1', description='Basic Template #1')
template_2 = DomainTemplate(name='basic_template_2', description='Basic Template #2')
template_3 = DomainTemplate(name='basic_template_3', description='Basic Template #3')
db.session.add(admin_role)
db.session.add(user_role)
db.session.add(template_1)
db.session.add(template_2)
db.session.add(template_3)
db.session.commit()

View file

@ -1,5 +1,5 @@
import os import os
from werkzeug.contrib.fixers import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from flask import Flask from flask import Flask
from flask_seasurf import SeaSurf from flask_seasurf import SeaSurf

View file

@ -521,6 +521,7 @@ class Domain(db.Model):
current_app.logger.error( current_app.logger.error(
'Cannot revoke user privileges on domain {0}. DETAIL: {1}'. 'Cannot revoke user privileges on domain {0}. DETAIL: {1}'.
format(self.name, e)) format(self.name, e))
current_app.logger.debug(print(traceback.format_exc()))
try: try:
for uid in added_ids: for uid in added_ids:
@ -529,10 +530,10 @@ class Domain(db.Model):
db.session.commit() db.session.commit()
except Exception as e: except Exception as e:
db.session.rollback() db.session.rollback()
print(traceback.format_exc())
current_app.logger.error( current_app.logger.error(
'Cannot grant user privileges to domain {0}. DETAIL: {1}'. 'Cannot grant user privileges to domain {0}. DETAIL: {1}'.
format(self.name, e)) format(self.name, e))
current_app.logger.debug(print(traceback.format_exc()))
def update_from_master(self, domain_name): def update_from_master(self, domain_name):
""" """

View file

@ -132,7 +132,7 @@ def api_login_create_zone():
"User is ordinary user, assigning created domain") "User is ordinary user, assigning created domain")
domain = Domain(name=data['name'].rstrip('.')) domain = Domain(name=data['name'].rstrip('.'))
domain.update() domain.update()
domain.grant_privileges([current_user.username]) domain.grant_privileges([current_user.id])
domain = Domain() domain = Domain()
domain.update() domain.update()
@ -281,7 +281,7 @@ def api_get_apikeys(domain_name):
if current_user.role.name not in ['Administrator', 'Operator']: if current_user.role.name not in ['Administrator', 'Operator']:
if domain_name: if domain_name:
msg = "Check if domain {0} exists and \ msg = "Check if domain {0} exists and \
is allowed for user." .format(domain_name) is allowed for user." .format(domain_name)
current_app.logger.debug(msg) current_app.logger.debug(msg)
apikeys = current_user.get_apikeys(domain_name) apikeys = current_user.get_apikeys(domain_name)
@ -502,7 +502,7 @@ def api_create_zone(server_id):
@api_bp.route('/servers/<string:server_id>/zones', methods=['GET']) @api_bp.route('/servers/<string:server_id>/zones', methods=['GET'])
@apikey_auth @apikey_auth
def api_get_zones(server_id): def api_get_zones(server_id):
if server_id == 'powerdns-admin': if server_id == 'pdnsadmin':
if g.apikey.role.name not in ['Administrator', 'Operator']: if g.apikey.role.name not in ['Administrator', 'Operator']:
domain_obj_list = g.apikey.domains domain_obj_list = g.apikey.domains
else: else:

View file

@ -1,9 +1,9 @@
Flask==1.0.2 Flask==1.1.1
Flask-Assets==0.12 Flask-Assets==0.12
Flask-Login==0.4.1 Flask-Login==0.4.1
Flask-SQLAlchemy==2.3.2 Flask-SQLAlchemy==2.4.1
Flask-Migrate==2.2.1 Flask-Migrate==2.5.2
SQLAlchemy==1.3.5 SQLAlchemy==1.3.11
mysqlclient==1.3.12 mysqlclient==1.3.12
configobj==5.0.6 configobj==5.0.6
bcrypt==3.1.4 bcrypt==3.1.4

View file

@ -1,23 +0,0 @@
[unix_http_server]
file=/tmp/supervisor.sock ; (the path to the socket file)
chown=nobody:nogroup ; socket file uid:gid owner
[supervisord]
logfile=/tmp/supervisord.log ; (main log file;default $CWD/supervisord.log)
logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB)
logfile_backups=10 ; (num of main logfile rotation backups;default 10)
loglevel=info ; (log level;default info; others: debug,warn,trace)
pidfile=/tmp/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
nodaemon=true ; (start in foreground if true;default false)
minfds=1024 ; (min. avail startup file descriptors;default 1024)
minprocs=200 ; (min. avail process descriptors;default 200)
[program:powerdns-admin]
command=/usr/local/bin/gunicorn -t 120 --workers 4 --bind '0.0.0.0:9191' --log-level info app:app
directory=/powerdns-admin
autostart=true
priority=999
user=www-data
redirect_stderr=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0

View file

@ -1,24 +1,26 @@
import pytest
import sys
import flask_migrate
import os import os
import pytest
import flask_migrate
from base64 import b64encode from base64 import b64encode
from unittest import mock
sys.path.append(os.getcwd())
from app.models import Role, User, Setting, ApiKey, Domain from powerdnsadmin import create_app
from app import app, db from powerdnsadmin.models.base import db
from app.blueprints.api import api_blueprint from powerdnsadmin.models.user import User
from app.lib.log import logging from powerdnsadmin.models.setting import Setting
from powerdnsadmin.models.api_key import ApiKey
app = create_app('../configs/test.py')
ctx = app.app_context()
ctx.push()
@pytest.fixture @pytest.fixture
def client(): def client():
app.config['TESTING'] = True app.config['TESTING'] = True
client = app.test_client() client = app.test_client()
yield client yield client
def load_data(setting_name, *args, **kwargs): def load_data(setting_name, *args, **kwargs):
if setting_name == 'maintenance': if setting_name == 'maintenance':
return 0 return 0
@ -35,6 +37,7 @@ def load_data(setting_name, *args, **kwargs):
if setting_name == 'allow_user_create_domain': if setting_name == 'allow_user_create_domain':
return True return True
@pytest.fixture @pytest.fixture
def basic_auth_admin_headers(): def basic_auth_admin_headers():
test_admin_user = app.config.get('TEST_ADMIN_USER') test_admin_user = app.config.get('TEST_ADMIN_USER')
@ -71,40 +74,35 @@ def initial_data():
allow_create_domain_setting = Setting('allow_user_create_domain', True) allow_create_domain_setting = Setting('allow_user_create_domain', True)
try: try:
with app.app_context(): flask_migrate.upgrade()
flask_migrate.upgrade()
db.session.add(api_url_setting) db.session.add(api_url_setting)
db.session.add(api_key_setting) db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting) db.session.add(allow_create_domain_setting)
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_user = app.config.get('TEST_USER') test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_admin_user = app.config.get('TEST_ADMIN_USER') test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
admin_user = User( admin_user = User(username=test_admin_user,
username=test_admin_user, plain_text_password=test_admin_pass,
plain_text_password=test_admin_pass, email="admin@admin.com")
email="admin@admin.com"
)
msg = admin_user.create_local_user() msg = admin_user.create_local_user()
if not msg: if not msg:
raise Exception("Error occurred creating user {0}".format(msg)) raise Exception("Error occurred creating user {0}".format(msg))
ordinary_user = User( ordinary_user = User(username=test_user,
username=test_user, plain_text_password=test_user_pass,
plain_text_password=test_user_pass, email="test@test.com")
email="test@test.com"
)
msg = ordinary_user.create_local_user() msg = ordinary_user.create_local_user()
if not msg: if not msg:
raise Exception("Error occurred creating user {0}".format(msg)) raise Exception("Error occurred creating user {0}".format(msg))
except Exception as e: except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e)) print("Unexpected ERROR: {0}".format(e))
raise e raise e
yield yield
@ -125,8 +123,7 @@ def initial_apikey_data():
allow_create_domain_setting = Setting('allow_user_create_domain', True) allow_create_domain_setting = Setting('allow_user_create_domain', True)
try: try:
with app.app_context(): flask_migrate.upgrade()
flask_migrate.upgrade()
db.session.add(api_url_setting) db.session.add(api_url_setting)
db.session.add(api_key_setting) db.session.add(api_key_setting)
@ -135,35 +132,26 @@ def initial_apikey_data():
test_user_apikey = app.config.get('TEST_USER_APIKEY') test_user_apikey = app.config.get('TEST_USER_APIKEY')
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY') test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
dummy_apikey = ApiKey( dummy_apikey = ApiKey(desc="dummy", role_name="Administrator")
desc="dummy",
role_name="Administrator"
)
admin_key = dummy_apikey.get_hashed_password( admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey plain_text_password=test_admin_apikey).decode('utf-8')
).decode('utf-8')
admin_apikey = ApiKey( admin_apikey = ApiKey(key=admin_key,
key=admin_key, desc="test admin apikey",
desc="test admin apikey", role_name="Administrator")
role_name="Administrator"
)
admin_apikey.create() admin_apikey.create()
user_key = dummy_apikey.get_hashed_password( user_key = dummy_apikey.get_hashed_password(
plain_text_password=test_user_apikey plain_text_password=test_user_apikey).decode('utf-8')
).decode('utf-8')
user_apikey = ApiKey( user_apikey = ApiKey(key=user_key,
key=user_key, desc="test user apikey",
desc="test user apikey", role_name="User")
role_name="User"
)
user_apikey.create() user_apikey.create()
except Exception as e: except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e)) print("Unexpected ERROR: {0}".format(e))
raise e raise e
yield yield
@ -185,47 +173,61 @@ def zone_data():
@pytest.fixture @pytest.fixture
def created_zone_data(): def created_zone_data():
data = { data = {
'url': '/api/v1/servers/localhost/zones/example.org.', 'url':
'soa_edit_api': 'DEFAULT', '/api/v1/servers/localhost/zones/example.org.',
'last_check': 0, 'soa_edit_api':
'masters': [], 'DEFAULT',
'dnssec': False, 'last_check':
'notified_serial': 0, 0,
'nsec3narrow': False, 'masters': [],
'serial': 2019013101, 'dnssec':
'nsec3param': '', False,
'soa_edit': '', 'notified_serial':
'api_rectify': False, 0,
'kind': 'Native', 'nsec3narrow':
'rrsets': [ False,
{ 'serial':
'comments': [], 2019013101,
'type': 'SOA', 'nsec3param':
'name': 'example.org.', '',
'ttl': 3600, 'soa_edit':
'records': [ '',
{ 'api_rectify':
'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600', False,
'disabled': False 'kind':
} 'Native',
] 'rrsets': [{
}, 'comments': [],
{ 'type':
'comments': [], 'SOA',
'type': 'NS', 'name':
'name': 'example.org.', 'example.org.',
'ttl': 3600, 'ttl':
'records': [ 3600,
{ 'records': [{
'content': 'ns1.example.org.', 'content':
'disabled': False 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
} 'disabled': False
] }]
} }, {
], 'comments': [],
'name': 'example.org.', 'type':
'account': '', 'NS',
'id': 'example.org.' 'name':
'example.org.',
'ttl':
3600,
'records': [{
'content': 'ns1.example.org.',
'disabled': False
}]
}],
'name':
'example.org.',
'account':
'',
'id':
'example.org.'
} }
return data return data
@ -233,20 +235,14 @@ def created_zone_data():
def user_apikey_data(): def user_apikey_data():
data = { data = {
"description": "userkey", "description": "userkey",
"domains": [ "domains": ["example.org"],
"example.org"
],
"role": "User" "role": "User"
} }
return data return data
def admin_apikey_data(): def admin_apikey_data():
data = { data = {"description": "masterkey", "domains": [], "role": "Administrator"}
"description": "masterkey",
"domains": [],
"role": "Administrator"
}
return data return data
@ -267,11 +263,9 @@ def admin_apikey_integration():
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def user_apikey(): def user_apikey():
data = user_apikey_data() data = user_apikey_data()
api_key = ApiKey( api_key = ApiKey(desc=data['description'],
desc=data['description'], role_name=data['role'],
role_name=data['role'], domains=[])
domains=[]
)
headers = create_apikey_headers(api_key.plain_key) headers = create_apikey_headers(api_key.plain_key)
return headers return headers
@ -279,18 +273,14 @@ def user_apikey():
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def admin_apikey(): def admin_apikey():
data = admin_apikey_data() data = admin_apikey_data()
api_key = ApiKey( api_key = ApiKey(desc=data['description'],
desc=data['description'], role_name=data['role'],
role_name=data['role'], domains=[])
domains=[]
)
headers = create_apikey_headers(api_key.plain_key) headers = create_apikey_headers(api_key.plain_key)
return headers return headers
def create_apikey_headers(passw): def create_apikey_headers(passw):
user_pass_base64 = b64encode(passw.encode('utf-8')) user_pass_base64 = b64encode(passw.encode('utf-8'))
headers = { headers = {"X-API-KEY": "{0}".format(user_pass_base64.decode('utf-8'))}
"X-API-KEY": "{0}".format(user_pass_base64.decode('utf-8'))
}
return headers return headers

View file

@ -1,61 +1,38 @@
import os
import pytest import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) from powerdnsadmin.lib.validators import validate_apikey
import app from powerdnsadmin.lib.schema import ApiKeySchema
from app.validators import validate_apikey
from app.models import Setting
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data
class TestIntegrationApiApiKeyAdminUser(object): class TestIntegrationApiApiKeyAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers): def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get( res = client.get("/api/v1/pdnsadmin/apikeys",
"/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[ [user_apikey_data(), admin_apikey_data()])
user_apikey_data(), def test_create_apikey(self, client, initial_data, apikey_data, zone_data,
admin_apikey_data() basic_auth_admin_headers):
] res = client.post("/api/v1/pdnsadmin/zones",
) headers=basic_auth_admin_headers,
def test_create_apikey( data=json.dumps(zone_data),
self, content_type="application/json")
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
res = client.post( res = client.post("/api/v1/pdnsadmin/apikeys",
"/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers,
headers=basic_auth_admin_headers, data=json.dumps(apikey_data),
data=json.dumps(apikey_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
validate_apikey(data) validate_apikey(data)
@ -64,69 +41,44 @@ class TestIntegrationApiApiKeyAdminUser(object):
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id']) apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete( res = client.delete(apikey_url, headers=basic_auth_admin_headers)
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[ [user_apikey_data(), admin_apikey_data()])
user_apikey_data(), def test_get_multiple_apikey(self, client, initial_data, apikey_data,
admin_apikey_data() zone_data, basic_auth_admin_headers):
] res = client.post("/api/v1/pdnsadmin/zones",
) headers=basic_auth_admin_headers,
def test_get_multiple_apikey( data=json.dumps(zone_data),
self, content_type="application/json")
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
res = client.post( res = client.post("/api/v1/pdnsadmin/apikeys",
"/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers,
headers=basic_auth_admin_headers, data=json.dumps(apikey_data),
data=json.dumps(apikey_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
validate_apikey(data) validate_apikey(data)
assert res.status_code == 201 assert res.status_code == 201
res = client.get( res = client.get("/api/v1/pdnsadmin/apikeys",
"/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_role = namedtuple( fake_role = namedtuple(
"Role", "Role", data[0]['role'].keys())(*data[0]['role'].values())
data[0]['role'].keys()
)(*data[0]['role'].values())
data[0]['domains'] = [] data[0]['domains'] = []
data[0]['role'] = fake_role data[0]['role'] = fake_role
@ -138,54 +90,33 @@ class TestIntegrationApiApiKeyAdminUser(object):
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(fake_apikey.id) apikey_url = apikey_url_format.format(fake_apikey.id)
res = client.delete( res = client.delete(apikey_url, headers=basic_auth_admin_headers)
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[ [user_apikey_data(), admin_apikey_data()])
user_apikey_data(), def test_delete_apikey(self, client, initial_data, apikey_data, zone_data,
admin_apikey_data() basic_auth_admin_headers):
] res = client.post("/api/v1/pdnsadmin/zones",
) headers=basic_auth_admin_headers,
def test_delete_apikey( data=json.dumps(zone_data),
self, content_type="application/json")
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
res = client.post( res = client.post("/api/v1/pdnsadmin/apikeys",
"/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers,
headers=basic_auth_admin_headers, data=json.dumps(apikey_data),
data=json.dumps(apikey_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
validate_apikey(data) validate_apikey(data)
@ -193,18 +124,12 @@ class TestIntegrationApiApiKeyAdminUser(object):
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id']) apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete( res = client.delete(apikey_url, headers=basic_auth_admin_headers)
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,42 +1,27 @@
import os
import pytest import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
sys.path.append(os.getcwd())
import app from powerdnsadmin.lib.validators import validate_zone
from app.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object): class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers): def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers)
headers=basic_auth_user_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -45,36 +30,24 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_get_multiple_zones( def test_get_multiple_zones(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers)
headers=basic_auth_user_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
@ -84,26 +57,16 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_delete_zone( def test_delete_zone(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -112,9 +75,6 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,43 +1,27 @@
import os
import pytest import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) from powerdnsadmin.lib.validators import validate_zone
import app from powerdnsadmin.lib.schema import DomainSchema
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import zone_data from tests.fixtures import zone_data
class TestIntegrationApiZoneAdminUser(object): class TestIntegrationApiZoneAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers): def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, client, initial_data, zone_data,
self, basic_auth_admin_headers):
client, res = client.post("/api/v1/pdnsadmin/zones",
initial_data, headers=basic_auth_admin_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_admin_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -46,36 +30,24 @@ class TestIntegrationApiZoneAdminUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_get_multiple_zones( def test_get_multiple_zones(self, client, initial_data, zone_data,
self, basic_auth_admin_headers):
client, res = client.post("/api/v1/pdnsadmin/zones",
initial_data, headers=basic_auth_admin_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_admin_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
@ -85,26 +57,16 @@ class TestIntegrationApiZoneAdminUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_delete_zone( def test_delete_zone(self, client, initial_data, zone_data,
self, basic_auth_admin_headers):
client, res = client.post("/api/v1/pdnsadmin/zones",
initial_data, headers=basic_auth_admin_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_admin_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -113,9 +75,6 @@ class TestIntegrationApiZoneAdminUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,44 +1,28 @@
import os
import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) from powerdnsadmin.lib.validators import validate_zone
import app from powerdnsadmin.lib.schema import DomainSchema
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import admin_apikey_integration from tests.fixtures import admin_apikey_integration
class TestIntegrationApiZoneAdminApiKey(object): class TestIntegrationApiZoneAdminApiKey(object):
def test_empty_get(self, client, initial_apikey_data,
def test_empty_get(self, client, initial_apikey_data, admin_apikey_integration): admin_apikey_integration):
res = client.get( res = client.get("/api/v1/servers/localhost/zones",
"/api/v1/servers/localhost/zones", headers=admin_apikey_integration)
headers=admin_apikey_integration
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, client, initial_apikey_data, zone_data,
self, admin_apikey_integration):
client, res = client.post("/api/v1/servers/localhost/zones",
initial_apikey_data, headers=admin_apikey_integration,
zone_data, data=json.dumps(zone_data),
admin_apikey_integration content_type="application/json")
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -47,36 +31,24 @@ class TestIntegrationApiZoneAdminApiKey(object):
zone_url_format = "/api/v1/servers/localhost/zones/{0}" zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=admin_apikey_integration)
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204 assert res.status_code == 204
def test_get_multiple_zones( def test_get_multiple_zones(self, client, initial_apikey_data, zone_data,
self, admin_apikey_integration):
client, res = client.post("/api/v1/servers/localhost/zones",
initial_apikey_data, headers=admin_apikey_integration,
zone_data, data=json.dumps(zone_data),
admin_apikey_integration content_type="application/json")
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
res = client.get( res = client.get("/api/v1/servers/localhost/zones",
"/api/v1/servers/localhost/zones", headers=admin_apikey_integration)
headers=admin_apikey_integration
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
@ -86,26 +58,16 @@ class TestIntegrationApiZoneAdminApiKey(object):
zone_url_format = "/api/v1/servers/localhost/zones/{0}" zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=admin_apikey_integration)
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204 assert res.status_code == 204
def test_delete_zone( def test_delete_zone(self, client, initial_apikey_data, zone_data,
self, admin_apikey_integration):
client, res = client.post("/api/v1/servers/localhost/zones",
initial_apikey_data, headers=admin_apikey_integration,
zone_data, data=json.dumps(zone_data),
admin_apikey_integration content_type="application/json")
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -114,9 +76,6 @@ class TestIntegrationApiZoneAdminApiKey(object):
zone_url_format = "/api/v1/servers/localhost/zones/{0}" zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=admin_apikey_integration)
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,14 +1,8 @@
import os
import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
sys.path.append(os.getcwd())
import app from powerdnsadmin.lib.validators import validate_zone
from app.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import user_apikey_integration from tests.fixtures import user_apikey_integration

View file

@ -1,42 +1,26 @@
import os
import pytest
import sys
import json import json
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
sys.path.append(os.getcwd())
import app from powerdnsadmin.lib.validators import validate_zone
from app.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object): class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers): def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers)
headers=basic_auth_user_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -45,36 +29,24 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_get_multiple_zones( def test_get_multiple_zones(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers)
headers=basic_auth_user_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
@ -84,26 +56,16 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204
def test_delete_zone( def test_delete_zone(self, initial_data, client, zone_data,
self, basic_auth_user_headers):
initial_data, res = client.post("/api/v1/pdnsadmin/zones",
client, headers=basic_auth_user_headers,
zone_data, data=json.dumps(zone_data),
basic_auth_user_headers content_type="application/json")
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
@ -112,9 +74,6 @@ class TestIntegrationApiZoneUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,164 +1,148 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json import json
from base64 import b64encode import pytest
from unittest.mock import patch
from collections import namedtuple from collections import namedtuple
import logging as logger import sys
import os
sys.path.append(os.getcwd()) sys.path.append(os.getcwd())
import app
from app.validators import validate_zone import powerdnsadmin
from app.models import Setting, Domain, ApiKey, Role from powerdnsadmin.models.setting import Setting
from app.schema import DomainSchema, ApiKeySchema from powerdnsadmin.models.domain import Domain
from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, admin_apikey, zone_data from tests.fixtures import user_apikey, admin_apikey, zone_data
from tests.fixtures import admin_apikey_data, load_data from tests.fixtures import admin_apikey_data, load_data
class TestUnitApiZoneAdminApiKey(object): class TestUnitApiZoneAdminApiKey(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self):
self.oauth_setting_patcher = patch( self.google_setting_patcher = patch(
'app.oauth.Setting', 'powerdnsadmin.services.google.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.github_setting_patcher = patch(
self.views_setting_patcher = patch( 'powerdnsadmin.services.github.Setting',
'app.views.Setting', spec=powerdnsadmin.models.setting.Setting)
spec=app.models.Setting self.oidc_setting_patcher = patch(
) 'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'app.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'app.models.Setting', 'powerdnsadmin.models.setting.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch( self.mock_apikey_patcher = patch(
'app.decorators.ApiKey', 'powerdnsadmin.decorators.ApiKey',
spec=app.models.ApiKey spec=powerdnsadmin.models.api_key.ApiKey)
)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'app.blueprints.api.History', 'powerdnsadmin.routes.api.History',
spec=app.models.History spec=powerdnsadmin.models.history.History)
)
data = admin_apikey_data() data = admin_apikey_data()
api_key = ApiKey( api_key = ApiKey(desc=data['description'],
desc=data['description'], role_name=data['role'],
role_name=data['role'], domains=[])
domains=[]
)
api_key.role = Role(name=data['role']) api_key.role = Role(name=data['role'])
self.mock_oauth_setting = self.oauth_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start() self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key self.mock_apikey.return_value.is_validate.return_value = api_key
def test_empty_get( def test_empty_get(self, client, common_data_mock, admin_apikey):
self, with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
client, patch('powerdnsadmin.lib.utils.requests.get') as mock_get:
common_data_mock,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain, \
patch('app.lib.utils.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = [] mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = [] mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = [] mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200 mock_get.return_value.status_code = 200
res = client.get( res = client.get("/api/v1/servers/pdnsadmin/zones",
"/api/v1/servers/localhost/zones", headers=admin_apikey)
headers=admin_apikey
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, client, common_data_mock, zone_data,
self, admin_apikey, created_zone_data):
client, with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
admin_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201 mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {} mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
res = client.post( res = client.post("/api/v1/servers/localhost/zones",
"/api/v1/servers/localhost/zones", headers=admin_apikey,
headers=admin_apikey, data=json.dumps(zone_data),
data=json.dumps(zone_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
def test_get_multiple_zones( def test_get_multiple_zones(self, client, common_data_mock, zone_data,
self, admin_apikey):
client, with patch('powerdnsadmin.routes.api.Domain') as mock_domain:
common_data_mock,
zone_data,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain] mock_domain.query.all.return_value = [test_domain]
res = client.get( res = client.get("/api/v1/servers/pdnsadmin/zones",
"/api/v1/servers/localhost/zones", headers=admin_apikey)
headers=admin_apikey
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple( fake_domain = namedtuple("Domain",
"Domain", data[0].keys())(*data[0].values())
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain])) json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200 assert res.status_code == 200
def test_delete_zone( def test_delete_zone(self, client, common_data_mock, zone_data,
self, admin_apikey):
client, with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
admin_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204 mock_delete.return_value.status_code = 204
mock_delete.return_value.content = '' mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}" zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=admin_apikey)
zone_url,
headers=admin_apikey
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,157 +1,151 @@
import os import json
import pytest import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) import powerdnsadmin
import app from powerdnsadmin.models.user import User
from app.validators import validate_zone from powerdnsadmin.models.role import Role
from app.models import User, Domain, Role from powerdnsadmin.models.domain import Domain
from app.schema import DomainSchema from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_admin_headers from tests.fixtures import client, basic_auth_admin_headers
from tests.fixtures import zone_data, created_zone_data, load_data from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneAdminUser(object): class TestUnitApiZoneAdminUser(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self):
self.oauth_setting_patcher = patch( self.google_setting_patcher = patch(
'app.oauth.Setting', 'powerdnsadmin.services.google.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch( self.api_setting_patcher = patch(
'app.blueprints.api.Setting', 'powerdnsadmin.routes.api.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.base_route_user_patcher = patch(
self.views_setting_patcher = patch( 'powerdnsadmin.routes.base.User',
'app.views.Setting', spec=powerdnsadmin.models.user.User)
spec=app.models.Setting
)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'app.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'app.models.Setting', 'powerdnsadmin.models.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch( self.decorators_setting_patcher = patch(
'app.decorators.Setting' 'powerdnsadmin.decorators.Setting',
) spec=powerdnsadmin.models.setting.Setting)
self.mock_user_patcher = patch( self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
'app.decorators.User' spec=powerdnsadmin.models.user.User)
)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'app.blueprints.api.History', 'powerdnsadmin.routes.api.History',
spec=app.models.History spec=powerdnsadmin.models.history.History)
)
self.mock_oauth_setting = self.oauth_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.decorators_setting = self.decorators_setting_patcher.start() self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start() self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start() self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data self.api_setting.return_value.get.side_effect = load_data
mockk = MagicMock()
mockk.role.name = "Administrator"
self.mock_user.query.filter.return_value.first.return_value = mockk
self.mock_user.return_value.is_validate.return_value = True
def test_empty_get( self.mockk = MagicMock()
self, self.mockk.role.name = "Administrator"
client,
common_data_mock, self.mock_user.query.filter.return_value.first.return_value = self.mockk
basic_auth_admin_headers self.mock_user.return_value.is_validate.return_value = True
): self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
with patch('app.blueprints.api.Domain') as mock_domain, \ self.mock_base_route_user.return_value.is_validate.return_value = True
patch('app.lib.utils.requests.get') as mock_get:
def test_empty_get(self, client, common_data_mock,
basic_auth_admin_headers):
with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
patch('powerdnsadmin.lib.helper.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = [] mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = [] mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = [] mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200 mock_get.return_value.status_code = 200
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert data == [] assert data == []
def test_create_zone( def test_create_zone(self, client, common_data_mock, zone_data,
self, basic_auth_admin_headers, created_zone_data):
client, with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
basic_auth_admin_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201 mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {} mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
res = client.post( res = client.post("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers,
headers=basic_auth_admin_headers, data=json.dumps(zone_data),
data=json.dumps(zone_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
def test_get_multiple_zones( def test_get_multiple_zones(self, client, common_data_mock, zone_data,
self, basic_auth_admin_headers):
client, with patch('powerdnsadmin.routes.api.Domain') as mock_domain:
common_data_mock,
zone_data,
basic_auth_admin_headers
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain] mock_domain.query.all.return_value = [test_domain]
res = client.get( res = client.get("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers)
headers=basic_auth_admin_headers
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple( fake_domain = namedtuple("Domain",
"Domain", data[0].keys())(*data[0].values())
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain])) json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200 assert res.status_code == 200
def test_delete_zone( def test_delete_zone(self, client, common_data_mock, zone_data,
self, basic_auth_admin_headers):
client, with patch('powerdnsadmin.lib.helper.requests.request') as mock_delete, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
basic_auth_admin_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
mock_domain.query.filter.return_value = True mock_domain.query.filter.return_value = True
mock_delete.return_value.status_code = 204 mock_delete.return_value.status_code = 204
@ -159,9 +153,6 @@ class TestUnitApiZoneAdminUser(object):
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_admin_headers)
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,146 +1,144 @@
import os import json
import pytest import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) import powerdnsadmin
import app from powerdnsadmin.models.user import User
from app.validators import validate_zone from powerdnsadmin.models.role import Role
from app.models import User, Domain, Role from powerdnsadmin.models.domain import Domain
from app.schema import DomainSchema from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_user_headers from tests.fixtures import client, basic_auth_user_headers
from tests.fixtures import zone_data, created_zone_data, load_data from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneUser(object): class TestUnitApiZoneUser(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self):
self.oauth_setting_patcher = patch( self.google_setting_patcher = patch(
'app.oauth.Setting', 'powerdnsadmin.services.google.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch( self.api_setting_patcher = patch(
'app.blueprints.api.Setting', 'powerdnsadmin.routes.api.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.base_route_user_patcher = patch(
self.views_setting_patcher = patch( 'powerdnsadmin.routes.base.User',
'app.views.Setting', spec=powerdnsadmin.models.user.User)
spec=app.models.Setting
)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'app.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'app.models.Setting', 'powerdnsadmin.models.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch( self.decorators_setting_patcher = patch(
'app.decorators.Setting' 'powerdnsadmin.decorators.Setting',
) spec=powerdnsadmin.models.setting.Setting)
self.mock_user_patcher = patch( self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
'app.decorators.User' spec=powerdnsadmin.models.user.User)
)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'app.blueprints.api.History', 'powerdnsadmin.routes.api.History',
spec=app.models.History spec=powerdnsadmin.models.history.History)
)
self.mock_oauth_setting = self.oauth_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.decorators_setting = self.decorators_setting_patcher.start() self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start() self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start() self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data self.api_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock() self.mockk = MagicMock()
self.mockk.role.name = "User" self.mockk.role.name = "User"
self.mock_user.query.filter.return_value.first.return_value = self.mockk self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
def test_create_zone( def test_create_zone(self, client, common_data_mock, zone_data,
self, basic_auth_user_headers, created_zone_data):
client, with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
basic_auth_user_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201 mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {} mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
res = client.post( res = client.post("/api/v1/pdnsadmin/zones",
"/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers,
headers=basic_auth_user_headers, data=json.dumps(zone_data),
data=json.dumps(zone_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
def test_get_multiple_zones( def test_get_multiple_zones(self, client, common_data_mock, zone_data,
self, basic_auth_user_headers):
client, with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
common_data_mock,
zone_data,
basic_auth_user_headers
):
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain]
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain] mock_user_domains.return_value = [test_domain]
res = client.get("/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(self, client, common_data_mock, zone_data,
basic_auth_user_headers):
with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.return_value.update.return_value = True
mock_user_domains.return_value = [test_domain]
mock_delete.return_value.status_code = 204 mock_delete.return_value.status_code = 204
mock_delete.return_value.content = '' mock_delete.return_value.content = ''
zone_url_format = "/api/v1/pdnsadmin/zones/{0}" zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=basic_auth_user_headers)
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204 assert res.status_code == 204

View file

@ -1,145 +1,134 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json import json
import pytest
from unittest.mock import patch
from base64 import b64encode from base64 import b64encode
from collections import namedtuple from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd()) import powerdnsadmin
import app from powerdnsadmin.models.setting import Setting
from app.validators import validate_zone from powerdnsadmin.models.domain import Domain
from app.models import Setting, Domain, ApiKey, Role from powerdnsadmin.models.api_key import ApiKey
from app.schema import DomainSchema, ApiKeySchema from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, zone_data from tests.fixtures import user_apikey, zone_data
from tests.fixtures import user_apikey_data, load_data from tests.fixtures import user_apikey_data, load_data
class TestUnitApiZoneUserApiKey(object): class TestUnitApiZoneUserApiKey(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self):
self.oauth_setting_patcher = patch( self.google_setting_patcher = patch(
'app.oauth.Setting', 'powerdnsadmin.services.google.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.github_setting_patcher = patch(
self.views_setting_patcher = patch( 'powerdnsadmin.services.github.Setting',
'app.views.Setting', spec=powerdnsadmin.models.setting.Setting)
spec=app.models.Setting self.oidc_setting_patcher = patch(
) 'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'app.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'app.models.Setting', 'powerdnsadmin.models.setting.Setting',
spec=app.models.Setting spec=powerdnsadmin.models.setting.Setting)
) self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch( self.mock_apikey_patcher = patch(
'app.decorators.ApiKey', 'powerdnsadmin.decorators.ApiKey',
spec=app.models.ApiKey spec=powerdnsadmin.models.api_key.ApiKey)
)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'app.blueprints.api.History', 'powerdnsadmin.routes.api.History',
spec=app.models.History spec=powerdnsadmin.models.history.History)
)
self.mock_oauth_setting = self.oauth_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start() self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
data = user_apikey_data() data = user_apikey_data()
domain = Domain(name=data['domains'][0]) domain = Domain(name=data['domains'][0])
api_key = ApiKey( api_key = ApiKey(desc=data['description'],
desc=data['description'], role_name=data['role'],
role_name=data['role'], domains=[domain])
domains=[domain]
)
api_key.role = Role(name=data['role']) api_key.role = Role(name=data['role'])
self.mock_apikey.return_value.is_validate.return_value = api_key self.mock_apikey.return_value.is_validate.return_value = api_key
def test_create_zone( def test_create_zone(self, client, common_data_mock, zone_data,
self, user_apikey, created_zone_data):
client, with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
user_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201 mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {} mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
res = client.post( res = client.post("/api/v1/servers/localhost/zones",
"/api/v1/servers/localhost/zones", headers=user_apikey,
headers=user_apikey, data=json.dumps(zone_data),
data=json.dumps(zone_data), content_type="application/json")
content_type="application/json"
)
data = res.get_json(force=True) data = res.get_json(force=True)
data['rrsets'] = [] data['rrsets'] = []
validate_zone(data) validate_zone(data)
assert res.status_code == 201 assert res.status_code == 201
def test_get_multiple_zones( def test_get_multiple_zones(self, client, common_data_mock, zone_data,
self, user_apikey):
client, with patch('powerdnsadmin.routes.api.Domain') as mock_domain:
common_data_mock,
zone_data,
user_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain] mock_domain.query.all.return_value = [test_domain]
res = client.get( res = client.get("/api/v1/servers/pdnsadmin/zones",
"/api/v1/servers/localhost/zones", headers=user_apikey)
headers=user_apikey
)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple( fake_domain = namedtuple("Domain",
"Domain", data[0].keys())(*data[0].values())
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain])) json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200 assert res.status_code == 200
def test_delete_zone( def test_delete_zone(self, client, common_data_mock, zone_data,
self, user_apikey):
client, with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \
common_data_mock, patch('powerdnsadmin.routes.api.Domain') as mock_domain:
zone_data,
user_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204 mock_delete.return_value.status_code = 204
mock_delete.return_value.content = '' mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}" zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip(".")) zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete( res = client.delete(zone_url, headers=user_apikey)
zone_url,
headers=user_apikey
)
assert res.status_code == 204 assert res.status_code == 204