您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

592 行
18 KiB

  1. import asyncio
  2. import json
  3. import logging
  4. from fastapi import APIRouter, Depends, HTTPException, Query, Request
  5. import httpx
  6. import config_env
  7. from typing import List, Optional
  8. from urllib.parse import urlencode
  9. from schemas.reslevis import (
  10. BuildingItem,
  11. FloorItem,
  12. ZoneItem,
  13. GatewayItem,
  14. TrackerItem,
  15. OperatorItem,
  16. SubjectItem,
  17. AlarmItem,
  18. TrackItem,
  19. TrackHistoryItem,
  20. TrackerZoneItem,
  21. SettingItem,
  22. )
  23. from logica_reslevis.gateway import GatewayJsonRepository
  24. from logica_reslevis.building import BuildingJsonRepository
  25. from logica_reslevis.floor import FloorJsonRepository
  26. from logica_reslevis.zone import ZoneJsonRepository
  27. from logica_reslevis.tracker import TrackerJsonRepository
  28. from logica_reslevis.operator import OperatorJsonRepository
  29. from logica_reslevis.setting import SettingJsonRepository
  30. from logica_reslevis.subject import SubjectJsonRepository
  31. from logica_reslevis.alarm import AlarmJsonRepository
  32. from logica_reslevis.track import TrackJsonRepository
  33. from logica_reslevis.tracker_zone import TrackerZoneJsonRepository
  34. from security import get_current_user
  35. #CORE SYNC
  36. CORE_BASE_URL = config_env.CORE_API_URL.rstrip("/")
  37. TRACKS_CORE_BASE_URL = "http://localhost:1902"
  38. CORE_TIMEOUT = 2.0 # secondi
  39. logger = logging.getLogger("reslevis")
  40. async def sync_core_get(request: Request) -> None:
  41. if request.method != "GET":
  42. return
  43. sync = CORE_GET_SYNC.get(request.url.path)
  44. if sync is None:
  45. return
  46. repo, normalizer = sync
  47. try:
  48. async with httpx.AsyncClient(timeout=CORE_TIMEOUT) as client:
  49. resp = await client.get(
  50. f"{CORE_BASE_URL}{request.url.path}",
  51. params=request.query_params,
  52. )
  53. if 200 <= resp.status_code < 300:
  54. data = resp.json()
  55. if isinstance(data, list):
  56. if normalizer:
  57. data = [normalizer(r) for r in data if isinstance(r, dict)]
  58. repo._write_all(data) # aggiorna i file locali
  59. except (httpx.RequestError, ValueError):
  60. # CORE giù o risposta non valida -> uso il file locale
  61. pass
  62. router = APIRouter(dependencies=[Depends(sync_core_get)])
  63. gateway_repo = GatewayJsonRepository()
  64. building_repo = BuildingJsonRepository()
  65. floor_repo = FloorJsonRepository()
  66. zone_repo = ZoneJsonRepository()
  67. tracker_repo = TrackerJsonRepository()
  68. operator_repo = OperatorJsonRepository()
  69. subject_repo = SubjectJsonRepository()
  70. alarm_repo = AlarmJsonRepository()
  71. track_repo = TrackJsonRepository()
  72. tracker_zone_repo = TrackerZoneJsonRepository()
  73. setting_repo = SettingJsonRepository()
  74. def _none_if_empty(v):
  75. return None if v in ("", None, 0, "0") else v
  76. def _str_or_none(v):
  77. if v in ("", None):
  78. return None
  79. if isinstance(v, (int, float, bool)):
  80. return str(v)
  81. return v
  82. def _uuid_list(values):
  83. if values in ("", None):
  84. return []
  85. if isinstance(values, str):
  86. values = [v for v in values.split(",") if v]
  87. if isinstance(values, (list, tuple, set)):
  88. cleaned = []
  89. for v in values:
  90. if isinstance(v, dict):
  91. v = v.get("id") or v.get("uuid")
  92. if v in ("", None, 0, "0"):
  93. continue
  94. cleaned.append(v)
  95. return cleaned
  96. return [values] if values not in ("", None, 0, "0") else []
  97. def _normalize_gateway(row: dict) -> dict:
  98. row = dict(row)
  99. row["floor"] = _none_if_empty(row.get("floor"))
  100. row["building"] = _none_if_empty(row.get("building"))
  101. return row
  102. def _normalize_tracker(row: dict) -> dict:
  103. row = dict(row)
  104. row["floor"] = _none_if_empty(row.get("floor"))
  105. row["building"] = _none_if_empty(row.get("building"))
  106. row["battery"] = _str_or_none(row.get("battery"))
  107. row["temperature"] = _str_or_none(row.get("temperature"))
  108. row["acceleration"] = _str_or_none(row.get("acceleration"))
  109. row["heartRate"] = _str_or_none(row.get("heartRate"))
  110. return row
  111. def _normalize_track(row: dict) -> dict:
  112. row = dict(row)
  113. row["ID"] = row.get("ID")
  114. row["gateway"] = _none_if_empty(row.get("gateway"))
  115. row["tracker"] = _none_if_empty(row.get("tracker"))
  116. row["subject"] = _none_if_empty(row.get("subject"))
  117. row["floor"] = _none_if_empty(row.get("floor"))
  118. row["building"] = _none_if_empty(row.get("building"))
  119. row["timestamp"] = _str_or_none(row.get("timestamp"))
  120. row["type"] = _str_or_none(row.get("type"))
  121. row["status"] = _str_or_none(row.get("status"))
  122. row["gatewayMac"] = _str_or_none(row.get("gatewayMac"))
  123. row["trackerMac"] = _str_or_none(row.get("trackerMac"))
  124. row["subjectName"] = _str_or_none(row.get("subjectName"))
  125. row["x"] = None if row.get("x") in ("", None) else row.get("x")
  126. row["y"] = None if row.get("y") in ("", None) else row.get("y")
  127. row["z"] = None if row.get("z") in ("", None) else row.get("z")
  128. # signal resta float o None
  129. row["signal"] = None if row.get("signal") in ("", None) else row.get("signal")
  130. return row
  131. def _normalize_zone(row: dict) -> dict:
  132. row = dict(row)
  133. row["floor"] = _none_if_empty(row.get("floor"))
  134. row["building"] = _none_if_empty(row.get("building"))
  135. row["groups"] = _uuid_list(row.get("groups"))
  136. return row
  137. CORE_GET_SYNC = {
  138. "/reslevis/getGateways": (gateway_repo, _normalize_gateway),
  139. "/reslevis/getZones": (zone_repo, _normalize_zone),
  140. "/reslevis/getTrackers": (tracker_repo, _normalize_tracker),
  141. }
  142. async def _core_get_json(
  143. client: httpx.AsyncClient,
  144. path: str,
  145. params: Optional[dict] = None,
  146. ):
  147. resp = await client.get(f"{CORE_BASE_URL}{path}", params=params)
  148. if resp.status_code >= 400:
  149. detail = resp.text.strip() or "CORE request failed"
  150. raise HTTPException(status_code=resp.status_code, detail=detail)
  151. try:
  152. return resp.json()
  153. except ValueError as exc:
  154. raise HTTPException(status_code=502, detail="Invalid CORE response") from exc
  155. async def _core_get_json_via_curl(
  156. path: str,
  157. params: Optional[dict] = None,
  158. ):
  159. query_string = urlencode(params or {})
  160. url = f"{TRACKS_CORE_BASE_URL}{path}"
  161. if query_string:
  162. url = f"{url}?{query_string}"
  163. cmd = ["curl", "-sS", "-X", "GET"]
  164. if TRACKS_CORE_BASE_URL.startswith("https://"):
  165. cmd.append("-k")
  166. cmd.append(url)
  167. logger.warning("tracks curl url=%s", url)
  168. process = await asyncio.create_subprocess_exec(
  169. *cmd,
  170. stdout=asyncio.subprocess.PIPE,
  171. stderr=asyncio.subprocess.PIPE,
  172. )
  173. stdout, stderr = await process.communicate()
  174. if process.returncode != 0:
  175. detail = (stderr or stdout).decode("utf-8", errors="replace").strip() or "CORE curl request failed"
  176. logger.error("tracks curl failed url=%s detail=%s", url, detail)
  177. raise HTTPException(status_code=502, detail=detail)
  178. preview = stdout.decode("utf-8", errors="replace")
  179. logger.warning("tracks curl response url=%s preview=%s", url, preview[:500])
  180. try:
  181. return json.loads(preview)
  182. except ValueError as exc:
  183. logger.error("tracks curl invalid json url=%s", url)
  184. raise HTTPException(status_code=502, detail="Invalid CORE response") from exc
  185. async def _fetch_tracks_for_tracker(
  186. tracker_id: str,
  187. params: Optional[dict] = None,
  188. ) -> List[dict]:
  189. logger.warning("tracks fetch tracker_id=%s params=%s", tracker_id, params)
  190. payload = await _core_get_json_via_curl(f"/reslevis/getTracks/{tracker_id}", params=params)
  191. if not isinstance(payload, list):
  192. raise HTTPException(status_code=502, detail="Unexpected CORE response type")
  193. rows = [_normalize_track(row) for row in payload if isinstance(row, dict)]
  194. logger.warning("tracks fetch tracker_id=%s rows=%s", tracker_id, len(rows))
  195. return rows
  196. def _sort_tracks_desc(rows: List[dict]) -> List[dict]:
  197. return sorted(rows, key=lambda row: row.get("timestamp") or "", reverse=True)
  198. async def _fetch_all_tracks(params: dict) -> List[dict]:
  199. trackers_payload = await _core_get_json_via_curl("/reslevis/getTrackers")
  200. if not isinstance(trackers_payload, list):
  201. raise HTTPException(status_code=502, detail="Unexpected CORE tracker response type")
  202. tracker_ids = []
  203. for item in trackers_payload:
  204. if not isinstance(item, dict):
  205. continue
  206. tracker_id = item.get("id")
  207. if tracker_id:
  208. tracker_ids.append(str(tracker_id))
  209. batches = await asyncio.gather(
  210. *[_fetch_tracks_for_tracker(tracker_id, params) for tracker_id in tracker_ids]
  211. )
  212. merged = [row for batch in batches for row in batch]
  213. merged = _sort_tracks_desc(merged)
  214. limit = params.get("limit")
  215. if isinstance(limit, int):
  216. return merged[:limit]
  217. return merged
  218. async def _fetch_first_tracker_tracks(params: dict) -> List[dict]:
  219. trackers_payload = await _core_get_json_via_curl("/reslevis/getTrackers")
  220. if not isinstance(trackers_payload, list):
  221. raise HTTPException(status_code=502, detail="Unexpected CORE tracker response type")
  222. for item in trackers_payload:
  223. if not isinstance(item, dict):
  224. continue
  225. tracker_id = item.get("id")
  226. if tracker_id:
  227. return await _fetch_tracks_for_tracker(str(tracker_id), params)
  228. return []
  229. @router.get(
  230. "/getGateways",
  231. response_model=List[GatewayItem],
  232. tags=["Reslevis"],
  233. dependencies=[Depends(get_current_user)],
  234. )
  235. def getGateways():
  236. return gateway_repo.list()
  237. @router.post("/postGateway", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  238. def postGateway(item: GatewayItem):
  239. gateway_repo.add(item)
  240. return {"message": "OK"}
  241. @router.put("/updateGateway", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  242. def updateGateway(item: GatewayItem):
  243. gateway_repo.update(item)
  244. return {"message": "OK"}
  245. @router.delete("/removeGateway/{gateway_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  246. def removeGateway(gateway_id: str):
  247. gateway_repo.remove(gateway_id)
  248. return {"message": "OK"}
  249. @router.get(
  250. "/getBuildings",
  251. response_model=List[BuildingItem],
  252. tags=["Reslevis"],
  253. dependencies=[Depends(get_current_user)],
  254. )
  255. def getBuildings():
  256. return building_repo.list()
  257. @router.post("/postBuilding", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  258. def postBuilding(item: BuildingItem):
  259. building_repo.add(item)
  260. return {"message": "OK"}
  261. @router.put("/updateBuilding", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  262. def updateBuilding(item: BuildingItem):
  263. building_repo.update(item)
  264. return {"message": "OK"}
  265. @router.delete("/removeBuilding/{building_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  266. def removeBuilding(building_id: str):
  267. building_repo.remove(building_id)
  268. return {"message": "OK"}
  269. @router.get(
  270. "/getFloors",
  271. response_model=List[FloorItem],
  272. tags=["Reslevis"],
  273. dependencies=[Depends(get_current_user)],
  274. )
  275. def getFloors():
  276. return floor_repo.list()
  277. @router.post("/postFloor", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  278. def postFloor(item: FloorItem):
  279. floor_repo.add(item)
  280. return {"message": "OK"}
  281. @router.put("/updateFloor", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  282. def updateFloor(item: FloorItem):
  283. floor_repo.update(item)
  284. return {"message": "OK"}
  285. @router.delete("/removeFloor/{floor_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  286. def removeFloor(floor_id: str):
  287. floor_repo.remove(floor_id)
  288. return {"message": "OK"}
  289. @router.get(
  290. "/getZones",
  291. response_model=List[ZoneItem],
  292. tags=["Reslevis"],
  293. dependencies=[Depends(get_current_user)],
  294. )
  295. def getZones():
  296. return zone_repo.list()
  297. @router.post("/postZone", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  298. def postZone(item: ZoneItem):
  299. zone_repo.add(item)
  300. return {"message": "OK"}
  301. @router.put("/updateZone", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  302. def updateZone(item: ZoneItem):
  303. zone_repo.update(item)
  304. return {"message": "OK"}
  305. @router.delete("/removeZone/{zone_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  306. def removeZone(zone_id: str):
  307. zone_repo.remove(zone_id)
  308. return {"message": "OK"}
  309. @router.get(
  310. "/getTrackers",
  311. response_model=List[TrackerItem],
  312. tags=["Reslevis"],
  313. dependencies=[Depends(get_current_user)],
  314. )
  315. def getTrackers():
  316. return tracker_repo.list()
  317. @router.post("/postTracker", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  318. def postTracker(item: TrackerItem):
  319. tracker_repo.add(item)
  320. return {"message": "OK"}
  321. @router.put("/updateTracker", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  322. def updateTracker(item: TrackerItem):
  323. tracker_repo.update(item)
  324. return {"message": "OK"}
  325. @router.delete("/removeTracker/{tracker_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  326. def removeTracker(tracker_id: str):
  327. tracker_repo.remove(tracker_id)
  328. return {"message": "OK"}
  329. @router.get(
  330. "/getTrackerZones",
  331. response_model=List[TrackerZoneItem],
  332. tags=["Reslevis"],
  333. dependencies=[Depends(get_current_user)],
  334. )
  335. def getTrackerZones():
  336. return tracker_zone_repo.list()
  337. @router.post("/postTrackerZone", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  338. def postTrackerZone(item: TrackerZoneItem):
  339. tracker_zone_repo.add(item)
  340. return {"message": "OK"}
  341. @router.put("/updateTrackerZone", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  342. def updateTrackerZone(item: TrackerZoneItem):
  343. tracker_zone_repo.update(item)
  344. return {"message": "OK"}
  345. @router.delete("/removeTrackerZone/{tracker_zone_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  346. def removeTrackerZone(tracker_zone_id: str):
  347. tracker_zone_repo.remove(tracker_zone_id)
  348. return {"message": "OK"}
  349. @router.get(
  350. "/getTracks",
  351. response_model=List[TrackHistoryItem],
  352. tags=["Reslevis"],
  353. dependencies=[Depends(get_current_user)],
  354. )
  355. async def getTracks(
  356. request: Request,
  357. tracker_id: Optional[str] = Query(None),
  358. limit: Optional[int] = Query(None, ge=1),
  359. from_: Optional[str] = Query(None, alias="from"),
  360. to: Optional[str] = Query(None),
  361. ):
  362. params = {}
  363. if limit is not None:
  364. params["limit"] = limit
  365. if from_:
  366. params["from"] = from_
  367. if to:
  368. params["to"] = to
  369. selected_tracker_id = tracker_id or request.query_params.get("id")
  370. if selected_tracker_id:
  371. return await _fetch_tracks_for_tracker(selected_tracker_id, params)
  372. return await _fetch_first_tracker_tracks(params)
  373. @router.get(
  374. "/getTracks/{tracker_id}",
  375. response_model=List[TrackHistoryItem],
  376. tags=["Reslevis"],
  377. dependencies=[Depends(get_current_user)],
  378. )
  379. async def getTrack(
  380. tracker_id: str,
  381. limit: Optional[int] = Query(None, ge=1),
  382. from_: Optional[str] = Query(None, alias="from"),
  383. to: Optional[str] = Query(None),
  384. ):
  385. params = {}
  386. if limit is not None:
  387. params["limit"] = limit
  388. if from_:
  389. params["from"] = from_
  390. if to:
  391. params["to"] = to
  392. return await _fetch_tracks_for_tracker(tracker_id, params)
  393. @router.post("/postTrack", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  394. def postTrack(item: TrackItem):
  395. track_repo.add(item)
  396. return {"message": "OK"}
  397. @router.put("/updateTrack", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  398. def updateTrack(item: TrackItem):
  399. track_repo.update(item)
  400. return {"message": "OK"}
  401. @router.delete("/removeTrack/{track_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  402. def removeTrack(track_id: str):
  403. track_repo.remove(track_id)
  404. return {"message": "OK"}
  405. @router.get(
  406. "/getOperators",
  407. response_model=List[OperatorItem],
  408. tags=["Reslevis"],
  409. dependencies=[Depends(get_current_user)],
  410. )
  411. def getOperators():
  412. return operator_repo.list()
  413. @router.post("/postOperator", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  414. def postOperator(item: OperatorItem):
  415. operator_repo.add(item)
  416. return {"message": "OK"}
  417. @router.put("/updateOperator", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  418. def updateOperator(item: OperatorItem):
  419. operator_repo.update(item)
  420. return {"message": "OK"}
  421. @router.delete("/removeOperator/{operator_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  422. def removeOperator(operator_id: str):
  423. operator_repo.remove(operator_id)
  424. return {"message": "OK"}
  425. @router.get(
  426. "/getSubjects",
  427. response_model=List[SubjectItem],
  428. tags=["Reslevis"],
  429. dependencies=[Depends(get_current_user)],
  430. )
  431. def getSubjects():
  432. return subject_repo.list()
  433. @router.post("/postSubject", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  434. def postSubject(item: SubjectItem):
  435. subject_repo.add(item)
  436. return {"message": "OK"}
  437. @router.put("/updateSubject", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  438. def updateSubject(item: SubjectItem):
  439. subject_repo.update(item)
  440. return {"message": "OK"}
  441. @router.delete("/removeSubject/{subject_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  442. def removeSubject(subject_id: str):
  443. subject_repo.remove(subject_id)
  444. return {"message": "OK"}
  445. @router.get(
  446. "/getSettings",
  447. response_model=List[SettingItem],
  448. tags=["Reslevis"],
  449. dependencies=[Depends(get_current_user)],
  450. )
  451. def getSettings():
  452. return setting_repo.list()
  453. @router.post("/postSetting", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  454. def postSetting(item: SettingItem):
  455. setting_repo.add(item)
  456. return {"message": "OK"}
  457. @router.put("/updateSetting", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  458. def updateSetting(item: SettingItem):
  459. setting_repo.update(item)
  460. return {"message": "OK"}
  461. @router.delete("/removeSetting/{setting_id}", tags=["Reslevis"], dependencies=[Depends(get_current_user)])
  462. def removeSetting(setting_id: str):
  463. setting_repo.remove(setting_id)
  464. return {"message": "OK"}