Allow user role to view history (#890)

This commit is contained in:
jodygilbert 2021-03-27 18:33:11 +00:00 committed by GitHub
parent 44c4531f02
commit 98db953820
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 177 additions and 46 deletions

View file

@ -1,4 +1,4 @@
FROM alpine:3.12 AS builder
FROM alpine:3.13 AS builder
LABEL maintainer="k@ndk.name"
ARG BUILD_DEPENDENCIES="build-base \
@ -8,7 +8,8 @@ ARG BUILD_DEPENDENCIES="build-base \
openldap-dev \
python3-dev \
xmlsec-dev \
yarn"
yarn \
cargo"
ENV LC_ALL=en_US.UTF-8 \
LANG=en_US.UTF-8 \
@ -68,12 +69,12 @@ RUN mkdir -p /app && \
RUN pip install pip-autoremove && \
pip-autoremove cssmin -y && \
pip-autoremove jsmin -y && \
pip-autoremove pytest -y && \
pip-autoremove pytest -y -L packaging && \
pip uninstall -y pip-autoremove && \
apk del ${BUILD_DEPENDENCIES}
# Build image
FROM alpine:3.12
FROM alpine:3.13
ENV FLASK_APP=/app/powerdnsadmin/__init__.py \
USER=pda

View file

@ -0,0 +1,34 @@
"""Add domain_id to history table
Revision ID: 0d3d93f1c2e0
Revises: 3f76448bb6de
Create Date: 2021-02-15 17:23:05.688241
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '0d3d93f1c2e0'
down_revision = '3f76448bb6de'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('history', schema=None) as batch_op:
batch_op.add_column(sa.Column('domain_id', sa.Integer(), nullable=True))
batch_op.create_foreign_key('fk_domain_id', 'domain', ['domain_id'], ['id'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('history', schema=None) as batch_op:
batch_op.drop_constraint('fk_domain_id', type_='foreignkey')
batch_op.drop_column('domain_id')
# ### end Alembic commands ###

View file

@ -8,7 +8,6 @@ from .models import User, ApiKey, Setting, Domain, Setting
from .lib.errors import RequestIsNotJSON, NotEnoughPrivileges
from .lib.errors import DomainAccessForbidden
def admin_role_required(f):
"""
Grant access if user is in Administrator role
@ -35,6 +34,21 @@ def operator_role_required(f):
return decorated_function
def history_access_required(f):
"""
Grant access if user is in Operator role or higher, or Users can view history
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.role.name not in [
'Administrator', 'Operator'
] and not Setting().get('allow_user_view_history'):
abort(403)
return f(*args, **kwargs)
return decorated_function
def can_access_domain(f):
"""
Grant access if:

View file

@ -13,12 +13,16 @@ class History(db.Model):
detail = db.Column(db.Text())
created_by = db.Column(db.String(128))
created_on = db.Column(db.DateTime, index=True, default=datetime.utcnow)
domain_id = db.Column(db.Integer,
db.ForeignKey('domain.id'),
nullable=True)
def __init__(self, id=None, msg=None, detail=None, created_by=None):
def __init__(self, id=None, msg=None, detail=None, created_by=None, domain_id=None):
self.id = id
self.msg = msg
self.detail = detail
self.created_by = created_by
self.domain_id = domain_id
def __repr__(self):
return '<History {0}>'.format(self.msg)
@ -31,6 +35,7 @@ class History(db.Model):
h.msg = self.msg
h.detail = self.detail
h.created_by = self.created_by
h.domain_id = self.domain_id
db.session.add(h)
db.session.commit()

View file

@ -26,6 +26,7 @@ class Setting(db.Model):
'pretty_ipv6_ptr': False,
'dnssec_admins_only': False,
'allow_user_create_domain': False,
'allow_user_view_history': False,
'bg_domain_updates': False,
'site_name': 'PowerDNS-Admin',
'site_url': 'http://localhost:9191',

View file

@ -6,7 +6,7 @@ from ast import literal_eval
from flask import Blueprint, render_template, make_response, url_for, current_app, request, redirect, jsonify, abort, flash, session
from flask_login import login_required, current_user
from ..decorators import operator_role_required, admin_role_required
from ..decorators import operator_role_required, admin_role_required, history_access_required
from ..models.user import User
from ..models.account import Account
from ..models.account_user import AccountUser
@ -15,10 +15,12 @@ from ..models.server import Server
from ..models.setting import Setting
from ..models.history import History
from ..models.domain import Domain
from ..models.domain_user import DomainUser
from ..models.record import Record
from ..models.domain_template import DomainTemplate
from ..models.domain_template_record import DomainTemplateRecord
from ..models.api_key import ApiKey
from ..models.base import db
from ..lib.schema import ApiPlainKeySchema
@ -579,7 +581,7 @@ def manage_account():
@admin_bp.route('/history', methods=['GET', 'POST'])
@login_required
@operator_role_required
@history_access_required
def history():
if request.method == 'POST':
if current_user.role.name != 'Administrator':
@ -608,7 +610,23 @@ def history():
}), 500)
if request.method == 'GET':
histories = History.query.all()
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.all()
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
# so include history for the domains for the user
histories = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
))
return render_template('admin_history.html', histories=histories)
@ -622,7 +640,7 @@ def setting_basic():
'login_ldap_first', 'default_record_table_size',
'default_domain_table_size', 'auto_ptr', 'record_quick_edit',
'pretty_ipv6_ptr', 'dnssec_admins_only',
'allow_user_create_domain', 'bg_domain_updates', 'site_name',
'allow_user_create_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name',
'session_timeout', 'warn_session_timeout', 'ttl_options',
'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email'
]

