選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 

230 行
7.6 KiB

  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. import time
  6. from typing import Dict, Optional, Tuple
  7. import config_env
  8. from logica_reslevis.gateway import GatewayJsonRepository
  9. LOG_DIR = "/data/var/log/FastAPI"
  10. LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log")
  11. LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s"
  12. def _configure_logger() -> logging.Logger:
  13. logger = logging.getLogger("mqtt_gateway_monitor")
  14. logger.setLevel(logging.INFO)
  15. has_handler = any(
  16. isinstance(h, logging.FileHandler)
  17. and getattr(h, "baseFilename", None) == LOG_PATH
  18. for h in logger.handlers
  19. )
  20. if has_handler:
  21. return logger
  22. try:
  23. os.makedirs(LOG_DIR, exist_ok=True)
  24. open(LOG_PATH, "a").close()
  25. handler = logging.FileHandler(LOG_PATH)
  26. handler.setFormatter(logging.Formatter(LOG_FORMAT))
  27. logger.addHandler(handler)
  28. logger.propagate = False
  29. except Exception:
  30. # fallback to root logger if file handler cannot be created
  31. logger = logging.getLogger(__name__)
  32. return logger
  33. log = _configure_logger()
  34. def _norm_mac(value: str) -> str:
  35. if value is None:
  36. return ""
  37. return "".join(ch for ch in str(value).strip().lower() if ch.isalnum())
  38. def _parse_line(line: str) -> Optional[Tuple[str, bool]]:
  39. line = line.strip()
  40. if not line or " " not in line:
  41. return None
  42. topic, payload = line.split(" ", 1)
  43. if not topic.startswith("publish_out/"):
  44. return None
  45. try:
  46. data = json.loads(payload)
  47. except Exception:
  48. return None
  49. if not isinstance(data, list) or not data:
  50. return None
  51. gateway_entry = None
  52. for item in data:
  53. if isinstance(item, dict) and item.get("type") == "Gateway":
  54. gateway_entry = item
  55. break
  56. mac = gateway_entry.get("mac") if gateway_entry else None
  57. if not mac:
  58. parts = topic.split("/", 1)
  59. mac = parts[1] if len(parts) > 1 else None
  60. if not mac:
  61. return None
  62. nums = gateway_entry.get("nums") if gateway_entry else None
  63. has_data = len(data) > 1
  64. if nums is not None:
  65. try:
  66. has_data = has_data and int(nums) > 0
  67. except (TypeError, ValueError):
  68. pass
  69. return mac, has_data
  70. class MqttGatewayMonitor:
  71. def __init__(
  72. self,
  73. host: str = None,
  74. port: int = None,
  75. topic: str = None,
  76. version: str = None,
  77. status_interval: int = None,
  78. stale_after: int = None,
  79. retry_delay: int = 5,
  80. gateway_repo: GatewayJsonRepository = None,
  81. ) -> None:
  82. self._host = host or config_env.MQTT_HOST
  83. self._port = port or config_env.MQTT_PORT
  84. self._topic = topic or config_env.MQTT_TOPIC
  85. self._version = version or config_env.MQTT_VERSION
  86. self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL
  87. self._stale_after = stale_after or config_env.MQTT_STALE_AFTER
  88. self._retry_delay = retry_delay
  89. self._gateway_repo = gateway_repo or GatewayJsonRepository()
  90. self._last_seen: Dict[str, float] = {}
  91. self._last_has_data: Dict[str, bool] = {}
  92. self._lock = asyncio.Lock()
  93. self._stop_event = asyncio.Event()
  94. self._reader_task: Optional[asyncio.Task] = None
  95. self._status_task: Optional[asyncio.Task] = None
  96. self._proc: Optional[asyncio.subprocess.Process] = None
  97. async def start(self) -> None:
  98. if self._reader_task:
  99. return
  100. self._stop_event.clear()
  101. self._reader_task = asyncio.create_task(self._reader_loop())
  102. self._status_task = asyncio.create_task(self._status_loop())
  103. async def stop(self) -> None:
  104. self._stop_event.set()
  105. if self._proc and self._proc.returncode is None:
  106. self._proc.terminate()
  107. tasks = [t for t in (self._reader_task, self._status_task) if t]
  108. for task in tasks:
  109. task.cancel()
  110. if tasks:
  111. await asyncio.gather(*tasks, return_exceptions=True)
  112. self._reader_task = None
  113. self._status_task = None
  114. async def _reader_loop(self) -> None:
  115. while not self._stop_event.is_set():
  116. try:
  117. self._proc = await asyncio.create_subprocess_exec(
  118. "mosquitto_sub",
  119. "-v",
  120. "-h",
  121. str(self._host),
  122. "-p",
  123. str(self._port),
  124. "-t",
  125. str(self._topic),
  126. "-V",
  127. str(self._version),
  128. stdout=asyncio.subprocess.PIPE,
  129. stderr=asyncio.subprocess.STDOUT,
  130. )
  131. except FileNotFoundError:
  132. log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay)
  133. await asyncio.sleep(self._retry_delay)
  134. continue
  135. except Exception:
  136. log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay)
  137. await asyncio.sleep(self._retry_delay)
  138. continue
  139. try:
  140. assert self._proc.stdout is not None
  141. while not self._stop_event.is_set():
  142. line = await self._proc.stdout.readline()
  143. if not line:
  144. break
  145. parsed = _parse_line(line.decode("utf-8", errors="ignore"))
  146. if not parsed:
  147. continue
  148. mac, has_data = parsed
  149. mac_norm = _norm_mac(mac)
  150. if not mac_norm:
  151. continue
  152. async with self._lock:
  153. self._last_seen[mac_norm] = time.monotonic()
  154. self._last_has_data[mac_norm] = bool(has_data)
  155. finally:
  156. if self._proc and self._proc.returncode is None:
  157. self._proc.terminate()
  158. await self._proc.wait()
  159. self._proc = None
  160. await asyncio.sleep(self._retry_delay)
  161. async def _status_loop(self) -> None:
  162. while not self._stop_event.is_set():
  163. await self._update_statuses()
  164. await asyncio.sleep(self._status_interval)
  165. async def _update_statuses(self) -> None:
  166. now = time.monotonic()
  167. gateways = self._gateway_repo.list()
  168. async with self._lock:
  169. last_seen = dict(self._last_seen)
  170. last_has_data = dict(self._last_has_data)
  171. status_by_mac: Dict[str, str] = {}
  172. for gw in gateways:
  173. mac_norm = _norm_mac(gw.get("mac"))
  174. if not mac_norm:
  175. continue
  176. seen_at = last_seen.get(mac_norm)
  177. has_data = last_has_data.get(mac_norm, False)
  178. if seen_at is None or (now - seen_at) > self._stale_after or not has_data:
  179. status = "disabled"
  180. else:
  181. status = "active"
  182. status_by_mac[mac_norm] = status
  183. changes = self._gateway_repo.update_statuses(status_by_mac)
  184. for change in changes:
  185. mac_label = change.get("mac_raw") or change.get("mac")
  186. if change.get("first_set"):
  187. log.info(
  188. "Gateway status initialized: mac=%s status=%s",
  189. mac_label,
  190. change.get("new_status"),
  191. )
  192. else:
  193. log.info(
  194. "Gateway status changed: mac=%s %s -> %s",
  195. mac_label,
  196. change.get("old_status"),
  197. change.get("new_status"),
  198. )