4 Commits

8 arquivos alterados com 195 adições e 38 exclusões
Visão dividida
  1. +56
    -0
      build/docker-compose.yaml
  2. +6
    -1
      build/init-scripts/create_topic.sh
  3. +23
    -5
      cmd/decoder/main.go
  4. +29
    -11
      cmd/location/main.go
  5. +71
    -18
      cmd/server/main.go
  6. +7
    -1
      internal/pkg/controller/beacons_controller.go
  7. +0
    -0
      internal/pkg/model/type_methods.go
  8. +3
    -2
      scripts/testAPI.sh

+ 56
- 0
build/docker-compose.yaml Ver arquivo

@@ -54,4 +54,60 @@ services:
ports:
- "127.0.0.1:6379:6379"

presense-decoder:
build:
context: ../
dockerfile: build/package/Dockerfile.decoder
image: presense-decoder
container_name: presense-decoder
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always
presense-server:
build:
context: ../
dockerfile: build/package/Dockerfile.server
image: presense-server
container_name: presense-server
environment:
- VALKEY_URL=valkey:6379
- KAFKA_URL=kafka:29092
ports:
- "127.0.0.1:1902:1902"
depends_on:
- kafka-init
- valkey
restart: always

presense-bridge:
build:
context: ../
dockerfile: build/package/Dockerfile.bridge
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:29092
- MQTT_HOST=192.168.1.101:1883
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
depends_on:
- kafka-init
restart: always

presense-location:
build:
context: ../
dockerfile: build/package/Dockerfile.location
image: presense-location
container_name: presense-location
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always



+ 6
- 1
build/init-scripts/create_topic.sh Ver arquivo

@@ -15,7 +15,12 @@
--create --if-not-exists --topic alertbeacons \
--partitions 1 --replication-factor 1

# create topic alertBeacons
# create topic locevents
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic locevents \
--partitions 1 --replication-factor 1

# create topic settings
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic settings \
--partitions 1 --replication-factor 1

+ 23
- 5
cmd/decoder/main.go Ver arquivo

@@ -5,6 +5,10 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/signal"
"strings"
"sync"
@@ -25,6 +29,16 @@ func main() {
appState := appcontext.NewAppState()
cfg := config.Load()

// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
@@ -34,7 +48,7 @@ func main() {

alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")

fmt.Println("Decoder initialized, subscribed to Kafka topics")
slog.Info("Decoder initialized, subscribed to Kafka topics")

chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 200)
@@ -55,18 +69,21 @@ eventloop:
case "POST":
id := msg.Beacon.ID
appState.AddBeaconToLookup(id)
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
case "DELETE":
id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id)
fmt.Println("Beacon removed from lookup: ", id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
}
}
}

fmt.Println("broken out of the main event loop")
slog.Info("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}
@@ -80,7 +97,8 @@ func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppStat

err := decodeBeacon(adv, appState, writer)
if err != nil {
fmt.Println("error in decoding")
eMsg := fmt.Sprintf("Error in decoding: %v", err)
fmt.Println(eMsg)
return
}
}


+ 29
- 11
cmd/location/main.go Ver arquivo

@@ -4,6 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
@@ -24,6 +28,16 @@ func main() {
appState := appcontext.NewAppState()
cfg := config.Load()

// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// Define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
@@ -34,7 +48,7 @@ func main() {

writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")

fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")
slog.Info("Locations algorithm initialized, subscribed to Kafka topics")

locTicker := time.NewTicker(1 * time.Second)
defer locTicker.Stop()
@@ -61,22 +75,24 @@ eventLoop:
switch msg.Method {
case "POST":
id := msg.Beacon.ID
fmt.Println("Beacon added to lookup: ", id)
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
appState.AddBeaconToLookup(id)
case "DELETE":
id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id)
fmt.Println("Beacon removed from lookup: ", id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
}
case msg := <-chSettings:
appState.UpdateSettings(msg)
}
}

fmt.Println("broken out of the main event loop")
slog.Info("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}
@@ -98,7 +114,7 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
mSize := len(beacon.BeaconMetrics)

if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold {
fmt.Println("Beacon is too old")
slog.Warn("beacon is too old")
continue
}

@@ -133,7 +149,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
beacon.LocationConfidence = 0

// Why do I need this if I am sending entire structure anyways? who knows
fmt.Println("this is called")
js, err := json.Marshal(model.LocationChange{
Method: "LocationChange",
BeaconRef: beacon,
@@ -144,7 +159,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
})

if err != nil {
fmt.Println("This error happens: ", err)
eMsg := fmt.Sprintf("Error in marshaling: %v", err)
slog.Error(eMsg)
beacon.PreviousConfidentLocation = bestLocName
beacon.PreviousLocation = bestLocName
appState.UpdateBeacon(beacon.ID, beacon)
@@ -166,7 +182,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {

js, err := json.Marshal(r)
if err != nil {
fmt.Println("Error in marshaling location: ", err)
eMsg := fmt.Sprintf("Error in marshaling location: %v", err)
slog.Error(eMsg)
continue
}

@@ -176,7 +193,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {

err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in sending Kafka message: ", err)
eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err)
slog.Error(eMsg)
}
}
}
@@ -194,7 +212,7 @@ func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppS
settings := appState.GetSettingsValue()