View file

@ -206,10 +206,15 @@ def api_login_create_zone():
current_app.logger.debug("Request to powerdns API successful")
data = request.get_json(force=True)
domain = Domain()
domain.update()
domain_id = domain.get_id_by_name(data['name'].rstrip('.'))
history = History(msg='Add domain {0}'.format(
data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain_id)
history.add()
if current_user.role.name not in ['Administrator', 'Operator']:
@ -219,9 +224,6 @@ def api_login_create_zone():
domain.update()
domain.grant_privileges([current_user.id])
domain = Domain()
domain.update()
if resp.status_code == 409:
raise (DomainAlreadyExists)
@ -278,14 +280,18 @@ def api_login_delete_zone(domain_name):
if resp.status_code == 204:
current_app.logger.debug("Request to powerdns API successful")
domain = Domain()
domain_id = domain.get_id_by_name(domain_name)
domain.update()
history = History(msg='Delete domain {0}'.format(
pretty_domain_name(domain_name)),
detail='',
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain_id)
history.add()
domain = Domain()
domain.update()
except Exception as e:
current_app.logger.error('Error: {0}'.format(e))
abort(500)
@ -972,24 +978,27 @@ def api_zone_forward(server_id, zone_id):
status = resp.status_code
if 200 <= status < 300:
current_app.logger.debug("Request to powerdns API successful")
if request.method == 'POST':
if request.method in ['POST', 'PATCH'] :
data = request.get_json(force=True)
for rrset_data in data['rrsets']:
history = History(msg='{0} zone {1} record of {2}'.format(
rrset_data['changetype'].lower(), rrset_data['type'],
rrset_data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.apikey.description)
detail=json.dumps(data),
created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
history.add()
elif request.method == 'DELETE':
history = History(msg='Deleted zone {0}'.format(zone_id),
history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')),
detail='',
created_by=g.apikey.description)
created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
history.add()
elif request.method != 'GET':
history = History(msg='Updated zone {0}'.format(zone_id),
history = History(msg='Updated zone {0}'.format(zone_id.rstrip('.')),
detail='',
created_by=g.apikey.description)
created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
history.add()
return resp.content, resp.status_code, resp.headers.items()
@ -1010,12 +1019,6 @@ def api_create_zone(server_id):
current_app.logger.debug("Request to powerdns API successful")
data = request.get_json(force=True)
history = History(msg='Add domain {0}'.format(
data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.apikey.description)
history.add()
if g.apikey.role.name not in ['Administrator', 'Operator']:
current_app.logger.debug(
"Apikey is user key, assigning created domain")
@ -1025,6 +1028,13 @@ def api_create_zone(server_id):
domain = Domain()
domain.update()
history = History(msg='Add domain {0}'.format(
data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.apikey.description,
domain_id=domain.get_id_by_name(data['name'].rstrip('.')))
history.add()
return resp.content, resp.status_code, resp.headers.items()

View file

@ -151,10 +151,36 @@ def dashboard():
current_app.logger.info('Updating domains in background...')
# Stats for dashboard
domain_count = Domain.query.count()
domain_count = 0
history_number = 0
history = []
user_num = User.query.count()
history_number = History.query.count()
history = History.query.order_by(History.created_on.desc()).limit(4)
if current_user.role.name in ['Administrator', 'Operator']:
domain_count = Domain.query.count()
history_number = History.query.count()
history = History.query.order_by(History.created_on.desc()).limit(4)
elif Setting().get('allow_user_view_history'):
history = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).order_by(History.created_on.desc())
history_number = history.count()
history = history[:4]
domain_count = db.session.query(Domain) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).count()
server = Server(server_id='localhost')
statistics = server.get_statistic()
if statistics:

View file

@ -180,6 +180,7 @@ def add():
domain_master_ips=domain_master_ips,
account_name=account_name)
if result['status'] == 'ok':
domain_id = Domain().get_id_by_name(domain_name)
history = History(msg='Add domain {0}'.format(
pretty_domain_name(domain_name)),
detail=str({
@ -187,7 +188,8 @@ def add():
'domain_master_ips': domain_master_ips,
'account_id': account_id
}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain_id)
history.add()
# grant user access to the domain
@ -228,7 +230,8 @@ def add():
"del_rrests":
result['data'][1]['rrsets']
})),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain_id)
history.add()
else:
history = History(
@ -311,7 +314,8 @@ def setting(domain_name):
msg='Change domain {0} access control'.format(
pretty_domain_name(domain_name)),
detail=str({'user_has_access': new_user_list}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=d.id)
history.add()
return redirect(url_for('domain.setting', domain_name=domain_name))
@ -352,7 +356,8 @@ def change_type(domain_name):
"type": domain_type,
"masters": domain_master_ips
}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=Domain().get_id_by_name(domain_name))
history.add()
return redirect(url_for('domain.setting', domain_name = domain_name))
else:
@ -384,7 +389,8 @@ def change_soa_edit_api(domain_name):
"domain": domain_name,
"soa_edit_api": new_setting
}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=d.get_id_by_name(domain_name))
history.add()
return redirect(url_for('domain.setting', domain_name = domain_name))
else:
@ -452,7 +458,8 @@ def record_apply(domain_name):
"add_rrests": result['data'][0]['rrsets'],
"del_rrests": result['data'][1]['rrsets']
})),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain.id)
history.add()
return make_response(jsonify(result), 200)
else:
@ -584,8 +591,10 @@ def admin_setdomainsetting(domain_name):
if setting.set(new_value):
history = History(
msg='Setting {0} changed value to {1} for {2}'.
format(new_setting, new_value, pretty_domain_name(domain_name)),
created_by=current_user.username)
format(new_setting, new_value,
pretty_domain_name(domain_name)),
created_by=current_user.username,
domain_id=domain.id)
history.add()
return make_response(
jsonify({
@ -604,7 +613,8 @@ def admin_setdomainsetting(domain_name):
msg=
'New setting {0} with value {1} for {2} has been created'
.format(new_setting, new_value, pretty_domain_name(domain_name)),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain.id)
history.add()
return make_response(
jsonify({

View file

@ -771,7 +771,8 @@ def dyndns_update():
msg=
"DynDNS update: attempted update of {0} but record already up-to-date"
.format(hostname),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain.id)
history.add()
else:
oldip = r.data
@ -786,7 +787,8 @@ def dyndns_update():
"old_value": oldip,
"new_value": str(ip)
}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain.id)
history.add()
response = 'good'
else:
@ -825,7 +827,8 @@ def dyndns_update():
"record": hostname,
"value": str(ip)
}),
created_by=current_user.username)
created_by=current_user.username,
domain_id=domain.id)
history.add()
response = 'good'
else:

