diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index 03dd205..c1997a5 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -7,6 +7,11 @@ Upgrades will be unaffected by this, but please note that reinstalls will requir ([Ubuntu Instructions](https://github.com/PyMySQL/mysqlclient-python#install) or change the start of your database URI with "mysql+pymysql://" instead of "mysql://" [GH-170]. +Features: + +* Supports Two-Factor Authentication on the backend [GH-169] + + ### v1.1.0 - A Hard Day's Night BACKWARDS INCOMPATIBILITIES / NOTES: diff --git a/migrations/versions/e3a72926f808_.py b/migrations/versions/e3a72926f808_.py new file mode 100644 index 0000000..3327d35 --- /dev/null +++ b/migrations/versions/e3a72926f808_.py @@ -0,0 +1,24 @@ +"""Adds table for 2 factor secret and checking if active + +Revision ID: e3a72926f808 +Revises: c3803ac9b7dd +Create Date: 2016-08-10 19:12:49.647496 + +""" + +# revision identifiers, used by Alembic. +revision = 'e3a72926f808' +down_revision = 'c3803ac9b7dd' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('postmaster_admins', sa.Column('otp_active', sa.Boolean(), nullable=True)) + op.add_column('postmaster_admins', sa.Column('otp_secret', sa.String(length=16), nullable=True)) + + +def downgrade(): + op.drop_column('postmaster_admins', 'otp_secret') + op.drop_column('postmaster_admins', 'otp_active') diff --git a/postmaster/apiv1/admins.py b/postmaster/apiv1/admins.py index 9353d0c..5ef2197 100644 --- a/postmaster/apiv1/admins.py +++ b/postmaster/apiv1/admins.py @@ -7,8 +7,10 @@ from flask import request from flask_login import login_required, current_user +import pyqrcode +from StringIO import StringIO from postmaster import db -from postmaster.models import Admins, Configs +from postmaster.models import Admins from postmaster.utils import json_logger, clear_lockout_fields_on_user from postmaster.decorators import json_wrap, paginate from postmaster.errors import ValidationError, GenericError @@ -147,3 +149,115 @@ def unlock_admin(admin_id): admin = Admins.query.get_or_404(admin_id) clear_lockout_fields_on_user(admin.username) return {}, 200 + + +@apiv1.route('/admins//twofactor', methods=['GET']) +@login_required +@json_wrap +def twofactor_status(admin_id): + """ Returns if 2 factor is enabled or not + + This information is in the main user route, + but I added it here as a stub for the URI. + """ + admin = Admins.query.get_or_404(admin_id) + return dict(enabled=admin.otp_active) + + +@apiv1.route('/admins//twofactor', methods=['PUT']) +@login_required +@json_wrap +def twofactor_disable(admin_id): + """ Disable 2 factor using API. + + Enabling 2 factor from this route is not possible. + """ + admin = Admins.query.get_or_404(admin_id) + status = request.get_json(force=True).get('enabled') + if status: + if status.lower() == "false": + admin.otp_active = False + try: + db.session.add(admin) + db.session.commit() + except ValidationError as e: + raise e + except Exception as e: + db.session.rollback() + json_logger( + 'error', current_user.username, + 'The following error occurred in twofactor_disable: {0}'.format(str(e))) + raise GenericError('The administrator could not be updated') + return {}, 200 + elif status.lower() == "true": + raise GenericError("Cannot enable 2 factor from this route - see docs") + raise GenericError("An invalid parameter was supplied") + + +@apiv1.route('/admins//twofactor/qrcode', methods=['GET']) +@login_required +def qrcode(admin_id): + """ Presents the user with a QR code to scan to setup 2 factor authentication + """ + # render qrcode for FreeTOTP + admin = Admins.query.get_or_404(admin_id) + if admin.id != current_user.id: + raise GenericError('You may not view other admin\'s QR codes') + if admin.otp_active: + return ('', 204) + admin.generate_otp_secret() + try: + db.session.add(admin) + db.session.commit() + except ValidationError as e: + raise e + except Exception as e: + db.session.rollback() + json_logger( + 'error', current_user.username, + 'The following error occurred in qrcode: {0}'.format(str(e))) + raise GenericError('The administrator could not be updated') + url = pyqrcode.create(admin.get_totp_uri()) + stream = StringIO() + url.svg(stream, scale=5) + return stream.getvalue().encode('utf-8'), 200, { + 'Content-Type': 'image/svg+xml', + 'Cache-Control': 'no-cache, no-store, must-revalidate', + 'Pragma': 'no-cache', + 'Expires': '0'} + + +@apiv1.route('/admins//twofactor/verify', methods=['POST']) +@login_required +@json_wrap +def verify_qrcode(admin_id): + """ Verifies if the 2 factor token provided is correct + + This will enable 2 factor for a user. + """ + admin = Admins.query.get_or_404(admin_id) + if request.get_json(force=True).get('code'): + if not admin.otp_secret: + raise GenericError("The 2 Factor Secret has not been generated yet") + if admin.verify_totp(request.get_json(force=True).get('code')): + if not admin.otp_active: + audit_message = 'The administrator "{0}" enabled 2 factor'.format( + admin.username) + admin.otp_active = True + try: + db.session.add(admin) + db.session.commit() + json_logger('audit', current_user.username, audit_message) + except ValidationError as e: + raise e + except Exception as e: + db.session.rollback() + json_logger( + 'error', current_user.username, + 'The following error occurred in verify_qrcode: {0}'.format(str(e))) + raise GenericError('The administrator could not be updated') + return {}, 200 + else: + raise GenericError("An invalid code was supplied") + else: + raise ValidationError("The code was not supplied") diff --git a/postmaster/forms.py b/postmaster/forms.py index 3865858..b61b7e0 100644 --- a/postmaster/forms.py +++ b/postmaster/forms.py @@ -5,8 +5,8 @@ """ from flask_wtf import Form -from wtforms import StringField, PasswordField, SelectField -from wtforms.validators import DataRequired +from wtforms import StringField, PasswordField, SelectField, IntegerField +from wtforms.validators import DataRequired, Optional from postmaster import models from postmaster.utils import validate_wtforms_password @@ -16,6 +16,7 @@ class LoginForm(Form): """ username = StringField(label='Username', validators=[DataRequired()]) password = PasswordField(label='Password', validators=[DataRequired(), validate_wtforms_password]) + two_factor = IntegerField(label='Two Factor', validators=(Optional(),)) auth_source = SelectField('PostMaster User', validators=[DataRequired()]) @classmethod diff --git a/postmaster/models.py b/postmaster/models.py index 7bf4079..ac50ce3 100644 --- a/postmaster/models.py +++ b/postmaster/models.py @@ -8,8 +8,10 @@ from postmaster.errors import ValidationError from re import search, match from os import urandom +import base64 from passlib.hash import sha512_crypt as sha512 # pylint: disable=no-name-in-module from hashlib import sha1 +import onetimepass class VirtualDomains(db.Model): @@ -226,6 +228,8 @@ class Admins(db.Model): username = db.Column(db.String(120), unique=True) password = db.Column(db.String(64)) source = db.Column(db.String(64)) + otp_secret = db.Column(db.String(16)) + otp_active = db.Column(db.Boolean, default=False) active = db.Column(db.Boolean, default=True) failed_attempts = db.Column(db.Integer) last_failed_date = db.Column(db.DateTime) @@ -262,7 +266,8 @@ def to_json(self): """ return {'id': self.id, 'name': self.name, 'username': self.username, 'failed_attempts': self.failed_attempts, 'last_failed_date': self.last_failed_date, 'unlock_date': self.unlock_date, - 'locked': (self.unlock_date is not None and self.unlock_date > datetime.utcnow())} + 'locked': (self.unlock_date is not None and self.unlock_date > datetime.utcnow()), + 'two_factor': self.otp_active} def from_json(self, json): if not json.get('username', None): @@ -366,6 +371,17 @@ def set_password(self, new_password): self.password = bcrypt.generate_password_hash(new_password) + def generate_otp_secret(self, **kwargs): + # generate a random secret + self.otp_secret = base64.b32encode(urandom(10)).decode('utf-8') + + def get_totp_uri(self): + return 'otpauth://totp/PostMaster:{0}?secret={1}&issuer=PostMaster' \ + .format(self.username, self.otp_secret) + + def verify_totp(self, token): + return onetimepass.valid_totp(token, self.otp_secret) + class Configs(db.Model): """ Table to store configuration items diff --git a/postmaster/utils.py b/postmaster/utils.py index a69b8e5..b03c404 100644 --- a/postmaster/utils.py +++ b/postmaster/utils.py @@ -256,24 +256,23 @@ def validate_wtforms_password(form, field): """ username = form.username.data password = form.password.data + two_factor = form.two_factor.data if form.two_factor.data else None try: if form.auth_source.data == 'PostMaster User': admin = models.Admins.query.filter_by(username=username, source='local').first() - if admin: - if admin.is_unlocked(): - if bcrypt.check_password_hash(admin.password, password): + if admin.otp_active: + if not admin.verify_totp(two_factor): + raise WtfStopValidation('The two factor authentication code is incorrect') form.admin = admin return else: increment_failed_login(username) - else: raise WtfStopValidation('The user is currently locked out. Please try logging in again later.') - json_logger( 'auth', username, 'The administrator "{0}" entered an incorrect username or password'.format( @@ -281,19 +280,17 @@ def validate_wtforms_password(form, field): raise WtfStopValidation('The username or password was incorrect') else: ad_object = AD() - if ad_object.login(username, password): - if ad_object.check_group_membership(): friendly_username = ad_object.get_loggedin_user() display_name = ad_object.get_loggedin_user_display_name() - if not models.Admins.query.filter_by(username=friendly_username, source='ldap').first(): add_ldap_user_to_db(friendly_username, display_name) - admin = models.Admins.query.filter_by(username=friendly_username, source='ldap').first() + if admin.otp_active: + if not admin.verify_totp(two_factor): + raise WtfStopValidation('The two factor authentication code is incorrect') form.admin = admin - except ADException as e: raise WtfStopValidation(e.message) diff --git a/requirements.in b/requirements.in index 4a2de51..6f0881c 100644 --- a/requirements.in +++ b/requirements.in @@ -8,9 +8,11 @@ Flask-SQLAlchemy Flask-WTF mock mockldap -pymysql +onetimepass passlib pylint +pymysql +pyqrcode pytest pytest-cov python-ldap diff --git a/requirements.txt b/requirements.txt index 3949c7f..71f1a3e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile # To update, run: # -# pip-compile --output-file .\requirements.txt .\requirements.in +# pip-compile --output-file requirements.txt requirements.in # alembic==0.8.7 # via flask-migrate astroid==1.4.8 # via pylint @@ -31,17 +31,19 @@ MarkupSafe==0.23 # via jinja2, mako mccabe==0.5.2 # via pylint mock==2.0.0 mockldap==0.2.8 +onetimepass==1.0.1 passlib==1.6.5 pbr==1.10.0 # via mock py==1.4.31 # via pytest pycparser==2.14 # via cffi pylint==1.6.4 -PyMySQL==0.7.6 +pymysql==0.7.9 +PyQRCode==1.2.1 pytest-cov==2.3.1 pytest==2.9.2 python-editor==1.0.1 # via alembic python-ldap==2.4.27 -six==1.10.0 # via astroid, bcrypt, mock, pylint +six==1.10.0 # via astroid, bcrypt, mock, onetimepass, pylint SQLAlchemy==1.0.14 # via alembic, flask-sqlalchemy Werkzeug==0.11.10 # via flask, flask-wtf wrapt==1.10.8 # via astroid diff --git a/tests/api/test_api_functions.py b/tests/api/test_api_functions.py index c6a5037..c8324f3 100644 --- a/tests/api/test_api_functions.py +++ b/tests/api/test_api_functions.py @@ -2,6 +2,7 @@ import random import json from mock import patch +import onetimepass from datetime import datetime, timedelta from postmaster import app, db from postmaster.models import Configs, Admins @@ -328,6 +329,94 @@ def test_admins_unlock_not_found(self, loggedin_client): rv = loggedin_client.put("/api/v1/admins/50/unlock", follow_redirects=True) assert rv.status_code == 404 + def test_admins_twofactor_qrcode(self, loggedin_client): + rv = loggedin_client.get("/api/v1/admins/1/twofactor/qrcode") + assert rv.status_code == 200 + + def test_admins_twofactor_status(self, loggedin_client): + rv = loggedin_client.get("/api/v1/admins/1/twofactor") + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 200 + + def test_admins_twofactor_update_fail(self, loggedin_client): + rv = loggedin_client.put("/api/v1/admins/1/twofactor") + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 400 + assert "invalid request" in rv.data + + def test_admins_twofactor_enable_fail(self, loggedin_client): + rv = loggedin_client.put("/api/v1/admins/1/twofactor", data=json.dumps({"enabled": "True"})) + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 400 + assert "Cannot enable 2 factor" in rv.data + + def test_admins_twofactor_verify_invalid(self, loggedin_client): + test_admin = Admins().from_json({ + 'username': 'test_admin', + 'password': 'S0meG00dP@ss', + 'name': 'Test Admin' + }) + test_admin.generate_otp_secret() + test_admin.otp_active = 1 + + db.session.add(test_admin) + db.session.commit() + rv = loggedin_client.post("/api/v1/admins/{0}/twofactor/verify".format(test_admin.id), data=json.dumps({"code": 123456})) + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 400 + assert "invalid code" in rv.data + + def test_admins_twofactor_verify_secret_fail(self, loggedin_client): + test_admin = Admins().from_json({ + 'username': 'test_admin', + 'password': 'S0meG00dP@ss', + 'name': 'Test Admin' + }) + + db.session.add(test_admin) + db.session.commit() + rv = loggedin_client.post("/api/v1/admins/{0}/twofactor/verify".format(test_admin.id), data=json.dumps({"code": 123456})) + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 400 + assert "2 Factor Secret" in rv.data + + def test_admins_twofactor_verify_valid(self, loggedin_client): + test_admin = Admins().from_json({ + 'username': 'test_admin', + 'password': 'S0meG00dP@ss', + 'name': 'Test Admin' + }) + test_admin.generate_otp_secret() + test_admin.otp_active = 1 + + db.session.add(test_admin) + db.session.commit() + + secret = test_admin.otp_secret + token = onetimepass.get_totp(secret) + assert test_admin.verify_totp(token) + rv = loggedin_client.post("/api/v1/admins/{0}/twofactor/verify".format(test_admin.id), data=json.dumps({"code": token})) + try: + json.loads(rv.data) + except: + assert False, "Not json" + assert rv.status_code == 200 + def test_configs_get_one(self, loggedin_client): rv = loggedin_client.get("/api/v1/configs/1", follow_redirects=True) try: