Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 

149 righe
5.1 KiB

  1. import json
  2. import os
  3. import tempfile
  4. import fcntl
  5. from typing import List, Dict, Any, Optional
  6. from fastapi.encoders import jsonable_encoder
  7. #Import logiche e schemi
  8. from schemas.reslevis import GatewayItem
  9. from .config import GATEWAY_JSON_PATH
  10. # ============================================================
  11. # Utility per lock e scrittura atomica
  12. # ============================================================
  13. class _LockedFile:
  14. """Gestisce l'accesso concorrente al file JSON con lock a livello OS."""
  15. def __init__(self, path: str, mode: str):
  16. self.path = path
  17. self.mode = mode
  18. self.fp = None
  19. def __enter__(self):
  20. os.makedirs(os.path.dirname(self.path), exist_ok=True)
  21. if "r" in self.mode and not os.path.exists(self.path):
  22. open(self.path, "w").write("[]")
  23. self.fp = open(self.path, self.mode)
  24. lock_type = (
  25. fcntl.LOCK_SH
  26. if ("r" in self.mode and "w" not in self.mode and "+" not in self.mode)
  27. else fcntl.LOCK_EX
  28. )
  29. fcntl.flock(self.fp.fileno(), lock_type)
  30. return self.fp
  31. def __exit__(self, exc_type, exc, tb):
  32. try:
  33. fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN)
  34. finally:
  35. self.fp.close()
  36. def _atomic_write(path: str, data: str) -> None:
  37. """Scrive su file in modo atomico (evita corruzione in caso di crash)."""
  38. dirpath = os.path.dirname(path)
  39. os.makedirs(dirpath, exist_ok=True)
  40. with tempfile.NamedTemporaryFile("w", dir=dirpath, delete=False) as tmp:
  41. tmp.write(data)
  42. tmp.flush()
  43. os.fsync(tmp.fileno())
  44. temp_name = tmp.name
  45. os.replace(temp_name, path)
  46. try:
  47. os.chmod(path, 0o664)
  48. except Exception as e:
  49. print(f"Warning: impossibile impostare i permessi su {path}: {e}")
  50. def _norm_str(v: Any) -> str:
  51. """Normalizza un valore per confronti case-insensitive e safe su None."""
  52. return str(v).strip().lower() if v is not None else ""
  53. def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]:
  54. gid = _norm_str(gateway_id)
  55. for i, r in enumerate(rows):
  56. if _norm_str(r.get("id")) == gid:
  57. return i
  58. return None
  59. class GatewayJsonRepository:
  60. """Gestisce lettura e scrittura dei Gateway nel file JSON."""
  61. def __init__(self, json_path: str = GATEWAY_JSON_PATH):
  62. self.path = json_path
  63. def _read_all(self) -> List[Dict[str, Any]]:
  64. """Legge tutti i record dal file JSON."""
  65. with _LockedFile(self.path, "r") as fp:
  66. try:
  67. fp.seek(0)
  68. data = fp.read().strip()
  69. return json.loads(data) if data else []
  70. except json.JSONDecodeError:
  71. return []
  72. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  73. """Sovrascrive completamente il file JSON con la lista fornita."""
  74. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  75. _atomic_write(self.path, payload)
  76. def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
  77. """Ritorna tutti i Gateway, eventualmente filtrati per testo."""
  78. rows = self._read_all()
  79. if search_string:
  80. s = search_string.lower()
  81. rows = [
  82. r for r in rows
  83. if any(
  84. (isinstance(r.get(k), str) and s in (r.get(k) or "").lower())
  85. for k in ("name", "mac", "model", "status", "ip", "notes")
  86. )
  87. ]
  88. return rows
  89. def add(self, item: GatewayItem) -> None:
  90. """Aggiunge un nuovo gateway con controllo duplicati su ID e MAC."""
  91. rows = self._read_all()
  92. obj = jsonable_encoder(item) # UUID -> str
  93. obj_id = _norm_str(obj.get("id"))
  94. mac = _norm_str(obj.get("mac"))
  95. if any(_norm_str(r.get("id")) == obj_id for r in rows):
  96. raise ValueError(f"Gateway con id '{obj_id}' già presente")
  97. if mac and any(_norm_str(r.get("mac")) == mac for r in rows):
  98. raise ValueError(f"Gateway con mac '{mac}' già presente")
  99. rows.append(obj)
  100. self._write_all(rows)
  101. def update(self, item: GatewayItem) -> None:
  102. """Sostituisce il gateway esistente con stesso ID. Controlla MAC duplicato su altri record."""
  103. rows = self._read_all()
  104. obj = jsonable_encoder(item)
  105. obj_id = _norm_str(obj.get("id"))
  106. mac = _norm_str(obj.get("mac"))
  107. idx = _index_by_id(rows, obj_id)
  108. if idx is None:
  109. raise ValueError(f"Gateway con id '{obj_id}' non trovato")
  110. if mac and any(_norm_str(r.get("mac")) == mac and _norm_str(r.get("id")) != obj_id for r in rows):
  111. raise ValueError(f"Gateway con mac '{mac}' già presente")
  112. rows[idx] = obj
  113. self._write_all(rows)
  114. def remove(self, gateway_id: str) -> None:
  115. """Rimuove un gateway per ID, altrimenti solleva ValueError."""
  116. rows = self._read_all()
  117. idx = _index_by_id(rows, gateway_id)
  118. if idx is None:
  119. raise ValueError(f"Gateway con id '{gateway_id}' non trovato")
  120. del rows[idx]
  121. self._write_all(rows)