|
- import os
- import csv
- import io
- from pathlib import Path
- from typing import List, Dict, Any, Optional
- import pandas as pd
- from .normalize import norm_mac
-
- def _coord_token(v: float) -> str:
- """Trasforma coordinata in stringa sicura per nome file (es. -1 -> -1, 120.5 -> 120_5)."""
- try:
- fv = float(v)
- except:
- return str(v)
- if abs(fv - round(fv)) < 1e-9:
- return str(int(round(fv)))
- s = f"{fv:.3f}".rstrip('0').rstrip('.')
- return s.replace('.', '_')
-
- def read_job_csv(job_path: Path, delimiter: str) -> List[Dict[str, Any]]:
- """Legge il Job CSV esteso e normalizza i campi per il Core."""
- text = job_path.read_text(encoding="utf-8", errors="replace")
- if not text.strip():
- return []
-
- # Rilevamento automatico delimitatore se diverso dal config
- first_line = next((ln for ln in text.splitlines() if ln.strip()), "")
- use_delim = delimiter
- if ";" in first_line: use_delim = ";"
- elif "," in first_line: use_delim = ","
-
- f = io.StringIO(text)
- r = csv.reader(f, delimiter=use_delim)
- header = next(r, None)
- if not header:
- return []
-
- # Normalizzazione Header (minuscolo, senza spazi)
- header_norm = [h.strip().lower() for h in header]
- idx = {name: i for i, name in enumerate(header_norm)}
-
- rows = []
- for cols in r:
- if not cols or len(cols) < len(header):
- continue
- try:
- # Estrazione usando i nomi esatti dei tuoi file
- rows.append({
- "mac": cols[idx["mac"]].strip(),
- "x": float(cols[idx["x"]]),
- "y": float(cols[idx["y"]]),
- "z": float(cols[idx["z"]]),
- "beaconname": cols[idx["beaconname"]].strip()
- })
- except Exception as e:
- continue
- return rows
-
- def write_samples_csv(path: Path, rows: List[Dict[str, Any]], gateway_headers: List[str], delimiter: str = ";", rssi_decimals: int = 0):
- """Scrive il campione con l'ordine colonne corretto: mac, x, y, z, ts_start, ts_end, GW..."""
- # Aggiungiamo i timestamp all'header prima dei gateway
- header = ["mac", "x", "y", "z", "ts_start", "ts_end"] + gateway_headers
-
- tmp_path = path.with_suffix(".tmp")
- with open(tmp_path, "w", newline="", encoding="utf-8") as f:
- w = csv.writer(f, delimiter=delimiter)
- w.writerow(header)
- for row in rows:
- line = []
- for col in header:
- val = row.get(col, "nan")
- if col == "mac":
- line.append(val)
- elif isinstance(val, float):
- if pd.isna(val): line.append("nan")
- elif col in ["x", "y", "z"]: line.append(f"{val:.1f}")
- # Timestamp e RSSI
- else: line.append(f"{val:.{rssi_decimals if col not in ['ts_start', 'ts_end'] else 3}f}")
- else:
- line.append(str(val))
- w.writerow(line)
- os.replace(tmp_path, path)
|