#1 GatewayStatusUpdate

Unito
pollutri ha unito 3 commit da GatewayStatusUpdate a master 4 giorni fa
  1. +12
    -0
      app.py
  2. +7
    -2
      config_env.py
  3. +57
    -0
      logica_reslevis/gateway.py
  4. +220
    -0
      mqtt_gateway_monitor.py

+ 12
- 0
app.py Vedi File

@@ -65,6 +65,7 @@ from fastapi.security import OAuth2AuthorizationCodeBearer


#Proxy al CORE ResLevis #Proxy al CORE ResLevis
import httpx import httpx
from mqtt_gateway_monitor import MqttGatewayMonitor




AUTH_URL = config_env.KEYCLOAK_AUTH_URL AUTH_URL = config_env.KEYCLOAK_AUTH_URL
@@ -132,6 +133,17 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )


# MQTT gateway monitor
mqtt_monitor = MqttGatewayMonitor()

@app.on_event("startup")
async def start_mqtt_monitor():
await mqtt_monitor.start()

@app.on_event("shutdown")
async def stop_mqtt_monitor():
await mqtt_monitor.stop()

#ResLevis CORE middleware #ResLevis CORE middleware
@app.middleware("http") @app.middleware("http")
async def local_then_core(request: Request, call_next): async def local_then_core(request: Request, call_next):


+ 7
- 2
config_env.py Vedi File

@@ -13,7 +13,12 @@ KEYCLOAK_PROTOCOL_ENDPOINT = os.getenv("KEYCLOAK_PROTOCOL_ENDPOINT")
KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL") KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL")
KEYCLOAK_AUTH_URL = os.getenv("KEYCLOAK_AUTH_URL") KEYCLOAK_AUTH_URL = os.getenv("KEYCLOAK_AUTH_URL")
KEYCLOAK_TOKEN_URL = os.getenv("KEYCLOAK_TOKEN_URL") KEYCLOAK_TOKEN_URL = os.getenv("KEYCLOAK_TOKEN_URL")

#API PROXYING TO THE CORE
CORE_API_URL = os.getenv("CORE_API_URL", "http://localhost:1902") CORE_API_URL = os.getenv("CORE_API_URL", "http://localhost:1902")


MQTT_HOST = os.getenv("MQTT_HOST", "192.168.1.101")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "#")
MQTT_VERSION = os.getenv("MQTT_VERSION", "mqttv311")
MQTT_STATUS_INTERVAL = int(os.getenv("MQTT_STATUS_INTERVAL", "30"))
MQTT_STALE_AFTER = int(os.getenv("MQTT_STALE_AFTER", "30"))


+ 57
- 0
logica_reslevis/gateway.py Vedi File

@@ -62,6 +62,12 @@ def _norm_str(v: Any) -> str:
"""Normalizza un valore per confronti case-insensitive e safe su None.""" """Normalizza un valore per confronti case-insensitive e safe su None."""
return str(v).strip().lower() if v is not None else "" return str(v).strip().lower() if v is not None else ""


def _norm_mac(v: Any) -> str:
"""Normalizza MAC rimuovendo separatori e forzando lowercase."""
if v is None:
return ""
return "".join(ch for ch in str(v).strip().lower() if ch.isalnum())

def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]: def _index_by_id(rows: List[Dict[str, Any]], gateway_id: str) -> Optional[int]:
gid = _norm_str(gateway_id) gid = _norm_str(gateway_id)
for i, r in enumerate(rows): for i, r in enumerate(rows):
@@ -146,3 +152,54 @@ class GatewayJsonRepository:
del rows[idx] del rows[idx]
self._write_all(rows) self._write_all(rows)


def update_status_by_mac(self, mac: str, status: str) -> bool:
"""Aggiorna lo status dei gateway con il MAC specificato."""
rows = self._read_all()
target = _norm_mac(mac)
if not target:
return False

updated = False
for row in rows:
if _norm_mac(row.get("mac")) == target:
if row.get("status") != status:
row["status"] = status
updated = True

if updated:
self._write_all(rows)
return updated

def update_statuses(self, status_by_mac: Dict[str, str]) -> List[Dict[str, Any]]:
"""Aggiorna lo status per più MAC in una singola scrittura."""
if not status_by_mac:
return []

rows = self._read_all()
changes: List[Dict[str, Any]] = []
for row in rows:
mac = _norm_mac(row.get("mac"))
if not mac:
continue
new_status = status_by_mac.get(mac)
if new_status is None:
continue
old_status = row.get("status")
if old_status != new_status:
first_set = old_status in (None, "")
row["status"] = new_status
changes.append(
{
"mac": mac,
"mac_raw": row.get("mac"),
"name": row.get("name"),
"old_status": old_status,
"new_status": new_status,
"first_set": first_set,
}
)

if changes:
self._write_all(rows)
return changes


+ 220
- 0
mqtt_gateway_monitor.py Vedi File

@@ -0,0 +1,220 @@
import asyncio
import json
import logging
import os
import time
from typing import Dict, Optional

import config_env
from logica_reslevis.gateway import GatewayJsonRepository

LOG_DIR = "/data/var/log/FastAPI"
LOG_PATH = os.path.join(LOG_DIR, "UpdateBeaconStatus.log")
LOG_FORMAT = "%(name)s - %(levelname)s - %(message)s"

