|
|
|
@@ -1,7 +1,11 @@ |
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request |
|
|
|
import asyncio |
|
|
|
import json |
|
|
|
import logging |
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request |
|
|
|
import httpx |
|
|
|
import config_env |
|
|
|
from typing import List |
|
|
|
from typing import List, Optional |
|
|
|
from urllib.parse import urlencode |
|
|
|
|
|
|
|
from schemas.reslevis import ( |
|
|
|
BuildingItem, |
|
|
|
@@ -13,6 +17,7 @@ from schemas.reslevis import ( |
|
|
|
SubjectItem, |
|
|
|
AlarmItem, |
|
|
|
TrackItem, |
|
|
|
TrackHistoryItem, |
|
|
|
TrackerZoneItem, |
|
|
|
SettingItem, |
|
|
|
) |
|
|
|
@@ -33,7 +38,9 @@ from security import get_current_user |
|
|
|
|
|
|
|
#CORE SYNC |
|
|
|
CORE_BASE_URL = config_env.CORE_API_URL.rstrip("/") |
|
|
|
TRACKS_CORE_BASE_URL = "http://localhost:1902" |
|
|
|
CORE_TIMEOUT = 2.0 # secondi |
|
|
|
logger = logging.getLogger("reslevis") |
|
|
|
|
|
|
|
async def sync_core_get(request: Request) -> None: |
|
|
|
if request.method != "GET": |
|
|
|
@@ -120,6 +127,7 @@ def _normalize_tracker(row: dict) -> dict: |
|
|
|
|
|
|
|
def _normalize_track(row: dict) -> dict: |
|
|
|
row = dict(row) |
|
|
|
row["ID"] = row.get("ID") |
|
|
|
row["gateway"] = _none_if_empty(row.get("gateway")) |
|
|
|
row["tracker"] = _none_if_empty(row.get("tracker")) |
|
|
|
row["subject"] = _none_if_empty(row.get("subject")) |
|
|
|
@@ -131,6 +139,9 @@ def _normalize_track(row: dict) -> dict: |
|
|
|
row["gatewayMac"] = _str_or_none(row.get("gatewayMac")) |
|
|
|
row["trackerMac"] = _str_or_none(row.get("trackerMac")) |
|
|
|
row["subjectName"] = _str_or_none(row.get("subjectName")) |
|
|
|
row["x"] = None if row.get("x") in ("", None) else row.get("x") |
|
|
|
row["y"] = None if row.get("y") in ("", None) else row.get("y") |
|
|
|
row["z"] = None if row.get("z") in ("", None) else row.get("z") |
|
|
|
# signal resta float o None |
|
|
|
row["signal"] = None if row.get("signal") in ("", None) else row.get("signal") |
|
|
|
return row |
|
|
|
@@ -146,10 +157,120 @@ CORE_GET_SYNC = { |
|
|
|
"/reslevis/getGateways": (gateway_repo, _normalize_gateway), |
|
|
|
"/reslevis/getZones": (zone_repo, _normalize_zone), |
|
|
|
"/reslevis/getTrackers": (tracker_repo, _normalize_tracker), |
|
|
|
"/reslevis/getTracks": (track_repo, _normalize_track), |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def _core_get_json( |
|
|
|
client: httpx.AsyncClient, |
|
|
|
path: str, |
|
|
|
params: Optional[dict] = None, |
|
|
|
): |
|
|
|
resp = await client.get(f"{CORE_BASE_URL}{path}", params=params) |
|
|
|
if resp.status_code >= 400: |
|
|
|
detail = resp.text.strip() or "CORE request failed" |
|
|
|
raise HTTPException(status_code=resp.status_code, detail=detail) |
|
|
|
try: |
|
|
|
return resp.json() |
|
|
|
except ValueError as exc: |
|
|
|
raise HTTPException(status_code=502, detail="Invalid CORE response") from exc |
|
|
|
|
|
|
|
|
|
|
|
async def _core_get_json_via_curl( |
|
|
|
path: str, |
|
|
|
params: Optional[dict] = None, |
|
|
|
): |
|
|
|
query_string = urlencode(params or {}) |
|
|
|
url = f"{TRACKS_CORE_BASE_URL}{path}" |
|
|
|
if query_string: |
|
|
|
url = f"{url}?{query_string}" |
|
|
|
|
|
|
|
cmd = ["curl", "-sS", "-X", "GET"] |
|
|
|
if TRACKS_CORE_BASE_URL.startswith("https://"): |
|
|
|
cmd.append("-k") |
|
|
|
cmd.append(url) |
|
|
|
|
|
|
|
logger.warning("tracks curl url=%s", url) |
|
|
|
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
|
|
*cmd, |
|
|
|
stdout=asyncio.subprocess.PIPE, |
|
|
|
stderr=asyncio.subprocess.PIPE, |
|
|
|
) |
|
|
|
stdout, stderr = await process.communicate() |
|
|
|
|
|
|
|
if process.returncode != 0: |
|
|
|
detail = (stderr or stdout).decode("utf-8", errors="replace").strip() or "CORE curl request failed" |
|
|
|
logger.error("tracks curl failed url=%s detail=%s", url, detail) |
|
|
|
raise HTTPException(status_code=502, detail=detail) |
|
|
|
|
|
|
|
preview = stdout.decode("utf-8", errors="replace") |
|
|
|
logger.warning("tracks curl response url=%s preview=%s", url, preview[:500]) |
|
|
|
|
|
|
|
try: |
|
|
|
return json.loads(preview) |
|
|
|
except ValueError as exc: |
|
|
|
logger.error("tracks curl invalid json url=%s", url) |
|
|
|
raise HTTPException(status_code=502, detail="Invalid CORE response") from exc |
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_tracks_for_tracker( |
|
|
|
tracker_id: str, |
|
|
|
params: Optional[dict] = None, |
|
|
|
) -> List[dict]: |
|
|
|
logger.warning("tracks fetch tracker_id=%s params=%s", tracker_id, params) |
|
|
|
payload = await _core_get_json_via_curl(f"/reslevis/getTracks/{tracker_id}", params=params) |
|
|
|
if not isinstance(payload, list): |
|
|
|
raise HTTPException(status_code=502, detail="Unexpected CORE response type") |
|
|
|
rows = [_normalize_track(row) for row in payload if isinstance(row, dict)] |
|
|
|
logger.warning("tracks fetch tracker_id=%s rows=%s", tracker_id, len(rows)) |
|
|
|
return rows |
|
|
|
|
|
|
|
|
|
|
|
def _sort_tracks_desc(rows: List[dict]) -> List[dict]: |
|
|
|
return sorted(rows, key=lambda row: row.get("timestamp") or "", reverse=True) |
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_all_tracks(params: dict) -> List[dict]: |
|
|
|
trackers_payload = await _core_get_json_via_curl("/reslevis/getTrackers") |
|
|
|
if not isinstance(trackers_payload, list): |
|
|
|
raise HTTPException(status_code=502, detail="Unexpected CORE tracker response type") |
|
|
|
|
|
|
|
tracker_ids = [] |
|
|
|
for item in trackers_payload: |
|
|
|
if not isinstance(item, dict): |
|
|
|
continue |
|
|
|
tracker_id = item.get("id") |
|
|
|
if tracker_id: |
|
|
|
tracker_ids.append(str(tracker_id)) |
|
|
|
|
|
|
|
batches = await asyncio.gather( |
|
|
|
*[_fetch_tracks_for_tracker(tracker_id, params) for tracker_id in tracker_ids] |
|
|
|
) |
|
|
|
|
|
|
|
merged = [row for batch in batches for row in batch] |
|
|
|
merged = _sort_tracks_desc(merged) |
|
|
|
|
|
|
|
limit = params.get("limit") |
|
|
|
if isinstance(limit, int): |
|
|
|
return merged[:limit] |
|
|
|
return merged |
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_first_tracker_tracks(params: dict) -> List[dict]: |
|
|
|
trackers_payload = await _core_get_json_via_curl("/reslevis/getTrackers") |
|
|
|
if not isinstance(trackers_payload, list): |
|
|
|
raise HTTPException(status_code=502, detail="Unexpected CORE tracker response type") |
|
|
|
|
|
|
|
for item in trackers_payload: |
|
|
|
if not isinstance(item, dict): |
|
|
|
continue |
|
|
|
tracker_id = item.get("id") |
|
|
|
if tracker_id: |
|
|
|
return await _fetch_tracks_for_tracker(str(tracker_id), params) |
|
|
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
@router.get( |
|
|
|
"/getGateways", |
|
|
|
response_model=List[GatewayItem], |
|
|
|
@@ -319,12 +440,53 @@ def removeTrackerZone(tracker_zone_id: str): |
|
|
|
|
|
|
|
@router.get( |
|
|
|
"/getTracks", |
|
|
|
response_model=List[TrackItem], |
|
|
|
response_model=List[TrackHistoryItem], |
|
|
|
tags=["Reslevis"], |
|
|
|
dependencies=[Depends(get_current_user)], |
|
|
|
) |
|
|
|
async def getTracks( |
|
|
|
request: Request, |
|
|
|
tracker_id: Optional[str] = Query(None), |
|
|
|
limit: Optional[int] = Query(None, ge=1), |
|
|
|
from_: Optional[str] = Query(None, alias="from"), |
|
|
|
to: Optional[str] = Query(None), |
|
|
|
): |
|
|
|
params = {} |
|
|
|
if limit is not None: |
|
|
|
params["limit"] = limit |
|
|
|
if from_: |
|
|
|
params["from"] = from_ |
|
|
|
if to: |
|
|
|
params["to"] = to |
|
|
|
|
|
|
|
selected_tracker_id = tracker_id or request.query_params.get("id") |
|
|
|
|
|
|
|
if selected_tracker_id: |
|
|
|
return await _fetch_tracks_for_tracker(selected_tracker_id, params) |
|
|
|
return await _fetch_first_tracker_tracks(params) |
|
|
|
|
|
|
|
|
|
|
|
@router.get( |
|
|
|
"/getTracks/{tracker_id}", |
|
|
|
response_model=List[TrackHistoryItem], |
|
|
|
tags=["Reslevis"], |
|
|
|
dependencies=[Depends(get_current_user)], |
|
|
|
) |
|
|
|
def getTracks(): |
|
|
|
return track_repo.list() |
|
|
|
async def getTrack( |
|
|
|
tracker_id: str, |
|
|
|
limit: Optional[int] = Query(None, ge=1), |
|
|
|
from_: Optional[str] = Query(None, alias="from"), |
|
|
|
to: Optional[str] = Query(None), |
|
|
|
): |
|
|
|
params = {} |
|
|
|
if limit is not None: |
|
|
|
params["limit"] = limit |
|
|
|
if from_: |
|
|
|
params["from"] = from_ |
|
|
|
if to: |
|
|
|
params["to"] = to |
|
|
|
|
|
|
|
return await _fetch_tracks_for_tracker(tracker_id, params) |
|
|
|
|
|
|
|
|
|
|
|
@router.post("/postTrack", tags=["Reslevis"], dependencies=[Depends(get_current_user)]) |
|
|
|
|