Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 
 

83 lignes
2.6 KiB

  1. from __future__ import annotations
  2. from dataclasses import dataclass, field
  3. from typing import Dict, Set, List
  4. import time
  5. def canonical_mac(mac: str) -> str:
  6. """
  7. Normalize MAC to 'AA:BB:CC:DD:EE:FF' (uppercase).
  8. Accepts:
  9. - 'AC233FC1DCCB'
  10. - 'ac:23:3f:c1:dc:cb'
  11. - 'AC-23-3F-C1-DC-CB'
  12. - other variants (best-effort)
  13. """
  14. s = str(mac).strip()
  15. if not s:
  16. return s
  17. # keep only hex chars
  18. hex_only = "".join(ch for ch in s if ch.isalnum()).upper()
  19. # If it's exactly 12 hex chars -> format with ':'
  20. if len(hex_only) == 12:
  21. return ":".join(hex_only[i:i + 2] for i in range(0, 12, 2))
  22. # Fallback: uppercase original (best-effort)
  23. return s.upper()
  24. @dataclass
  25. class OnlineMonitor:
  26. gateway_macs: Set[str]
  27. beacon_macs: Set[str]
  28. offline_after_seconds_gateways: int = 120
  29. offline_after_seconds_beacons: int = 600
  30. last_seen_gw: Dict[str, float] = field(default_factory=dict)
  31. last_seen_beacon: Dict[str, float] = field(default_factory=dict)
  32. def __post_init__(self) -> None:
  33. # normalize allowed sets once
  34. self.gateway_macs = {canonical_mac(m) for m in self.gateway_macs}
  35. self.beacon_macs = {canonical_mac(m) for m in self.beacon_macs}
  36. def mark_gateway_seen(self, gw_mac: str) -> None:
  37. self.last_seen_gw[canonical_mac(gw_mac)] = time.time()
  38. def mark_beacon_seen(self, beacon_mac: str) -> None:
  39. self.last_seen_beacon[canonical_mac(beacon_mac)] = time.time()
  40. def _offline(self, allowed: Set[str], last_seen: Dict[str, float], threshold: int) -> List[str]:
  41. now = time.time()
  42. off = []
  43. for mac in sorted(allowed):
  44. t = last_seen.get(mac)
  45. if t is None:
  46. off.append(mac)
  47. elif (now - t) > threshold:
  48. off.append(mac)
  49. return off
  50. def offline_gateways(self) -> List[str]:
  51. return self._offline(self.gateway_macs, self.last_seen_gw, self.offline_after_seconds_gateways)
  52. def offline_beacons(self) -> List[str]:
  53. return self._offline(self.beacon_macs, self.last_seen_beacon, self.offline_after_seconds_beacons)
  54. def _age(self, mac: str, last_seen: Dict[str, float]) -> int:
  55. t = last_seen.get(mac)
  56. if t is None:
  57. return -1
  58. return int(time.time() - t)
  59. def offline_gateways_with_age(self):
  60. off = self.offline_gateways()
  61. return [(m, self._age(m, self.last_seen_gw)) for m in off]
  62. def offline_beacons_with_age(self):
  63. off = self.offline_beacons()
  64. return [(m, self._age(m, self.last_seen_beacon)) for m in off]