25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 
 

83 satır
3.0 KiB

  1. import os
  2. import csv
  3. import io
  4. from pathlib import Path
  5. from typing import List, Dict, Any, Optional
  6. import pandas as pd
  7. from .normalize import norm_mac
  8. def _coord_token(v: float) -> str:
  9. """Trasforma coordinata in stringa sicura per nome file (es. -1 -> -1, 120.5 -> 120_5)."""
  10. try:
  11. fv = float(v)
  12. except:
  13. return str(v)
  14. if abs(fv - round(fv)) < 1e-9:
  15. return str(int(round(fv)))
  16. s = f"{fv:.3f}".rstrip('0').rstrip('.')
  17. return s.replace('.', '_')
  18. def read_job_csv(job_path: Path, delimiter: str) -> List[Dict[str, Any]]:
  19. """Legge il Job CSV esteso e normalizza i campi per il Core."""
  20. text = job_path.read_text(encoding="utf-8", errors="replace")
  21. if not text.strip():
  22. return []
  23. # Rilevamento automatico delimitatore se diverso dal config
  24. first_line = next((ln for ln in text.splitlines() if ln.strip()), "")
  25. use_delim = delimiter
  26. if ";" in first_line: use_delim = ";"
  27. elif "," in first_line: use_delim = ","
  28. f = io.StringIO(text)
  29. r = csv.reader(f, delimiter=use_delim)
  30. header = next(r, None)
  31. if not header:
  32. return []
  33. # Normalizzazione Header (minuscolo, senza spazi)
  34. header_norm = [h.strip().lower() for h in header]
  35. idx = {name: i for i, name in enumerate(header_norm)}
  36. rows = []
  37. for cols in r:
  38. if not cols or len(cols) < len(header):
  39. continue
  40. try:
  41. # Estrazione usando i nomi esatti dei tuoi file
  42. rows.append({
  43. "mac": cols[idx["mac"]].strip(),
  44. "x": float(cols[idx["x"]]),
  45. "y": float(cols[idx["y"]]),
  46. "z": float(cols[idx["z"]]),
  47. "beaconname": cols[idx["beaconname"]].strip()
  48. })
  49. except Exception as e:
  50. continue
  51. return rows
  52. def write_samples_csv(path: Path, rows: List[Dict[str, Any]], gateway_headers: List[str], delimiter: str = ";", rssi_decimals: int = 0):
  53. """Scrive il campione con l'ordine colonne corretto: mac, x, y, z, ts_start, ts_end, GW..."""
  54. # Aggiungiamo i timestamp all'header prima dei gateway
  55. header = ["mac", "x", "y", "z", "ts_start", "ts_end"] + gateway_headers
  56. tmp_path = path.with_suffix(".tmp")
  57. with open(tmp_path, "w", newline="", encoding="utf-8") as f:
  58. w = csv.writer(f, delimiter=delimiter)
  59. w.writerow(header)
  60. for row in rows:
  61. line = []
  62. for col in header:
  63. val = row.get(col, "nan")
  64. if col == "mac":
  65. line.append(val)
  66. elif isinstance(val, float):
  67. if pd.isna(val): line.append("nan")
  68. elif col in ["x", "y", "z"]: line.append(f"{val:.1f}")
  69. # Timestamp e RSSI
  70. else: line.append(f"{val:.{rssi_decimals if col not in ['ts_start', 'ts_end'] else 3}f}")
  71. else:
  72. line.append(str(val))
  73. w.writerow(line)
  74. os.replace(tmp_path, path)