You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

77 regels
2.4 KiB

  1. import json
  2. from typing import List, Dict, Any, Optional
  3. from fastapi.encoders import jsonable_encoder
  4. from schemas.reslevis import TrackerZoneItem
  5. from .config import TRACKER_ZONE_JSON_PATH
  6. from .gateway import _LockedFile, _atomic_write, _norm_str, _index_by_id
  7. class TrackerZoneJsonRepository:
  8. def __init__(self, json_path: str = TRACKER_ZONE_JSON_PATH):
  9. self.path = json_path
  10. def _read_all(self) -> List[Dict[str, Any]]:
  11. with _LockedFile(self.path, "r") as fp:
  12. try:
  13. fp.seek(0)
  14. data = fp.read().strip()
  15. return json.loads(data) if data else []
  16. except json.JSONDecodeError:
  17. return []
  18. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  19. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  20. _atomic_write(self.path, payload)
  21. def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
  22. rows = self._read_all()
  23. if search_string:
  24. s = search_string.lower()
  25. rows = [
  26. r
  27. for r in rows
  28. if any(
  29. isinstance(r.get(k), str) and s in (r.get(k) or "").lower()
  30. for k in ("days", "time", "id", "tracker")
  31. )
  32. or any(
  33. s in str(z).lower()
  34. for z in (r.get("zoneList") or [])
  35. )
  36. ]
  37. return rows
  38. def add(self, item: TrackerZoneItem) -> None:
  39. rows = self._read_all()
  40. obj = jsonable_encoder(item)
  41. obj_id = _norm_str(obj.get("id"))
  42. if any(_norm_str(r.get("id")) == obj_id for r in rows):
  43. raise ValueError(f"TrackerZone con id '{obj_id}' già presente")
  44. rows.append(obj)
  45. self._write_all(rows)
  46. def update(self, item: TrackerZoneItem) -> None:
  47. rows = self._read_all()
  48. obj = jsonable_encoder(item)
  49. obj_id = _norm_str(obj.get("id"))
  50. idx = _index_by_id(rows, obj_id)
  51. if idx is None:
  52. raise ValueError(f"TrackerZone con id '{obj_id}' non trovato")
  53. rows[idx] = obj
  54. self._write_all(rows)
  55. def remove(self, tracker_zone_id: str) -> None:
  56. rows = self._read_all()
  57. idx = _index_by_id(rows, tracker_zone_id)
  58. if idx is None:
  59. raise ValueError(f"TrackerZone con id '{tracker_zone_id}' non trovato")
  60. del rows[idx]
  61. self._write_all(rows)