您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

128 行
4.1 KiB

  1. """fingerprint.py
  2. Raccolta RSSI durante una finestra temporale e aggregazione in feature-vector.
  3. Scelte chiave:
  4. - Matching interno su MAC in formato **compact** (12 hex senza ':').
  5. - Header CSV dei gateway mantenuto nel formato originale (spesso con ':').
  6. Filtri/robustezza (per-gateway):
  7. - range rssi_min/rssi_max
  8. - outlier_method: none | mad | iqr
  9. - min_samples_per_gateway
  10. - max_stddev (opzionale)
  11. - aggregate: mean | median
  12. """
  13. from __future__ import annotations
  14. from dataclasses import dataclass, field
  15. from typing import Dict, List, Optional, Tuple
  16. import math
  17. import statistics
  18. def _mad_filter(values: List[float], z: float = 3.5) -> List[float]:
  19. if len(values) < 3:
  20. return values
  21. med = statistics.median(values)
  22. dev = [abs(v - med) for v in values]
  23. mad = statistics.median(dev)
  24. if mad == 0:
  25. return values
  26. kept: List[float] = []
  27. for v in values:
  28. mz = 0.6745 * (v - med) / mad # modified z-score
  29. if abs(mz) <= z:
  30. kept.append(v)
  31. return kept if kept else values
  32. def _iqr_filter(values: List[float], k: float = 1.5) -> List[float]:
  33. if len(values) < 4:
  34. return values
  35. vs = sorted(values)
  36. q1 = vs[len(vs) // 4]
  37. q3 = vs[(len(vs) * 3) // 4]
  38. iqr = q3 - q1
  39. low = q1 - k * iqr
  40. high = q3 + k * iqr
  41. kept = [v for v in values if low <= v <= high]
  42. return kept if kept else values
  43. @dataclass
  44. class FingerprintWindow:
  45. beacon_keys: List[str] # compact
  46. gateway_headers: List[str] # come in gateway.csv (spesso colon)
  47. gateway_keys: List[str] # compact (allineato a gateway_headers)
  48. rssi_min: float = -110.0
  49. rssi_max: float = -25.0
  50. outlier_method: str = "none" # none | mad | iqr
  51. mad_z: float = 3.5
  52. min_samples_per_gateway: int = 1
  53. max_stddev: Optional[float] = None
  54. values: Dict[str, Dict[str, List[float]]] = field(default_factory=dict)
  55. def __post_init__(self) -> None:
  56. self.beacon_set = set(self.beacon_keys)
  57. self.gw_set = set(self.gateway_keys)
  58. for b in self.beacon_keys:
  59. self.values[b] = {gk: [] for gk in self.gateway_keys}
  60. def add(self, gw_key: str, beacon_key: str, rssi: float) -> bool:
  61. if gw_key not in self.gw_set or beacon_key not in self.beacon_set:
  62. return False
  63. try:
  64. r = float(rssi)
  65. except Exception:
  66. return False
  67. if r < self.rssi_min or r > self.rssi_max:
  68. return False
  69. self.values[beacon_key][gw_key].append(r)
  70. return True
  71. def _aggregate_one(self, xs: List[float], method: str) -> float:
  72. if not xs:
  73. return math.nan
  74. vals = xs
  75. if self.outlier_method == "mad":
  76. vals = _mad_filter(vals, z=self.mad_z)
  77. elif self.outlier_method == "iqr":
  78. vals = _iqr_filter(vals, k=1.5)
  79. if len(vals) < max(1, int(self.min_samples_per_gateway)):
  80. return math.nan
  81. if self.max_stddev is not None and len(vals) >= 2:
  82. mean = sum(vals) / len(vals)
  83. var = sum((v - mean) ** 2 for v in vals) / (len(vals) - 1)
  84. sd = math.sqrt(var)
  85. if sd > float(self.max_stddev):
  86. return math.nan
  87. if method == "mean":
  88. return sum(vals) / len(vals)
  89. return float(statistics.median(vals))
  90. def features_for(self, beacon_key: str, aggregate: str = "median") -> Dict[str, float]:
  91. out: Dict[str, float] = {}
  92. for gk, hdr in zip(self.gateway_keys, self.gateway_headers):
  93. out[hdr] = self._aggregate_one(self.values[beacon_key][gk], aggregate)
  94. return out
  95. def top_gateways(self, beacon_key: str, aggregate: str = "median", top_n: int = 5) -> List[Tuple[int, str, float]]:
  96. rows: List[Tuple[int, str, float]] = []
  97. for gk, hdr in zip(self.gateway_keys, self.gateway_headers):
  98. n = len(self.values[beacon_key][gk])
  99. if n <= 0:
  100. continue
  101. agg = self._aggregate_one(self.values[beacon_key][gk], aggregate)
  102. rows.append((n, hdr, agg))
  103. rows.sort(key=lambda t: t[0], reverse=True)
  104. return rows[:top_n]