Parcourir la source

API fails to authenticate

auth_API_bug_fix
Lorenzo Pollutri il y a 3 semaines
Parent
révision
4b94416ef3
3 fichiers modifiés avec 109 ajouts et 8 suppressions
  1. +21
    -0
      app.py
  2. +80
    -8
      config_env.py
  3. +8
    -0
      security.py

+ 21
- 0
app.py Voir le fichier

@@ -329,6 +329,27 @@ async def get_ble_ai_maps():
return {"items": items, "count": len(items)} return {"items": items, "count": len(items)}




@app.get("/ble-ai/maps/{filename}", tags=["BLE-AI"], dependencies=[Depends(get_current_user)])
async def download_ble_ai_map(filename: str):
maps_dir = config_env.BLE_AI_MAPS_DIR
if not os.path.isdir(maps_dir):
raise HTTPException(status_code=404, detail="Maps directory not found")

safe_name = os.path.basename(filename)
if safe_name != filename or not safe_name.lower().endswith(".png"):
raise HTTPException(status_code=400, detail="Invalid map filename")

file_path = os.path.join(maps_dir, safe_name)
if not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail="Map file not found")

return FileResponse(
path=file_path,
filename=safe_name,
media_type="image/png",
)


@app.get("/openapi.json/", tags=["Documentation"]) @app.get("/openapi.json/", tags=["Documentation"])
async def get_open_api_endpoint(): async def get_open_api_endpoint():
#async def get_open_api_endpoint(current_user: User = Depends(get_current_active_user)): #async def get_open_api_endpoint(current_user: User = Depends(get_current_active_user)):


+ 80
- 8
config_env.py Voir le fichier

@@ -1,18 +1,90 @@
#This file reads the .env where the variables should be stored
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv


load_dotenv() load_dotenv()


#Keycloak configuration (look in the .env)

def _clean(value: str) -> str:
if value is None:
return ""
return str(value).strip().strip('"').strip("'")


def _has_http_scheme(value: str) -> bool:
return value.startswith("http://") or value.startswith("https://")


def _ensure_http_scheme(value: str, default_scheme: str = "https") -> str:
value = _clean(value)
if not value:
return ""
if _has_http_scheme(value):
return value
if value.startswith("/"):
return value
return f"{default_scheme}://{value}"


def _absolute_or_join(value: str, base_url: str) -> str:
value = _clean(value)
if not value:
return ""
if _has_http_scheme(value):
return value
if value.startswith("/") and base_url:
return f"{base_url.rstrip('/')}{value}"
return _ensure_http_scheme(value)


# Keycloak configuration (look in the .env)
SECRET = os.getenv("SECRET") SECRET = os.getenv("SECRET")
KEYCLOAK_AUDIENCE = os.getenv("KEYCLOAK_AUDIENCE") KEYCLOAK_AUDIENCE = os.getenv("KEYCLOAK_AUDIENCE")
KEYCLOAK_SERVER = os.getenv("KEYCLOAK_SERVER")
KEYCLOAK_ISSUER = os.getenv("KEYCLOAK_ISSUER")
KEYCLOAK_PROTOCOL_ENDPOINT = os.getenv("KEYCLOAK_PROTOCOL_ENDPOINT")
KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL")
KEYCLOAK_AUTH_URL = os.getenv("KEYCLOAK_AUTH_URL")
KEYCLOAK_TOKEN_URL = os.getenv("KEYCLOAK_TOKEN_URL")

_raw_keycloak_server = _clean(os.getenv("KEYCLOAK_SERVER"))
KEYCLOAK_SERVER = _ensure_http_scheme(_raw_keycloak_server)

_default_realm = _clean(os.getenv("KEYCLOAK_REALM")) or "API.Server.local"

_raw_keycloak_issuer = _clean(os.getenv("KEYCLOAK_ISSUER"))
if _raw_keycloak_issuer and "${" not in _raw_keycloak_issuer:
KEYCLOAK_ISSUER = _absolute_or_join(_raw_keycloak_issuer, KEYCLOAK_SERVER)
elif KEYCLOAK_SERVER:
KEYCLOAK_ISSUER = f"{KEYCLOAK_SERVER.rstrip('/')}/realms/{_default_realm}"
else:
KEYCLOAK_ISSUER = ""

