|
- """fingerprint.py
-
- Raccolta RSSI durante una finestra temporale e aggregazione in feature-vector.
-
- Scelte chiave:
- - Matching interno su MAC in formato **compact** (12 hex senza ':').
- - Header CSV dei gateway mantenuto nel formato originale (spesso con ':').
-
- Filtri/robustezza (per-gateway):
- - range rssi_min/rssi_max
- - outlier_method: none | mad | iqr
- - min_samples_per_gateway
- - max_stddev (opzionale)
- - aggregate: mean | median
- """
-
- from __future__ import annotations
-
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple
-
- import math
- import statistics
-
-
- def _mad_filter(values: List[float], z: float = 3.5) -> List[float]:
- if len(values) < 3:
- return values
- med = statistics.median(values)
- dev = [abs(v - med) for v in values]
- mad = statistics.median(dev)
- if mad == 0:
- return values
- kept: List[float] = []
- for v in values:
- mz = 0.6745 * (v - med) / mad # modified z-score
- if abs(mz) <= z:
- kept.append(v)
- return kept if kept else values
-
-
- def _iqr_filter(values: List[float], k: float = 1.5) -> List[float]:
- if len(values) < 4:
- return values
- vs = sorted(values)
- q1 = vs[len(vs) // 4]
- q3 = vs[(len(vs) * 3) // 4]
- iqr = q3 - q1
- low = q1 - k * iqr
- high = q3 + k * iqr
- kept = [v for v in values if low <= v <= high]
- return kept if kept else values
-
-
- @dataclass
- class FingerprintWindow:
- beacon_keys: List[str] # compact
- gateway_headers: List[str] # come in gateway.csv (spesso colon)
- gateway_keys: List[str] # compact (allineato a gateway_headers)
-
- rssi_min: float = -110.0
- rssi_max: float = -25.0
- outlier_method: str = "none" # none | mad | iqr
- mad_z: float = 3.5
- min_samples_per_gateway: int = 1
- max_stddev: Optional[float] = None
-
- values: Dict[str, Dict[str, List[float]]] = field(default_factory=dict)
-
- def __post_init__(self) -> None:
- self.beacon_set = set(self.beacon_keys)
- self.gw_set = set(self.gateway_keys)
- for b in self.beacon_keys:
- self.values[b] = {gk: [] for gk in self.gateway_keys}
-
- def add(self, gw_key: str, beacon_key: str, rssi: float) -> bool:
- if gw_key not in self.gw_set or beacon_key not in self.beacon_set:
- return False
- try:
- r = float(rssi)
- except Exception:
- return False
- if r < self.rssi_min or r > self.rssi_max:
- return False
- self.values[beacon_key][gw_key].append(r)
- return True
-
- def _aggregate_one(self, xs: List[float], method: str) -> float:
- if not xs:
- return math.nan
-
- vals = xs
- if self.outlier_method == "mad":
- vals = _mad_filter(vals, z=self.mad_z)
- elif self.outlier_method == "iqr":
- vals = _iqr_filter(vals, k=1.5)
-
- if len(vals) < max(1, int(self.min_samples_per_gateway)):
- return math.nan
-
- if self.max_stddev is not None and len(vals) >= 2:
- mean = sum(vals) / len(vals)
- var = sum((v - mean) ** 2 for v in vals) / (len(vals) - 1)
- sd = math.sqrt(var)
- if sd > float(self.max_stddev):
- return math.nan
-
- if method == "mean":
- return sum(vals) / len(vals)
- return float(statistics.median(vals))
-
- def features_for(self, beacon_key: str, aggregate: str = "median") -> Dict[str, float]:
- out: Dict[str, float] = {}
- for gk, hdr in zip(self.gateway_keys, self.gateway_headers):
- out[hdr] = self._aggregate_one(self.values[beacon_key][gk], aggregate)
- return out
-
- def top_gateways(self, beacon_key: str, aggregate: str = "median", top_n: int = 5) -> List[Tuple[int, str, float]]:
- rows: List[Tuple[int, str, float]] = []
- for gk, hdr in zip(self.gateway_keys, self.gateway_headers):
- n = len(self.values[beacon_key][gk])
- if n <= 0:
- continue
- agg = self._aggregate_one(self.values[beacon_key][gk], aggregate)
- rows.append((n, hdr, agg))
- rows.sort(key=lambda t: t[0], reverse=True)
- return rows[:top_n]
|