def _configure_logger() -> logging.Logger:
logger = logging.getLogger("mqtt_gateway_monitor")
logger.setLevel(logging.INFO)
has_handler = any(
isinstance(h, logging.FileHandler)
and getattr(h, "baseFilename", None) == LOG_PATH
for h in logger.handlers
)
if has_handler:
return logger
try:
os.makedirs(LOG_DIR, exist_ok=True)
open(LOG_PATH, "a").close()
handler = logging.FileHandler(LOG_PATH)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(handler)
logger.propagate = False
except Exception:
# fallback to root logger if file handler cannot be created
logger = logging.getLogger(__name__)
return logger

log = _configure_logger()


def _norm_mac(value: str) -> str:
if value is None:
return ""
return "".join(ch for ch in str(value).strip().lower() if ch.isalnum())


def _parse_line(line: str) -> Optional[str]:
line = line.strip()
if not line or " " not in line:
return None

topic, payload = line.split(" ", 1)
if not topic.startswith("publish_out/"):
return None

try:
data = json.loads(payload)
except Exception:
return None

if not isinstance(data, list) or not data:
return None

gateway_entry = None
for item in data:
if isinstance(item, dict) and item.get("type") == "Gateway":
gateway_entry = item
break

mac = gateway_entry.get("mac") if gateway_entry else None
if not mac:
parts = topic.split("/", 1)
mac = parts[1] if len(parts) > 1 else None

if not mac:
return None

nums = gateway_entry.get("nums") if gateway_entry else None
return mac


class MqttGatewayMonitor:
def __init__(
self,
host: str = None,
port: int = None,
topic: str = None,
version: str = None,
status_interval: int = None,
stale_after: int = None,
retry_delay: int = 5,
gateway_repo: GatewayJsonRepository = None,
) -> None:
self._host = host or config_env.MQTT_HOST
self._port = port or config_env.MQTT_PORT
self._topic = topic or config_env.MQTT_TOPIC
self._version = version or config_env.MQTT_VERSION
self._status_interval = status_interval or config_env.MQTT_STATUS_INTERVAL
self._stale_after = stale_after or config_env.MQTT_STALE_AFTER
self._retry_delay = retry_delay
self._gateway_repo = gateway_repo or GatewayJsonRepository()

self._last_seen: Dict[str, float] = {}
self._lock = asyncio.Lock()
self._stop_event = asyncio.Event()
self._reader_task: Optional[asyncio.Task] = None
self._status_task: Optional[asyncio.Task] = None
self._proc: Optional[asyncio.subprocess.Process] = None

async def start(self) -> None:
if self._reader_task:
return
self._stop_event.clear()
self._reader_task = asyncio.create_task(self._reader_loop())
self._status_task = asyncio.create_task(self._status_loop())

async def stop(self) -> None:
self._stop_event.set()
if self._proc and self._proc.returncode is None:
self._proc.terminate()
tasks = [t for t in (self._reader_task, self._status_task) if t]
for task in tasks:
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._reader_task = None
self._status_task = None

async def _reader_loop(self) -> None:
while not self._stop_event.is_set():
try:
self._proc = await asyncio.create_subprocess_exec(
"mosquitto_sub",
"-v",
"-h",
str(self._host),
"-p",
str(self._port),
"-t",
str(self._topic),
"-V",
str(self._version),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
except FileNotFoundError:
log.error("mosquitto_sub not found in PATH; retrying in %ss", self._retry_delay)
await asyncio.sleep(self._retry_delay)
continue
except Exception:
log.exception("Failed to start mosquitto_sub; retrying in %ss", self._retry_delay)
await asyncio.sleep(self._retry_delay)
continue

try:
assert self._proc.stdout is not None
while not self._stop_event.is_set():
line = await self._proc.stdout.readline()
if not line:
break
mac = _parse_line(line.decode("utf-8", errors="ignore"))
if not mac:
continue
mac_norm = _norm_mac(mac)
if not mac_norm:
continue
async with self._lock:
self._last_seen[mac_norm] = time.monotonic()
finally:
if self._proc and self._proc.returncode is None:
self._proc.terminate()
await self._proc.wait()
self._proc = None

await asyncio.sleep(self._retry_delay)

async def _status_loop(self) -> None:
while not self._stop_event.is_set():
await self._update_statuses()
await asyncio.sleep(self._status_interval)

async def _update_statuses(self) -> None:
now = time.monotonic()
gateways = self._gateway_repo.list()

async with self._lock:
last_seen = dict(self._last_seen)

status_by_mac: Dict[str, str] = {}
for gw in gateways:
mac_norm = _norm_mac(gw.get("mac"))
if not mac_norm:
continue

seen_at = last_seen.get(mac_norm)
if seen_at is None or (now - seen_at) > self._stale_after:
status = "disabled"
else:
status = "active"

status_by_mac[mac_norm] = status

changes = self._gateway_repo.update_statuses(status_by_mac)
for change in changes:
mac_label = change.get("mac_raw") or change.get("mac")
name_label = change.get("name") or ""
if change.get("first_set"):
log.info(
"Gateway status initialized: mac=%s name=%s status=%s",
mac_label,
name_label,
change.get("new_status"),
)
else:
log.info(
"Gateway status changed: mac=%s name=%s %s -> %s",
mac_label,
name_label,
change.get("old_status"),
change.get("new_status"),
)

Caricamento…
Annulla
Salva