Просмотр исходного кода

feat: add switching between ml and filter algorithm, various fixes in context use, script for creating docker images, various bug fixes regarding persistence in db, persisting alerts in db and CRUD operations for alerts

master
Blaz Smehov 3 недель назад
Родитель
Сommit
a08f381800
34 измененных файлов: 630 добавлений и 376 удалений
  1. +16
    -46
      build/docker-compose.dev.yml
  2. +17
    -49
      build/docker-compose.yaml
  3. +2
    -0
      build/env/db.env
  4. +1
    -0
      build/env/kafdrop.env
  5. +1
    -0
      build/env/kafka-init.env
  6. +13
    -0
      build/env/kafka.env
  7. +5
    -0
      build/env/presense-bridge.env
  8. +1
    -0
      build/env/presense-decoder.env
  9. +8
    -0
      build/env/presense-location.env
  10. +15
    -0
      build/env/presense-server.env
  11. +0
    -1
      build/package/Dockerfile.decoder
  12. +1
    -0
      build/package/Dockerfile.server
  13. +27
    -4
      internal/app/location/app.go
  14. +10
    -2
      internal/app/server/events.go
  15. +21
    -17
      internal/app/server/routes.go
  16. +1
    -0
      internal/pkg/apiclient/data.go
  17. +9
    -1
      internal/pkg/common/appcontext/context.go
  18. +41
    -34
      internal/pkg/config/config.go
  19. +50
    -0
      internal/pkg/controller/alerts_controller.go
  20. +33
    -24
      internal/pkg/controller/gateways_controller.go
  21. +41
    -39
      internal/pkg/controller/parser_controller.go
  22. +15
    -27
      internal/pkg/controller/settings_controller.go
  23. +43
    -34
      internal/pkg/controller/trackers_controller.go
  24. +32
    -20
      internal/pkg/controller/trackerzones_controller.go
  25. +6
    -8
      internal/pkg/controller/tracks_controller.go
  26. +32
    -20
      internal/pkg/controller/zone_controller.go
  27. +1
    -1
      internal/pkg/database/database.go
  28. +13
    -7
      internal/pkg/location/inference.go
  29. +8
    -0
      internal/pkg/model/alerts.go
  30. +2
    -3
      internal/pkg/model/trackers.go
  31. +4
    -6
      internal/pkg/model/types.go
  32. +39
    -0
      internal/pkg/service/alert_service.go
  33. +107
    -33
      internal/pkg/service/beacon_service.go
  34. +15
    -0
      scripts/build/build.sh

+ 16
- 46
build/docker-compose.dev.yml Просмотреть файл

@@ -6,9 +6,8 @@ services:
restart: always
ports:
- "127.0.0.1:5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
env_file:
- ./env/db.env
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
@@ -21,8 +20,8 @@ services:
restart: "no"
ports:
- "127.0.0.1:9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
env_file:
- ./env/kafdrop.env
depends_on:
- "kafka"
kafka:
@@ -32,26 +31,14 @@ services:
# - "127.0.0.1:2181:2181"
- "127.0.0.1:9092:9092"
- "127.0.0.1:9093:9093"
env_file:
- ./env/kafka.env
healthcheck: # <-- ADD THIS BLOCK
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"]
interval: 10s
timeout: 5s
retries: 10
start_period: 20s
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092,CONTROLLER://127.0.0.1:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3

kafka-init:
image: apache/kafka:3.9.0
@@ -61,8 +48,8 @@ services:
condition: service_healthy
volumes:
- ./init-scripts/create_topic.sh:/tmp/create_topic.sh
environment:
- TOPIC_NAMES=topic1,topic2,topic3
env_file:
- ./env/kafka-init.env
valkey:
image: valkey/valkey:9.0.0
@@ -76,8 +63,8 @@ services:
dockerfile: build/package/Dockerfile.dev
image: presense-decoder
container_name: presense-decoder
environment:
- KAFKA_URL=kafka:29092
env_file:
- ./env/presense-decoder.env
depends_on:
kafka-init:
condition: service_completed_successfully
@@ -94,21 +81,8 @@ services:
dockerfile: build/package/Dockerfile.dev
image: presense-server
container_name: presense-server
environment:
- VALKEY_URL=valkey:6379
- KAFKA_URL=kafka:29092
- DBHost=db
- DBUser=postgres
- DBPass=postgres
- DBName=postgres
- HTTPClientID=Fastapi
- ClientSecret=wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC
- HTTPUsername=core
- HTTPPassword=C0r3_us3r_Cr3d3nt14ls
- HTTPAudience=Fastapi
- HTTPADDR=0.0.0.0:1902
- CONFIG_PATH=/app/cmd/server/config.json
- API_BASE_URL=https://10.251.0.30:5050
env_file:
- ./env/presense-server.env
ports:
- "127.0.0.1:1902:1902"
depends_on:
@@ -129,12 +103,8 @@ services:
dockerfile: build/package/Dockerfile.dev
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:29092
- MQTT_HOST=192.168.1.101
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
- MQTT_CLIENT_ID=bridge
env_file:
- ./env/presense-bridge.env
depends_on:
kafka-init:
condition: service_completed_successfully
@@ -151,8 +121,8 @@ services:
dockerfile: build/package/Dockerfile.dev
image: presense-location
container_name: presense-location
environment:
- KAFKA_URL=kafka:29092
env_file:
- ./env/presense-location.env
depends_on:
kafka-init:
condition: service_completed_successfully


+ 17
- 49
build/docker-compose.yaml Просмотреть файл

@@ -1,4 +1,3 @@
version: "2"
services:
db:
image: postgres
@@ -6,9 +5,8 @@ services:
restart: always
ports:
- "127.0.0.1:5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
env_file:
- ./env/db.env
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
@@ -21,37 +19,24 @@ services:
restart: "no"
ports:
- "127.0.0.1:9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
env_file:
- ./env/kafdrop.env
depends_on:
- "kafka"
kafka:
image: apache/kafka:3.9.0
restart: "no"
ports:
# - "127.0.0.1:2181:2181"
- "127.0.0.1:9092:9092"
- "127.0.0.1:9093:9093"
healthcheck: # <-- ADD THIS BLOCK
env_file:
- ./env/kafka.env
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"]
interval: 10s
timeout: 5s
retries: 10
start_period: 20s
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092,CONTROLLER://127.0.0.1:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3

