From e38ba38bc2204cbf4073194a54e85000c9c2fd90 Mon Sep 17 00:00:00 2001 From: pollutri Date: Fri, 30 Jan 2026 16:00:45 +0100 Subject: [PATCH] Implementation of dynamic gateway status update --- app.py | 12 ++ config_env.py | 9 +- logica_reslevis/gateway.py | 56 +++++++++ mqtt_gateway_monitor.py | 229 +++++++++++++++++++++++++++++++++++++ 4 files changed, 304 insertions(+), 2 deletions(-) create mode 100644 mqtt_gateway_monitor.py diff --git a/app.py b/app.py index 9209c36..4475dc1 100644 --- a/app.py +++ b/app.py @@ -65,6 +65,7 @@ from fastapi.security import OAuth2AuthorizationCodeBearer #Proxy al CORE ResLevis import httpx +from mqtt_gateway_monitor import MqttGatewayMonitor AUTH_URL = config_env.KEYCLOAK_AUTH_URL @@ -132,6 +133,17 @@ app.add_middleware( allow_headers=["*"], ) +# MQTT gateway monitor +mqtt_monitor = MqttGatewayMonitor() + +@app.on_event("startup") +async def start_mqtt_monitor(): + await mqtt_monitor.start() + +@app.on_event("shutdown") +async def stop_mqtt_monitor(): + await mqtt_monitor.stop() + #ResLevis CORE middleware @app.middleware("http") async def local_then_core(request: Request, call_next): diff --git a/config_env.py b/config_env.py index c9c0500..cf91b24 100644 --- a/config_env.py +++ b/config_env.py @@ -13,7 +13,12 @@ KEYCLOAK_PROTOCOL_ENDPOINT = os.getenv("KEYCLOAK_PROTOCOL_ENDPOINT") KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL") KEYCLOAK_AUTH_URL = os.getenv("KEYCLOAK_AUTH_URL") KEYCLOAK_TOKEN_URL = os.getenv("KEYCLOAK_TOKEN_URL") - -#API PROXYING TO THE CORE CORE_API_URL = os.getenv("CORE_API_URL", "http://localhost:1902") +MQTT_HOST = os.getenv("MQTT_HOST", "192.168.1.101") +MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) +MQTT_TOPIC = os.getenv("MQTT_TOPIC", "#") +MQTT_VERSION = os.getenv("MQTT_VERSION", "mqttv311") +MQTT_STATUS_INTERVAL = int(os.getenv("MQTT_STATUS_INTERVAL", "30")) +MQTT_STALE_AFTER = int(os.getenv("MQTT_STALE_AFTER", "30")) + diff --git a/logica_reslevis/gateway.py b/logica_reslevis/gateway.py index 08aa910..f9d9122 100644 --- a/logica_reslevis/gateway.py +++ b/logica_reslevis/gateway.py @@ -62,6 +62,12 @@ def _norm_str(v: Any) -> str: """Normalizza un valore per confronti case-insensitive e safe su None.""" return str(v).strip().lower() if v is not None else "" +def _norm_mac(v: Any) -> str: + """Normalizza MAC rimuovendo separatori e forzando lowercase.""" + if v is None: + return "" + return "".join(ch for ch in str(v).strip().lower() if ch.isalnum()) + def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]: gid = _norm_str(gateway_id) for i, r in enumerate(rows): @@ -146,3 +152,53 @@ class GatewayJsonRepository: del rows[idx] self._write_all(rows) + def update_status_by_mac(self, mac: str, status: str) -> bool: + """Aggiorna lo status dei gateway con il MAC specificato.""" + rows = self._read_all() + target = _norm_mac(mac) + if not target: + return False + + updated = False + for row in rows: + if _norm_mac(row.get("mac")) == target: + if row.get("status") != status: + row["status"] = status + updated = True + + if updated: + self._write_all(rows) + return updated + + def update_statuses(self, status_by_mac: Dict[str, str]) -> List[Dict[str, Any]]: + """Aggiorna lo status per più MAC in una singola scrittura.""" + if not status_by_mac: + return [] + + rows = self._read_all() + changes: List[Dict[str, Any]] = [] + for row in rows: + mac = _norm_mac(row.get("mac")) + if not mac: + continue + new_status = status_by_mac.get(mac) + if new_status is None: + continue + old_status = row.get("status") + if old_status != new_status: + first_set = old_status in (None, "") + row["status"] = new_status + changes.append( + { + "mac": mac, + "mac_raw": row.get("mac"), + "old_status": old_status, + "new_status": new_status, + "first_set": first_set, + } + ) + + if changes: + self._write_all(rows) + return changes + diff --git a/mqtt_gateway_monitor.py b/mqtt_gateway_monitor.py new file mode 100644 index 0000000..410a42e --- /dev/null +++ b/mqtt_gateway_monitor.py @@ -0,0 +1,229 @@ +import asyncio +import json +import logging +import os +import time +from typing import Dict, Optional, Tuple + +import config_env +from logica_reslevis.gateway import GatewayJsonRepository + +LOG_DIR = "/data/var/log/FastAPI" +LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log") +LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s" + +def _configure_logger() -> logging.Logger: + logger = logging.getLogger("mqtt_gateway_monitor") + logger.setLevel(logging.INFO) + has_handler = any( + isinstance(h, logging.FileHandler) + and getattr(h, "baseFilename", None) == LOG_PATH + for h in logger.handlers + ) + if has_handler: + return logger + try: + os.makedirs(LOG_DIR, exist_ok=True) + open(LOG_PATH, "a").close() + handler = logging.FileHandler(LOG_PATH) + handler.setFormatter(logging.Formatter(LOG_FORMAT)) + logger.addHandler(handler) + logger.propagate = False + except Exception: + # fallback to root logger if file handler cannot be created + logger = logging.getLogger(__name__) + return logger + +log = _configure_logger() + + +def _norm_mac(value: str) -> str: + if value is None: + return "" + return "".join(ch for ch in str(value).strip().lower() if ch.isalnum()) + + +def _parse_line(line: str) -> Optional[Tuple[str, bool]]: + line = line.strip() + if not line or " " not in line: + return None + + topic, payload = line.split(" ", 1) + if not topic.startswith("publish_out/"): + return None + + try: + data = json.loads(payload) + except Exception: + return None + + if not isinstance(data, list) or not data: + return None + + gateway_entry = None + for item in data: + if isinstance(item, dict) and item.get("type") == "Gateway": + gateway_entry = item + break + + mac = gateway_entry.get("mac") if gateway_entry else None + if not mac: + parts = topic.split("/", 1) + mac = parts[1] if len(parts) > 1 else None + + if not mac: + return None + + nums = gateway_entry.get("nums") if gateway_entry else None + has_data = len(data) > 1 + if nums is not None: + try: + has_data = has_data and int(nums) > 0 + except (TypeError, ValueError): + pass + + return mac, has_data + + +class MqttGatewayMonitor: + def __init__( + self, + host: str = None, + port: int = None, + topic: str = None, + version: str = None, + status_interval: int = None, + stale_after: int = None, + retry_delay: int = 5, + gateway_repo: GatewayJsonRepository = None, + ) -> None: + self._host = host or config_env.MQTT_HOST + self._port = port or config_env.MQTT_PORT + self._topic = topic or config_env.MQTT_TOPIC + self._version = version or config_env.MQTT_VERSION + self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL + self._stale_after = stale_after or config_env.MQTT_STALE_AFTER + self._retry_delay = retry_delay + self._gateway_repo = gateway_repo or GatewayJsonRepository() + + self._last_seen: Dict[str, float] = {} + self._last_has_data: Dict[str, bool] = {} + self._lock = asyncio.Lock() + self._stop_event = asyncio.Event() + self._reader_task: Optional[asyncio.Task] = None + self._status_task: Optional[asyncio.Task] = None + self._proc: Optional[asyncio.subprocess.Process] = None + + async def start(self) -> None: + if self._reader_task: + return + self._stop_event.clear() + self._reader_task = asyncio.create_task(self._reader_loop()) + self._status_task = asyncio.create_task(self._status_loop()) + + async def stop(self) -> None: + self._stop_event.set() + if self._proc and self._proc.returncode is None: + self._proc.terminate() + tasks = [t for t in (self._reader_task, self._status_task) if t] + for task in tasks: + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + self._reader_task = None + self._status_task = None + + async def _reader_loop(self) -> None: + while not self._stop_event.is_set(): + try: + self._proc = await asyncio.create_subprocess_exec( + "mosquitto_sub", + "-v", + "-h", + str(self._host), + "-p", + str(self._port), + "-t", + str(self._topic), + "-V", + str(self._version), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + except FileNotFoundError: + log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay) + await asyncio.sleep(self._retry_delay) + continue + except Exception: + log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay) + await asyncio.sleep(self._retry_delay) + continue + + try: + assert self._proc.stdout is not None + while not self._stop_event.is_set(): + line = await self._proc.stdout.readline() + if not line: + break + parsed = _parse_line(line.decode("utf-8", errors="ignore")) + if not parsed: + continue + mac, has_data = parsed + mac_norm = _norm_mac(mac) + if not mac_norm: + continue + async with self._lock: + self._last_seen[mac_norm] = time.monotonic() + self._last_has_data[mac_norm] = bool(has_data) + finally: + if self._proc and self._proc.returncode is None: + self._proc.terminate() + await self._proc.wait() + self._proc = None + + await asyncio.sleep(self._retry_delay) + + async def _status_loop(self) -> None: + while not self._stop_event.is_set(): + await self._update_statuses() + await asyncio.sleep(self._status_interval) + + async def _update_statuses(self) -> None: + now = time.monotonic() + gateways = self._gateway_repo.list() + + async with self._lock: + last_seen = dict(self._last_seen) + last_has_data = dict(self._last_has_data) + + status_by_mac: Dict[str, str] = {} + for gw in gateways: + mac_norm = _norm_mac(gw.get("mac")) + if not mac_norm: + continue + + seen_at = last_seen.get(mac_norm) + has_data = last_has_data.get(mac_norm, False) + if seen_at is None or (now - seen_at) > self._stale_after or not has_data: + status = "disabled" + else: + status = "active" + + status_by_mac[mac_norm] = status + + changes = self._gateway_repo.update_statuses(status_by_mac) + for change in changes: + mac_label = change.get("mac_raw") or change.get("mac") + if change.get("first_set"): + log.info( + "Gateway status initialized: mac=%s status=%s", + mac_label, + change.get("new_status"), + ) + else: + log.info( + "Gateway status changed: mac=%s %s -> %s", + mac_label, + change.get("old_status"), + change.get("new_status"), + )