|
- import re
- from functools import wraps
-
- from binascii import unhexlify
- from ldap3.protocol.formatters.formatters import format_sid
- import argparse
- import datetime
- import json
- import ldap3
- import logging
- import os
- import ssl
- import sys
- import hashlib
- import binascii
- import xml.dom.minidom
- from pathlib import Path # <-- corregge "import Path"
-
- __all__ = ['LDAP']
-
-
- def cast_to_dict(cid):
- out = {}
- for key, value in cid.items():
- if type(value) == bytes:
- out[key] = str(value)
- elif type(value) == list:
- if len(value) == 1:
- value = value[0]
- if type(value) == bytes:
- out[key] = str(value)
- elif type(value) == datetime.datetime:
- out[key] = value.strftime('%Y-%m-%d %H:%M:%S')
- elif type(value) == datetime.timedelta:
- out[key] = value.seconds
- else:
- out[key] = value
- else:
- newlist = []
- for element in value:
- if type(element) == bytes:
- newlist.append(str(element))
- elif type(element) == datetime.datetime:
- newlist.append(element.strftime('%Y-%m-%d %H:%M:%S'))
- elif type(element) == datetime.timedelta:
- newlist.append(element.seconds)
- out[key] = newlist
- elif type(value) == datetime.datetime:
- out[key] = value.strftime('%Y-%m-%d %H:%M:%S')
- elif type(value) == datetime.timedelta:
- out[key] = value.seconds
- else:
- out[key] = value
- return out
-
-
- def dict_get_paths(d):
- paths = []
- for key in d.keys():
- if type(d[key]) == dict:
- paths = [[key]+p for p in dict_get_paths(d[key])]
- else:
- paths.append([key])
- return paths
-
-
- def dict_path_access(d, path):
- for key in path:
- if key in d.keys():
- d = d[key]
- else:
- return None
- return d
-
-
- class LDAPConsole(object):
- """docstring for LDAPConsole."""
-
- def __init__(self, debug=True):
- super(LDAPConsole, self).__init__()
- self.ldap_server = None
- self.ldap_session = None
- self.tls_version = None
- self.delegate_from = None
- self.target_dn = None
- self.debug = debug
- self.host = None
- self.binddn = None
- self.bindpwd = None
- self.suffix = None
-
- # Carica il file XML di configurazione:
- # usa la variabile d'ambiente FASTAPI_CONFUR se presente,
- # altrimenti di default FastAPI/include/confur.xml relativo al progetto.
- base_dir = Path(__file__).resolve().parents[1] # /var/opt/FastAPI
- conf_path = os.getenv("FASTAPI_CONFUR", str(base_dir / "include" / "confur.xml"))
- doc = xml.dom.minidom.parse(conf_path)
-
- ssonode = doc.getElementsByTagName("sso")
-
- for skill in ssonode:
- self.binddn = self.getText(skill.getElementsByTagName("bind-dn")[0].childNodes)
- self.suffix = self.getText(skill.getElementsByTagName("suffix")[0].childNodes)
- self.host = self.getText(skill.getElementsByTagName("host")[0].childNodes)
- self.bindpwd = self.getText(skill.getElementsByTagName("bind-pwd")[0].childNodes)
-
- if self.debug:
- print(self.binddn + " " + self.suffix + " " + self.host + " " + self.bindpwd)
-
- self.init_ldap_connection()
-
- def getText(self, nodelist):
- rc = []
- for node in nodelist:
- if node.nodeType == node.TEXT_NODE:
- rc.append(node.data)
- return ''.join(rc)
-
- def init_ldap_connection(self):
- if self.tls_version is not None:
- use_ssl = True
- port = 636
- tls = ldap3.Tls(validate=ssl.CERT_NONE, version=self.tls_version)
- else:
- use_ssl = False
- port = 389
- tls = None
- self.ldap_server = ldap3.Server(self.host, get_info=ldap3.ALL, port=port, use_ssl=use_ssl, tls=tls)
- self.ldap_session = ldap3.Connection(self.ldap_server, self.binddn, self.bindpwd,
- authentication='SIMPLE', auto_bind=True)
-
- def queryallusers(self, subtree, attributes=['*']):
- results = {}
- try:
- if self.debug:
- print(subtree + ',' + self.suffix)
- self.ldap_session.search(subtree + ',' + self.suffix, '(&(uid=*))', attributes=attributes)
- for entry in self.ldap_session.response:
- if self.debug:
- print(entry)
- for entry in self.ldap_session.response:
- if entry['type'] != 'searchResEntry':
- continue
- results[entry['dn']] = entry["attributes"]
- except ldap3.core.exceptions.LDAPInvalidFilterError:
- print("Invalid Filter. (ldap3.core.exceptions.LDAPInvalidFilterError)")
- except Exception as e:
- raise e
- return results
-
- def doLdapGetUser(self, subtree, username):
- if self.debug:
- print(subtree + ',' + self.suffix)
- results = {'authen': 'none'}
- try:
- self.ldap_session.search(subtree + ',' + self.suffix, f'(&(uid={username}))', attributes=['ntPassword'])
- for entry in self.ldap_session.response:
- if self.debug:
- print("response present")
- if entry['type'] != 'searchResEntry':
- continue
- results['data'] = entry["attributes"]
- if entry['attributes'].get('ntPassword'):
- results['authen'] = "yeah"
- except ldap3.core.exceptions.LDAPInvalidFilterError:
- print("Invalid Filter. (ldap3.core.exceptions.LDAPInvalidFilterError)")
- except Exception as e:
- raise e
- return results
-
-
- def bytessize(data):
- l = len(data)
- units = ['B', 'kB', 'MB', 'GB', 'TB', 'PB']
- for k in range(len(units)):
- if l < (1024 ** (k + 1)):
- break
- return "%4.2f %s" % (round(l / (1024 ** k), 2), units[k])
|