View file

@ -153,6 +153,11 @@
{% endif %}
</ul>
</li>
{% elif SETTING.get('allow_user_view_history') %}
<li class="header">ADMINISTRATION</li>
<li class="{{ 'active' if active_page == 'admin_history' else '' }}">
<a href="{{ url_for('admin.history') }}"><i class="fa fa-calendar"></i> <span>History</span></a>
</li>
{% endif %}
</ul>
{% endif %}

View file

@ -19,7 +19,7 @@
{% block content %}
<!-- Main content -->
<section class="content">
{% if current_user.role.name in ['Administrator', 'Operator'] %}
{% if current_user.role.name in ['Administrator', 'Operator'] or SETTING.get('allow_user_view_history') %}
<div class="row">
<div class="col-xs-3">
<div class="box">
@ -40,6 +40,7 @@
</div>
</div>
</div>
{% if current_user.role.name in ['Administrator', 'Operator'] %}
<div class="col-lg-6">
<a href="{{ url_for('admin.manage_user') }}">
<div class="small-box bg-green">
@ -53,6 +54,7 @@
</div>
</a>
</div>
{% endif %}
</div>
<div class="row">
<div class="col-lg-6">
@ -68,6 +70,7 @@
</div>
</a>
</div>
{% if current_user.role.name in ['Administrator', 'Operator'] %}
<div class="col-lg-6">
<a href="{{ url_for('admin.pdns_stats') }}">
<div class="small-box bg-green">
@ -81,6 +84,7 @@
</div>
</a>
</div>
{% endif %}
</div>
</div>
</div>