Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 

221 righe
7.2 KiB

  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. import time
  6. from typing import Dict, Optional
  7. import config_env
  8. from logica_reslevis.gateway import GatewayJsonRepository
  9. LOG_DIR = "/data/var/log/FastAPI"
  10. LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log")
  11. LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s"
  12. def _configure_logger() -> logging.Logger:
  13. logger = logging.getLogger("mqtt_gateway_monitor")
  14. logger.setLevel(logging.INFO)
  15. has_handler = any(
  16. isinstance(h, logging.FileHandler)
  17. and getattr(h, "baseFilename", None) == LOG_PATH
  18. for h in logger.handlers
  19. )
  20. if has_handler:
  21. return logger
  22. try:
  23. os.makedirs(LOG_DIR, exist_ok=True)
  24. open(LOG_PATH, "a").close()
  25. handler = logging.FileHandler(LOG_PATH)
  26. handler.setFormatter(logging.Formatter(LOG_FORMAT))
  27. logger.addHandler(handler)
  28. logger.propagate = False
  29. except Exception:
  30. # fallback to root logger if file handler cannot be created
  31. logger = logging.getLogger(__name__)
  32. return logger
  33. log = _configure_logger()
  34. def _norm_mac(value: str) -> str:
  35. if value is None:
  36. return ""
  37. return "".join(ch for ch in str(value).strip().lower() if ch.isalnum())
  38. def _parse_line(line: str) -> Optional[str]:
  39. line = line.strip()
  40. if not line or " " not in line:
  41. return None
  42. topic, payload = line.split(" ", 1)
  43. if not topic.startswith("publish_out/"):
  44. return None
  45. try:
  46. data = json.loads(payload)
  47. except Exception:
  48. return None
  49. if not isinstance(data, list) or not data:
  50. return None
  51. gateway_entry = None
  52. for item in data:
  53. if isinstance(item, dict) and item.get("type") == "Gateway":
  54. gateway_entry = item
  55. break
  56. mac = gateway_entry.get("mac") if gateway_entry else None
  57. if not mac:
  58. parts = topic.split("/", 1)
  59. mac = parts[1] if len(parts) > 1 else None
  60. if not mac:
  61. return None
  62. nums = gateway_entry.get("nums") if gateway_entry else None
  63. return mac
  64. class MqttGatewayMonitor:
  65. def __init__(
  66. self,
  67. host: str = None,
  68. port: int = None,
  69. topic: str = None,
  70. version: str = None,
  71. status_interval: int = None,
  72. stale_after: int = None,
  73. retry_delay: int = 5,
  74. gateway_repo: GatewayJsonRepository = None,
  75. ) -> None:
  76. self._host = host or config_env.MQTT_HOST
  77. self._port = port or config_env.MQTT_PORT
  78. self._topic = topic or config_env.MQTT_TOPIC
  79. self._version = version or config_env.MQTT_VERSION
  80. self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL
  81. self._stale_after = stale_after or config_env.MQTT_STALE_AFTER
  82. self._retry_delay = retry_delay
  83. self._gateway_repo = gateway_repo or GatewayJsonRepository()
  84. self._last_seen: Dict[str, float] = {}
  85. self._lock = asyncio.Lock()
  86. self._stop_event = asyncio.Event()
  87. self._reader_task: Optional[asyncio.Task] = None
  88. self._status_task: Optional[asyncio.Task] = None
  89. self._proc: Optional[asyncio.subprocess.Process] = None
  90. async def start(self) -> None:
  91. if self._reader_task:
  92. return
  93. self._stop_event.clear()
  94. self._reader_task = asyncio.create_task(self._reader_loop())
  95. self._status_task = asyncio.create_task(self._status_loop())
  96. async def stop(self) -> None:
  97. self._stop_event.set()
  98. if self._proc and self._proc.returncode is None:
  99. self._proc.terminate()
  100. tasks = [t for t in (self._reader_task, self._status_task) if t]
  101. for task in tasks:
  102. task.cancel()
  103. if tasks:
  104. await asyncio.gather(*tasks, return_exceptions=True)
  105. self._reader_task = None
  106. self._status_task = None
  107. async def _reader_loop(self) -> None:
  108. while not self._stop_event.is_set():
  109. try:
  110. self._proc = await asyncio.create_subprocess_exec(
  111. "mosquitto_sub",
  112. "-v",
  113. "-h",
  114. str(self._host),
  115. "-p",
  116. str(self._port),
  117. "-t",
  118. str(self._topic),
  119. "-V",
  120. str(self._version),
  121. stdout=asyncio.subprocess.PIPE,
  122. stderr=asyncio.subprocess.STDOUT,
  123. )
  124. except FileNotFoundError:
  125. log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay)
  126. await asyncio.sleep(self._retry_delay)
  127. continue
  128. except Exception:
  129. log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay)
  130. await asyncio.sleep(self._retry_delay)
  131. continue
  132. try:
  133. assert self._proc.stdout is not None
  134. while not self._stop_event.is_set():
  135. line = await self._proc.stdout.readline()
  136. if not line:
  137. break
  138. mac = _parse_line(line.decode("utf-8", errors="ignore"))
  139. if not mac:
  140. continue
  141. mac_norm = _norm_mac(mac)
  142. if not mac_norm:
  143. continue
  144. async with self._lock:
  145. self._last_seen[mac_norm] = time.monotonic()
  146. finally:
  147. if self._proc and self._proc.returncode is None:
  148. self._proc.terminate()
  149. await self._proc.wait()
  150. self._proc = None
  151. await asyncio.sleep(self._retry_delay)
  152. async def _status_loop(self) -> None:
  153. while not self._stop_event.is_set():
  154. await self._update_statuses()
  155. await asyncio.sleep(self._status_interval)
  156. async def _update_statuses(self) -> None:
  157. now = time.monotonic()
  158. gateways = self._gateway_repo.list()
  159. async with self._lock:
  160. last_seen = dict(self._last_seen)
  161. status_by_mac: Dict[str, str] = {}
  162. for gw in gateways:
  163. mac_norm = _norm_mac(gw.get("mac"))
  164. if not mac_norm:
  165. continue
  166. seen_at = last_seen.get(mac_norm)
  167. if seen_at is None or (now - seen_at) > self._stale_after:
  168. status = "disabled"
  169. else:
  170. status = "active"
  171. status_by_mac[mac_norm] = status
  172. changes = self._gateway_repo.update_statuses(status_by_mac)
  173. for change in changes:
  174. mac_label = change.get("mac_raw") or change.get("mac")
  175. name_label = change.get("name") or ""
  176. if change.get("first_set"):
  177. log.info(
  178. "Gateway status initialized: mac=%s name=%s status=%s",
  179. mac_label,
  180. name_label,
  181. change.get("new_status"),
  182. )
  183. else:
  184. log.info(
  185. "Gateway status changed: mac=%s name=%s %s -> %s",
  186. mac_label,
  187. name_label,
  188. change.get("old_status"),
  189. change.get("new_status"),
  190. )