2 Commits

10 changed files with 270 additions and 121 deletions
Unified View
  1. +2
    -2
      cmd/bridge/main.go
  2. +25
    -10
      cmd/decoder/main.go
  3. +24
    -10
      cmd/location/main.go
  4. +84
    -82
      cmd/server/main.go
  5. +95
    -5
      internal/pkg/common/appcontext/context.go
  6. +2
    -0
      internal/pkg/config/config.go
  7. +20
    -12
      internal/pkg/kafkaclient/consumer.go
  8. +3
    -0
      internal/pkg/kafkaclient/reader.go
  9. +3
    -0
      internal/pkg/kafkaclient/writer.go
  10. +12
    -0
      internal/pkg/model/types.go

+ 2
- 2
cmd/bridge/main.go View File

@@ -29,11 +29,11 @@ func main() {
}) })


if err != nil { if err != nil {
fmt.Println("Could not connect to MQTT broker")
fmt.Printf("Could not connect to MQTT broker, addr: %s", cfg.MQTTHost)
panic(err) panic(err)
} }


fmt.Println("Successfuly connected to MQTT broker")
fmt.Printf("Successfuly connected to MQTT broker, addr: %s", cfg.MQTTHost)


writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons") writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons")
defer writer.Close() defer writer.Close()


+ 25
- 10
cmd/decoder/main.go View File

@@ -5,7 +5,10 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"os/signal"
"strings" "strings"
"sync"
"syscall"


"github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/common/utils" "github.com/AFASystems/presence/internal/pkg/common/utils"
@@ -15,31 +18,36 @@ import (
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )


var wg sync.WaitGroup

func main() { func main() {
// Load global context to init beacons and latest list // Load global context to init beacons and latest list
appState := appcontext.NewAppState() appState := appcontext.NewAppState()
cfg := config.Load() cfg := config.Load()


// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
defer rawReader.Close()
// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()


// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
defer apiReader.Close()
rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")


alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons")
defer alertWriter.Close()
alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")


fmt.Println("Decoder initialized, subscribed to Kafka topics") fmt.Println("Decoder initialized, subscribed to Kafka topics")

chRaw := make(chan model.BeaconAdvertisement, 2000) chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 2000) chApi := make(chan model.ApiUpdate, 2000)


go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
wg.Add(2)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)


eventloop:
for { for {
select { select {
case <-ctx.Done():
break eventloop
case msg := <-chRaw: case msg := <-chRaw:
processIncoming(msg, appState, alertWriter) processIncoming(msg, appState, alertWriter)
case msg := <-chApi: case msg := <-chApi:
@@ -52,6 +60,13 @@ func main() {
} }
} }
} }

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
} }


func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) { func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) {


+ 24
- 10
cmd/location/main.go View File

@@ -4,6 +4,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os/signal"
"sync"
"syscall"
"time" "time"


"github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/common/appcontext"
@@ -14,21 +17,21 @@ import (
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )


var wg sync.WaitGroup

func main() { func main() {
// Load global context to init beacons and latest list // Load global context to init beacons and latest list
appState := appcontext.NewAppState() appState := appcontext.NewAppState()
cfg := config.Load() cfg := config.Load()


// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
defer rawReader.Close()
// Define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()


// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
defer apiReader.Close()
rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")


writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents")
defer writer.Close()
writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")


fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")


@@ -38,11 +41,15 @@ func main() {
chRaw := make(chan model.BeaconAdvertisement, 2000) chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 2000) chApi := make(chan model.ApiUpdate, 2000)


go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
wg.Add(2)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)


eventLoop:
for { for {
select { select {
case <-ctx.Done():
break eventLoop
case <-locTicker.C: case <-locTicker.C:
getLikelyLocations(appState, writer) getLikelyLocations(appState, writer)
case msg := <-chRaw: case msg := <-chRaw:
@@ -58,6 +65,13 @@ func main() {
} }
} }
} }

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
} }


func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {


+ 84
- 82
cmd/server/main.go View File

@@ -6,7 +6,10 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"os/signal"
"strings" "strings"
"sync"
"syscall"
"time" "time"


"github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/common/appcontext"
@@ -24,102 +27,35 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
} }


func main() {
HttpServer("0.0.0.0:1902")
}
var wg sync.WaitGroup


func HttpServer(addr string) {
func main() {
cfg := config.Load() cfg := config.Load()
appState := appcontext.NewAppState() appState := appcontext.NewAppState()


// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"}) originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})


// Kafka writer that relays messages
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons")
defer writer.Close()

settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings")
defer settingsWriter.Close()
writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")


// Kafka reader for Raw MQTT beacons
locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv")
defer locationReader.Close()
locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")


// Kafka reader for API server updates
alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
defer alertsReader.Close()

client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})

ctx := context.Background()
client := appState.AddValkeyClient(cfg.ValkeyURL)


// Separate channel for location change? // Separate channel for location change?
chLoc := make(chan model.HTTPLocation, 200) chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500) chEvents := make(chan model.BeaconEvent, 500)


go kafkaclient.Consume(locationReader, chLoc)
go kafkaclient.Consume(alertsReader, chEvents)

go func() {
for {
select {
case msg := <-chLoc:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
} else {
beacon.ID = msg.ID
beacon.Location = msg.Location
beacon.Distance = msg.Distance
beacon.LastSeen = msg.LastSeen
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
case msg := <-chEvents:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
} else {
beacon.ID = msg.ID
beacon.BeaconType = msg.Type
beacon.HSBattery = int64(msg.Battery)
beacon.Event = msg.Event
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
}
}
}()
wg.Add(2)
go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)


r := mux.NewRouter() r := mux.NewRouter()