kafka-init:
image: apache/kafka:3.9.0
@@ -61,8 +46,8 @@ services:
condition: service_healthy
volumes:
- ./init-scripts/create_topic.sh:/tmp/create_topic.sh
environment:
- TOPIC_NAMES=topic1,topic2,topic3
env_file:
- ./env/kafka-init.env
valkey:
image: valkey/valkey:9.0.0
@@ -76,8 +61,8 @@ services:
dockerfile: build/package/Dockerfile.decoder
image: presense-decoder
container_name: presense-decoder
environment:
- KAFKA_URL=kafka:29092
env_file:
- ./env/presense-decoder.env
depends_on:
kafka-init:
condition: service_completed_successfully
@@ -91,21 +76,8 @@ services:
dockerfile: build/package/Dockerfile.server
image: presense-server
container_name: presense-server
environment:
- KAFKA_URL=kafka:29092
- DBHost=db
- DBUser=postgres
- DBPass=postgres
- DBName=postgres
- HTTPClientID=Fastapi
- ClientSecret=wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC
- HTTPUsername=core
- HTTPPassword=C0r3_us3r_Cr3d3nt14ls
- HTTPAudience=Fastapi
- HTTPADDR=0.0.0.0:1902
- CONFIG_PATH=/app/cmd/server/config.json
- API_BASE_URL=https://10.251.0.30:5050
- API_AUTH_URL=https://10.251.0.30:10002
env_file:
- ./env/presense-server.env
ports:
- "127.0.0.1:1902:1902"
depends_on:
@@ -123,12 +95,8 @@ services:
dockerfile: build/package/Dockerfile.bridge
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:29092
- MQTT_HOST=192.168.1.101
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
- MQTT_CLIENT_ID=bridge
env_file:
- ./env/presense-bridge.env
depends_on:
kafka-init:
condition: service_completed_successfully
@@ -142,8 +110,8 @@ services:
dockerfile: build/package/Dockerfile.location
image: presense-location
container_name: presense-location
environment:
- KAFKA_URL=kafka:29092
env_file:
- ./env/presense-location.env
depends_on:
kafka-init:
condition: service_completed_successfully


+ 2
- 0
build/env/db.env Просмотреть файл

@@ -0,0 +1,2 @@
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres

+ 1
- 0
build/env/kafdrop.env Просмотреть файл

@@ -0,0 +1 @@
KAFKA_BROKERCONNECT=kafka:29092

+ 1
- 0
build/env/kafka-init.env Просмотреть файл

@@ -0,0 +1 @@
TOPIC_NAMES=topic1,topic2,topic3

+ 13
- 0
build/env/kafka.env Просмотреть файл

@@ -0,0 +1,13 @@
KAFKA_NODE_ID=1
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_LISTENERS=INTERNAL://:29092,EXTERNAL://:9092,CONTROLLER://127.0.0.1:9093
KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
KAFKA_NUM_PARTITIONS=3

+ 5
- 0
build/env/presense-bridge.env Просмотреть файл

@@ -0,0 +1,5 @@
KAFKA_URL=kafka:29092
MQTT_HOST=192.168.1.101
MQTT_USERNAME=user
MQTT_PASSWORD=pass
MQTT_CLIENT_ID=bridge

+ 1
- 0
build/env/presense-decoder.env Просмотреть файл

@@ -0,0 +1 @@
KAFKA_URL=kafka:29092

+ 8
- 0
build/env/presense-location.env Просмотреть файл

@@ -0,0 +1,8 @@
KAFKA_URL=kafka:29092
HTTPClientID=Fastapi
ClientSecret=wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC
HTTPUsername=core
HTTPPassword=C0r3_us3r_Cr3d3nt14ls
HTTPAudience=Fastapi
API_AUTH_URL=https://10.251.0.30:10002
ALGORITHM=ai

+ 15
- 0
build/env/presense-server.env Просмотреть файл

@@ -0,0 +1,15 @@
KAFKA_URL=kafka:29092
DBHost=db
DBUser=postgres
DBPass=postgres
DBName=postgres
HTTPClientID=Fastapi
ClientSecret=wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC
HTTPUsername=core
HTTPPassword=C0r3_us3r_Cr3d3nt14ls
HTTPAudience=Fastapi
HTTPADDR=0.0.0.0:1902
CONFIG_PATH=/app/cmd/server/config.json
API_BASE_URL=https://10.251.0.30:5050
API_AUTH_URL=https://10.251.0.30:10002
ALGORITHM=ai

+ 0
- 1
build/package/Dockerfile.decoder Просмотреть файл

@@ -13,6 +13,5 @@ FROM alpine:latest
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /app/decoder .
COPY --from=builder /app/cmd/decoder/config.json ./cmd/decoder/config.json

ENTRYPOINT ["./decoder"]

+ 1
- 0
build/package/Dockerfile.server Просмотреть файл

@@ -13,6 +13,7 @@ FROM alpine:latest
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /app/server .
COPY --from=builder /app/cmd/server/config.json ./cmd/server/config.json

EXPOSE 1902
ENTRYPOINT ["./server"]

+ 27
- 4
internal/app/location/app.go Просмотреть файл

@@ -2,7 +2,7 @@ package location