if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
fmt.Println("Settings returns")
slog.Info("Settings returns")
return
}



+ 71
- 18
cmd/server/main.go Ver arquivo

@@ -4,9 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
@@ -27,12 +31,24 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
}

var _ io.Writer = (*os.File)(nil)

var wg sync.WaitGroup

func main() {
cfg := config.Load()
appState := appcontext.NewAppState()

// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
@@ -43,12 +59,14 @@ func main() {

writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
slog.Info("Kafka writers topics: apibeacons, settings initialized")

locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
slog.Info("Kafka readers topics: locevents, alertbeacons initialized")

client := appState.AddValkeyClient(cfg.ValkeyURL)
// Need Lua script to pull all of the beacons in one go on init
slog.Info("Valkey DB client created")

chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500)
@@ -59,7 +77,6 @@ func main() {

r := mux.NewRouter()

// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
@@ -69,9 +86,23 @@ func main() {
r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")

r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx))
wsHandler := http.HandlerFunc(serveWs(appState, ctx))
restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
wsHandler.ServeHTTP(w, r)
return
}

restApiHandler.ServeHTTP(w, r)
})

http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
server := http.Server{
Addr: cfg.HTTPAddr,
Handler: mainHandler,
}

go server.ListenAndServe()

eventLoop:
for {
@@ -80,23 +111,35 @@ eventLoop:
break eventLoop
case msg := <-chLoc:
if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
fmt.Printf("Error in writing location change to beacon: %v\n", err)
eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
slog.Error(eMsg)
}
case msg := <-chEvents:
if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
fmt.Printf("Error in writing event change to beacon: %v\n", err)
eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
slog.Error(eMsg)
}
}
}

fmt.Println("broken out of the main event loop")
if err := server.Shutdown(context.Background()); err != nil {
eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
slog.Error(eMsg)
}

slog.Info("API SERVER: \n")
slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")

slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
appState.CleanValkeyClient()

slog.Info("API server shutting down")
logFile.Close()
}

func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
@@ -104,13 +147,14 @@ func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFun
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
slog.Error(eMsg)
}
return
}
wg.Add(1)
wg.Add(2)
go writer(ws, appstate, ctx)
reader(ws)
go reader(ws, ctx)
}
}

@@ -126,7 +170,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte
for {
select {
case <-ctx.Done():
log.Println("WebSocket writer received shutdown signal.")
slog.Info("WebSocket writer received shutdown signal.")
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return
@@ -150,15 +194,24 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte
}
}

func reader(ws *websocket.Conn) {
defer ws.Close()
func reader(ws *websocket.Conn, ctx context.Context) {
defer func() {
ws.Close()
wg.Done()
}()
ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
select {
case <-ctx.Done():
slog.Info("closing ws reader")
return
default:
_, _, err := ws.ReadMessage()
if err != nil {
return
}
}
}
}

+ 7
- 1
internal/pkg/controller/beacons_controller.go Ver arquivo

@@ -87,24 +87,30 @@ func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.Handle
decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon
err := decoder.Decode(&inBeacon)
fmt.Printf("hello world\n")

if err != nil {
http.Error(w, err.Error(), 400)
return
}
fmt.Printf("hello world\n")
fmt.Printf("in beacon: %+v\n", inBeacon)

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
}

fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID)
fmt.Printf("Adding new print here also\n")
// fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID)

apiUpdate := model.ApiUpdate{
Method: "POST",
Beacon: inBeacon,
}

fmt.Printf("message: %+v\n", apiUpdate)

if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
fmt.Println("error in sending Kafka POST message")
http.Error(w, "Error in sending kafka message", 500)


internal/pkg/model/typeMethods.go → internal/pkg/model/type_methods.go Ver arquivo


+ 3
- 2
scripts/testAPI.sh Ver arquivo

@@ -10,5 +10,6 @@ echo -e "\n"

sleep 1

echo "GET beacon ID: $BEACON_ID"
curl -X GET $URL/$BEACON_ID
curl -X GET $URL

sleep 1

Carregando…
Cancelar
Salvar