@@ -146,7 +82,73 @@ func HttpServer(addr string) {
// r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))


http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r))

eventLoop:
for {
select {
case <-ctx.Done():
break eventLoop
case msg := <-chLoc:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
} else {
beacon.ID = msg.ID
beacon.Location = msg.Location
beacon.Distance = msg.Distance
beacon.LastSeen = msg.LastSeen
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
case msg := <-chEvents:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
} else {
beacon.ID = msg.ID
beacon.BeaconType = msg.Type
beacon.HSBattery = int64(msg.Battery)
beacon.Event = msg.Event
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
}
}

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")
appState.CleanValkeyClient()
} }


func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc {


+ 95
- 5
internal/pkg/common/appcontext/context.go View File

@@ -1,16 +1,25 @@
package appcontext package appcontext


import ( import (
"fmt"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
) )


// AppState provides centralized access to application state // AppState provides centralized access to application state
type AppState struct { type AppState struct {
beacons model.BeaconsList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]struct{}
latestList model.LatestBeaconsList
beacons model.BeaconsList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]struct{}
latestList model.LatestBeaconsList
kafkaReadersList model.KafkaReadersList
kafkaWritersList model.KafkaWritersList
valkeyDB *redis.Client
} }


// NewAppState creates a new application context AppState with default values // NewAppState creates a new application context AppState with default values
@@ -37,7 +46,88 @@ func NewAppState() *AppState {
latestList: model.LatestBeaconsList{ latestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon), LatestList: make(map[string]model.Beacon),
}, },
kafkaReadersList: model.KafkaReadersList{
KafkaReaders: make([]*kafka.Reader, 0),
},
kafkaWritersList: model.KafkaWritersList{
KafkaWriters: make([]*kafka.Writer, 0),
},
}
}

func (m *AppState) AddValkeyClient(url string) *redis.Client {
valkeyDB := redis.NewClient(&redis.Options{
Addr: url,
Password: "",
})

m.valkeyDB = valkeyDB
return valkeyDB
}

func (m *AppState) CleanValkeyClient() {
fmt.Println("shutdown of valkey client starts")
if err := m.valkeyDB.Close(); err != nil {
fmt.Println("Error in shuting down valkey client")
}

fmt.Println("Succesfully shutting down valkey client")
}

func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
} }

m.kafkaWritersList.KafkaWritersLock.Lock()
m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
m.kafkaWritersList.KafkaWritersLock.Unlock()

return kafkaWriter
}

func (m *AppState) CleanKafkaWriters() {
fmt.Println("shutdown of kafka readers starts")
for _, r := range m.kafkaWritersList.KafkaWriters {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka writer %v", err)
}
}

fmt.Println("Kafka writers graceful shutdown complete")
}

func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})

m.kafkaReadersList.KafkaReadersLock.Lock()
m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
m.kafkaReadersList.KafkaReadersLock.Unlock()

return kafkaReader
}

func (m *AppState) CleanKafkaReaders() {
for _, r := range m.kafkaReadersList.KafkaReaders {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka reader %v", err)
}
}

fmt.Println("Kafka readers graceful shutdown complete")
} }


// GetBeacons returns thread-safe access to beacons list // GetBeacons returns thread-safe access to beacons list


+ 2
- 0
internal/pkg/config/config.go View File

@@ -12,6 +12,7 @@ type Config struct {
DBPath string DBPath string
KafkaURL string KafkaURL string
RedisURL string RedisURL string
ValkeyURL string
} }


// getEnv returns env var value or a default if not set. // getEnv returns env var value or a default if not set.
@@ -32,5 +33,6 @@ func Load() *Config {
MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"),
DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
ValkeyURL: getEnv("VALKEY_URL", "127.0.0.1:6379"),
} }
} }

+ 20
- 12
internal/pkg/kafkaclient/consumer.go View File

@@ -4,24 +4,32 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"


"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )


func Consume[T any](r *kafka.Reader, ch chan<- T) {
func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for { for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println("error reading message:", err)
continue
}
select {
case <-ctx.Done():
fmt.Println("consumer closed")
return
default:
msg, err := r.ReadMessage(ctx)
if err != nil {
fmt.Println("error reading message:", err)
continue
}


var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}
var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}


ch <- data
ch <- data
}
} }
} }

+ 3
- 0
internal/pkg/kafkaclient/reader.go View File

@@ -6,6 +6,9 @@ import (
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )


// Create Kafka reader
//
// Deprecated: Use context manager object instead
func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",") brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{ return kafka.NewReader(kafka.ReaderConfig{


+ 3
- 0
internal/pkg/kafkaclient/writer.go View File

@@ -6,6 +6,9 @@ import (
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )


// Create Kafka writer
//
// Deprecated: Use context manager object instead
func KafkaWriter(kafkaURL, topic string) *kafka.Writer { func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{ return &kafka.Writer{
Addr: kafka.TCP(kafkaURL), Addr: kafka.TCP(kafkaURL),


+ 12
- 0
internal/pkg/model/types.go View File

@@ -2,6 +2,8 @@ package model


import ( import (
"sync" "sync"

"github.com/segmentio/kafka-go"
) )


// Settings defines configuration parameters for presence detection behavior. // Settings defines configuration parameters for presence detection behavior.
@@ -198,5 +200,15 @@ type ApiUpdate struct {
ID string ID string
} }


type KafkaReadersList struct {
KafkaReadersLock sync.RWMutex
KafkaReaders []*kafka.Reader
}

type KafkaWritersList struct {
KafkaWritersLock sync.RWMutex
KafkaWriters []*kafka.Writer
}

var HTTPHostPathPtr *string var HTTPHostPathPtr *string
var HTTPWSHostPathPtr *string var HTTPWSHostPathPtr *string

Loading…
Cancel
Save