|
- import json
- from typing import List, Dict, Any, Optional
-
- from fastapi.encoders import jsonable_encoder
-
- from schemas.reslevis import TrackItem
- from .config import TRACK_JSON_PATH
- from .gateway import _LockedFile, _atomic_write, _norm_str, _index_by_id
-
-
- class TrackJsonRepository:
- def __init__(self, json_path: str = TRACK_JSON_PATH):
- self.path = json_path
-
- def _read_all(self) -> List[Dict[str, Any]]:
- with _LockedFile(self.path, "r") as fp:
- try:
- fp.seek(0)
- data = fp.read().strip()
- return json.loads(data) if data else []
- except json.JSONDecodeError:
- return []
-
- def _write_all(self, rows: List[Dict[str, Any]]) -> None:
- payload = json.dumps(rows, ensure_ascii=False, indent=2)
- _atomic_write(self.path, payload)
-
- def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
- rows = self._read_all()
- if search_string:
- s = search_string.lower()
- rows = [
- r
- for r in rows
- if any(
- isinstance(r.get(k), str) and s in (r.get(k) or "").lower()
- for k in (
- "id",
- "status",
- "type",
- "gatewayName",
- "trackerName",
- "subjectName",
- "zoneName",
- "buildingName",
- )
- )
- ]
- return rows
-
- def add(self, item: TrackItem) -> None:
- rows = self._read_all()
- obj = jsonable_encoder(item)
- obj_id = _norm_str(obj.get("id"))
-
- if any(_norm_str(r.get("id")) == obj_id for r in rows):
- raise ValueError(f"Track con id '{obj_id}' già presente")
-
- rows.append(obj)
- self._write_all(rows)
-
- def update(self, item: TrackItem) -> None:
- rows = self._read_all()
- obj = jsonable_encoder(item)
- obj_id = _norm_str(obj.get("id"))
-
- idx = _index_by_id(rows, obj_id)
- if idx is None:
- raise ValueError(f"Track con id '{obj_id}' non trovato")
-
- rows[idx] = obj
- self._write_all(rows)
-
- def remove(self, track_id: str) -> None:
- rows = self._read_all()
- idx = _index_by_id(rows, track_id)
- if idx is None:
- raise ValueError(f"Track con id '{track_id}' non trovato")
-
- del rows[idx]
- self._write_all(rows)
|