Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 15 additions & 76 deletions postmaster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,6 @@
from postmaster.errors import ValidationError


def row2dict(row):
""" Return a dictionary from a sqlalchemy row We use this so
we don't have to deal with sa_instance_state stuff
"""
d = {}
for column in row.__table__.columns:
d[column.name] = str(getattr(row, column.name))
return d


def getAllFromTable(table):
""" Returns the entire table that is specified
"""
# Dynamically get the model to query based on the table variable
if getattr(models, table):
table = (getattr(models, table)).query.all()
listItemsDict = [row2dict(row) for row in table]
return listItemsDict

return None


def getDomain(name):
""" Returns a domain from VirtualDomains
"""
queriedDomain = models.VirtualDomains.query.filter_by(name=name).first()
if queriedDomain:
return row2dict(queriedDomain)
return None


def getUser(email):
""" Returns a user from VirtualUsers
"""
queriedUser = models.VirtualUsers.query.filter_by(email=email).first()

if queriedUser:
return row2dict(queriedUser)
return None


def getAlias(source):
""" Returns alias from VirtualAliases
"""
queriedAlias = models.VirtualAliases.query.filter_by(source=source).first()

if queriedAlias:
return row2dict(queriedAlias)
return None


def maildb_auditing_enabled():
""" Returns a bool based on if mail db auditing is enabled
"""
Expand Down Expand Up @@ -401,24 +350,25 @@ def get_distinguished_name(self, sAMAccountName):
else:
raise ADException('You must be logged into LDAP to search')

def get_nested_group_members(self, group_distinguishedName):
""" Gets the members and nested members of a group by supplying a distinguishedName for the group.
A list with the members will be returned
def check_nested_group_membership(self, group_sAMAccountName, user_sAMAccountName):
""" Checks the nested group membership of a user by supplying the sAMAccountName, and verifies if the user is a
part of that supplied group. A list with the groups the user is a member of will be returned
"""
# Check if the ldap_connection is in a logged in state
if self.ldap_connection.whoami_s():
group_members = list()
group_dn = self.get_distinguished_name(group_sAMAccountName)
user_dn = self.get_distinguished_name(user_sAMAccountName)

if group_distinguishedName:
if group_dn and user_dn:
# Get the base distinguished name based on the domain name
base_dn = 'dc=' + (self.domain.replace('.', ',dc='))
search_filter = '(memberOf:1.2.840.113556.1.4.1941:={0})'.format(group_distinguishedName)
search_filter = '(member:1.2.840.113556.1.4.1941:={0})'.format(user_dn)
search_result = self.ldap_connection.search_s(base_dn, ldap.SCOPE_SUBTREE,
search_filter, ['distinguishedName'])
for member in search_result:
if member[0]:
group_members.append(member[0])
return group_members
for group in search_result:
if group[0] == group_dn:
return True
return False
else:
raise ADException('You must be logged into LDAP to search')

Expand Down Expand Up @@ -472,14 +422,6 @@ def check_group_membership(self):
if self.ldap_connection.whoami_s():
# AD returns the username as DOMAIN\username, so this gets the sAMAccountName
username = sub(r'(^.*(?<=\\))', '', self.ldap_connection.whoami_s())
# Get the distinguished name of the logged in user
user_distinguished_name = self.get_distinguished_name(username)

if not user_distinguished_name:
json_logger(
'error', username,
'The LDAP user "{0}" authenticated but could not be found'.format(username))
raise ADException('There was an error searching LDAP. Please try again.')
# Get the distinguished name of the admin group in the database
group_distinguished_name = self.get_distinguished_name(self.ldap_admin_group)

Expand All @@ -489,10 +431,8 @@ def check_group_membership(self):
'The PostMaster Admin group "{0}" could not be found'.format(self.ldap_admin_group))
raise ADException('There was an error searching LDAP. Please try again.')

group_members = self.get_nested_group_members(group_distinguished_name)
for member in group_members:
if user_distinguished_name == member:
return True
if self.check_nested_group_membership(self.ldap_admin_group, username):
return True

# If the user was not a member of the group, check to see if the admin group is the primary group
# of the user which is not included in memberOf (this is typically Domain Users)
Expand All @@ -502,9 +442,8 @@ def check_group_membership(self):

json_logger(
'auth', username,
'The LDAP user "{0}" authenticated but the login failed \
because they weren\'t a PostMaster administrator'.format(
username))
('The LDAP user "{0}" authenticated but the login failed because they weren\'t '
'a PostMaster administrator').format(username))
raise ADException('The user account is not authorized to login to PostMaster')
else:
raise ADException('You must be logged into LDAP to search')
Expand Down
Loading