You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

89 line
3.0 KiB

  1. import json
  2. from datetime import datetime, timezone
  3. from typing import Any, Dict, List, Optional
  4. from uuid import NAMESPACE_URL, uuid5
  5. from fastapi.encoders import jsonable_encoder
  6. from schemas.reslevis import UserPreferencesUpdateItem
  7. from .config import USER_PREFERENCES_JSON_PATH
  8. from .gateway import _LockedFile, _atomic_write, _norm_str
  9. def _model_to_dict(model: Any) -> Dict[str, Any]:
  10. if hasattr(model, "model_dump"):
  11. return model.model_dump(exclude_unset=True, exclude_none=True)
  12. return model.dict(exclude_unset=True, exclude_none=True)
  13. class UserPreferencesJsonRepository:
  14. def __init__(self, json_path: str = USER_PREFERENCES_JSON_PATH):
  15. self.path = json_path
  16. def _read_all(self) -> List[Dict[str, Any]]:
  17. with _LockedFile(self.path, "r") as fp:
  18. try:
  19. fp.seek(0)
  20. data = fp.read().strip()
  21. return json.loads(data) if data else []
  22. except json.JSONDecodeError:
  23. return []
  24. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  25. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  26. _atomic_write(self.path, payload)
  27. def _index_by_uid(self, rows: List[Dict[str, Any]], uid: str) -> Optional[int]:
  28. target = _norm_str(uid)
  29. for i, row in enumerate(rows):
  30. if _norm_str(row.get("uid")) == target:
  31. return i
  32. return None
  33. def _default_for_uid(self, uid: str) -> Dict[str, Any]:
  34. return {
  35. "id": str(uuid5(NAMESPACE_URL, f"reslevis:user-preferences:{uid}")),
  36. "uid": uid,
  37. "language": "it",
  38. "tables": {},
  39. "updated_at": None,
  40. }
  41. def get(self, uid: str) -> Dict[str, Any]:
  42. rows = self._read_all()
  43. idx = self._index_by_uid(rows, uid)
  44. if idx is None:
  45. return self._default_for_uid(uid)
  46. obj = self._default_for_uid(uid)
  47. obj.update(rows[idx])
  48. obj["uid"] = uid
  49. obj["tables"] = obj.get("tables") or {}
  50. obj["language"] = obj.get("language") or "it"
  51. return obj
  52. def update(self, uid: str, item: UserPreferencesUpdateItem) -> Dict[str, Any]:
  53. rows = self._read_all()
  54. idx = self._index_by_uid(rows, uid)
  55. current = self._default_for_uid(uid) if idx is None else self.get(uid)
  56. payload = jsonable_encoder(_model_to_dict(item))
  57. if "language" in payload and payload["language"] is not None:
  58. current["language"] = payload["language"]
  59. if "tables" in payload and payload["tables"] is not None:
  60. tables = current.get("tables") or {}
  61. for table_name, table_preferences in payload["tables"].items():
  62. tables[table_name] = table_preferences or {"hiddenColumns": {}}
  63. current["tables"] = tables
  64. current["uid"] = uid
  65. current["updated_at"] = datetime.now(timezone.utc).isoformat()
  66. if idx is None:
  67. rows.append(current)
  68. else:
  69. rows[idx] = current
  70. self._write_all(rows)
  71. return current