4 Commit

8 ha cambiato i file con 195 aggiunte e 38 eliminazioni
  1. +56
    -0
      build/docker-compose.yaml
  2. +6
    -1
      build/init-scripts/create_topic.sh
  3. +23
    -5
      cmd/decoder/main.go
  4. +29
    -11
      cmd/location/main.go
  5. +71
    -18
      cmd/server/main.go
  6. +7
    -1
      internal/pkg/controller/beacons_controller.go
  7. +0
    -0
      internal/pkg/model/type_methods.go
  8. +3
    -2
      scripts/testAPI.sh

+ 56
- 0
build/docker-compose.yaml Vedi File

@@ -54,4 +54,60 @@ services:
ports: ports:
- "127.0.0.1:6379:6379" - "127.0.0.1:6379:6379"


presense-decoder:
build:
context: ../
dockerfile: build/package/Dockerfile.decoder
image: presense-decoder
container_name: presense-decoder
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always
presense-server:
build:
context: ../
dockerfile: build/package/Dockerfile.server
image: presense-server
container_name: presense-server
environment:
- VALKEY_URL=valkey:6379
- KAFKA_URL=kafka:29092
ports:
- "127.0.0.1:1902:1902"
depends_on:
- kafka-init
- valkey
restart: always

presense-bridge:
build:
context: ../
dockerfile: build/package/Dockerfile.bridge
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:29092
- MQTT_HOST=192.168.1.101:1883
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
depends_on:
- kafka-init
restart: always

presense-location:
build:
context: ../
dockerfile: build/package/Dockerfile.location
image: presense-location
container_name: presense-location
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always




+ 6
- 1
build/init-scripts/create_topic.sh Vedi File

@@ -15,7 +15,12 @@
--create --if-not-exists --topic alertbeacons \ --create --if-not-exists --topic alertbeacons \
--partitions 1 --replication-factor 1 --partitions 1 --replication-factor 1


# create topic alertBeacons
# create topic locevents
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic locevents \ --create --if-not-exists --topic locevents \
--partitions 1 --replication-factor 1

# create topic settings
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic settings \
--partitions 1 --replication-factor 1 --partitions 1 --replication-factor 1

+ 23
- 5
cmd/decoder/main.go Vedi File

@@ -5,6 +5,10 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io"
"log"
"log/slog"
"os"
"os/signal" "os/signal"
"strings" "strings"
"sync" "sync"
@@ -25,6 +29,16 @@ func main() {
appState := appcontext.NewAppState() appState := appcontext.NewAppState()
cfg := config.Load() cfg := config.Load()


// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// define context // define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop() defer stop()
@@ -34,7 +48,7 @@ func main() {


alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")


fmt.Println("Decoder initialized, subscribed to Kafka topics")
slog.Info("Decoder initialized, subscribed to Kafka topics")


chRaw := make(chan model.BeaconAdvertisement, 2000) chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 200) chApi := make(chan model.ApiUpdate, 200)
@@ -55,18 +69,21 @@ eventloop:
case "POST": case "POST":
id := msg.Beacon.ID id := msg.Beacon.ID
appState.AddBeaconToLookup(id) appState.AddBeaconToLookup(id)
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
case "DELETE": case "DELETE":
id := msg.Beacon.ID id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id) appState.RemoveBeaconFromLookup(id)
fmt.Println("Beacon removed from lookup: ", id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
} }
} }
} }


fmt.Println("broken out of the main event loop")
slog.Info("broken out of the main event loop")
wg.Wait() wg.Wait()


fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders() appState.CleanKafkaReaders()
appState.CleanKafkaWriters() appState.CleanKafkaWriters()
} }
@@ -80,7 +97,8 @@ func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppStat


err := decodeBeacon(adv, appState, writer) err := decodeBeacon(adv, appState, writer)
if err != nil { if err != nil {
fmt.Println("error in decoding")
eMsg := fmt.Sprintf("Error in decoding: %v", err)
fmt.Println(eMsg)
return return
} }
} }


+ 29
- 11
cmd/location/main.go Vedi File

