import json import os import tempfile import fcntl from typing import List, Dict, Any, Optional from fastapi.encoders import jsonable_encoder #Import logiche e schemi from schemas.reslevis import GatewayItem from .config import GATEWAY_JSON_PATH # ============================================================ # Utility per lock e scrittura atomica # ============================================================ class _LockedFile: """Gestisce l'accesso concorrente al file JSON con lock a livello OS.""" def __init__(self, path: str, mode: str): self.path = path self.mode = mode self.fp = None def __enter__(self): os.makedirs(os.path.dirname(self.path), exist_ok=True) if "r" in self.mode and not os.path.exists(self.path): open(self.path, "w").write("[]") self.fp = open(self.path, self.mode) lock_type = ( fcntl.LOCK_SH if ("r" in self.mode and "w" not in self.mode and "+" not in self.mode) else fcntl.LOCK_EX ) fcntl.flock(self.fp.fileno(), lock_type) return self.fp def __exit__(self, exc_type, exc, tb): try: fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN) finally: self.fp.close() def _atomic_write(path: str, data: str) -> None: """Scrive su file in modo atomico (evita corruzione in caso di crash).""" dirpath = os.path.dirname(path) os.makedirs(dirpath, exist_ok=True) with tempfile.NamedTemporaryFile("w", dir=dirpath, delete=False) as tmp: tmp.write(data) tmp.flush() os.fsync(tmp.fileno()) temp_name = tmp.name os.replace(temp_name, path) try: os.chmod(path, 0o664) except Exception as e: print(f"Warning: impossibile impostare i permessi su {path}: {e}") def _norm_str(v: Any) -> str: """Normalizza un valore per confronti case-insensitive e safe su None.""" return str(v).strip().lower() if v is not None else "" def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]: gid = _norm_str(gateway_id) for i, r in enumerate(rows): if _norm_str(r.get("id")) == gid: return i return None class GatewayJsonRepository: """Gestisce lettura e scrittura dei Gateway nel file JSON.""" def __init__(self, json_path: str = GATEWAY_JSON_PATH): self.path = json_path def _read_all(self) -> List[Dict[str, Any]]: """Legge tutti i record dal file JSON.""" with _LockedFile(self.path, "r") as fp: try: fp.seek(0) data = fp.read().strip() return json.loads(data) if data else [] except json.JSONDecodeError: return [] def _write_all(self, rows: List[Dict[str, Any]]) -> None: """Sovrascrive completamente il file JSON con la lista fornita.""" payload = json.dumps(rows, ensure_ascii=False, indent=2) _atomic_write(self.path, payload) def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]: """Ritorna tutti i Gateway, eventualmente filtrati per testo.""" rows = self._read_all() if search_string: s = search_string.lower() rows = [ r for r in rows if any( (isinstance(r.get(k), str) and s in (r.get(k) or "").lower()) for k in ("name", "mac", "model", "status", "ip", "notes") ) ] return rows def add(self, item: GatewayItem) -> None: """Aggiunge un nuovo gateway con controllo duplicati su ID e MAC.""" rows = self._read_all() obj = jsonable_encoder(item) # UUID -> str obj_id = _norm_str(obj.get("id")) mac = _norm_str(obj.get("mac")) if any(_norm_str(r.get("id")) == obj_id for r in rows): raise ValueError(f"Gateway con id '{obj_id}' già presente") if mac and any(_norm_str(r.get("mac")) == mac for r in rows): raise ValueError(f"Gateway con mac '{mac}' già presente") rows.append(obj) self._write_all(rows) def update(self, item: GatewayItem) -> None: """Sostituisce il gateway esistente con stesso ID. Controlla MAC duplicato su altri record.""" rows = self._read_all() obj = jsonable_encoder(item) obj_id = _norm_str(obj.get("id")) mac = _norm_str(obj.get("mac")) idx = _index_by_id(rows, obj_id) if idx is None: raise ValueError(f"Gateway con id '{obj_id}' non trovato") if mac and any(_norm_str(r.get("mac")) == mac and _norm_str(r.get("id")) != obj_id for r in rows): raise ValueError(f"Gateway con mac '{mac}' già presente") rows[idx] = obj self._write_all(rows) def remove(self, gateway_id: str) -> None: """Rimuove un gateway per ID, altrimenti solleva ValueError.""" rows = self._read_all() idx = _index_by_id(rows, gateway_id) if idx is None: raise ValueError(f"Gateway con id '{gateway_id}' non trovato") del rows[idx] self._write_all(rows)