flectra/addons/auth_ldap/models/res_company_ldap.py
2018-01-16 02:34:37 -08:00

204 lines
7.4 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details.
import ldap
import logging
from ldap.filter import filter_format
from flectra import api, fields, models, tools
from flectra.tools.pycompat import to_native
_logger = logging.getLogger(__name__)
class CompanyLDAP(models.Model):
_name = 'res.company.ldap'
_order = 'sequence'
_rec_name = 'ldap_server'
sequence = fields.Integer(default=10)
company = fields.Many2one('res.company', string='Company', required=True, ondelete='cascade')
ldap_server = fields.Char(string='LDAP Server address', required=True, default='127.0.0.1')
ldap_server_port = fields.Integer(string='LDAP Server port', required=True, default=389)
ldap_binddn = fields.Char('LDAP binddn',
help="The user account on the LDAP server that is used to query the directory. "
"Leave empty to connect anonymously.")
ldap_password = fields.Char(string='LDAP password',
help="The password of the user account on the LDAP server that is used to query the directory.")
ldap_filter = fields.Char(string='LDAP filter', required=True)
ldap_base = fields.Char(string='LDAP base', required=True)
user = fields.Many2one('res.users', string='Template User',
help="User to copy when creating new users")
create_user = fields.Boolean(default=True,
help="Automatically create local user accounts for new users authenticating via LDAP")
ldap_tls = fields.Boolean(string='Use TLS',
help="Request secure TLS/SSL encryption when connecting to the LDAP server. "
"This option requires a server with STARTTLS enabled, "
"otherwise all authentication attempts will fail.")
@api.multi
def get_ldap_dicts(self):
"""
Retrieve res_company_ldap resources from the database in dictionary
format.
:return: ldap configurations
:rtype: list of dictionaries
"""
ldaps = self.sudo().search([('ldap_server', '!=', False)], order='sequence')
res = ldaps.read([
'id',
'company',
'ldap_server',
'ldap_server_port',
'ldap_binddn',
'ldap_password',
'ldap_filter',
'ldap_base',
'user',
'create_user',
'ldap_tls'
])
return res
def connect(self, conf):
"""
Connect to an LDAP server specified by an ldap
configuration dictionary.
:param dict conf: LDAP configuration
:return: an LDAP object
"""
uri = 'ldap://%s:%d' % (conf['ldap_server'], conf['ldap_server_port'])
connection = ldap.initialize(uri)
if conf['ldap_tls']:
connection.start_tls_s()
return connection
def authenticate(self, conf, login, password):
"""
Authenticate a user against the specified LDAP server.
In order to prevent an unintended 'unauthenticated authentication',
which is an anonymous bind with a valid dn and a blank password,
check for empty passwords explicitely (:rfc:`4513#section-6.3.1`)
:param dict conf: LDAP configuration
:param login: username
:param password: Password for the LDAP user
:return: LDAP entry of authenticated user or False
:rtype: dictionary of attributes
"""
if not password:
return False
entry = False
try:
filter = filter_format(conf['ldap_filter'], (login,))
except TypeError:
_logger.warning('Could not format LDAP filter. Your filter should contain one \'%s\'.')
return False
try:
results = self.query(conf, tools.ustr(filter))
# Get rid of (None, attrs) for searchResultReference replies
results = [i for i in results if i[0]]
if len(results) == 1:
dn = results[0][0]
conn = self.connect(conf)
conn.simple_bind_s(dn, to_native(password))
conn.unbind()
entry = results[0]
except ldap.INVALID_CREDENTIALS:
return False
except ldap.LDAPError as e:
_logger.error('An LDAP exception occurred: %s', e)
return entry
def query(self, conf, filter, retrieve_attributes=None):
"""
Query an LDAP server with the filter argument and scope subtree.
Allow for all authentication methods of the simple authentication
method:
- authenticated bind (non-empty binddn + valid password)
- anonymous bind (empty binddn + empty password)
- unauthenticated authentication (non-empty binddn + empty password)
.. seealso::
:rfc:`4513#section-5.1` - LDAP: Simple Authentication Method.
:param dict conf: LDAP configuration
:param filter: valid LDAP filter
:param list retrieve_attributes: LDAP attributes to be retrieved. \
If not specified, return all attributes.
:return: ldap entries
:rtype: list of tuples (dn, attrs)
"""
results = []
try:
conn = self.connect(conf)
ldap_password = conf['ldap_password'] or ''
ldap_binddn = conf['ldap_binddn'] or ''
conn.simple_bind_s(to_native(ldap_binddn), to_native(ldap_password))
results = conn.search_st(to_native(conf['ldap_base']), ldap.SCOPE_SUBTREE, filter, retrieve_attributes, timeout=60)
conn.unbind()
except ldap.INVALID_CREDENTIALS:
_logger.error('LDAP bind failed.')
except ldap.LDAPError as e:
_logger.error('An LDAP exception occurred: %s', e)
return results
def map_ldap_attributes(self, conf, login, ldap_entry):
"""
Compose values for a new resource of model res_users,
based upon the retrieved ldap entry and the LDAP settings.
:param dict conf: LDAP configuration
:param login: the new user's login
:param tuple ldap_entry: single LDAP result (dn, attrs)
:return: parameters for a new resource of model res_users
:rtype: dict
"""
return {
'name': ldap_entry[1]['cn'][0],
'login': login,
'company_id': conf['company'][0]
}
@api.model
def get_or_create_user(self, conf, login, ldap_entry):
"""
Retrieve an active resource of model res_users with the specified
login. Create the user if it is not initially found.
:param dict conf: LDAP configuration
:param login: the user's login
:param tuple ldap_entry: single LDAP result (dn, attrs)
:return: res_users id
:rtype: int
"""
user_id = False
login = tools.ustr(login.lower().strip())
self.env.cr.execute("SELECT id, active FROM res_users WHERE lower(login)=%s", (login,))
res = self.env.cr.fetchone()
if res:
if res[1]:
user_id = res[0]
elif conf['create_user']:
_logger.debug("Creating new Flectra user \"%s\" from LDAP" % login)
values = self.map_ldap_attributes(conf, login, ldap_entry)
SudoUser = self.env['res.users'].sudo()
if conf['user']:
values['active'] = True
user_id = SudoUser.browse(conf['user'][0]).copy(default=values).id
else:
user_id = SudoUser.create(values).id
return user_id