from fastapi import APIRouter, Depends from sqlalchemy.exc import IntegrityError #from db import get_session from exceptions import InvalidPermissions, InvalidUserName, UsernameAlreadyTaken from models.user import UserCreate, User from core.actions import get_user_by_name from core.security import manager,is_admin from fastapi.requests import Request #from security import common_parameters from fastapi import Security import subprocess import os, sys, json router = APIRouter( prefix="/user" ) # @router.post('/register', response_model=UserResponse, status_code=201) # def register(user: UserCreate, db=Depends(get_session)) -> UserResponse: # """ # Registers a new user # """ # try: # user = create_user(user.username, user.password, db) # return UserResponse.from_orm(user) # except IntegrityError: # raise UsernameAlreadyTaken #user=Security(manager, scopes=["required", "is_admin"]), @router.get('/profili') def get_profili(active_user=Depends(manager)) : # Open a file path = "/conf/etc/useradmin/usersdefault" return (path_to_dict(path)) #return UserResponse.from_orm(username) @router.get('/{username}') def read_user(username,user=Security(manager, scopes=["required", "is_admin"]),active_user=Depends(manager)) : print (user) print ("Active " + active_user.username + "required " +username ) """ Shows information about the user """ user = get_user_by_name(active_user.username) print ("Get info user " + user.username) if user is None: raise InvalidUserName # Only allow admins and oneself to access this information if username != active_user.username and not active_user.is_admin: raise InvalidPermissions ruser = get_user_by_name(username) return {"dettaglio": f"is admin {ruser.is_admin}"} # @router.get("/private") # def private_route(user=Depends(manager)): # print (user) # ruser = get_user_by_name(user.username) # return {"detail": f"Welcome {ruser.is_admin}"} def path_to_dict(path): lst = [] d = {'name': 'profiles'} if os.path.isdir(path): for x in os.listdir(path): print (x) lst.append(x.rsplit( ".", 1 )[ 0 ] ) d['list'] = lst return d @router.get('/{username}') def read_user(username,user=Security(manager, scopes=["required", "is_admin"]),active_user=Depends(manager)) : print ("Active " + active_user.username + "required " +username ) """ Shows information about the user """ user = get_user_by_name(active_user.username) print ("Get info user " + user.username) if user is None: raise InvalidUserName # Only allow admins and oneself to access this information if username != active_user.username and not active_user.is_admin: raise InvalidPermissions ruser = get_user_by_name(username) return {"dettaglio": f"is admin {ruser.is_admin}"} @router.delete("/{username}") def delete_user(username: str,user=Security(manager, scopes=["required", "is_admin"]),active_user=Depends(manager)): proc = subprocess.Popen(['perl', '/var/opt/FastAPI/delusr.pl', username ], stdout=subprocess.PIPE) stdout_value = proc.communicate()[0] stdout_value=stdout_value.decode('UTF-8') response = {"return_code": stdout_value.split(';')[0], "return_str" : stdout_value.split(';')[1] } return response #return {"ok": True}