|
- # -*- coding: utf-8 -*-
- import json
- from flask import current_app
- from six import add_metaclass
- from ldap3.utils.dn import safe_dn
- from ldap3.utils.conv import check_json_dict, format_json
- from ldap3.core.exceptions import LDAPAttributeError
-
- from .query import BaseQuery
- from .attribute import LDAPAttribute, LdapField
-
-
- __all__ = ('LDAPEntry',)
-
-
- class LDAPEntryMeta(type):
-
- def __init__(cls, name, bases, attr):
- cls._fields = {}
- for key, value in attr.items():
- if isinstance(value, LdapField):
- cls._fields[key] = value
-
- for base in bases:
- if isinstance(base, LDAPEntryMeta):
- cls._fields.update(base._fields)
- # Deduplicate object classes
- cls.object_classes = list(
- set(cls.object_classes + base.object_classes))
-
- @property
- def query(cls):
- return BaseQuery(cls)
-
-
- @add_metaclass(LDAPEntryMeta)
- class LDAPEntry(object):
-
- base_dn = None
- entry_rdn = ['cn']
- object_classes = ['top']
- sub_tree = True
- operational_attributes = False
- _changetype = 'add'
-
- def __init__(self, dn=None, changetype='add', **kwargs):
- self._attributes = {}
- self._dn = dn
- self._changetype = changetype
- if kwargs:
- for key, value in kwargs.items():
- self._store_attr(key, value, init=True)
- for key, ldap_attr in self._fields.items():
- if not self._isstored(key):
- self._store_attr(key, [])
-
- @property
- def dn(self):
- if self._dn is None:
- self.generate_dn_from_entry()
- return self._dn
-
- def generate_dn_from_entry(self):
- rdn_list = list()
- for key, attr in self._attributes.items():
- if attr.name in self.entry_rdn:
- if len(self._attributes[key]) == 1:
- rdn = '{attr}={value}'.format(
- attr=attr.name,
- value=self._attributes[key].value
- )
- rdn_list.append(rdn)
- dn = '{rdn},{base_dn}'.format(rdn='+'.join(rdn_list),
- base_dn=self.base_dn)
- self._dn = safe_dn(dn)
-
- @classmethod
- def _get_field(cls, attr):
- return cls._fields.get(attr)
-
- @classmethod
- def _get_field_name(cls, attr):
- if cls._get_field(attr):
- return cls._get_field(attr).name
-
- def _store_attr(self, attr, value=[], init=False):
- if not self._get_field(attr):
- raise LDAPAttributeError('attribute not found')
- if value is None:
- value = []
- if not self._attributes.get(attr):
- self._attributes[attr] = LDAPAttribute(self._get_field_name(attr))
- self._attributes[attr].value = value
- if init:
- self._attributes[attr].__dict__['changetype'] = None
-
- def _isstored(self, attr):
- return self._attributes.get(attr)
-
- def _get_attr(self, attr):
- if self._isstored(attr):
- return self._attributes[attr].value
- return None
-
- def __getattribute__(self, item):
- if item != '_fields' and item in self._fields:
- return self._get_attr(item)
- return super(LDAPModel, self).__getattribute__(item)
-
- def __setattr__(self, key, value):
- if key != '_fields' and key in self._fields:
- self._store_attr(key, value)
- else:
- return super(LDAPModel, self).__setattr__(key, value)
-
- def get_attributes_dict(self):
- return dict((attribute_key, attribute_value.values) for (attribute_key,
- attribute_value) in self._attributes.items())
-
- def get_entry_add_dict(self, attr_dict):
- add_dict = dict()
- for attribute_key, attribute_value in attr_dict.items():
- if self._attributes[attribute_key].value != []:
- add_dict.update({self._get_field_name(attribute_key): attribute_value})
- return add_dict
-
- def get_entry_modify_dict(self, attr_dict):
- modify_dict = dict()
- for attribute_key in attr_dict.keys():
- if self._attributes[attribute_key].changetype is not None:
- changes = self._attributes[attribute_key].get_changes_tuple()
- modify_dict.update({self._get_field_name(attribute_key): changes})
- return modify_dict
-
- @property
- def connection(self):
- return current_app.extensions.get('ldap_conn')
-
- def delete(self):
- '''Delete this entry from LDAP server'''
- return self.connection.connection.delete(self.dn)
-
- def save(self):
- '''Save the current instance'''
- attrs = self.get_attributes_dict()
- if self._changetype == 'add':
- changes = self.get_entry_add_dict(attrs)
- return self.connection.connection.add(self.dn,
- self.object_classes,
- changes)
- elif self._changetype == 'modify':
- changes = self.get_entry_modify_dict(attrs)
- return self.connection.connection.modify(self.dn, changes)
-
- return False
-
- def authenticate(self, password):
- '''Authenticate a user with an LDAPModel class
-
- Args:
- password (str): The user password.
-
- '''
- return self.connection.authenticate(self.dn, password)
-
- def to_json(self, indent=2, sort=True, str_values=False):
- json_entry = dict()
- json_entry['dn'] = self.dn
-
- # Get "single values" from attributes as str instead list if
- # `str_values=True` else get all attributes as list. This only
- # works if `FORCE_ATTRIBUTE_VALUE_AS_LIST` is False (default).
- if str_values is True:
- json_entry['attributes'] = {}
- for attr in self._attributes.keys():
- json_entry['attributes'][attr] = self._attributes[attr].value
- else:
- json_entry['attributes'] = self.get_attributes_dict()
-
- if str == bytes:
- check_json_dict(json_entry)
-
- json_output = json.dumps(json_entry,
- ensure_ascii=True,
- sort_keys=sort,
- indent=indent,
- check_circular=True,
- default=format_json,
- separators=(',', ': '))
-
- return json_output
-
-
- LDAPModel = LDAPEntry
|