import (
"context"
"fmt"
"encoding/json"
"log/slog"
"sync"
"time"
@@ -10,9 +10,10 @@ import (
"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/logger"
pkglocation "github.com/AFASystems/presence/internal/pkg/location"
"github.com/AFASystems/presence/internal/pkg/logger"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// LocationApp holds dependencies for the location service.
@@ -58,7 +59,7 @@ func (a *LocationApp) Run(ctx context.Context) {
go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("settings"), a.ChSettings, ctx, &a.wg)

locTicker := time.NewTicker(config.SMALL_TICKER_INTERVAL)
locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
defer locTicker.Stop()

for {
@@ -67,7 +68,7 @@ func (a *LocationApp) Run(ctx context.Context) {
return
case <-locTicker.C:
settings := a.AppState.GetSettings()
slog.Info("location tick", "settings", fmt.Sprintf("%+v", settings))
slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm)
switch settings.CurrentAlgorithm {
case "filter":
pkglocation.GetLikelyLocations(a.AppState, a.KafkaManager.GetWriter("locevents"))
@@ -77,6 +78,28 @@ func (a *LocationApp) Run(ctx context.Context) {
slog.Error("AI inference", "err", err)
continue
}

for _, item := range inferred.Items {
r := model.HTTPLocation{
Method: "AI",
Y: item.Y,
X: item.X,
Z: item.Z,
MAC: item.Mac,
LastSeen: time.Now().Unix(),
}

js, err := json.Marshal(r)
if err != nil {
slog.Error("marshaling location", "err", err, "beacon_id", item.Mac)
continue
}

if err := a.KafkaManager.GetWriter("locevents").WriteMessages(ctx, kafka.Message{Value: js}); err != nil {
slog.Error("sending kafka location message", "err", err, "beacon_id", item.Mac)
}
}

slog.Info("AI algorithm", "count", inferred.Count, "items", len(inferred.Items))
}
case msg := <-a.ChRaw:


+ 10
- 2
internal/app/server/events.go Просмотреть файл

@@ -23,9 +23,17 @@ func RunEventLoop(ctx context.Context, a *ServerApp) {
case <-ctx.Done():
return
case msg := <-a.ChLoc:
service.LocationToBeaconService(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
switch msg.Method {
case "Standard":
service.LocationToBeaconService(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
case "AI":
service.LocationToBeaconServiceAI(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
default:
slog.Error("unknown method", "method", msg.Method)
continue
}

case msg := <-a.ChEvents:
slog.Info("decoder event", "event", msg)
id := msg.ID
if err := a.DB.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
slog.Error("decoder event for untracked beacon", "id", id)


+ 21
- 17
internal/app/server/routes.go Просмотреть файл

@@ -18,41 +18,45 @@ func (a *ServerApp) RegisterRoutes() http.Handler {
r.HandleFunc("/ready", handler.Ready(a.DB)).Methods("GET")

// Gateways
r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(a.DB)).Methods("POST")
r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(a.DB)).Methods("DELETE")
r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(a.DB)).Methods("PUT")
r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(a.DB, a.ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(a.DB, a.ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(a.DB, a.ctx)).Methods("PUT")

// Zones
r.HandleFunc("/reslevis/getZones", controller.ZoneListController(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(a.DB)).Methods("POST")
r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(a.DB)).Methods("DELETE")
r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(a.DB)).Methods("PUT")
r.HandleFunc("/reslevis/getZones", controller.ZoneListController(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(a.DB, a.ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(a.DB, a.ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(a.DB, a.ctx)).Methods("PUT")

// Tracker zones
r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(a.DB)).Methods("POST")
r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(a.DB)).Methods("DELETE")
r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(a.DB)).Methods("PUT")
r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(a.DB, a.ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(a.DB, a.ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(a.DB, a.ctx)).Methods("PUT")

// Trackers
r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(a.DB, a.KafkaManager.GetWriter("apibeacons"), a.ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(a.DB, a.KafkaManager.GetWriter("apibeacons"), a.ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(a.DB)).Methods("PUT")
r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(a.DB, a.ctx)).Methods("PUT")

// Parser configs
r.HandleFunc("/configs/beacons", controller.ParserListController(a.DB)).Methods("GET")
r.HandleFunc("/configs/beacons", controller.ParserListController(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/configs/beacons", controller.ParserAddController(a.DB, a.KafkaManager.GetWriter("parser"), a.ctx)).Methods("POST")
r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(a.DB, a.KafkaManager.GetWriter("parser"), a.ctx)).Methods("PUT")
r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(a.DB, a.KafkaManager.GetWriter("parser"), a.ctx)).Methods("DELETE")

// Settings
r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(a.DB, a.KafkaManager.GetWriter("settings"), a.ctx)).Methods("PATCH")
r.HandleFunc("/reslevis/settings", controller.SettingsListController(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/settings", controller.SettingsListController(a.DB, a.ctx)).Methods("GET")

r.HandleFunc("/reslevis/alerts/{id}", controller.AlertDeleteController(a.DB, a.ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/alerts", controller.AlertsListController(a.DB, a.ctx)).Methods("GET")
r.HandleFunc("/reslevis/alerts/{id}", controller.ListAlertsByTrackerIDController(a.DB, a.ctx)).Methods("GET")

// Tracks
r.HandleFunc("/reslevis/getTracks/{id}", controller.TracksListController(a.DB)).Methods("GET")
r.HandleFunc("/reslevis/getTracks/{id}", controller.TracksListController(a.DB, a.ctx)).Methods("GET")

chain := middleware.Recovery(middleware.Logging(middleware.RequestID(middleware.CORS(nil, nil, nil)(r))))
return chain


+ 1
- 0
internal/pkg/apiclient/data.go Просмотреть файл

@@ -74,6 +74,7 @@ func GetZones(token string, client *http.Client, cfg *config.Config) ([]model.Zo

func InferPosition(token string, client *http.Client, cfg *config.Config) (model.PositionResponse, error) {
url := fmt.Sprintf("%s/ble-ai/infer", cfg.APIBaseURL)
fmt.Printf("url: %s\n", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Printf("error new request: %+v\n", err)


+ 9
- 1
internal/pkg/common/appcontext/context.go Просмотреть файл

@@ -3,6 +3,7 @@ package appcontext
import (
"fmt"
"log/slog"
"os"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/mitchellh/mapstructure"
@@ -16,6 +17,13 @@ type AppState struct {
beaconsLookup model.BeaconsLookup
}

func getEnv(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}

// NewAppState creates a new application context AppState with default values
func NewAppState() *AppState {
return &AppState{
@@ -24,7 +32,7 @@ func NewAppState() *AppState {
},
settings: model.Settings{
ID: 1,
CurrentAlgorithm: "filter", // possible values filter or AI
CurrentAlgorithm: getEnv("ALGORITHM", "filter"),
LocationConfidence: 4,
LastSeenThreshold: 15,
BeaconMetricSize: 30,


+ 41
- 34
internal/pkg/config/config.go Просмотреть файл

@@ -56,25 +56,25 @@ func getEnvPanic(key string) string {

func Load() *Config {
return &Config{
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"),
WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"),
MQTTHost: getEnv("MQTT_HOST", "192.168.1.101"),
MQTTUser: getEnvPanic("MQTT_USERNAME"),
MQTTPass: getEnvPanic("MQTT_PASSWORD"),
MQTTClientID: getEnvPanic("MQTT_CLIENT_ID"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
DBHost: getEnv("DBHost", "127.0.0.1"),
DBUser: getEnvPanic("DBUser"),
DBPass: getEnvPanic("DBPass"),
DBName: getEnv("DBName", "go_crud_db"),
HTTPClientID: getEnvPanic("HTTPClientID"),
ClientSecret: getEnvPanic("ClientSecret"),
HTTPUsername: getEnvPanic("HTTPUsername"),
HTTPPassword: getEnvPanic("HTTPPassword"),
HTTPAudience: getEnvPanic("HTTPAudience"),
ConfigPath: getEnv("CONFIG_PATH", "/app/cmd/server/config.json"),
APIBaseURL: getEnv("API_BASE_URL", "https://10.251.0.30:5050"),
TLSInsecureSkipVerify: getEnvBool("TLS_INSECURE_SKIP_VERIFY", false),
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"),
WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"),
MQTTHost: getEnv("MQTT_HOST", "192.168.1.101"),
MQTTUser: getEnvPanic("MQTT_USERNAME"),
MQTTPass: getEnvPanic("MQTT_PASSWORD"),
MQTTClientID: getEnvPanic("MQTT_CLIENT_ID"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
DBHost: getEnv("DBHost", "127.0.0.1"),
DBUser: getEnvPanic("DBUser"),
DBPass: getEnvPanic("DBPass"),
DBName: getEnv("DBName", "go_crud_db"),
HTTPClientID: getEnvPanic("HTTPClientID"),
ClientSecret: getEnvPanic("ClientSecret"),
HTTPUsername: getEnvPanic("HTTPUsername"),
HTTPPassword: getEnvPanic("HTTPPassword"),
HTTPAudience: getEnvPanic("HTTPAudience"),
ConfigPath: getEnv("CONFIG_PATH", "/app/cmd/server/config.json"),
APIBaseURL: getEnv("API_BASE_URL", "https://10.251.0.30:5050"),
TLSInsecureSkipVerify: getEnvBool("TLS_INSECURE_SKIP_VERIFY", false),
}
}

@@ -86,21 +86,21 @@ func LoadDecoder() *Config {

func LoadServer() *Config {
return &Config{
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"),
DBHost: getEnv("DBHost", "127.0.0.1"),
DBUser: getEnvPanic("DBUser"),
DBPass: getEnvPanic("DBPass"),
DBName: getEnv("DBName", "go_crud_db"),
HTTPClientID: getEnvPanic("HTTPClientID"),
ClientSecret: getEnvPanic("ClientSecret"),
HTTPUsername: getEnvPanic("HTTPUsername"),
HTTPPassword: getEnvPanic("HTTPPassword"),
HTTPAudience: getEnvPanic("HTTPAudience"),
ConfigPath: getEnv("CONFIG_PATH", "/app/cmd/server/config.json"),
APIBaseURL: getEnv("API_BASE_URL", "https://10.251.0.30:5050"),
APIAuthURL: getEnv("API_AUTH_URL", "https://10.251.0.30:10002"),
TLSInsecureSkipVerify: getEnvBool("TLS_INSECURE_SKIP_VERIFY", false),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"),
DBHost: getEnv("DBHost", "127.0.0.1"),
DBUser: getEnvPanic("DBUser"),
DBPass: getEnvPanic("DBPass"),
DBName: getEnv("DBName", "go_crud_db"),
HTTPClientID: getEnvPanic("HTTPClientID"),
ClientSecret: getEnvPanic("ClientSecret"),
HTTPUsername: getEnvPanic("HTTPUsername"),
HTTPPassword: getEnvPanic("HTTPPassword"),
HTTPAudience: getEnvPanic("HTTPAudience"),
ConfigPath: getEnv("CONFIG_PATH", "/app/cmd/server/config.json"),
APIBaseURL: getEnv("API_BASE_URL", "https://10.251.0.30:5050"),
APIAuthURL: getEnv("API_AUTH_URL", "https://10.251.0.30:10002"),
TLSInsecureSkipVerify: getEnvBool("TLS_INSECURE_SKIP_VERIFY", false),
}
}

@@ -118,6 +118,13 @@ func LoadLocation() *Config {
return &Config{
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
TLSInsecureSkipVerify: getEnvBool("TLS_INSECURE_SKIP_VERIFY", false),
HTTPClientID: getEnvPanic("HTTPClientID"),
ClientSecret: getEnvPanic("ClientSecret"),
HTTPUsername: getEnvPanic("HTTPUsername"),
HTTPPassword: getEnvPanic("HTTPPassword"),
HTTPAudience: getEnvPanic("HTTPAudience"),
APIAuthURL: getEnv("API_AUTH_URL", "https://10.251.0.30:10002"),
APIBaseURL: getEnv("API_BASE_URL", "https://10.251.0.30:5050"),
}
}



+ 50
- 0
internal/pkg/controller/alerts_controller.go Просмотреть файл

@@ -0,0 +1,50 @@
package controller

import (
"context"
"errors"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/service"
"github.com/gorilla/mux"
"gorm.io/gorm"
)

func AlertsListController(db *gorm.DB, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
alerts, err := service.GetAllAlerts(db, ctx)
if err != nil {
response.InternalError(w, "failed to list alerts", err)
return
}
response.JSON(w, http.StatusOK, alerts)
}
}

func ListAlertsByTrackerIDController(db *gorm.DB, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
alert, err := service.GetAlertById(id, db, ctx)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
response.NotFound(w, "alert not found")
return
}
response.InternalError(w, "failed to get alert", err)
return
}
response.JSON(w, http.StatusOK, alert)
}
}

func AlertDeleteController(db *gorm.DB, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
if err := service.DeleteAlertByTrackerID(id, db, ctx); err != nil {
response.InternalError(w, "failed to delete alert", err)
return
}
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

+ 33
- 24
internal/pkg/controller/gateways_controller.go Просмотреть файл

@@ -1,73 +1,82 @@
package controller

import (
"context"
"encoding/json"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"gorm.io/gorm"
)

func GatewayAddController(db *gorm.DB) http.HandlerFunc {
func GatewayAddController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var gateway model.Gateway
if err := json.NewDecoder(r.Body).Decode(&gateway); err != nil {
response.BadRequest(w, "invalid request body")
return
}

if err := decoder.Decode(&gateway); err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Create(&gateway).Error; err != nil {
response.InternalError(w, "failed to create gateway", err)
return
}

db.Create(&gateway)
w.Write([]byte("ok"))
response.JSON(w, http.StatusCreated, map[string]string{"status": "created"})
}
}

func GatewayListController(db *gorm.DB) http.HandlerFunc {
func GatewayListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var gateways []model.Gateway
db.Find(&gateways)
res, err := json.Marshal(gateways)
if err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Find(&gateways).Error; err != nil {
response.InternalError(w, "failed to list gateways", err)
return
}

w.Write(res)
response.JSON(w, http.StatusOK, gateways)
}
}

func GatewayDeleteController(db *gorm.DB) http.HandlerFunc {
func GatewayDeleteController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
if res := db.Delete(&model.Gateway{}, "id = ?", id); res.RowsAffected == 0 {
http.Error(w, "no gateway with such ID found", 400)
res := db.WithContext(context).Delete(&model.Gateway{}, "id = ?", id)
if res.RowsAffected == 0 {
response.NotFound(w, "gateway not found")
return
}
if res.Error != nil {
response.InternalError(w, "failed to delete gateway", res.Error)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

func GatewayUpdateController(db *gorm.DB) http.HandlerFunc {
func GatewayUpdateController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]

if err := db.First(&model.Gateway{}, "id = ?", id).Error; err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).First(&model.Gateway{}, "id = ?", id).Error; err != nil {
response.NotFound(w, "gateway not found")
return
}

decoder := json.NewDecoder(r.Body)
var gateway model.Gateway
if err := json.NewDecoder(r.Body).Decode(&gateway); err != nil {
response.BadRequest(w, "invalid request body")
return
}

if err := decoder.Decode(&gateway); err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Save(&gateway).Error; err != nil {
response.InternalError(w, "failed to update gateway", err)
return
}

db.Save(&gateway)
w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

+ 41
- 39
internal/pkg/controller/parser_controller.go Просмотреть файл

@@ -3,10 +3,10 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/service"
"github.com/gorilla/mux"
@@ -14,53 +14,55 @@ import (
"gorm.io/gorm"
)

func ParserAddController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func ParserAddController(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var config model.Config

if err := decoder.Decode(&config); err != nil {
http.Error(w, err.Error(), 400)
if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
response.BadRequest(w, "invalid request body")
return
}

db.Create(&config)
if err := db.WithContext(context).Create(&config).Error; err != nil {
response.InternalError(w, "failed to create parser config", err)
return
}

kp := model.KafkaParser{
ID: "add",
Config: config,
}

if err := service.SendParserConfig(kp, writer, ctx); err != nil {
http.Error(w, "Unable to send parser config to kafka broker", 400)
msg := fmt.Sprintf("Unable to send parser config to kafka broker %v", err)
slog.Error(msg)
if err := service.SendParserConfig(kp, writer, context); err != nil {
slog.Error("failed to send parser config to Kafka", "err", err)
response.InternalError(w, "failed to publish parser config", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusCreated, map[string]string{"status": "created"})
}
}

func ParserListController(db *gorm.DB) http.HandlerFunc {
func ParserListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var configs []model.Config
db.Find(&configs)
res, err := json.Marshal(configs)
if err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Find(&configs).Error; err != nil {
response.InternalError(w, "failed to list parser configs", err)
return
}

w.Write(res)
response.JSON(w, http.StatusOK, configs)
}
}

func ParserDeleteController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func ParserDeleteController(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
if res := db.Delete(&model.Config{}, "name = ?", id); res.RowsAffected == 0 {
http.Error(w, "no parser config with such name found", 400)
res := db.WithContext(context).Delete(&model.Config{}, "name = ?", id)
if res.RowsAffected == 0 {
response.NotFound(w, "parser config not found")
return
}
if res.Error != nil {
response.InternalError(w, "failed to delete parser config", res.Error)
return
}

@@ -69,31 +71,33 @@ func ParserDeleteController(db *gorm.DB, writer *kafka.Writer, ctx context.Conte
Name: id,
}

if err := service.SendParserConfig(kp, writer, ctx); err != nil {
http.Error(w, "Unable to send parser config to kafka broker", 400)
msg := fmt.Sprintf("Unable to send parser config to kafka broker %v", err)
slog.Error(msg)
if err := service.SendParserConfig(kp, writer, context); err != nil {
slog.Error("failed to send parser config to Kafka", "err", err)
response.InternalError(w, "failed to publish parser config deletion", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

func ParserUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func ParserUpdateController(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]

if err := db.First(&model.Config{}, "name = ?", id).Error; err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).First(&model.Config{}, "name = ?", id).Error; err != nil {
response.NotFound(w, "parser config not found")
return
}

decoder := json.NewDecoder(r.Body)
var config model.Config
if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
response.BadRequest(w, "invalid request body")
return
}

if err := decoder.Decode(&config); err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Save(&config).Error; err != nil {
response.InternalError(w, "failed to update parser config", err)
return
}

@@ -103,14 +107,12 @@ func ParserUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Conte
Config: config,
}

db.Save(&config)
if err := service.SendParserConfig(kp, writer, ctx); err != nil {
http.Error(w, "Unable to send parser config to kafka broker", 400)
msg := fmt.Sprintf("Unable to send parser config to kafka broker %v", err)
slog.Error(msg)
if err := service.SendParserConfig(kp, writer, context); err != nil {
slog.Error("failed to send parser config to Kafka", "err", err)
response.InternalError(w, "failed to publish parser config update", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

+ 15
- 27
internal/pkg/controller/settings_controller.go Просмотреть файл

@@ -3,66 +3,54 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
"gorm.io/gorm"
)

func SettingsListController(db *gorm.DB) http.HandlerFunc {
func SettingsListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var settings []model.Settings
db.Find(&settings)
res, err := json.Marshal(settings)
if err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Find(&settings).Error; err != nil {
response.InternalError(w, "failed to list settings", err)
return
}

w.Write(res)
response.JSON(w, http.StatusOK, settings)
}
}

func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var updates map[string]any
if err := json.NewDecoder(r.Body).Decode(&updates); err != nil {
http.Error(w, "Invalid JSON", 400)
response.BadRequest(w, "invalid request body")
return
}

inMsg := fmt.Sprintf("updates: %+v", updates)
slog.Info(inMsg)
slog.Info("updating settings", "updates", updates)

if err := db.Model(&model.Settings{}).Where("id = ?", 1).Updates(updates).Error; err != nil {
msg := fmt.Sprintf("Error in updating settings: %v", err)
slog.Error(msg)
http.Error(w, err.Error(), 500)
if err := db.WithContext(context).Model(&model.Settings{}).Where("id = ?", 1).Updates(updates).Error; err != nil {
response.InternalError(w, "failed to update settings", err)
return
}

eMsg, err := json.Marshal(updates)
if err != nil {
http.Error(w, "Error in marshaling settings updates", 400)
msg := fmt.Sprintf("Error in marshaling settings updates: %v", err)
slog.Error(msg)
response.InternalError(w, "failed to marshal settings for publish", err)
return
}

msg := kafka.Message{
Value: eMsg,
}

if err := writer.WriteMessages(ctx, msg); err != nil {
kafkaMsg := kafka.Message{Value: eMsg}
if err := writer.WriteMessages(context, kafkaMsg); err != nil {
slog.Error("writing settings to Kafka", "err", err)
http.Error(w, "Failed to publish settings update", 500)
response.InternalError(w, "failed to publish settings update", err)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"Settings updated"}`))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

+ 43
- 34
internal/pkg/controller/trackers_controller.go Просмотреть файл

@@ -7,13 +7,14 @@ import (
"log/slog"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"github.com/segmentio/kafka-go"
"gorm.io/gorm"
)

func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error {
func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, context context.Context) error {
valueStr, err := json.Marshal(&value)
if err != nil {
msg := fmt.Sprintf("error in encoding: %v", err)
@@ -24,7 +25,7 @@ func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.
Value: valueStr,
}

if err := writer.WriteMessages(ctx, msg); err != nil {
if err := writer.WriteMessages(context, msg); err != nil {
msg := fmt.Sprintf("Error in sending kafka message: %v", err)
slog.Error(msg)
return err
@@ -33,14 +34,17 @@ func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.
return nil
}

func TrackerAdd(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func TrackerAdd(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var tracker model.Tracker
if err := json.NewDecoder(r.Body).Decode(&tracker); err != nil {
http.Error(w, err.Error(), 400)
response.BadRequest(w, "invalid request body")
return
}
if err := db.WithContext(context).Create(&tracker).Error; err != nil {
response.InternalError(w, "failed to create tracker", err)
return
}
db.Create(&tracker)

apiUpdate := model.ApiUpdate{
Method: "POST",
@@ -48,58 +52,66 @@ func TrackerAdd(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.Han
MAC: tracker.MAC,
}

if err := SendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
msg := "error in sending Kafka POST message"
slog.Error(msg)
http.Error(w, "Error in sending kafka message", 500)
if err := SendKafkaMessage(writer, &apiUpdate, context); err != nil {
slog.Error("error sending Kafka POST message", "err", err)
response.InternalError(w, "failed to publish tracker update", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusCreated, map[string]string{"status": "created"})
}
}

func TrackerList(db *gorm.DB) http.HandlerFunc {
func TrackerList(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var list []model.Tracker
db.Find(&list)
json.NewEncoder(w).Encode(list)
if err := db.WithContext(context).Find(&list).Error; err != nil {
response.InternalError(w, "failed to list trackers", err)
return
}
response.JSON(w, http.StatusOK, list)
}
}

func TrackerUpdate(db *gorm.DB) http.HandlerFunc {
func TrackerUpdate(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var tracker model.Tracker

if err := json.NewDecoder(r.Body).Decode(&tracker); err != nil {
http.Error(w, "Invalid JSON", 400)
response.BadRequest(w, "invalid request body")
return
}

id := tracker.ID

if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).First(&model.Tracker{}, "id = ?", id).Error; err != nil {
response.NotFound(w, "tracker not found")
return
}

if err := db.Save(&tracker).Error; err != nil {
http.Error(w, err.Error(), 500)
if err := db.WithContext(context).Save(&tracker).Error; err != nil {
response.InternalError(w, "failed to update tracker", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

func TrackerDelete(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
func TrackerDelete(db *gorm.DB, writer *kafka.Writer, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var tracker model.Tracker
db.Find(&tracker, "id = ?", id)
if err := db.WithContext(context).First(&tracker, "id = ?", id).Error; err != nil {
response.NotFound(w, "tracker not found")
return
}

if res := db.Delete(&model.Tracker{}, "id = ?", id); res.RowsAffected == 0 {
http.Error(w, "no tracker with such ID found", 400)
res := db.WithContext(context).Delete(&model.Tracker{}, "id = ?", id)
if res.RowsAffected == 0 {
response.NotFound(w, "tracker not found")
return
}
if res.Error != nil {
response.InternalError(w, "failed to delete tracker", res.Error)
return
}

@@ -107,17 +119,14 @@ func TrackerDelete(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.
Method: "DELETE",
MAC: tracker.MAC,
}
slog.Info("sending DELETE tracker message", "id", id)

msg := fmt.Sprintf("Sending DELETE tracker id: %s message", id)
slog.Info(msg)

if err := SendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
msg := "error in sending Kafka DELETE message"
slog.Error(msg)
http.Error(w, "Error in sending kafka message", 500)
if err := SendKafkaMessage(writer, &apiUpdate, context); err != nil {
slog.Error("error sending Kafka DELETE message", "err", err)
response.InternalError(w, "failed to publish tracker deletion", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

+ 32
- 20
internal/pkg/controller/trackerzones_controller.go Просмотреть файл

@@ -1,67 +1,79 @@
package controller

import (
"context"
"encoding/json"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"gorm.io/gorm"
)

func TrackerZoneAddController(db *gorm.DB) http.HandlerFunc {
func TrackerZoneAddController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var tz model.TrackerZones
if err := json.NewDecoder(r.Body).Decode(&tz); err != nil {
http.Error(w, err.Error(), 400)
response.BadRequest(w, "invalid request body")
return
}
db.Create(&tz)
w.Write([]byte("ok"))
if err := db.WithContext(context).Create(&tz).Error; err != nil {
response.InternalError(w, "failed to create tracker zone", err)
return
}

response.JSON(w, http.StatusCreated, map[string]string{"status": "created"})
}
}

func TrackerZoneListController(db *gorm.DB) http.HandlerFunc {
func TrackerZoneListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var list []model.TrackerZones
db.Find(&list)
json.NewEncoder(w).Encode(list)
if err := db.WithContext(context).Find(&list).Error; err != nil {
response.InternalError(w, "failed to list tracker zones", err)
return
}
response.JSON(w, http.StatusOK, list)
}
}

func TrackerZoneUpdateController(db *gorm.DB) http.HandlerFunc {
func TrackerZoneUpdateController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var tz model.TrackerZones

if err := json.NewDecoder(r.Body).Decode(&tz); err != nil {
http.Error(w, "Invalid JSON", 400)
response.BadRequest(w, "invalid request body")
return
}

id := tz.ID

if err := db.First(&model.TrackerZones{}, "id = ?", id).Error; err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).First(&model.TrackerZones{}, "id = ?", id).Error; err != nil {
response.NotFound(w, "tracker zone not found")
return
}

if err := db.Save(&tz).Error; err != nil {
http.Error(w, err.Error(), 500)
if err := db.WithContext(context).Save(&tz).Error; err != nil {
response.InternalError(w, "failed to update tracker zone", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

func TrackerZoneDeleteController(db *gorm.DB) http.HandlerFunc {
func TrackerZoneDeleteController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
if res := db.Delete(&model.TrackerZones{}, "id = ?", id); res.RowsAffected == 0 {
http.Error(w, "no tracker zone with such ID found", 400)
res := db.WithContext(context).Delete(&model.TrackerZones{}, "id = ?", id)
if res.RowsAffected == 0 {
response.NotFound(w, "tracker zone not found")
return
}
if res.Error != nil {
response.InternalError(w, "failed to delete tracker zone", res.Error)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

+ 6
- 8
internal/pkg/controller/tracks_controller.go Просмотреть файл

@@ -1,17 +1,18 @@
package controller

import (
"encoding/json"
"context"
"net/http"
"strconv"
"time"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"gorm.io/gorm"
)

func TracksListController(db *gorm.DB) http.HandlerFunc {
func TracksListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var tracks []model.Tracks
@@ -34,13 +35,10 @@ func TracksListController(db *gorm.DB) http.HandlerFunc {
from := parseTime("from", time.Now().AddDate(0, 0, -1))
to := parseTime("to", time.Now())

db.Where("uuid = ? AND timestamp BETWEEN ? AND ?", id, from, to).Order("timestamp DESC").Limit(limit).Find(&tracks)
res, err := json.Marshal(tracks)
if err != nil {
http.Error(w, err.Error(), 400)
if err := db.WithContext(context).Where("uuid = ? AND timestamp BETWEEN ? AND ?", id, from, to).Order("timestamp DESC").Limit(limit).Find(&tracks).Error; err != nil {
response.InternalError(w, "failed to list tracks", err)
return
}

w.Write(res)
response.JSON(w, http.StatusOK, tracks)
}
}

+ 32
- 20
internal/pkg/controller/zone_controller.go Просмотреть файл

@@ -1,67 +1,79 @@
package controller

import (
"context"
"encoding/json"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"gorm.io/gorm"
)

func ZoneAddController(db *gorm.DB) http.HandlerFunc {
func ZoneAddController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var zone model.Zone
if err := json.NewDecoder(r.Body).Decode(&zone); err != nil {
http.Error(w, err.Error(), 400)
response.BadRequest(w, "invalid request body")
return
}
db.Create(&zone)
w.Write([]byte("ok"))
if err := db.WithContext(context).Create(&zone).Error; err != nil {
response.InternalError(w, "failed to create zone", err)
return
}

response.JSON(w, http.StatusCreated, map[string]string{"status": "created"})
}
}

func ZoneListController(db *gorm.DB) http.HandlerFunc {
func ZoneListController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var zones []model.Zone
db.Find(&zones)
json.NewEncoder(w).Encode(zones) // Groups will appear as ["a", "b"] in JSON
if err := db.WithContext(context).Find(&zones).Error; err != nil {
response.InternalError(w, "failed to list zones", err)
return
}
response.JSON(w, http.StatusOK, zones)
}
}

func ZoneUpdateController(db *gorm.DB) http.HandlerFunc {
func ZoneUpdateController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var zone model.Zone

if err := json.NewDecoder(r.Body).Decode(&zone); err != nil {
http.Error(w, err.Error(), 400)
response.BadRequest(w, "invalid request body")
return
}

id := zone.ID

if err := db.First(&model.Zone{}, "id = ?", id); err != nil {
http.Error(w, "zone with this ID does not yet exist", 500)
if err := db.WithContext(context).First(&model.Zone{}, "id = ?", id).Error; err != nil {
response.NotFound(w, "zone not found")
return
}

if err := db.Save(&zone).Error; err != nil {
http.Error(w, err.Error(), 500)
if err := db.WithContext(context).Save(&zone).Error; err != nil {
response.InternalError(w, "failed to update zone", err)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "updated"})
}
}

func ZoneDeleteController(db *gorm.DB) http.HandlerFunc {
func ZoneDeleteController(db *gorm.DB, context context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
if res := db.Delete(&model.Zone{}, "id = ?", id); res.RowsAffected == 0 {
http.Error(w, "no zone with such ID found", 400)
res := db.WithContext(context).Delete(&model.Zone{}, "id = ?", id)
if res.RowsAffected == 0 {
response.NotFound(w, "zone not found")
return
}
if res.Error != nil {
response.InternalError(w, "failed to delete zone", res.Error)
return
}

w.Write([]byte("ok"))
response.JSON(w, http.StatusOK, map[string]string{"status": "deleted"})
}
}

+ 1
- 1
internal/pkg/database/database.go Просмотреть файл

@@ -25,7 +25,7 @@ func Connect(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, model.Settings{}, model.Tracks{}); err != nil {
if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, model.Settings{}, model.Tracks{}, &model.Alert{}); err != nil {
return nil, err
}



+ 13
- 7
internal/pkg/location/inference.go Просмотреть файл

@@ -3,6 +3,7 @@ package location
import (
"context"
"crypto/tls"
"fmt"
"net/http"

"github.com/AFASystems/presence/internal/pkg/apiclient"
@@ -18,24 +19,29 @@ type Inferencer interface {
// DefaultInferencer uses apiclient to get token and call the inference API.
type DefaultInferencer struct {
Client *http.Client
Token string
}

// NewDefaultInferencer creates an inferencer with optional TLS skip verify (e.g. from config.TLSInsecureSkipVerify).
func NewDefaultInferencer(skipTLSVerify bool) *DefaultInferencer {
tr := &http.Transport{}
if skipTLSVerify {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return &DefaultInferencer{
Client: &http.Client{Transport: tr},
Token: "",
}
}

// Infer gets a token and calls the inference API.
func (d *DefaultInferencer) Infer(ctx context.Context, cfg *config.Config) (model.PositionResponse, error) {
token, err := apiclient.GetToken(ctx, cfg, d.Client)
if err != nil {
return model.PositionResponse{}, err
if d.Token == "" {
fmt.Printf("getting token\n")
token, err := apiclient.GetToken(ctx, cfg, d.Client)
if err != nil {
return model.PositionResponse{}, err
}
d.Token = token
}
return apiclient.InferPosition(token, d.Client, cfg)

return apiclient.InferPosition(d.Token, d.Client, cfg)
}

+ 8
- 0
internal/pkg/model/alerts.go Просмотреть файл

@@ -0,0 +1,8 @@
package model

type Alert struct {
ID string `json:"id" gorm:"primaryKey"`
TrackerID string `json:"tracker_id"`
Type string `json:"type"`
Value string `json:"value"`
}

+ 2
- 3
internal/pkg/model/trackers.go Просмотреть файл

@@ -6,14 +6,13 @@ type Tracker struct {
MAC string `json:"mac"`
Status string `json:"status"`
Model string `json:"model"`
IP string `json:"ip"`
Position string `json:"position"`
Notes string `json:"notes"`
X float32 `json:"x"`
Y float32 `json:"y"`
Notes string `json:"notes"`
Floor string `json:"floor"`
Building string `json:"building"`
Location string `json:"location"`
Distance float64 `json:"distance"`
Battery uint32 `json:"battery,string"`
BatteryThreshold uint32 `json:"batteryThreshold"`
Temperature uint16 `json:"temperature,string"`


+ 4
- 6
internal/pkg/model/types.go Просмотреть файл

@@ -44,6 +44,10 @@ type HTTPLocation struct {
Location string `json:"location"`
LastSeen int64 `json:"last_seen"`
RSSI int64 `json:"rssi"`
X float32 `json:"x"`
Y float32 `json:"y"`
Z float32 `json:"z"`
MAC string `json:"mac"`
}

// Beacon holds all relevant information about a tracked beacon device.
@@ -134,9 +138,3 @@ type ApiUpdate struct {
ID string
MAC string
}

type Alert struct {
ID string `json:"id"` // tracker id
Type string `json:"type"` // type of alert
Value string `json:"value"` // possible value
}

+ 39
- 0
internal/pkg/service/alert_service.go Просмотреть файл

@@ -0,0 +1,39 @@
package service

import (
"context"

"github.com/AFASystems/presence/internal/pkg/model"
"gorm.io/gorm"
)

func InsertAlert(alert model.Alert, db *gorm.DB, ctx context.Context) error {
if err := db.WithContext(ctx).Create(&alert).Error; err != nil {
return err
}
return nil
}

func DeleteAlertByTrackerID(trackerID string, db *gorm.DB, ctx context.Context) error {
if err := db.WithContext(ctx).Where("id = ?", trackerID).Delete(&model.Alert{}).Error; err != nil {
return err
}
return nil
}

func GetAllAlerts(db *gorm.DB, ctx context.Context) ([]model.Alert, error) {
var alerts []model.Alert
if err := db.WithContext(ctx).Find(&alerts).Error; err != nil {
return []model.Alert{}, err
}

return alerts, nil
}

func GetAlertById(id string, db *gorm.DB, ctx context.Context) (model.Alert, error) {
var alert model.Alert
if err := db.WithContext(ctx).First(&alert, id).Error; err != nil {
return alert, err
}
return alert, nil
}

+ 107
- 33
internal/pkg/service/beacon_service.go Просмотреть файл

@@ -3,6 +3,7 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"slices"
@@ -10,6 +11,7 @@ import (
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"gorm.io/gorm"
)
@@ -19,45 +21,131 @@ type KafkaWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) {
var tracker model.Tracker
if msg.MAC != "" {
if err := db.Where("mac = ?", msg.MAC).Find(&tracker).Error; err != nil {
return model.Tracker{}, err
}

return tracker, nil
}

if msg.ID != "" {
if err := db.Where("id = ?", msg.ID).Find(&tracker).Error; err != nil {
return model.Tracker{}, err
}

return tracker, nil
}

return model.Tracker{}, errors.New("both ID and MAC are not provided")
}

func findZones(trackerID string, db *gorm.DB) ([]string, error) {
var zones []model.TrackerZones
if err := db.Select("zoneList").Where("tracker = ?", trackerID).Find(&zones).Error; err != nil {
return nil, err
}

var allowedZones []string
for _, z := range zones {
allowedZones = append(allowedZones, z.ZoneList...)
}

return allowedZones, nil
}

func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) {
if msg.ID == "" {
msg := "empty ID"
tracker, err := findTracker(msg, db)
if err != nil {
msg := fmt.Sprintf("Error in finding tracker: %v", err)
slog.Error(msg)
return
}

var zones []model.TrackerZones
if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil {
msg := fmt.Sprintf("Error in selecting zones: %v", err)
allowedZones, err := findZones(tracker.ID, db)
if err != nil {
msg := fmt.Sprintf("Error in finding zones: %v", err)
slog.Error(msg)
return
}

var tracker model.Tracker
if err := db.Where("id = ?", msg.ID).Find(&tracker).Error; err != nil {
msg := fmt.Sprintf("Error in selecting tracker: %v", err)
var gw model.Gateway
mac := formatMac(msg.Location)
if err := db.Select("*").Where("mac = ?", mac).First(&gw).Error; err != nil {
msg := fmt.Sprintf("Gateway not found for MAC: %s", mac)
slog.Error(msg)
return
}

var allowedZones []string
for _, z := range zones {
allowedZones = append(allowedZones, z.ZoneList...)
if err := db.Create(&model.Tracks{UUID: msg.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: msg.ID, Floor: gw.Floor, Building: gw.Building, TrackerMac: tracker.MAC, Signal: msg.RSSI}).Error; err != nil {
msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
slog.Error(msg)
return
}

err = db.Where("id = ?", msg.ID).Updates(model.Tracker{Position: gw.ID, X: gw.X, Y: gw.Y}).Error
if err != nil {
msg := fmt.Sprintf("Error in updating tracker: %v", err)
slog.Error(msg)
return
}

sendAlert(gw.ID, msg.ID, writer, ctx, allowedZones, db)
}

func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) {
tracker, err := findTracker(msg, db)
if err != nil {
msg := fmt.Sprintf("Error in finding tracker: %v", err)
slog.Error(msg)
return
}

allowedZones, err := findZones(tracker.ID, db)
if err != nil {
msg := fmt.Sprintf("Error in finding zones: %v", err)
slog.Error(msg)
return
}

var gw model.Gateway
mac := formatMac(msg.Location)
if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil {
msg := fmt.Sprintf("Gateway not found for MAC: %s", mac)
if err := db.Order(fmt.Sprintf("POW(x - %f, 2) + POW(y - %f, 2)", msg.X, msg.Y)).First(&gw).Error; err != nil {
msg := fmt.Sprintf("Error in finding gateway: %v", err)
slog.Error(msg)
return
}

if err := db.Create(&model.Tracks{UUID: tracker.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: tracker.ID, Floor: gw.Floor, Building: gw.Building, TrackerMac: tracker.MAC}).Error; err != nil {
msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
slog.Error(msg)
return
}

err = db.Where("id = ?", tracker.ID).Updates(model.Tracker{Position: gw.ID, X: msg.X, Y: msg.Y}).Error
if err != nil {
msg := fmt.Sprintf("Error in updating tracker: %v", err)
slog.Error(msg)
return
}
}

sendAlert(gw.ID, tracker.ID, writer, ctx, allowedZones, db)
}

if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) {
func sendAlert(gwId, trackerId string, writer KafkaWriter, ctx context.Context, allowedZones []string, db *gorm.DB) {
if len(allowedZones) != 0 && !slices.Contains(allowedZones, gwId) {
alert := model.Alert{
ID: msg.ID,
Type: "Restricted zone",
Value: gw.ID,
ID: uuid.New().String(),
TrackerID: trackerId,
Type: "Restricted zone",
Value: gwId,
}

if err := InsertAlert(alert, db, ctx); err != nil {
msg := fmt.Sprintf("Error in inserting alert: %v", err)
slog.Error(msg)
return
}

eMsg, err := json.Marshal(alert)
@@ -70,22 +158,8 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWr
Value: eMsg,
}
writer.WriteMessages(ctx, msg)
return
}
}

// status, subject, subject name?
if err := db.Create(&model.Tracks{UUID: msg.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: msg.ID, Floor: gw.Floor, Building: gw.Building, TrackerMac: tracker.MAC, Signal: msg.RSSI}).Error; err != nil {
msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
slog.Error(msg)
return
}

if err := db.Updates(&model.Tracker{ID: msg.ID, Location: gw.ID, Distance: msg.Distance, X: gw.X, Y: gw.Y}).Error; err != nil {
msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
slog.Error(msg)
return
}
}

func formatMac(MAC string) string {


+ 15
- 0
scripts/build/build.sh Просмотреть файл

@@ -0,0 +1,15 @@
#!/bin/bash

# Build the server
docker build -t presense:server_v1.0.0 -f ../../build/package/Dockerfile.server ../../

# Build the location
docker build -t presense:location_v1.0.0 -f ../../build/package/Dockerfile.location ../../

# Build the decoder
docker build -t presense:decoder_v1.0.0 -f ../../build/package/Dockerfile.decoder ../../

# Build the bridge
docker build -t presense:bridge_v1.0.0 -f ../../build/package/Dockerfile.bridge ../../

docker image ls

Загрузка…
Отмена
Сохранить