import re from functools import wraps from binascii import unhexlify from ldap3.protocol.formatters.formatters import format_sid import argparse import datetime import json import ldap3 import logging import os import ssl import sys import hashlib import binascii import xml.dom.minidom from pathlib import Path # <-- corregge "import Path" __all__ = ['LDAP'] def cast_to_dict(cid): out = {} for key, value in cid.items(): if type(value) == bytes: out[key] = str(value) elif type(value) == list: if len(value) == 1: value = value[0] if type(value) == bytes: out[key] = str(value) elif type(value) == datetime.datetime: out[key] = value.strftime('%Y-%m-%d %H:%M:%S') elif type(value) == datetime.timedelta: out[key] = value.seconds else: out[key] = value else: newlist = [] for element in value: if type(element) == bytes: newlist.append(str(element)) elif type(element) == datetime.datetime: newlist.append(element.strftime('%Y-%m-%d %H:%M:%S')) elif type(element) == datetime.timedelta: newlist.append(element.seconds) out[key] = newlist elif type(value) == datetime.datetime: out[key] = value.strftime('%Y-%m-%d %H:%M:%S') elif type(value) == datetime.timedelta: out[key] = value.seconds else: out[key] = value return out def dict_get_paths(d): paths = [] for key in d.keys(): if type(d[key]) == dict: paths = [[key]+p for p in dict_get_paths(d[key])] else: paths.append([key]) return paths def dict_path_access(d, path): for key in path: if key in d.keys(): d = d[key] else: return None return d class LDAPConsole(object): """docstring for LDAPConsole.""" def __init__(self, debug=True): super(LDAPConsole, self).__init__() self.ldap_server = None self.ldap_session = None self.tls_version = None self.delegate_from = None self.target_dn = None self.debug = debug self.host = None self.binddn = None self.bindpwd = None self.suffix = None # Carica il file XML di configurazione: # usa la variabile d'ambiente FASTAPI_CONFUR se presente, # altrimenti di default FastAPI/include/confur.xml relativo al progetto. base_dir = Path(__file__).resolve().parents[1] # /var/opt/FastAPI conf_path = os.getenv("FASTAPI_CONFUR", str(base_dir / "include" / "confur.xml")) doc = xml.dom.minidom.parse(conf_path) ssonode = doc.getElementsByTagName("sso") for skill in ssonode: self.binddn = self.getText(skill.getElementsByTagName("bind-dn")[0].childNodes) self.suffix = self.getText(skill.getElementsByTagName("suffix")[0].childNodes) self.host = self.getText(skill.getElementsByTagName("host")[0].childNodes) self.bindpwd = self.getText(skill.getElementsByTagName("bind-pwd")[0].childNodes) if self.debug: print(self.binddn + " " + self.suffix + " " + self.host + " " + self.bindpwd) self.init_ldap_connection() def getText(self, nodelist): rc = [] for node in nodelist: if node.nodeType == node.TEXT_NODE: rc.append(node.data) return ''.join(rc) def init_ldap_connection(self): if self.tls_version is not None: use_ssl = True port = 636 tls = ldap3.Tls(validate=ssl.CERT_NONE, version=self.tls_version) else: use_ssl = False port = 389 tls = None self.ldap_server = ldap3.Server(self.host, get_info=ldap3.ALL, port=port, use_ssl=use_ssl, tls=tls) self.ldap_session = ldap3.Connection(self.ldap_server, self.binddn, self.bindpwd, authentication='SIMPLE', auto_bind=True) def queryallusers(self, subtree, attributes=['*']): results = {} try: if self.debug: print(subtree + ',' + self.suffix) self.ldap_session.search(subtree + ',' + self.suffix, '(&(uid=*))', attributes=attributes) for entry in self.ldap_session.response: if self.debug: print(entry) for entry in self.ldap_session.response: if entry['type'] != 'searchResEntry': continue results[entry['dn']] = entry["attributes"] except ldap3.core.exceptions.LDAPInvalidFilterError: print("Invalid Filter. (ldap3.core.exceptions.LDAPInvalidFilterError)") except Exception as e: raise e return results def doLdapGetUser(self, subtree, username): if self.debug: print(subtree + ',' + self.suffix) results = {'authen': 'none'} try: self.ldap_session.search(subtree + ',' + self.suffix, f'(&(uid={username}))', attributes=['ntPassword']) for entry in self.ldap_session.response: if self.debug: print("response present") if entry['type'] != 'searchResEntry': continue results['data'] = entry["attributes"] if entry['attributes'].get('ntPassword'): results['authen'] = "yeah" except ldap3.core.exceptions.LDAPInvalidFilterError: print("Invalid Filter. (ldap3.core.exceptions.LDAPInvalidFilterError)") except Exception as e: raise e return results def bytessize(data): l = len(data) units = ['B', 'kB', 'MB', 'GB', 'TB', 'PB'] for k in range(len(units)): if l < (1024 ** (k + 1)): break return "%4.2f %s" % (round(l / (1024 ** k), 2), units[k])