|
- import json
- from datetime import datetime, timezone
- from typing import Any, Dict, List, Optional
- from uuid import NAMESPACE_URL, uuid5
-
- from fastapi.encoders import jsonable_encoder
-
- from schemas.reslevis import UserPreferencesUpdateItem
- from .config import USER_PREFERENCES_JSON_PATH
- from .gateway import _LockedFile, _atomic_write, _norm_str
-
-
- def _model_to_dict(model: Any) -> Dict[str, Any]:
- if hasattr(model, "model_dump"):
- return model.model_dump(exclude_unset=True, exclude_none=True)
- return model.dict(exclude_unset=True, exclude_none=True)
-
-
- class UserPreferencesJsonRepository:
- def __init__(self, json_path: str = USER_PREFERENCES_JSON_PATH):
- self.path = json_path
-
- def _read_all(self) -> List[Dict[str, Any]]:
- with _LockedFile(self.path, "r") as fp:
- try:
- fp.seek(0)
- data = fp.read().strip()
- return json.loads(data) if data else []
- except json.JSONDecodeError:
- return []
-
- def _write_all(self, rows: List[Dict[str, Any]]) -> None:
- payload = json.dumps(rows, ensure_ascii=False, indent=2)
- _atomic_write(self.path, payload)
-
- def _index_by_uid(self, rows: List[Dict[str, Any]], uid: str) -> Optional[int]:
- target = _norm_str(uid)
- for i, row in enumerate(rows):
- if _norm_str(row.get("uid")) == target:
- return i
- return None
-
- def _default_for_uid(self, uid: str) -> Dict[str, Any]:
- return {
- "id": str(uuid5(NAMESPACE_URL, f"reslevis:user-preferences:{uid}")),
- "uid": uid,
- "language": "it",
- "tables": {},
- "updated_at": None,
- }
-
- def get(self, uid: str) -> Dict[str, Any]:
- rows = self._read_all()
- idx = self._index_by_uid(rows, uid)
- if idx is None:
- return self._default_for_uid(uid)
-
- obj = self._default_for_uid(uid)
- obj.update(rows[idx])
- obj["uid"] = uid
- obj["tables"] = obj.get("tables") or {}
- obj["language"] = obj.get("language") or "it"
- return obj
-
- def update(self, uid: str, item: UserPreferencesUpdateItem) -> Dict[str, Any]:
- rows = self._read_all()
- idx = self._index_by_uid(rows, uid)
- current = self._default_for_uid(uid) if idx is None else self.get(uid)
- payload = jsonable_encoder(_model_to_dict(item))
-
- if "language" in payload and payload["language"] is not None:
- current["language"] = payload["language"]
-
- if "tables" in payload and payload["tables"] is not None:
- tables = current.get("tables") or {}
- for table_name, table_preferences in payload["tables"].items():
- tables[table_name] = table_preferences or {"hiddenColumns": {}}
- current["tables"] = tables
-
- current["uid"] = uid
- current["updated_at"] = datetime.now(timezone.utc).isoformat()
-
- if idx is None:
- rows.append(current)
- else:
- rows[idx] = current
- self._write_all(rows)
- return current
|