@@ -4,6 +4,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log"
"log/slog"
"os"
"os/signal" "os/signal"
"sync" "sync"
"syscall" "syscall"
@@ -24,6 +28,16 @@ func main() {
appState := appcontext.NewAppState() appState := appcontext.NewAppState()
cfg := config.Load() cfg := config.Load()


// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// Define context // Define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop() defer stop()
@@ -34,7 +48,7 @@ func main() {


writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")


fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")
slog.Info("Locations algorithm initialized, subscribed to Kafka topics")


locTicker := time.NewTicker(1 * time.Second) locTicker := time.NewTicker(1 * time.Second)
defer locTicker.Stop() defer locTicker.Stop()
@@ -61,22 +75,24 @@ eventLoop:
switch msg.Method { switch msg.Method {
case "POST": case "POST":
id := msg.Beacon.ID id := msg.Beacon.ID
fmt.Println("Beacon added to lookup: ", id)
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
appState.AddBeaconToLookup(id) appState.AddBeaconToLookup(id)
case "DELETE": case "DELETE":
id := msg.Beacon.ID id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id) appState.RemoveBeaconFromLookup(id)
fmt.Println("Beacon removed from lookup: ", id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
} }
case msg := <-chSettings: case msg := <-chSettings:
appState.UpdateSettings(msg) appState.UpdateSettings(msg)
} }
} }


fmt.Println("broken out of the main event loop")
slog.Info("broken out of the main event loop")
wg.Wait() wg.Wait()


fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders() appState.CleanKafkaReaders()
appState.CleanKafkaWriters() appState.CleanKafkaWriters()
} }
@@ -98,7 +114,7 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
mSize := len(beacon.BeaconMetrics) mSize := len(beacon.BeaconMetrics)


if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold {
fmt.Println("Beacon is too old")
slog.Warn("beacon is too old")
continue continue
} }


@@ -133,7 +149,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
beacon.LocationConfidence = 0 beacon.LocationConfidence = 0


// Why do I need this if I am sending entire structure anyways? who knows // Why do I need this if I am sending entire structure anyways? who knows
fmt.Println("this is called")
js, err := json.Marshal(model.LocationChange{ js, err := json.Marshal(model.LocationChange{
Method: "LocationChange", Method: "LocationChange",
BeaconRef: beacon, BeaconRef: beacon,
@@ -144,7 +159,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
}) })


if err != nil { if err != nil {
fmt.Println("This error happens: ", err)
eMsg := fmt.Sprintf("Error in marshaling: %v", err)
slog.Error(eMsg)
beacon.PreviousConfidentLocation = bestLocName beacon.PreviousConfidentLocation = bestLocName
beacon.PreviousLocation = bestLocName beacon.PreviousLocation = bestLocName
appState.UpdateBeacon(beacon.ID, beacon) appState.UpdateBeacon(beacon.ID, beacon)
@@ -166,7 +182,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {


js, err := json.Marshal(r) js, err := json.Marshal(r)
if err != nil { if err != nil {
fmt.Println("Error in marshaling location: ", err)
eMsg := fmt.Sprintf("Error in marshaling location: %v", err)
slog.Error(eMsg)
continue continue
} }


@@ -176,7 +193,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {


err = writer.WriteMessages(context.Background(), msg) err = writer.WriteMessages(context.Background(), msg)
if err != nil { if err != nil {
fmt.Println("Error in sending Kafka message: ", err)
eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err)
slog.Error(eMsg)
} }
} }
} }
@@ -194,7 +212,7 @@ func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppS
settings := appState.GetSettingsValue() settings := appState.GetSettingsValue()


if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
fmt.Println("Settings returns")
slog.Info("Settings returns")
return return
} }




+ 71
- 18
cmd/server/main.go Vedi File

@@ -4,9 +4,13 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"log/slog"
"net/http" "net/http"
"os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@@ -27,12 +31,24 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1024, WriteBufferSize: 1024,
} }


var _ io.Writer = (*os.File)(nil)

var wg sync.WaitGroup var wg sync.WaitGroup


func main() { func main() {
cfg := config.Load() cfg := config.Load()
appState := appcontext.NewAppState() appState := appcontext.NewAppState()


// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// define context // define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop() defer stop()
@@ -43,12 +59,14 @@ func main() {


writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
slog.Info("Kafka writers topics: apibeacons, settings initialized")


locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
slog.Info("Kafka readers topics: locevents, alertbeacons initialized")


client := appState.AddValkeyClient(cfg.ValkeyURL) client := appState.AddValkeyClient(cfg.ValkeyURL)
// Need Lua script to pull all of the beacons in one go on init
slog.Info("Valkey DB client created")


chLoc := make(chan model.HTTPLocation, 200) chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500) chEvents := make(chan model.BeaconEvent, 500)
@@ -59,7 +77,6 @@ func main() {


r := mux.NewRouter() r := mux.NewRouter()


// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
@@ -69,9 +86,23 @@ func main() {
r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")


r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx))
wsHandler := http.HandlerFunc(serveWs(appState, ctx))
restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
wsHandler.ServeHTTP(w, r)
return
}

restApiHandler.ServeHTTP(w, r)
})


