|
- import csv
- import logging
- from typing import Any, Dict, List, Optional
-
- import httpx
-
- log = logging.getLogger(__name__)
-
-
- def _norm_mac(v: Any) -> str:
- if v is None:
- return ""
- return "".join(ch for ch in str(v).strip().lower() if ch.isalnum())
-
-
- def _none_if_empty(v: Any) -> Any:
- return None if v in ("", None, 0, "0") else v
-
-
- def _str_or_none(v: Any) -> Optional[str]:
- if v in ("", None):
- return None
- if isinstance(v, (int, float, bool)):
- return str(v)
- return v
-
-
- def _normalize_tracker(row: dict) -> dict:
- row = dict(row)
- row["floor"] = _none_if_empty(row.get("floor"))
- row["building"] = _none_if_empty(row.get("building"))
- row["battery"] = _str_or_none(row.get("battery"))
- row["temperature"] = _str_or_none(row.get("temperature"))
- row["acceleration"] = _str_or_none(row.get("acceleration"))
- row["heartRate"] = _str_or_none(row.get("heartRate"))
- return row
-
-
- async def _fetch_algorithm(core_base_url: str, timeout: float) -> Optional[str]:
- try:
- async with httpx.AsyncClient(timeout=timeout) as client:
- resp = await client.get(f"{core_base_url}/reslevis/settings")
- if 200 <= resp.status_code < 300:
- payload = resp.json()
- if isinstance(payload, list) and payload:
- value = payload[0].get("current_algorithm")
- if value is not None:
- return str(value).lower()
- except (httpx.RequestError, ValueError):
- pass
- return None
-
-
- async def _filter_mode_trackers(
- tracker_repo, core_base_url: str, timeout: float
- ) -> List[Dict[str, Any]]:
- try:
- async with httpx.AsyncClient(timeout=timeout) as client:
- resp = await client.get(f"{core_base_url}/reslevis/getTrackers")
- if 200 <= resp.status_code < 300:
- data = resp.json()
- if isinstance(data, list):
- normalized = [_normalize_tracker(r) for r in data if isinstance(r, dict)]
- tracker_repo._write_all(normalized)
- return normalized
- except (httpx.RequestError, ValueError):
- pass
- return tracker_repo.list()
-
-
- def _read_infer_positions(infer_csv_path: str) -> Dict[str, Dict[str, Optional[float]]]:
- positions: Dict[str, Dict[str, Optional[float]]] = {}
- try:
- with open(infer_csv_path, newline="") as f:
- reader = csv.DictReader(f, delimiter=";")
- for row in reader:
- mac = _norm_mac(row.get("mac"))
- if not mac:
- continue
- try:
- positions[mac] = {
- "x": int(row["x"]) if row.get("x") not in (None, "") else None,
- "y": int(row["y"]) if row.get("y") not in (None, "") else None,
- }
- except (KeyError, ValueError):
- continue
- except OSError:
- log.warning("BLE-AI infer CSV not found: %s", infer_csv_path)
- return positions
-
-
- def _ai_mode_trackers(
- tracker_repo, infer_csv_path: str
- ) -> List[Dict[str, Any]]:
- trackers = tracker_repo.list()
- positions = _read_infer_positions(infer_csv_path)
- if not positions:
- return trackers
- result = []
- for tracker in trackers:
- t = dict(tracker)
- mac = _norm_mac(t.get("mac"))
- if mac and mac in positions:
- t["x"] = positions[mac]["x"]
- t["y"] = positions[mac]["y"]
- result.append(t)
- return result
-
-
- async def get_mode_aware_trackers(
- tracker_repo,
- core_base_url: str,
- infer_csv_path: str,
- timeout: float,
- ) -> List[Dict[str, Any]]:
- algorithm = await _fetch_algorithm(core_base_url, timeout)
-
- if algorithm == "filter":
- return await _filter_mode_trackers(tracker_repo, core_base_url, timeout)
-
- if algorithm == "ai":
- return _ai_mode_trackers(tracker_repo, infer_csv_path)
-
- return tracker_repo.list()
|