Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

85 linhas
2.7 KiB

  1. import json
  2. from typing import List, Dict, Any, Optional
  3. from fastapi.encoders import jsonable_encoder
  4. from schemas.reslevis import TrackerItem
  5. from .config import TRACKER_JSON_PATH
  6. from .gateway import _LockedFile, _atomic_write, _norm_str, _index_by_id
  7. class TrackerJsonRepository:
  8. def __init__(self, json_path: str = TRACKER_JSON_PATH):
  9. self.path = json_path
  10. def _read_all(self) -> List[Dict[str, Any]]:
  11. with _LockedFile(self.path, "r") as fp:
  12. try:
  13. fp.seek(0)
  14. data = fp.read().strip()
  15. return json.loads(data) if data else []
  16. except json.JSONDecodeError:
  17. return []
  18. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  19. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  20. _atomic_write(self.path, payload)
  21. def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
  22. rows = self._read_all()
  23. if search_string:
  24. s = search_string.lower()
  25. rows = [
  26. r
  27. for r in rows
  28. if any(
  29. isinstance(r.get(k), str) and s in (r.get(k) or "").lower()
  30. for k in ("name", "mac", "model", "status", "position", "notes")
  31. )
  32. ]
  33. return rows
  34. def add(self, item: TrackerItem) -> None:
  35. rows = self._read_all()
  36. obj = jsonable_encoder(item)
  37. obj_id = _norm_str(obj.get("id"))
  38. mac = _norm_str(obj.get("mac"))
  39. if any(_norm_str(r.get("id")) == obj_id for r in rows):
  40. raise ValueError(f"Tracker con id '{obj_id}' già presente")
  41. if mac and any(_norm_str(r.get("mac")) == mac for r in rows):
  42. raise ValueError(f"Tracker con mac '{mac}' già presente")
  43. rows.append(obj)
  44. self._write_all(rows)
  45. def update(self, item: TrackerItem) -> None:
  46. rows = self._read_all()
  47. obj = jsonable_encoder(item)
  48. obj_id = _norm_str(obj.get("id"))
  49. mac = _norm_str(obj.get("mac"))
  50. idx = _index_by_id(rows, obj_id)
  51. if idx is None:
  52. raise ValueError(f"Tracker con id '{obj_id}' non trovato")
  53. if mac and any(
  54. _norm_str(r.get("mac")) == mac and _norm_str(r.get("id")) != obj_id
  55. for r in rows
  56. ):
  57. raise ValueError(f"Tracker con mac '{mac}' già presente")
  58. rows[idx] = obj
  59. self._write_all(rows)
  60. def remove(self, tracker_id: str) -> None:
  61. rows = self._read_all()
  62. idx = _index_by_id(rows, tracker_id)
  63. if idx is None:
  64. raise ValueError(f"Tracker con id '{tracker_id}' non trovato")
  65. del rows[idx]
  66. self._write_all(rows)