You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

221 rivejä
7.2 KiB

  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. import time
  6. from typing import Dict, Optional
  7. import config_env
  8. from logica_reslevis.gateway import GatewayJsonRepository
  9. LOG_DIR = "/data/var/log/FastAPI"
  10. LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log")
  11. LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s"
  12. def _configure_logger() -> logging.Logger:
  13. logger = logging.getLogger("mqtt_gateway_monitor")
  14. logger.setLevel(logging.INFO)
  15. has_handler = any(
  16. isinstance(h, logging.FileHandler)
  17. and getattr(h, "baseFilename", None) == LOG_PATH
  18. for h in logger.handlers
  19. )
  20. if has_handler:
  21. return logger
  22. try:
  23. os.makedirs(LOG_DIR, exist_ok=True)
  24. open(LOG_PATH, "a").close()
  25. handler = logging.FileHandler(LOG_PATH)
  26. handler.setFormatter(logging.Formatter(LOG_FORMAT))
  27. logger.addHandler(handler)
  28. logger.propagate = False
  29. except Exception:
  30. # fallback to root logger if file handler cannot be created
  31. logger = logging.getLogger(__name__)
  32. return logger
  33. log = _configure_logger()
  34. def _norm_mac(value: str) -> str:
  35. if value is None:
  36. return ""
  37. return "".join(ch for ch in str(value).strip().lower() if ch.isalnum())
  38. def _parse_line(line: str) -> Optional[str]:
  39. line = line.strip()
  40. if not line or " " not in line:
  41. return None
  42. topic, payload = line.split(" ", 1)
  43. if not topic.startswith("publish_out/"):
  44. return None
  45. try:
  46. data = json.loads(payload)
  47. except Exception:
  48. return None
  49. if not isinstance(data, list) or not data:
  50. return None
  51. gateway_entry = None
  52. for item in data:
  53. if isinstance(item, dict) and item.get("type") == "Gateway":
  54. gateway_entry = item
  55. break
  56. mac = gateway_entry.get("mac") if gateway_entry else None
  57. if not mac:
  58. parts = topic.split("/", 1)
  59. mac = parts[1] if len(parts) > 1 else None
  60. if not mac:
  61. return None
  62. nums = gateway_entry.get("nums") if gateway_entry else None
  63. return mac
  64. class MqttGatewayMonitor:
  65. def __init__(
  66. self,
  67. host: str = None,
  68. port: int = None,
  69. topic: str = None,
  70. version: str = None,
  71. status_interval: int = None,
  72. stale_after: int = None,
  73. retry_delay: int = 5,
  74. gateway_repo: GatewayJsonRepository = None,
  75. ) -> None:
  76. self._host = host or config_env.MQTT_HOST
  77. self._port = port or config_env.MQTT_PORT
  78. self._topic = topic or config_env.MQTT_TOPIC
  79. self._version = version or config_env.MQTT_VERSION
  80. self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL
  81. self._stale_after = stale_after or config_env.MQTT_STALE_AFTER
  82. self._retry_delay = retry_delay
  83. self._gateway_repo = gateway_repo or GatewayJsonRepository()
  84. self._last_seen: Dict[str, float] = {}
  85. self._lock = asyncio.Lock()
  86. self._stop_event = asyncio.Event()
  87. self._reader_task: Optional[asyncio.Task] = None
  88. self._status_task: Optional[asyncio.Task] = None
  89. self._proc: Optional[asyncio.subprocess.Process] = None
  90. async def start(self) -> None:
  91. if self._reader_task:
  92. return
  93. self._stop_event.clear()
  94. self._reader_task = asyncio.create_task(self._reader_loop())
  95. self._status_task = asyncio.create_task(self._status_loop())
  96. async def stop(self) -> None:
  97. self._stop_event.set()
  98. if self._proc and self._proc.returncode is None:
  99. self._proc.terminate()
  100. tasks = [t for t in (self._reader_task, self._status_task) if t]
  101. for task in tasks:
  102. task.cancel()
  103. if tasks:
  104. await asyncio.gather(*tasks, return_exceptions=True)
  105. self._reader_task = None
  106. self._status_task = None
  107. async def _reader_loop(self) -> None:
  108. while not self._stop_event.is_set():
  109. try:
  110. self._proc = await asyncio.create_subprocess_exec(
  111. "mosquitto_sub",
  112. "-v",
  113. "-h",
  114. str(self._host),
  115. "-p",
  116. str(self._port),
  117. "-t",
  118. str(self._topic),
  119. "-V",
  120. str(self._version),
  121. stdout=asyncio.subprocess.PIPE,
  122. stderr=asyncio.subprocess.STDOUT,
  123. )
  124. except FileNotFoundError:
  125. log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay)
  126. await asyncio.sleep(self._retry_delay)
  127. continue
  128. except Exception:
  129. log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay)
  130. await asyncio.sleep(self._retry_delay)
  131. continue
  132. try:
  133. assert self._proc.stdout is not None
  134. while not self._stop_event.is_set():
  135. line = await self._proc.stdout.readline()
  136. if not line:
  137. break
  138. mac = _parse_line(line.decode("utf-8", errors="ignore"))
  139. if not mac:
  140. continue
  141. mac_norm = _norm_mac(mac)
  142. if not mac_norm:
  143. continue
  144. async with self._lock:
  145. self._last_seen[mac_norm] = time.monotonic()
  146. finally:
  147. if self._proc and self._proc.returncode is None:
  148. self._proc.terminate()
  149. await self._proc.wait()
  150. self._proc = None
  151. await asyncio.sleep(self._retry_delay)
  152. async def _status_loop(self) -> None:
  153. while not self._stop_event.is_set():
  154. await self._update_statuses()
  155. await asyncio.sleep(self._status_interval)
  156. async def _update_statuses(self) -> None:
  157. now = time.monotonic()
  158. gateways = self._gateway_repo.list()
  159. async with self._lock:
  160. last_seen = dict(self._last_seen)
  161. status_by_mac: Dict[str, str] = {}
  162. for gw in gateways:
  163. mac_norm = _norm_mac(gw.get("mac"))
  164. if not mac_norm:
  165. continue
  166. seen_at = last_seen.get(mac_norm)
  167. if seen_at is None or (now - seen_at) > self._stale_after:
  168. status = "disabled"
  169. else:
  170. status = "active"
  171. status_by_mac[mac_norm] = status
  172. changes = self._gateway_repo.update_statuses(status_by_mac)
  173. for change in changes:
  174. mac_label = change.get("mac_raw") or change.get("mac")
  175. name_label = change.get("name") or ""
  176. if change.get("first_set"):
  177. log.info(
  178. "Gateway status initialized: mac=%s name=%s status=%s",
  179. mac_label,
  180. name_label,
  181. change.get("new_status"),
  182. )
  183. else:
  184. log.info(
  185. "Gateway status changed: mac=%s name=%s %s -> %s",
  186. mac_label,
  187. name_label,
  188. change.get("old_status"),
  189. change.get("new_status"),
  190. )