Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 
 

83 wiersze
2.5 KiB

  1. import json
  2. from typing import List, Dict, Any, Optional
  3. from fastapi.encoders import jsonable_encoder
  4. from schemas.reslevis import TrackItem
  5. from .config import TRACK_JSON_PATH
  6. from .gateway import _LockedFile, _atomic_write, _norm_str, _index_by_id
  7. class TrackJsonRepository:
  8. def __init__(self, json_path: str = TRACK_JSON_PATH):
  9. self.path = json_path
  10. def _read_all(self) -> List[Dict[str, Any]]:
  11. with _LockedFile(self.path, "r") as fp:
  12. try:
  13. fp.seek(0)
  14. data = fp.read().strip()
  15. return json.loads(data) if data else []
  16. except json.JSONDecodeError:
  17. return []
  18. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  19. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  20. _atomic_write(self.path, payload)
  21. def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
  22. rows = self._read_all()
  23. if search_string:
  24. s = search_string.lower()
  25. rows = [
  26. r
  27. for r in rows
  28. if any(
  29. isinstance(r.get(k), str) and s in (r.get(k) or "").lower()
  30. for k in (
  31. "id",
  32. "status",
  33. "type",
  34. "gatewayName",
  35. "trackerName",
  36. "subjectName",
  37. "zoneName",
  38. "buildingName",
  39. )
  40. )
  41. ]
  42. return rows
  43. def add(self, item: TrackItem) -> None:
  44. rows = self._read_all()
  45. obj = jsonable_encoder(item)
  46. obj_id = _norm_str(obj.get("id"))
  47. if any(_norm_str(r.get("id")) == obj_id for r in rows):
  48. raise ValueError(f"Track con id '{obj_id}' già presente")
  49. rows.append(obj)
  50. self._write_all(rows)
  51. def update(self, item: TrackItem) -> None:
  52. rows = self._read_all()
  53. obj = jsonable_encoder(item)
  54. obj_id = _norm_str(obj.get("id"))
  55. idx = _index_by_id(rows, obj_id)
  56. if idx is None:
  57. raise ValueError(f"Track con id '{obj_id}' non trovato")
  58. rows[idx] = obj
  59. self._write_all(rows)
  60. def remove(self, track_id: str) -> None:
  61. rows = self._read_all()
  62. idx = _index_by_id(rows, track_id)
  63. if idx is None:
  64. raise ValueError(f"Track con id '{track_id}' non trovato")
  65. del rows[idx]
  66. self._write_all(rows)