Procházet zdrojové kódy

API per ottenere i gruppi natural se presenti dei utenti

master
Lorenzo Pollutri před 2 dny
rodič
revize
ca393054f3
2 změnil soubory, kde provedl 70 přidání a 2 odebrání
  1. +6
    -0
      models/mnuser.py
  2. +64
    -2
      routes/majornet.py

+ 6
- 0
models/mnuser.py Zobrazit soubor

@@ -21,3 +21,9 @@ class post_mnuser(BaseModel):
return_code: int return_code: int
return_str: str return_str: str


class mnuser_groups(BaseModel):

uid: str
usergroups: List[str]

+ 64
- 2
routes/majornet.py Zobrazit soubor

@@ -1,4 +1,4 @@
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from fastapi.param_functions import Depends from fastapi.param_functions import Depends
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login.exceptions import InvalidCredentialsException
@@ -6,6 +6,7 @@ from fastapi_login.exceptions import InvalidCredentialsException




import xml.dom.minidom import xml.dom.minidom
import xml.etree.ElementTree as ET
from binascii import unhexlify from binascii import unhexlify
from ldap3.protocol.formatters.formatters import format_sid from ldap3.protocol.formatters.formatters import format_sid
import argparse import argparse
@@ -19,19 +20,26 @@ import hashlib
import binascii import binascii
from datetime import datetime, timedelta from datetime import datetime, timedelta
import subprocess import subprocess
from typing import List


#from db import get_session #from db import get_session
from core.actions import get_user_by_name from core.actions import get_user_by_name
from core.security import manager from core.security import manager
from core.ldap import LDAPConsole from core.ldap import LDAPConsole
from security import get_current_user


from models.httpresponse import httpResponse400, httpResponse200, httpResponse500 from models.httpresponse import httpResponse400, httpResponse200, httpResponse500
from models.mnuser import mnuser, post_mnuser
from models.mnuser import mnuser, post_mnuser, mnuser_groups


router = APIRouter( router = APIRouter(
prefix="/majornet" prefix="/majornet"
) )


MNUSERS_XML_PATH = os.getenv(
"MNUSERS_XML_PATH",
os.getenv("MNUSERS_XML", "/conf/etc/useradmin/mnusers.xml"),
)

def cast_to_dict(cid): def cast_to_dict(cid):
out = {} out = {}
for key, value in cid.items(): for key, value in cid.items():
@@ -89,6 +97,43 @@ def dict_path_access(d, path):
return d return d




def _element_text(element, tag_name):
child = element.find(tag_name)
if child is None or child.text is None:
return ""
return child.text.strip()


def _split_usergroups(value):
if not value:
return []
return [group.strip() for group in value.split(",") if group.strip()]


def _iter_mnuser_nodes(root):
user_nodes = root.findall(".//useradmin")
if user_nodes:
return user_nodes
return [node for node in root.iter() if node.find("uid") is not None]


def read_mnusers_groups(file_path=MNUSERS_XML_PATH):
tree = ET.parse(file_path)
root = tree.getroot()
users = []

for node in _iter_mnuser_nodes(root):
uid = _element_text(node, "uid")
if not uid:
continue
users.append({
"uid": uid,
"usergroups": _split_usergroups(_element_text(node, "usergroups")),
})

return users








@@ -110,6 +155,23 @@ lc = LDAPConsole(debug=True) if (USE_LDAP and LDAPConsole) else None






@router.get(
"/getUsersGroups",
response_model=List[mnuser_groups],
tags=["MajorNet"],
dependencies=[Depends(get_current_user)],
)
async def get_users_groups():
try:
return read_mnusers_groups()
except FileNotFoundError:
raise HTTPException(status_code=404, detail="mnusers.xml not found")
except ET.ParseError:
raise HTTPException(status_code=500, detail="Invalid mnusers.xml")
except OSError:
raise HTTPException(status_code=500, detail="Unable to read mnusers.xml")


@router.get("/users/",tags=["MajorNet"], responses={200: {"model": httpResponse200}, 400: {"model": httpResponse400}, 500: {"model": httpResponse500}}) @router.get("/users/",tags=["MajorNet"], responses={200: {"model": httpResponse200}, 400: {"model": httpResponse400}, 500: {"model": httpResponse500}})
async def get_majornet_users(current_user= Depends(manager)): async def get_majornet_users(current_user= Depends(manager)):


Načítá se…
Zrušit
Uložit