|
- import os
- import json
- import time
- import joblib
- import pandas as pd
- import numpy as np
- from pathlib import Path
- from datetime import datetime
- from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
-
- # Import delle utility esistenti
- from .logger_utils import log_msg as log
- from .csv_config import load_gateway_features_csv
-
- def process_train_jobs():
- """Monitora ed esegue i job di addestramento salvando backup cronologici."""
- JOBS_DIR = Path("/data/train/train_jobs")
- JOBS_DIR.mkdir(parents=True, exist_ok=True)
-
- job_files = list(JOBS_DIR.glob("*.lock"))
- if not job_files:
- return
-
- for job_path in job_files:
- try:
- log(f"[TRAIN-CORE] Rilevato nuovo job: {job_path.name}")
-
- with open(job_path, "r") as f:
- job = json.load(f)
-
- campagna = job["campaign"]
- knn_cfg = job["knn"]
- nan_fill = job["nan_fill"]
- gw_csv = job["gateways_csv"]
-
- # --- GENERAZIONE NOME FILE CON TIMESTAMP (BACKUP) ---
- now_str = datetime.now().strftime("%Y%m%d_%H%M%S")
- model_filename = f"model_camp_{campagna}_{now_str}.joblib"
- model_path = Path("/data/model") / model_filename
-
- # Caricamento Gateway
- gws = load_gateway_features_csv(gw_csv)
- gateways_order = [g.mac for g in gws]
-
- # Analisi file campioni
- samples_dir = Path("/data/train/samples")
- sample_files = list(samples_dir.glob(f"{campagna}_*.csv"))
-
- X_list, y_z, y_xy = [], [], []
- for fp in sample_files:
- try:
- df = pd.read_csv(fp, sep=";")
- if df.empty: continue
- row = df.iloc[0]
- # Mapping RSSI basato su gateway.csv (risolve errore 'mac')
- X_list.append([float(row.get(gw, nan_fill)) for gw in gateways_order])
- y_z.append(int(round(float(row.get("z")))))
- y_xy.append([float(row.get("x")), float(row.get("y"))])
- except: continue
-
- if not X_list:
- log(f"[TRAIN-CORE] ERRORE: Dati non validi per campagna {campagna}")
- job_path.unlink()
- continue
-
- X, Y_z, Y_xy = np.array(X_list), np.array(y_z), np.array(y_xy)
-
- # Fitting KNN
- log(f"[TRAIN-CORE] Fitting modello per {model_filename}...")
- floor_clf = KNeighborsClassifier(
- n_neighbors=int(knn_cfg.get('k', 5)),
- weights=knn_cfg.get('weights', 'distance'),
- metric=knn_cfg.get('metric', 'euclidean')
- ).fit(X, Y_z)
-
- models_xy = {}
- for z in np.unique(Y_z):
- idx = np.where(Y_z == z)[0]
- models_xy[int(z)] = KNeighborsRegressor(
- n_neighbors=min(int(knn_cfg.get('k', 5)), len(idx)),
- weights=knn_cfg.get('weights', 'distance'),
- metric=knn_cfg.get('metric', 'euclidean')
- ).fit(X[idx], Y_xy[idx])
-
- # Salvataggio Pacchetto
- model_pkg = {
- "floor_clf": floor_clf,
- "xy_by_floor": models_xy,
- "gateways_order": gateways_order,
- "nan_fill": nan_fill,
- "created_at": datetime.now().isoformat(),
- "campaign": campagna,
- "filename": model_filename
- }
-
- Path("/data/model").mkdir(parents=True, exist_ok=True)
- joblib.dump(model_pkg, model_path)
-
- log(f"[TRAIN-CORE] ✅ Addestramento COMPLETATO: {model_filename}")
-
- except Exception as e:
- log(f"[TRAIN-CORE] ❌ ERRORE CRITICO: {str(e)}")
-
- finally:
- if job_path.exists():
- job_path.unlink()
-
- def run_train_monitor():
- """Loop di monitoraggio per il Core Orchestrator."""
- while True:
- process_train_jobs()
- time.sleep(5)
|