Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 

130 rader
3.9 KiB

  1. import json
  2. import time
  3. import logging
  4. from datetime import datetime, timezone
  5. from typing import Any, Dict, Optional
  6. from fastapi import Request
  7. from starlette.middleware.base import BaseHTTPMiddleware
  8. from jose import jwt
  9. from security import _get_key, KEYCLOAK_ISSUER, ALGORITHMS
  10. audit_logger = logging.getLogger("audit")
  11. audit_logger.setLevel(logging.INFO)
  12. def _ts_minute_utc() -> str:
  13. # formato: 2025-12-12 15:47 (UTC)
  14. return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
  15. def _compact_json(obj: Any, max_len: int = 2000) -> str:
  16. try:
  17. s = json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
  18. except Exception:
  19. s = str(obj)
  20. if len(s) > max_len:
  21. return s[:max_len] + "...(truncated)"
  22. return s
  23. async def _user_from_bearer(request: Request) -> Optional[str]:
  24. auth = request.headers.get("authorization", "")
  25. if not auth.lower().startswith("bearer "):
  26. return None
  27. token = auth.split(" ", 1)[1].strip()
  28. try:
  29. key = await _get_key(token)
  30. claims = jwt.decode(
  31. token,
  32. key,
  33. algorithms=ALGORITHMS,
  34. issuer=KEYCLOAK_ISSUER,
  35. options={"verify_aud": False, "verify_iss": True},
  36. )
  37. return (
  38. claims.get("preferred_username")
  39. or claims.get("username")
  40. or claims.get("email")
  41. )
  42. except Exception:
  43. return None
  44. async def _read_request_body_if_any(request: Request) -> Optional[str]:
  45. # logga solo metodi che tipicamente hanno body
  46. if request.method not in ("POST", "PUT", "PATCH", "DELETE"):
  47. return None
  48. ctype = (request.headers.get("content-type") or "").lower()
  49. if not ctype:
  50. return None
  51. # Evita file upload / form-data grossi
  52. if "multipart/form-data" in ctype:
  53. return "<multipart/form-data omitted>"
  54. try:
  55. raw = await request.body()
  56. if not raw:
  57. return None
  58. # Prova JSON
  59. if "application/json" in ctype:
  60. try:
  61. return _compact_json(json.loads(raw.decode("utf-8", errors="replace")))
  62. except Exception:
  63. # se non parseabile, logga raw truncato
  64. txt = raw.decode("utf-8", errors="replace")
  65. return (txt[:2000] + "...(truncated)") if len(txt) > 2000 else txt
  66. # Testo semplice
  67. if "text/" in ctype or "application/x-www-form-urlencoded" in ctype:
  68. txt = raw.decode("utf-8", errors="replace")
  69. return (txt[:2000] + "...(truncated)") if len(txt) > 2000 else txt
  70. # Altro (binary)
  71. return f"<body omitted content-type={ctype} bytes={len(raw)}>"
  72. except Exception:
  73. return "<body read error>"
  74. class AuditMiddleware(BaseHTTPMiddleware):
  75. async def dispatch(self, request: Request, call_next):
  76. start = time.time()
  77. method = request.method
  78. path = request.url.path
  79. # 1) via header del proxy (preferito)
  80. user = request.headers.get("x-authenticated-user")
  81. # 2) fallback: decodifica bearer senza usare verify_token (evita spam del logger security)
  82. if not user:
  83. user = await _user_from_bearer(request) or "-"
  84. body_str = await _read_request_body_if_any(request)
  85. status_code = 500
  86. try:
  87. response = await call_next(request)
  88. status_code = response.status_code
  89. return response
  90. finally:
  91. duration_ms = int((time.time() - start) * 1000)
  92. ts = _ts_minute_utc()
  93. if body_str:
  94. audit_logger.info(
  95. "%s user=%s %s %s status=%s ms=%s body=%s",
  96. ts, user, method, path, status_code, duration_ms, body_str
  97. )
  98. else:
  99. audit_logger.info(
  100. "%s user=%s %s %s status=%s ms=%s",
  101. ts, user, method, path, status_code, duration_ms
  102. )