您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

206 行
7.0 KiB

  1. import json
  2. import os
  3. import tempfile
  4. import fcntl
  5. from typing import List, Dict, Any, Optional
  6. from fastapi.encoders import jsonable_encoder
  7. #Import logiche e schemi
  8. from schemas.reslevis import GatewayItem
  9. from .config import GATEWAY_JSON_PATH
  10. # ============================================================
  11. # Utility per lock e scrittura atomica
  12. # ============================================================
  13. class _LockedFile:
  14. """Gestisce l'accesso concorrente al file JSON con lock a livello OS."""
  15. def __init__(self, path: str, mode: str):
  16. self.path = path
  17. self.mode = mode
  18. self.fp = None
  19. def __enter__(self):
  20. os.makedirs(os.path.dirname(self.path), exist_ok=True)
  21. if "r" in self.mode and not os.path.exists(self.path):
  22. open(self.path, "w").write("[]")
  23. self.fp = open(self.path, self.mode)
  24. lock_type = (
  25. fcntl.LOCK_SH
  26. if ("r" in self.mode and "w" not in self.mode and "+" not in self.mode)
  27. else fcntl.LOCK_EX
  28. )
  29. fcntl.flock(self.fp.fileno(), lock_type)
  30. return self.fp
  31. def __exit__(self, exc_type, exc, tb):
  32. try:
  33. fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN)
  34. finally:
  35. self.fp.close()
  36. def _atomic_write(path: str, data: str) -> None:
  37. """Scrive su file in modo atomico (evita corruzione in caso di crash)."""
  38. dirpath = os.path.dirname(path)
  39. os.makedirs(dirpath, exist_ok=True)
  40. with tempfile.NamedTemporaryFile("w", dir=dirpath, delete=False) as tmp:
  41. tmp.write(data)
  42. tmp.flush()
  43. os.fsync(tmp.fileno())
  44. temp_name = tmp.name
  45. os.replace(temp_name, path)
  46. try:
  47. os.chmod(path, 0o664)
  48. except Exception as e:
  49. print(f"Warning: impossibile impostare i permessi su {path}: {e}")
  50. def _norm_str(v: Any) -> str:
  51. """Normalizza un valore per confronti case-insensitive e safe su None."""
  52. return str(v).strip().lower() if v is not None else ""
  53. def _norm_mac(v: Any) -> str:
  54. """Normalizza MAC rimuovendo separatori e forzando lowercase."""
  55. if v is None:
  56. return ""
  57. return "".join(ch for ch in str(v).strip().lower() if ch.isalnum())
  58. def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]:
  59. gid = _norm_str(gateway_id)
  60. for i, r in enumerate(rows):
  61. if _norm_str(r.get("id")) == gid:
  62. return i
  63. return None
  64. class GatewayJsonRepository:
  65. """Gestisce lettura e scrittura dei Gateway nel file JSON."""
  66. def __init__(self, json_path: str = GATEWAY_JSON_PATH):
  67. self.path = json_path
  68. def _read_all(self) -> List[Dict[str, Any]]:
  69. """Legge tutti i record dal file JSON."""
  70. with _LockedFile(self.path, "r") as fp:
  71. try:
  72. fp.seek(0)
  73. data = fp.read().strip()
  74. return json.loads(data) if data else []
  75. except json.JSONDecodeError:
  76. return []
  77. def _write_all(self, rows: List[Dict[str, Any]]) -> None:
  78. """Sovrascrive completamente il file JSON con la lista fornita."""
  79. payload = json.dumps(rows, ensure_ascii=False, indent=2)
  80. _atomic_write(self.path, payload)
  81. def list(self, search_string: Optional[str] = None) -> List[Dict[str, Any]]:
  82. """Ritorna tutti i Gateway, eventualmente filtrati per testo."""
  83. rows = self._read_all()
  84. if search_string:
  85. s = search_string.lower()
  86. rows = [
  87. r for r in rows
  88. if any(
  89. (isinstance(r.get(k), str) and s in (r.get(k) or "").lower())
  90. for k in ("name", "mac", "model", "status", "ip", "notes")
  91. )
  92. ]
  93. return rows
  94. def add(self, item: GatewayItem) -> None:
  95. """Aggiunge un nuovo gateway con controllo duplicati su ID e MAC."""
  96. rows = self._read_all()
  97. obj = jsonable_encoder(item) # UUID -> str
  98. obj_id = _norm_str(obj.get("id"))
  99. mac = _norm_str(obj.get("mac"))
  100. if any(_norm_str(r.get("id")) == obj_id for r in rows):
  101. raise ValueError(f"Gateway con id '{obj_id}' già presente")
  102. if mac and any(_norm_str(r.get("mac")) == mac for r in rows):
  103. raise ValueError(f"Gateway con mac '{mac}' già presente")
  104. rows.append(obj)
  105. self._write_all(rows)
  106. def update(self, item: GatewayItem) -> None:
  107. """Sostituisce il gateway esistente con stesso ID. Controlla MAC duplicato su altri record."""
  108. rows = self._read_all()
  109. obj = jsonable_encoder(item)
  110. obj_id = _norm_str(obj.get("id"))
  111. mac = _norm_str(obj.get("mac"))
  112. idx = _index_by_id(rows, obj_id)
  113. if idx is None:
  114. raise ValueError(f"Gateway con id '{obj_id}' non trovato")
  115. if mac and any(_norm_str(r.get("mac")) == mac and _norm_str(r.get("id")) != obj_id for r in rows):
  116. raise ValueError(f"Gateway con mac '{mac}' già presente")
  117. rows[idx] = obj
  118. self._write_all(rows)
  119. def remove(self, gateway_id: str) -> None:
  120. """Rimuove un gateway per ID, altrimenti solleva ValueError."""
  121. rows = self._read_all()
  122. idx = _index_by_id(rows, gateway_id)
  123. if idx is None:
  124. raise ValueError(f"Gateway con id '{gateway_id}' non trovato")
  125. del rows[idx]
  126. self._write_all(rows)
  127. def update_status_by_mac(self, mac: str, status: str) -> bool:
  128. """Aggiorna lo status dei gateway con il MAC specificato."""
  129. rows = self._read_all()
  130. target = _norm_mac(mac)
  131. if not target:
  132. return False
  133. updated = False
  134. for row in rows:
  135. if _norm_mac(row.get("mac")) == target:
  136. if row.get("status") != status:
  137. row["status"] = status
  138. updated = True
  139. if updated:
  140. self._write_all(rows)
  141. return updated
  142. def update_statuses(self, status_by_mac: Dict[str, str]) -> List[Dict[str, Any]]:
  143. """Aggiorna lo status per più MAC in una singola scrittura."""
  144. if not status_by_mac:
  145. return []
  146. rows = self._read_all()
  147. changes: List[Dict[str, Any]] = []
  148. for row in rows:
  149. mac = _norm_mac(row.get("mac"))
  150. if not mac:
  151. continue
  152. new_status = status_by_mac.get(mac)
  153. if new_status is None:
  154. continue
  155. old_status = row.get("status")
  156. if old_status != new_status:
  157. first_set = old_status in (None, "")
  158. row["status"] = new_status
  159. changes.append(
  160. {
  161. "mac": mac,
  162. "mac_raw": row.get("mac"),
  163. "name": row.get("name"),
  164. "old_status": old_status,
  165. "new_status": new_status,
  166. "first_set": first_set,
  167. }
  168. )
  169. if changes:
  170. self._write_all(rows)
  171. return changes