選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 

89 行
3.0 KiB

  1. import json
  2. from datetime import datetime, timezone
  3. from typing import Any, Dict, List, Optional
  4. from uuid import NAMESPACE_URL, uuid5
  5. from fastapi.encoders import jsonable_encoder
  6. from schemas.reslevis import UserPreferencesUpdateItem
  7. from .config import USER_PREFERENCES_JSON_PATH
  8. from .gateway import _LockedFile, _atomic_write, _norm_str
  9. def _model_to_dict(model: Any) -> Dict[str, Any]:
  10. if hasattr(model, "model_dump"):
  11. return model.model_dump(exclude_unset=True, exclude_none=True)
  12. return model.dict(exclude_unset=True, exclude_none=True)
  13. class UserPreferencesJsonRepository:
  14. def __init__(self, json_path: str = USER_PREFERENCES_JSON_PATH):
  15. self.path = json_path
  16. def _read_all(self) -> List[Dict[str, Any]]:
  17. with _LockedFile(self.path, "r") as fp:
  18. try:
  19. fp.seek(0)
  20. data = fp.read().strip()
  21. return json.loads(data) if data else []
  22. except json.JSONDecodeError:
  23. return []
  24. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  25. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  26. _atomic_write(self.path, payload)
  27. def _index_by_uid(self, rows: List[Dict[str, Any]], uid: str) -> Optional[int]:
  28. target = _norm_str(uid)
  29. for i, row in enumerate(rows):
  30. if _norm_str(row.get("uid")) == target:
  31. return i
  32. return None
  33. def _default_for_uid(self, uid: str) -> Dict[str, Any]:
  34. return {
  35. "id": str(uuid5(NAMESPACE_URL, f"reslevis:user-preferences:{uid}")),
  36. "uid": uid,
  37. "language": "it",
  38. "tables": {},
  39. "updated_at": None,
  40. }
  41. def get(self, uid: str) -> Dict[str, Any]:
  42. rows = self._read_all()
  43. idx = self._index_by_uid(rows, uid)
  44. if idx is None:
  45. return self._default_for_uid(uid)
  46. obj = self._default_for_uid(uid)
  47. obj.update(rows[idx])
  48. obj["uid"] = uid
  49. obj["tables"] = obj.get("tables") or {}
  50. obj["language"] = obj.get("language") or "it"
  51. return obj
  52. def update(self, uid: str, item: UserPreferencesUpdateItem) -> Dict[str, Any]:
  53. rows = self._read_all()
  54. idx = self._index_by_uid(rows, uid)
  55. current = self._default_for_uid(uid) if idx is None else self.get(uid)
  56. payload = jsonable_encoder(_model_to_dict(item))
  57. if "language" in payload and payload["language"] is not None:
  58. current["language"] = payload["language"]
  59. if "tables" in payload and payload["tables"] is not None:
  60. tables = current.get("tables") or {}
  61. for table_name, table_preferences in payload["tables"].items():
  62. tables[table_name] = table_preferences or {"hiddenColumns": {}}
  63. current["tables"] = tables
  64. current["uid"] = uid
  65. current["updated_at"] = datetime.now(timezone.utc).isoformat()
  66. if idx is None:
  67. rows.append(current)
  68. else:
  69. rows[idx] = current
  70. self._write_all(rows)
  71. return current