import asyncio import json import logging import os import time from typing import Dict, Optional, Tuple import config_env from logica_reslevis.gateway import GatewayJsonRepository LOG_DIR = "/data/var/log/FastAPI" LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log") LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s" def _configure_logger() -> logging.Logger: logger = logging.getLogger("mqtt_gateway_monitor") logger.setLevel(logging.INFO) has_handler = any( isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", None) == LOG_PATH for h in logger.handlers ) if has_handler: return logger try: os.makedirs(LOG_DIR, exist_ok=True) open(LOG_PATH, "a").close() handler = logging.FileHandler(LOG_PATH) handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(handler) logger.propagate = False except Exception: # fallback to root logger if file handler cannot be created logger = logging.getLogger(__name__) return logger log = _configure_logger() def _norm_mac(value: str) -> str: if value is None: return "" return "".join(ch for ch in str(value).strip().lower() if ch.isalnum()) def _parse_line(line: str) -> Optional[Tuple[str, bool]]: line = line.strip() if not line or " " not in line: return None topic, payload = line.split(" ", 1) if not topic.startswith("publish_out/"): return None try: data = json.loads(payload) except Exception: return None if not isinstance(data, list) or not data: return None gateway_entry = None for item in data: if isinstance(item, dict) and item.get("type") == "Gateway": gateway_entry = item break mac = gateway_entry.get("mac") if gateway_entry else None if not mac: parts = topic.split("/", 1) mac = parts[1] if len(parts) > 1 else None if not mac: return None nums = gateway_entry.get("nums") if gateway_entry else None has_data = len(data) > 1 if nums is not None: try: has_data = has_data and int(nums) > 0 except (TypeError, ValueError): pass return mac, has_data class MqttGatewayMonitor: def __init__( self, host: str = None, port: int = None, topic: str = None, version: str = None, status_interval: int = None, stale_after: int = None, retry_delay: int = 5, gateway_repo: GatewayJsonRepository = None, ) -> None: self._host = host or config_env.MQTT_HOST self._port = port or config_env.MQTT_PORT self._topic = topic or config_env.MQTT_TOPIC self._version = version or config_env.MQTT_VERSION self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL self._stale_after = stale_after or config_env.MQTT_STALE_AFTER self._retry_delay = retry_delay self._gateway_repo = gateway_repo or GatewayJsonRepository() self._last_seen: Dict[str, float] = {} self._last_has_data: Dict[str, bool] = {} self._lock = asyncio.Lock() self._stop_event = asyncio.Event() self._reader_task: Optional[asyncio.Task] = None self._status_task: Optional[asyncio.Task] = None self._proc: Optional[asyncio.subprocess.Process] = None async def start(self) -> None: if self._reader_task: return self._stop_event.clear() self._reader_task = asyncio.create_task(self._reader_loop()) self._status_task = asyncio.create_task(self._status_loop()) async def stop(self) -> None: self._stop_event.set() if self._proc and self._proc.returncode is None: self._proc.terminate() tasks = [t for t in (self._reader_task, self._status_task) if t] for task in tasks: task.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) self._reader_task = None self._status_task = None async def _reader_loop(self) -> None: while not self._stop_event.is_set(): try: self._proc = await asyncio.create_subprocess_exec( "mosquitto_sub", "-v", "-h", str(self._host), "-p", str(self._port), "-t", str(self._topic), "-V", str(self._version), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) except FileNotFoundError: log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay) await asyncio.sleep(self._retry_delay) continue except Exception: log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay) await asyncio.sleep(self._retry_delay) continue try: assert self._proc.stdout is not None while not self._stop_event.is_set(): line = await self._proc.stdout.readline() if not line: break parsed = _parse_line(line.decode("utf-8", errors="ignore")) if not parsed: continue mac, has_data = parsed mac_norm = _norm_mac(mac) if not mac_norm: continue async with self._lock: self._last_seen[mac_norm] = time.monotonic() self._last_has_data[mac_norm] = bool(has_data) finally: if self._proc and self._proc.returncode is None: self._proc.terminate() await self._proc.wait() self._proc = None await asyncio.sleep(self._retry_delay) async def _status_loop(self) -> None: while not self._stop_event.is_set(): await self._update_statuses() await asyncio.sleep(self._status_interval) async def _update_statuses(self) -> None: now = time.monotonic() gateways = self._gateway_repo.list() async with self._lock: last_seen = dict(self._last_seen) last_has_data = dict(self._last_has_data) status_by_mac: Dict[str, str] = {} for gw in gateways: mac_norm = _norm_mac(gw.get("mac")) if not mac_norm: continue seen_at = last_seen.get(mac_norm) has_data = last_has_data.get(mac_norm, False) if seen_at is None or (now - seen_at) > self._stale_after or not has_data: status = "disabled" else: status = "active" status_by_mac[mac_norm] = status changes = self._gateway_repo.update_statuses(status_by_mac) for change in changes: mac_label = change.get("mac_raw") or change.get("mac") name_label = change.get("name") or "" if change.get("first_set"): log.info( "Gateway status initialized: mac=%s name=%s status=%s", mac_label, name_label, change.get("new_status"), ) else: log.info( "Gateway status changed: mac=%s name=%s %s -> %s", mac_label, name_label, change.get("old_status"), change.get("new_status"), )