import os import csv import io from pathlib import Path from typing import List, Dict, Any, Optional import pandas as pd from .normalize import norm_mac def _coord_token(v: float) -> str: """Trasforma coordinata in stringa sicura per nome file (es. -1 -> -1, 120.5 -> 120_5).""" try: fv = float(v) except: return str(v) if abs(fv - round(fv)) < 1e-9: return str(int(round(fv))) s = f"{fv:.3f}".rstrip('0').rstrip('.') return s.replace('.', '_') def read_job_csv(job_path: Path, delimiter: str) -> List[Dict[str, Any]]: """Legge il Job CSV esteso e normalizza i campi per il Core.""" text = job_path.read_text(encoding="utf-8", errors="replace") if not text.strip(): return [] # Rilevamento automatico delimitatore se diverso dal config first_line = next((ln for ln in text.splitlines() if ln.strip()), "") use_delim = delimiter if ";" in first_line: use_delim = ";" elif "," in first_line: use_delim = "," f = io.StringIO(text) r = csv.reader(f, delimiter=use_delim) header = next(r, None) if not header: return [] # Normalizzazione Header (minuscolo, senza spazi) header_norm = [h.strip().lower() for h in header] idx = {name: i for i, name in enumerate(header_norm)} rows = [] for cols in r: if not cols or len(cols) < len(header): continue try: # Estrazione usando i nomi esatti dei tuoi file rows.append({ "mac": cols[idx["mac"]].strip(), "x": float(cols[idx["x"]]), "y": float(cols[idx["y"]]), "z": float(cols[idx["z"]]), "beaconname": cols[idx["beaconname"]].strip() }) except Exception as e: continue return rows def write_samples_csv(path: Path, rows: List[Dict[str, Any]], gateway_headers: List[str], delimiter: str = ";", rssi_decimals: int = 0): """Scrive il campione con l'ordine colonne corretto: mac, x, y, z, ts_start, ts_end, GW...""" # Aggiungiamo i timestamp all'header prima dei gateway header = ["mac", "x", "y", "z", "ts_start", "ts_end"] + gateway_headers tmp_path = path.with_suffix(".tmp") with open(tmp_path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f, delimiter=delimiter) w.writerow(header) for row in rows: line = [] for col in header: val = row.get(col, "nan") if col == "mac": line.append(val) elif isinstance(val, float): if pd.isna(val): line.append("nan") elif col in ["x", "y", "z"]: line.append(f"{val:.1f}") # Timestamp e RSSI else: line.append(f"{val:.{rssi_decimals if col not in ['ts_start', 'ts_end'] else 3}f}") else: line.append(str(val)) w.writerow(line) os.replace(tmp_path, path)