|
- from __future__ import annotations
-
- from dataclasses import dataclass, field
- from typing import Dict, Set, List
- import time
-
-
- def canonical_mac(mac: str) -> str:
- """
- Normalize MAC to 'AA:BB:CC:DD:EE:FF' (uppercase).
- Accepts:
- - 'AC233FC1DCCB'
- - 'ac:23:3f:c1:dc:cb'
- - 'AC-23-3F-C1-DC-CB'
- - other variants (best-effort)
- """
- s = str(mac).strip()
- if not s:
- return s
-
- # keep only hex chars
- hex_only = "".join(ch for ch in s if ch.isalnum()).upper()
-
- # If it's exactly 12 hex chars -> format with ':'
- if len(hex_only) == 12:
- return ":".join(hex_only[i:i + 2] for i in range(0, 12, 2))
-
- # Fallback: uppercase original (best-effort)
- return s.upper()
-
-
- @dataclass
- class OnlineMonitor:
- gateway_macs: Set[str]
- beacon_macs: Set[str]
- offline_after_seconds_gateways: int = 120
- offline_after_seconds_beacons: int = 600
-
- last_seen_gw: Dict[str, float] = field(default_factory=dict)
- last_seen_beacon: Dict[str, float] = field(default_factory=dict)
-
- def __post_init__(self) -> None:
- # normalize allowed sets once
- self.gateway_macs = {canonical_mac(m) for m in self.gateway_macs}
- self.beacon_macs = {canonical_mac(m) for m in self.beacon_macs}
-
- def mark_gateway_seen(self, gw_mac: str) -> None:
- self.last_seen_gw[canonical_mac(gw_mac)] = time.time()
-
- def mark_beacon_seen(self, beacon_mac: str) -> None:
- self.last_seen_beacon[canonical_mac(beacon_mac)] = time.time()
-
- def _offline(self, allowed: Set[str], last_seen: Dict[str, float], threshold: int) -> List[str]:
- now = time.time()
- off = []
- for mac in sorted(allowed):
- t = last_seen.get(mac)
- if t is None:
- off.append(mac)
- elif (now - t) > threshold:
- off.append(mac)
- return off
-
- def offline_gateways(self) -> List[str]:
- return self._offline(self.gateway_macs, self.last_seen_gw, self.offline_after_seconds_gateways)
-
- def offline_beacons(self) -> List[str]:
- return self._offline(self.beacon_macs, self.last_seen_beacon, self.offline_after_seconds_beacons)
-
- def _age(self, mac: str, last_seen: Dict[str, float]) -> int:
- t = last_seen.get(mac)
- if t is None:
- return -1
- return int(time.time() - t)
-
- def offline_gateways_with_age(self):
- off = self.offline_gateways()
- return [(m, self._age(m, self.last_seen_gw)) for m in off]
-
- def offline_beacons_with_age(self):
- off = self.offline_beacons()
- return [(m, self._age(m, self.last_seen_beacon)) for m in off]
|