http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
server := http.Server{
Addr: cfg.HTTPAddr,
Handler: mainHandler,
}

go server.ListenAndServe()


eventLoop: eventLoop:
for { for {
@@ -80,23 +111,35 @@ eventLoop:
break eventLoop break eventLoop
case msg := <-chLoc: case msg := <-chLoc:
if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
fmt.Printf("Error in writing location change to beacon: %v\n", err)
eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
slog.Error(eMsg)
} }
case msg := <-chEvents: case msg := <-chEvents:
if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
fmt.Printf("Error in writing event change to beacon: %v\n", err)
eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
slog.Error(eMsg)
} }
} }
} }


fmt.Println("broken out of the main event loop")
if err := server.Shutdown(context.Background()); err != nil {
eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
slog.Error(eMsg)
}

slog.Info("API SERVER: \n")
slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
wg.Wait() wg.Wait()


fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
appState.CleanKafkaReaders() appState.CleanKafkaReaders()
appState.CleanKafkaWriters() appState.CleanKafkaWriters()
fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")

slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
appState.CleanValkeyClient() appState.CleanValkeyClient()

slog.Info("API server shutting down")
logFile.Close()
} }


func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
@@ -104,13 +147,14 @@ func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFun
ws, err := upgrader.Upgrade(w, r, nil) ws, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok { if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
slog.Error(eMsg)
} }
return return
} }
wg.Add(1)
wg.Add(2)
go writer(ws, appstate, ctx) go writer(ws, appstate, ctx)
reader(ws)
go reader(ws, ctx)
} }
} }


@@ -126,7 +170,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Println("WebSocket writer received shutdown signal.")
slog.Info("WebSocket writer received shutdown signal.")
ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return return
@@ -150,15 +194,24 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte
} }
} }


func reader(ws *websocket.Conn) {
defer ws.Close()
func reader(ws *websocket.Conn, ctx context.Context) {
defer func() {
ws.Close()
wg.Done()
}()
ws.SetReadLimit(512) ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
for { for {
_, _, err := ws.ReadMessage()
if err != nil {
break
select {
case <-ctx.Done():
slog.Info("closing ws reader")
return
default:
_, _, err := ws.ReadMessage()
if err != nil {
return
}
} }
} }
} }

+ 7
- 1
internal/pkg/controller/beacons_controller.go Vedi File

@@ -87,24 +87,30 @@ func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.Handle
decoder := json.NewDecoder(r.Body) decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon var inBeacon model.Beacon
err := decoder.Decode(&inBeacon) err := decoder.Decode(&inBeacon)
fmt.Printf("hello world\n")


if err != nil { if err != nil {
http.Error(w, err.Error(), 400) http.Error(w, err.Error(), 400)
return return
} }
fmt.Printf("hello world\n")
fmt.Printf("in beacon: %+v\n", inBeacon)


if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400) http.Error(w, "name and beacon_id cannot be blank", 400)
return return
} }


fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID)
fmt.Printf("Adding new print here also\n")
// fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID)


apiUpdate := model.ApiUpdate{ apiUpdate := model.ApiUpdate{
Method: "POST", Method: "POST",
Beacon: inBeacon, Beacon: inBeacon,
} }


fmt.Printf("message: %+v\n", apiUpdate)

if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
fmt.Println("error in sending Kafka POST message") fmt.Println("error in sending Kafka POST message")
http.Error(w, "Error in sending kafka message", 500) http.Error(w, "Error in sending kafka message", 500)


internal/pkg/model/typeMethods.go → internal/pkg/model/type_methods.go Vedi File


+ 3
- 2
scripts/testAPI.sh Vedi File

@@ -10,5 +10,6 @@ echo -e "\n"


sleep 1 sleep 1


echo "GET beacon ID: $BEACON_ID"
curl -X GET $URL/$BEACON_ID
curl -X GET $URL

sleep 1

Caricamento…
Annulla
Salva