"""fingerprint.py Raccolta RSSI durante una finestra temporale e aggregazione in feature-vector. Scelte chiave: - Matching interno su MAC in formato **compact** (12 hex senza ':'). - Header CSV dei gateway mantenuto nel formato originale (spesso con ':'). Filtri/robustezza (per-gateway): - range rssi_min/rssi_max - outlier_method: none | mad | iqr - min_samples_per_gateway - max_stddev (opzionale) - aggregate: mean | median """ from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import math import statistics def _mad_filter(values: List[float], z: float = 3.5) -> List[float]: if len(values) < 3: return values med = statistics.median(values) dev = [abs(v - med) for v in values] mad = statistics.median(dev) if mad == 0: return values kept: List[float] = [] for v in values: mz = 0.6745 * (v - med) / mad # modified z-score if abs(mz) <= z: kept.append(v) return kept if kept else values def _iqr_filter(values: List[float], k: float = 1.5) -> List[float]: if len(values) < 4: return values vs = sorted(values) q1 = vs[len(vs) // 4] q3 = vs[(len(vs) * 3) // 4] iqr = q3 - q1 low = q1 - k * iqr high = q3 + k * iqr kept = [v for v in values if low <= v <= high] return kept if kept else values @dataclass class FingerprintWindow: beacon_keys: List[str] # compact gateway_headers: List[str] # come in gateway.csv (spesso colon) gateway_keys: List[str] # compact (allineato a gateway_headers) rssi_min: float = -110.0 rssi_max: float = -25.0 outlier_method: str = "none" # none | mad | iqr mad_z: float = 3.5 min_samples_per_gateway: int = 1 max_stddev: Optional[float] = None values: Dict[str, Dict[str, List[float]]] = field(default_factory=dict) def __post_init__(self) -> None: self.beacon_set = set(self.beacon_keys) self.gw_set = set(self.gateway_keys) for b in self.beacon_keys: self.values[b] = {gk: [] for gk in self.gateway_keys} def add(self, gw_key: str, beacon_key: str, rssi: float) -> bool: if gw_key not in self.gw_set or beacon_key not in self.beacon_set: return False try: r = float(rssi) except Exception: return False if r < self.rssi_min or r > self.rssi_max: return False self.values[beacon_key][gw_key].append(r) return True def _aggregate_one(self, xs: List[float], method: str) -> float: if not xs: return math.nan vals = xs if self.outlier_method == "mad": vals = _mad_filter(vals, z=self.mad_z) elif self.outlier_method == "iqr": vals = _iqr_filter(vals, k=1.5) if len(vals) < max(1, int(self.min_samples_per_gateway)): return math.nan if self.max_stddev is not None and len(vals) >= 2: mean = sum(vals) / len(vals) var = sum((v - mean) ** 2 for v in vals) / (len(vals) - 1) sd = math.sqrt(var) if sd > float(self.max_stddev): return math.nan if method == "mean": return sum(vals) / len(vals) return float(statistics.median(vals)) def features_for(self, beacon_key: str, aggregate: str = "median") -> Dict[str, float]: out: Dict[str, float] = {} for gk, hdr in zip(self.gateway_keys, self.gateway_headers): out[hdr] = self._aggregate_one(self.values[beacon_key][gk], aggregate) return out def top_gateways(self, beacon_key: str, aggregate: str = "median", top_n: int = 5) -> List[Tuple[int, str, float]]: rows: List[Tuple[int, str, float]] = [] for gk, hdr in zip(self.gateway_keys, self.gateway_headers): n = len(self.values[beacon_key][gk]) if n <= 0: continue agg = self._aggregate_one(self.values[beacon_key][gk], aggregate) rows.append((n, hdr, agg)) rows.sort(key=lambda t: t[0], reverse=True) return rows[:top_n]