_raw_keycloak_protocol = _clean(os.getenv("KEYCLOAK_PROTOCOL_ENDPOINT"))
if _raw_keycloak_protocol and "${" not in _raw_keycloak_protocol:
KEYCLOAK_PROTOCOL_ENDPOINT = _absolute_or_join(_raw_keycloak_protocol, KEYCLOAK_SERVER)
elif KEYCLOAK_ISSUER:
KEYCLOAK_PROTOCOL_ENDPOINT = f"{KEYCLOAK_ISSUER.rstrip('/')}/protocol/openid-connect"
else:
KEYCLOAK_PROTOCOL_ENDPOINT = ""

_raw_jwks = _clean(os.getenv("KEYCLOAK_JWKS_URL"))
if _raw_jwks and "${" not in _raw_jwks:
KEYCLOAK_JWKS_URL = _absolute_or_join(_raw_jwks, KEYCLOAK_SERVER)
elif KEYCLOAK_PROTOCOL_ENDPOINT:
KEYCLOAK_JWKS_URL = f"{KEYCLOAK_PROTOCOL_ENDPOINT.rstrip('/')}/certs"
else:
KEYCLOAK_JWKS_URL = ""

_raw_auth = _clean(os.getenv("KEYCLOAK_AUTH_URL"))
if _raw_auth and "${" not in _raw_auth:
KEYCLOAK_AUTH_URL = _absolute_or_join(_raw_auth, KEYCLOAK_SERVER)
elif KEYCLOAK_PROTOCOL_ENDPOINT:
KEYCLOAK_AUTH_URL = f"{KEYCLOAK_PROTOCOL_ENDPOINT.rstrip('/')}/auth"
else:
KEYCLOAK_AUTH_URL = ""

_raw_token = _clean(os.getenv("KEYCLOAK_TOKEN_URL"))
if _raw_token and "${" not in _raw_token:
KEYCLOAK_TOKEN_URL = _absolute_or_join(_raw_token, KEYCLOAK_SERVER)
elif KEYCLOAK_PROTOCOL_ENDPOINT:
KEYCLOAK_TOKEN_URL = f"{KEYCLOAK_PROTOCOL_ENDPOINT.rstrip('/')}/token"
else:
KEYCLOAK_TOKEN_URL = ""

CORE_API_URL = os.getenv("CORE_API_URL", "http://localhost:1902") CORE_API_URL = os.getenv("CORE_API_URL", "http://localhost:1902")


MQTT_HOST = os.getenv("MQTT_HOST", "192.168.1.101") MQTT_HOST = os.getenv("MQTT_HOST", "192.168.1.101")


+ 8
- 0
security.py Voir le fichier

@@ -39,6 +39,14 @@ http_bearer = HTTPBearer(auto_error=True)


async def _get_jwks() -> Dict[str, Any]: async def _get_jwks() -> Dict[str, Any]:
global _cached_jwks global _cached_jwks
if not KEYCLOAK_JWKS_URL or not (
KEYCLOAK_JWKS_URL.startswith("http://") or KEYCLOAK_JWKS_URL.startswith("https://")
):
logger.error("_get_jwks: invalid KEYCLOAK_JWKS_URL=%r", KEYCLOAK_JWKS_URL)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Invalid Keycloak JWKS URL configuration",
)
if _cached_jwks is None: if _cached_jwks is None:
logger.info(f"_get_jwks: cache miss, fetching from {KEYCLOAK_JWKS_URL}") logger.info(f"_get_jwks: cache miss, fetching from {KEYCLOAK_JWKS_URL}")
resp = await _http.get(KEYCLOAK_JWKS_URL) resp = await _http.get(KEYCLOAK_JWKS_URL)


Chargement…
Annuler
Enregistrer