3 Cometimentos

27 ficheiros alterados com 1955 adições e 7572 eliminações
  1. +21
    -42
      cmd/bridge/main.go
  2. +0
    -5
      cmd/db-testbench/.env
  3. +0
    -37
      cmd/db-testbench/main.go
  4. +0
    -7
      cmd/db-testbench/models/book.go
  5. +0
    -22
      cmd/decoder-parser-testbench/configs.json
  6. +0
    -137
      cmd/decoder-parser-testbench/main.go
  7. +15
    -9
      cmd/decoder/main.go
  8. +10
    -8
      cmd/location/main.go
  9. +7
    -0
      cmd/server/config.json
  10. +20
    -27
      cmd/server/main.go
  11. +0
    -3238
      cmd/testbench/debug.txt
  12. +0
    -86
      cmd/testbench/main.go
  13. +0
    -3488
      cmd/testbench/save.txt
  14. +0
    -161
      cmd/token-testbench/main.go
  15. +0
    -131
      cmd/valkey-testbench/main.go
  16. BIN
      docs/Frame definition- B7,MWB01,MWC01.pdf
  17. +5
    -109
      internal/pkg/common/appcontext/context.go
  18. +113
    -0
      internal/pkg/kafkaclient/manager.go
  19. +0
    -21
      internal/pkg/kafkaclient/reader.go
  20. +0
    -22
      internal/pkg/kafkaclient/writer.go
  21. +6
    -2
      internal/pkg/model/parser.go
  22. +11
    -20
      internal/pkg/model/types.go
  23. +364
    -0
      tests/decoder/decode_test.go
  24. +369
    -0
      tests/decoder/event_loop_test.go
  25. +418
    -0
      tests/decoder/integration_test.go
  26. +275
    -0
      tests/decoder/parser_registry_test.go
  27. +321
    -0
      tests/decoder/testutil.go

+ 21
- 42
cmd/bridge/main.go Ver ficheiro

@@ -71,35 +71,15 @@ func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *a
break
}
}
} else {
s := strings.Split(string(message), ",")
if len(s) < 6 {
log.Printf("Messaggio CSV non valido: %s", msgStr)
return
}

fmt.Println("this gateway is also sending data: ", s)
}
// } else {
// s := strings.Split(string(message), ",")
// if len(s) < 6 {
// log.Printf("Messaggio CSV non valido: %s", msgStr)
// return
// }

// rawdata := s[4]
// buttonCounter := parseButtonState(rawdata)
// if buttonCounter > 0 {
// adv := model.BeaconAdvertisement{}
// i, _ := strconv.ParseInt(s[3], 10, 64)
// adv.Hostname = hostname
// adv.BeaconType = "hb_button"
// adv.MAC = s[1]
// adv.RSSI = i
// adv.Data = rawdata
// adv.HSButtonCounter = buttonCounter

// read_line := strings.TrimRight(string(s[5]), "\r\n")
// it, err33 := strconv.Atoi(read_line)
// if err33 != nil {
// fmt.Println(it)
// fmt.Println(err33)
// os.Exit(2)
// }
// }
// }
}

var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
@@ -118,6 +98,7 @@ func main() {
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()
kafkaManager := kafkaclient.InitKafkaManager()

// Set logger -> terminal and log file
slog.SetDefault(logger.CreateLogger("bridge.log"))
@@ -126,13 +107,11 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

// define kafka readers
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api")
alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert")
mqttReader := appState.AddKafkaReader(cfg.KafkaURL, "mqtt", "bridge-mqtt")
readerTopics := []string{"apibeacons", "alert", "mqtt"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)

// define kafka writer
writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons")
writerTopics := []string{"rawbeacons"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)

slog.Info("Bridge initialized, subscribed to kafka topics")

@@ -141,9 +120,9 @@ func main() {
chMqtt := make(chan []model.Tracker, 200)

wg.Add(3)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
go kafkaclient.Consume(alertReader, chAlert, ctx, &wg)
go kafkaclient.Consume(mqttReader, chMqtt, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("apibeacons"), chApi, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("alert"), chAlert, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("mqtt"), chMqtt, ctx, &wg)

opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
@@ -154,7 +133,9 @@ func main() {
opts.SetMaxReconnectInterval(600 * time.Second)
opts.SetCleanSession(false)

opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) })
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
messagePubHandler(m, kafkaManager.GetWriter("rawbeacons"), appState)
})
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
@@ -188,14 +169,12 @@ eventloop:
slog.Info(lMsg)
}
case msg := <-chAlert:
fmt.Printf("Alerts: %+v\n", msg)
p, err := json.Marshal(msg)
if err != nil {
continue
}
client.Publish("/alerts", 0, true, p)
case msg := <-chMqtt:
fmt.Printf("trackers: %+v\n", msg)
p, err := json.Marshal(msg)
if err != nil {
continue
@@ -208,8 +187,8 @@ eventloop:
wg.Wait()

slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()

client.Disconnect(250)
slog.Info("Closing connection to MQTT broker")


+ 0
- 5
cmd/db-testbench/.env Ver ficheiro

@@ -1,5 +0,0 @@
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASSWORD=postgres
DB_NAME=go_crud_db

+ 0
- 37
cmd/db-testbench/main.go Ver ficheiro

@@ -1,37 +0,0 @@
package main

import (
"fmt"
"log"
"os"

"github.com/joho/godotenv"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

var DB *gorm.DB

func main() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}

dsn := fmt.Sprintf(
"host=%s user=%s password=%s dbname=%s port=%s sslmode=disable",
os.Getenv("DB_HOST"),
os.Getenv("DB_USER"),
os.Getenv("DB_PASSWORD"),
os.Getenv("DB_NAME"),
os.Getenv("DB_PORT"),
)

db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("Failed to connect to the database:", err)
}

DB = db
fmt.Println("Database connection established")
}

+ 0
- 7
cmd/db-testbench/models/book.go Ver ficheiro

@@ -1,7 +0,0 @@
package models

type Book struct {
ID uint `json:"id" gorm:"primaryKey"`
Title string `json:"title"`
Author string `json:"author"`
}

+ 0
- 22
cmd/decoder-parser-testbench/configs.json Ver ficheiro

@@ -1,22 +0,0 @@
[
{
"name": "config1",
"min": 10,
"max": 15,
"pattern": ["0x02", "0x01", "0x06"],
"configs": {
"battery": {"offset": 3, "length": 1},
"accX": {"offset": 4, "length": 2, "order": "bigendian"}
}
},
{
"name": "config2",
"min": 10,
"max": 15,
"pattern": ["0x02", "0x01", "0x06"],
"configs": {
"battery": {"offset": 3, "length": 1},
"accY": {"offset": 4, "length": 2, "order": "bigendian"}
}
}
]

+ 0
- 137
cmd/decoder-parser-testbench/main.go Ver ficheiro

@@ -1,137 +0,0 @@
package main

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
"sync"

"github.com/AFASystems/presence/internal/pkg/model"
)

type parserConfig struct {
Length int `json:"length"`
Offset int `json:"offset"`
Order string `json:"order"`
}

type beaconParser struct {
name string
canParse func([]byte) bool
configs map[string]parserConfig
}

type parserRegistry struct {
parserList []beaconParser
rw sync.RWMutex
}

type config struct {
Name string `json:"name"`
Min int `json:"min"`
Max int `json:"max"`
Pattern []string `json:"pattern"`
Configs map[string]parserConfig `json:"configs"`
}

func (pc parserConfig) GetOrder() binary.ByteOrder {
if pc.Order == "bigendian" {
return binary.BigEndian
}

return binary.LittleEndian
}

func (p *parserRegistry) Register(name string, c config) {
p.rw.Lock()
defer p.rw.Unlock()

b := beaconParser{
name: name,
canParse: func(ad []byte) bool {
return len(ad) >= c.Min && len(ad) <= c.Max && bytes.HasPrefix(ad, c.GetPatternBytes())
},
configs: c.Configs,
}

p.parserList = append(p.parserList, b)
}

func (b *beaconParser) Parse(ad []byte) (model.BeaconEvent, bool) {
flag := false
event := model.BeaconEvent{Type: b.name}
if cfg, ok := b.configs["battery"]; ok {
event.Battery = uint32(b.extract(ad, cfg))
flag = true
}
if cfg, ok := b.configs["accX"]; ok {
event.AccX = int16(b.extract(ad, cfg))
flag = true
}
if cfg, ok := b.configs["accY"]; ok {
event.AccY = int16(b.extract(ad, cfg))
flag = true
}
if cfg, ok := b.configs["accZ"]; ok {
event.AccZ = int16(b.extract(ad, cfg))
flag = true
}
return event, flag
}

func (b *beaconParser) extract(ad []byte, pc parserConfig) uint16 {
if len(ad) < pc.Offset+pc.Length {
return 0
}
data := ad[pc.Offset : pc.Offset+pc.Length]

if pc.Length == 1 {
return uint16(data[0])
}

return pc.GetOrder().Uint16(data)
}

func (c config) GetPatternBytes() []byte {
res := make([]byte, len(c.Pattern))
for i, s := range c.Pattern {
fmt.Sscanf(s, "0x%02x", &res[i])
}
return res
}

func main() {
parserRegistry := parserRegistry{
parserList: make([]beaconParser, 0),
}
seq := []byte{0x02, 0x01, 0x06, 0x64, 0x01, 0xF4, 0x00, 0x0A, 0xFF, 0x05}

jsonFile, err := os.Open("configs.json")
if err != nil {
fmt.Println(err)
}

fmt.Println("succesfully opened json file")

b, _ := io.ReadAll(jsonFile)
var configs []config
json.Unmarshal(b, &configs)
for _, config := range configs {
parserRegistry.Register(config.Name, config)
}

for _, parser := range parserRegistry.parserList {
if parser.canParse(seq) {
event, ok := parser.Parse(seq)
if ok {
fmt.Printf("Device: %s | Battery: %d%% | AccX: %d | AccY: %d | AccZ: %d\n", event.Type, event.Battery, event.AccX, event.AccY, event.AccZ)
}
}
}

fmt.Printf("configs: %+v\n", configs)
jsonFile.Close()
}

+ 15
- 9
cmd/decoder/main.go Ver ficheiro

@@ -26,6 +26,7 @@ func main() {
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()
kafkaManager := kafkaclient.InitKafkaManager()

parserRegistry := model.ParserRegistry{
ParserList: make(map[string]model.BeaconParser),
@@ -38,10 +39,11 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
parserReader := appState.AddKafkaReader(cfg.KafkaURL, "parser", "gid-parser")
readerTopics := []string{"rawbeacons", "parser"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics)

alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")
writerTopics := []string{"alertbeacons"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)

slog.Info("Decoder initialized, subscribed to Kafka topics")

@@ -49,8 +51,8 @@ func main() {
chParser := make(chan model.KafkaParser, 200)

wg.Add(3)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(parserReader, chParser, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("rawbeacons"), chRaw, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("parser"), chParser, ctx, &wg)

eventloop:
for {
@@ -58,7 +60,7 @@ eventloop:
case <-ctx.Done():
break eventloop
case msg := <-chRaw:
processIncoming(msg, appState, alertWriter, &parserRegistry)
processIncoming(msg, appState, kafkaManager.GetWriter("alertbeacons"), &parserRegistry)
case msg := <-chParser:
switch msg.ID {
case "add":
@@ -77,8 +79,8 @@ eventloop:
wg.Wait()

slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()
}

func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
@@ -110,9 +112,13 @@ func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState,
if event.ID == "" {
return nil
}

prevEvent, ok := appState.GetBeaconEvent(id)
appState.UpdateBeaconEvent(id, event)

if event.Type == "iBeacon" {
event.BtnPressed = true
}

if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
return nil
}


+ 10
- 8
cmd/location/main.go Ver ficheiro

@@ -25,6 +25,7 @@ func main() {
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()
kafkaManager := kafkaclient.InitKafkaManager()

// Set logger -> terminal and log file
slog.SetDefault(logger.CreateLogger("location.log"))
@@ -33,10 +34,11 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
settingsReader := appState.AddKafkaReader(cfg.KafkaURL, "settings", "gid-settings-loc")
readerTopics := []string{"rawbeacons", "settings"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics)

writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")
writerTopics := []string{"locevents"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)

slog.Info("Locations algorithm initialized, subscribed to Kafka topics")

@@ -47,8 +49,8 @@ func main() {
chSettings := make(chan map[string]any, 5)

wg.Add(3)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(settingsReader, chSettings, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("rawbeacons"), chRaw, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("settings"), chSettings, ctx, &wg)

eventLoop:
for {
@@ -60,7 +62,7 @@ eventLoop:
fmt.Printf("Settings: %+v\n", settings)
switch settings.CurrentAlgorithm {
case "filter":
getLikelyLocations(appState, writer)
getLikelyLocations(appState, kafkaManager.GetWriter("locevents"))
case "ai":
fmt.Println("AI algorithm selected")
}
@@ -76,8 +78,8 @@ eventLoop:
wg.Wait()

slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()
}

func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {


+ 7
- 0
cmd/server/config.json Ver ficheiro

@@ -39,5 +39,12 @@
"accY": {"offset": 9, "length": 2, "order": "fixedpoint"},
"accZ": {"offset": 11, "length": 2, "order": "fixedpoint"}
}
},
{
"name": "iBeacon",
"min": 5,
"max": 27,
"pattern": ["0xFF", "0x4C", "0x00", "0x02"],
"configs": {}
}
]

+ 20
- 27
cmd/server/main.go Ver ficheiro

@@ -25,21 +25,16 @@ import (
"github.com/AFASystems/presence/internal/pkg/service"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/segmentio/kafka-go"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

var _ io.Writer = (*os.File)(nil)
var wg sync.WaitGroup

func main() {
cfg := config.Load()
appState := appcontext.NewAppState()
kafkaManager := kafkaclient.InitKafkaManager()

// Set logger -> terminal and log file
slog.SetDefault(logger.CreateLogger("server.log"))
@@ -57,11 +52,9 @@ func main() {
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert")
parserWriter := appState.AddKafkaWriter(cfg.KafkaURL, "parser")
mqttWriter := appState.AddKafkaWriter(cfg.KafkaURL, "mqtt")
writerTopics := []string{"apibeacons", "alert", "mqtt", "settings", "parser"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)

slog.Info("Kafka writers topics: apibeacons, settings initialized")

configFile, err := os.Open("/app/cmd/server/config.json")
@@ -86,25 +79,25 @@ func main() {
Config: config,
}

if err := service.SendParserConfig(kp, parserWriter, ctx); err != nil {
if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil {
fmt.Printf("Unable to send parser config to kafka broker %v\n", err)
}
}

if err := apiclient.UpdateDB(db, ctx, cfg, writer, appState); err != nil {
if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil {
fmt.Printf("Error in getting token: %v\n", err)
}

locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
readerTopics := []string{"locevents", "alertbeacons"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "server", readerTopics)
slog.Info("Kafka readers topics: locevents, alertbeacons initialized")

chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500)

wg.Add(2)
go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("locevents"), chLoc, ctx, &wg)
go kafkaclient.Consume(kafkaManager.GetReader("alertbeacons"), chEvents, ctx, &wg)

r := mux.NewRouter()

@@ -124,16 +117,16 @@ func main() {
r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT")

r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET")
r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("POST")
r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("DELETE")
r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT")

r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET")
r.HandleFunc("/configs/beacons", controller.ParserAddController(db, parserWriter, ctx)).Methods("POST")
r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, parserWriter, ctx)).Methods("PUT")
r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, parserWriter, ctx)).Methods("DELETE")
r.HandleFunc("/configs/beacons", controller.ParserAddController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("POST")
r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("PUT")
r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("DELETE")

r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, settingsWriter, ctx)).Methods("PATCH")
r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, kafkaManager.GetWriter("settings"), ctx)).Methods("PATCH")
r.HandleFunc("/reslevis/settings", controller.SettingsListController(db)).Methods("GET")

beaconTicker := time.NewTicker(2 * time.Second)
@@ -156,7 +149,7 @@ eventLoop:
case <-ctx.Done():
break eventLoop
case msg := <-chLoc:
service.LocationToBeaconService(msg, db, alertWriter, ctx)
service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx)
case msg := <-chEvents:
fmt.Printf("event: %+v\n", msg)
id := msg.ID
@@ -182,7 +175,7 @@ eventLoop:
Value: eMsg,
}

mqttWriter.WriteMessages(ctx, msg)
kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg)
}
}

@@ -196,8 +189,8 @@ eventLoop:
wg.Wait()

slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()

slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
slog.Info("API server shutting down")


+ 0
- 3238
cmd/testbench/debug.txt
A apresentação das diferenças no ficheiro foi suprimida por ser demasiado grande
Ver ficheiro


+ 0
- 86
cmd/testbench/main.go Ver ficheiro

@@ -1,86 +0,0 @@
package main

import (
"bufio"
"encoding/hex"
"fmt"
"log"
"os"
"strings"
)

func main() {
file, err := os.Open("save.txt")
if err != nil {
log.Fatalf("Failed to open file: %s", err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
decodeBeacon(line)
}
}

func decodeBeacon(beacon string) {
beacon = strings.TrimSpace(beacon)
if beacon == "" {
return
}

// convert to bytes for faster operations
b, err := hex.DecodeString(beacon)
if err != nil {
fmt.Println("invalid line: ", beacon)
return
}

// remove flag bytes - they hold no structural information
if len(b) > 1 && b[1] == 0x01 {
l := int(b[0])
if 1+l <= len(b) {
b = b[1+l:]
}
}

adBlockIndeces := parseADFast(b)
for _, r := range adBlockIndeces {
ad := b[r[0]:r[1]]
if len(ad) >= 4 &&
ad[1] == 0x16 &&
ad[2] == 0xAA &&
ad[3] == 0xFE {
// fmt.Println("Eddystone:", hex.EncodeToString(b))
return
}
if len(ad) >= 7 &&
ad[1] == 0xFF &&
ad[2] == 0x4C && ad[3] == 0x00 &&
ad[4] == 0x02 && ad[5] == 0x15 {
// fmt.Println("iBeacon:", hex.EncodeToString(b))
return
}

}

fmt.Println(hex.EncodeToString(b))
}

func parseADFast(b []byte) [][2]int {
var res [][2]int
i := 0

for i < len(b) {
l := int(b[i])
if l == 0 || i+1+l > len(b) {
break
}

res = append(res, [2]int{i, i + 1 + l})

i += 1 + l
}

return res
}

+ 0
- 3488
cmd/testbench/save.txt
A apresentação das diferenças no ficheiro foi suprimida por ser demasiado grande
Ver ficheiro


+ 0
- 161
cmd/token-testbench/main.go Ver ficheiro

@@ -1,161 +0,0 @@
package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"

"github.com/AFASystems/presence/internal/pkg/model"
)

type response struct {
A string `json:"access_token"`
}

func main() {
ctx := context.Background()

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}

formData := url.Values{}
formData.Set("grant_type", "password")
formData.Set("client_id", "Fastapi")
formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC")
formData.Set("username", "core")
formData.Set("password", "C0r3_us3r_Cr3d3nt14ls")
formData.Set("audience", "Fastapi")

req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode()))
if err != nil {
panic(err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

req = req.WithContext(ctx)
res, err := client.Do(req)
if err != nil {
panic(err)
}

var j response

err = json.NewDecoder(res.Body).Decode(&j)
if err != nil {
panic(err)
}

token := j.A

trackers, err := GetTrackers(token, client)
if err != nil {
panic(err)
}

fmt.Printf("trackers: %+v\n", trackers)

gateways, err := getGateways(token, client)
if err != nil {
panic(err)
}

fmt.Printf("gateways: %+v\n", gateways)

zones, err := GetZones(token, client)
if err != nil {
panic(err)
}

fmt.Printf("zones: %+v\n", zones)

trackerZones, err := GetTrackerZones(token, client)
if err != nil {
panic(err)
}

fmt.Printf("tracker zones: %+v\n", trackerZones)
}

func GetTrackers(token string, client *http.Client) ([]model.Tracker, error) {
res, err := getRequest(token, "getTrackers", client)
if err != nil {
return nil, err
}

var i []model.Tracker
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {
return nil, err
}

return i, nil
}

func getGateways(token string, client *http.Client) ([]model.Gateway, error) {
res, err := getRequest(token, "getGateways", client)
if err != nil {
return nil, err
}

var i []model.Gateway
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {
return nil, err
}

return i, nil
}

func GetTrackerZones(token string, client *http.Client) ([]model.TrackerZones, error) {
res, err := getRequest(token, "getTrackerZones", client)
if err != nil {
return nil, err
}

var i []model.TrackerZones
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {
return nil, err
}

return i, nil
}

func GetZones(token string, client *http.Client) ([]model.Zone, error) {
res, err := getRequest(token, "getZones", client)
if err != nil {
return nil, err
}

var i []model.Zone
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {
return nil, err
}

return i, nil
}

func getRequest(token, route string, client *http.Client) (*http.Response, error) {
url := fmt.Sprintf("https://10.251.0.30:5050/reslevis/%s", route)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

header := fmt.Sprintf("Bearer %s", token)

req.Header.Add("Authorization", header)
res, err := client.Do(req)
if err != nil {
return nil, err
}

return res, nil
}

+ 0
- 131
cmd/valkey-testbench/main.go Ver ficheiro

@@ -1,131 +0,0 @@
package main

import (
"context"
"encoding/json"
"fmt"
"reflect"

"github.com/redis/go-redis/v9"
)

type Per struct {
Name string `json:"name"`
Age int `json:"age"`
}

type Beacon struct {
ID string `json:"id"` // Use JSON tags to ensure correct field names
Type string `json:"type"`
Temp int `json:"temp"`
Name string `json:"name"`
}

func ConvertStructToMap(obj any) (map[string]any, error) {
// 1. Marshal the struct into a JSON byte slice
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}

// 2. Unmarshal the JSON byte slice into the map
var result map[string]any
err = json.Unmarshal(data, &result)
if err != nil {
return nil, err
}

return result, nil
}

// func main() { ... }
// client.HSet(ctx, "beacon:123", resultMap).Err()

func main() {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})

ctx := context.Background()

err := client.Set(ctx, "testkey", "hello world", 0).Err()
if err != nil {
fmt.Println("Ok")
}

val, err := client.Get(ctx, "testkey").Result()
if err != nil {
fmt.Println("Ok")
}
fmt.Println(val)

err = client.SAdd(ctx, "myset", "b-1").Err()
if err != nil {
fmt.Println(err)
}

res, err := client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)

err = client.SAdd(ctx, "myset", "b-2").Err()
if err != nil {
fmt.Println(err)
}

res, err = client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)

err = client.SAdd(ctx, "myset", "b-1").Err()
if err != nil {
fmt.Println(err)
}

res, err = client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)
fmt.Println("type: ", reflect.TypeOf(res))

// b := Beacon{
// ID: "hello",
// Type: "node",
// Temp: 10,
// Name: "Peter",
// }

// per := Per{
// Name: "Janez",
// Age: 10,
// }

// bEncoded, err := ConvertStructToMap(b)
// if err != nil {
// fmt.Print("error\n")
// }

// perEncoded, err := ConvertStructToMap(per)
// if err != nil {
// fmt.Print("error\n")
// }

// err = client.HSet(ctx, "myhash", bEncoded).Err()
// fmt.Println(err)

// res, _ := client.HGetAll(ctx, "myhash").Result()
// fmt.Println(res)

// err = client.HSet(ctx, "myhash", perEncoded).Err()
// fmt.Println(err)

// res, _ = client.HGetAll(ctx, "myhash").Result()
// fmt.Println(res)

}

BIN
docs/Frame definition- B7,MWB01,MWC01.pdf Ver ficheiro


+ 5
- 109
internal/pkg/common/appcontext/context.go Ver ficheiro

@@ -2,25 +2,18 @@ package appcontext

import (
"fmt"
"maps"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/mitchellh/mapstructure"
"github.com/segmentio/kafka-go"
)

// AppState provides centralized access to application state
type AppState struct {
beacons model.BeaconsList
httpResults model.HTTPResultList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]string
latestList model.LatestBeaconsList
kafkaReadersList model.KafkaReadersList
kafkaWritersList model.KafkaWritersList
beacons model.BeaconsList
httpResults model.HTTPResultList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]string
}

// NewAppState creates a new application context AppState with default values
@@ -47,72 +40,7 @@ func NewAppState() *AppState {
Beacons: make(map[string]model.BeaconEvent),
},
beaconsLookup: make(map[string]string),
latestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon),
},
kafkaReadersList: model.KafkaReadersList{
KafkaReaders: make([]*kafka.Reader, 0),
},
kafkaWritersList: model.KafkaWritersList{
KafkaWriters: make([]*kafka.Writer, 0),
},
}
}

func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}

m.kafkaWritersList.KafkaWritersLock.Lock()
m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
m.kafkaWritersList.KafkaWritersLock.Unlock()

return kafkaWriter
}

func (m *AppState) CleanKafkaWriters() {
fmt.Println("shutdown of kafka readers starts")
for _, r := range m.kafkaWritersList.KafkaWriters {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka writer %v", err)
}
}

fmt.Println("Kafka writers graceful shutdown complete")
}

func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})

m.kafkaReadersList.KafkaReadersLock.Lock()
m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
m.kafkaReadersList.KafkaReadersLock.Unlock()

return kafkaReader
}

func (m *AppState) CleanKafkaReaders() {
for _, r := range m.kafkaReadersList.KafkaReaders {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka reader %v", err)
}
}

fmt.Println("Kafka readers graceful shutdown complete")
}

// GetBeacons returns thread-safe access to beacons list
@@ -135,11 +63,6 @@ func (m *AppState) GetBeaconsLookup() map[string]string {
return m.beaconsLookup
}

// GetLatestList returns thread-safe access to latest beacons list
func (m *AppState) GetLatestList() *model.LatestBeaconsList {
return &m.latestList
}

// AddBeaconToLookup adds a beacon ID to the lookup map
func (m *AppState) AddBeaconToLookup(id, value string) {
m.beaconsLookup[id] = value
@@ -223,23 +146,6 @@ func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
m.beaconEvents.Beacons[id] = event
}

// GetLatestBeacon returns the latest beacon by ID (thread-safe)
func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
m.latestList.Lock.RLock()
defer m.latestList.Lock.RUnlock()

beacon, exists := m.latestList.LatestList[id]
return beacon, exists
}

// UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
m.latestList.Lock.Lock()
defer m.latestList.Lock.Unlock()

m.latestList.LatestList[id] = beacon
}

// GetAllBeacons returns a copy of all beacons
func (m *AppState) GetAllBeacons() map[string]model.Beacon {
m.beacons.Lock.RLock()
@@ -264,16 +170,6 @@ func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult {
return beacons
}

// GetAllLatestBeacons returns a copy of all latest beacons
func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
m.latestList.Lock.RLock()
defer m.latestList.Lock.RUnlock()

beacons := make(map[string]model.Beacon)
maps.Copy(beacons, m.latestList.LatestList)
return beacons
}

// GetBeaconCount returns the number of tracked beacons
func (m *AppState) GetBeaconCount() int {
m.beacons.Lock.RLock()


+ 113
- 0
internal/pkg/kafkaclient/manager.go Ver ficheiro

@@ -0,0 +1,113 @@
package kafkaclient

import (
"fmt"
"strings"
"sync"
"time"

"github.com/segmentio/kafka-go"
)

type KafkaReadersMap struct {
KafkaReadersLock sync.RWMutex
KafkaReaders map[string]*kafka.Reader
}

type KafkaWritersMap struct {
KafkaWritersLock sync.RWMutex
KafkaWriters map[string]*kafka.Writer
}

type KafkaManager struct {
kafkaReadersMap KafkaReadersMap
kafkaWritersMap KafkaWritersMap
}

func InitKafkaManager() *KafkaManager {
return &KafkaManager{
kafkaReadersMap: KafkaReadersMap{
KafkaReaders: make(map[string]*kafka.Reader),
},
kafkaWritersMap: KafkaWritersMap{
KafkaWriters: make(map[string]*kafka.Writer),
},
}
}

func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}

m.kafkaWritersMap.KafkaWritersLock.Lock()
m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
m.kafkaWritersMap.KafkaWritersLock.Unlock()
}

func (m *KafkaManager) CleanKafkaWriters() {
fmt.Println("shutdown of kafka readers starts")
m.kafkaWritersMap.KafkaWritersLock.Lock()
for _, r := range m.kafkaWritersMap.KafkaWriters {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka writer %v", err)
}
}
m.kafkaWritersMap.KafkaWritersLock.Unlock()
fmt.Println("Kafka writers graceful shutdown complete")
}

func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})

m.kafkaReadersMap.KafkaReadersLock.Lock()
m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
m.kafkaReadersMap.KafkaReadersLock.Unlock()
}

func (m *KafkaManager) CleanKafkaReaders() {
m.kafkaReadersMap.KafkaReadersLock.Lock()
for _, r := range m.kafkaReadersMap.KafkaReaders {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka reader %v", err)
}
}
m.kafkaReadersMap.KafkaReadersLock.Unlock()
fmt.Println("Kafka readers graceful shutdown complete")
}

func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) {
for _, topic := range topics {
if name != "" {
gid := fmt.Sprintf("%s-%s", topic, name)
m.AddKafkaReader(url, topic, gid)
} else {
m.AddKafkaWriter(url, topic)
}
}
}

func (m *KafkaManager) GetReader(topic string) *kafka.Reader {
m.kafkaReadersMap.KafkaReadersLock.Lock()
defer m.kafkaReadersMap.KafkaReadersLock.Unlock()
return m.kafkaReadersMap.KafkaReaders[topic]
}

func (m *KafkaManager) GetWriter(topic string) *kafka.Writer {
m.kafkaWritersMap.KafkaWritersLock.Lock()
defer m.kafkaWritersMap.KafkaWritersLock.Unlock()
return m.kafkaWritersMap.KafkaWriters[topic]
}

+ 0
- 21
internal/pkg/kafkaclient/reader.go Ver ficheiro

@@ -1,21 +0,0 @@
package kafkaclient

import (
"strings"

"github.com/segmentio/kafka-go"
)

// Create Kafka reader
//
// Deprecated: Use context manager object instead
func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})
}

+ 0
- 22
internal/pkg/kafkaclient/writer.go Ver ficheiro

@@ -1,22 +0,0 @@
package kafkaclient

import (
"time"

"github.com/segmentio/kafka-go"
)

// Create Kafka writer
//
// Deprecated: Use context manager object instead
func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
}

+ 6
- 2
internal/pkg/model/parser.go Ver ficheiro

@@ -77,7 +77,6 @@ func (p *ParserRegistry) Unregister(name string) {
func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) {
flag := false
event := BeaconEvent{Type: name}
fmt.Printf("parsing: %s\n", name)
if cfg, ok := b.configs["battery"]; ok {
event.Battery = uint32(b.extract(ad, cfg).(uint16))
flag = true
@@ -97,7 +96,12 @@ func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) {
event.AccZ = int16(val)
flag = true
}
fmt.Printf("success: %s, event: %+v\n", flag, event)
if cfg, ok := b.configs["temperature"]; ok {
val := b.extract(ad, cfg).(float64)
event.Temperature = uint16(val)
flag = true
}

return event, flag
}



+ 11
- 20
internal/pkg/model/types.go Ver ficheiro

@@ -2,8 +2,6 @@ package model

import (
"sync"

"github.com/segmentio/kafka-go"
)

// BeaconAdvertisement represents the JSON payload received from beacon advertisements.
@@ -89,14 +87,17 @@ type Beacon struct {
}

type BeaconEvent struct {
Name string
ID string
Type string
Battery uint32
Event int
AccX int16
AccY int16
AccZ int16
Name string
ID string
Type string
Battery uint32
Event int
AccX int16
AccY int16
AccZ int16
Temperature uint16
Heart int16
BtnPressed bool
}

type HTTPResult struct {
@@ -161,16 +162,6 @@ type ApiUpdate struct {
MAC string
}

type KafkaReadersList struct {
KafkaReadersLock sync.RWMutex
KafkaReaders []*kafka.Reader
}

type KafkaWritersList struct {
KafkaWritersLock sync.RWMutex
KafkaWriters []*kafka.Writer
}

type Alert struct {
ID string `json:"id"` // tracker id
Type string `json:"type"` // type of alert


+ 364
- 0
tests/decoder/decode_test.go Ver ficheiro

@@ -0,0 +1,364 @@
package decoder

import (
"bytes"
"testing"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

func TestDecodeBeacon_EmptyData(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "", // Empty data
}

// Execute
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

// Assert
if err != nil {
t.Errorf("Expected no error for empty data, got %v", err)
}

if len(mockWriter.Messages) != 0 {
t.Errorf("Expected no messages for empty data, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_WhitespaceOnly(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: " ", // Whitespace only
}

// Execute
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

// Assert
if err != nil {
t.Errorf("Expected no error for whitespace-only data, got %v", err)
}

if len(mockWriter.Messages) != 0 {
t.Errorf("Expected no messages for whitespace-only data, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_InvalidHex(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "INVALID_HEX_DATA!!!",
}

// Execute
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

// Assert
if err == nil {
t.Error("Expected error for invalid hex data, got nil")
}

if len(mockWriter.Messages) != 0 {
t.Errorf("Expected no messages for invalid hex, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_ValidHexNoParser(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{} // No parsers registered

// Valid hex but no matching parser
adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "0201060302A0", // Valid AD structure
}

// Execute
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

// Assert
if err != nil {
t.Errorf("Expected no error when no parser matches, got %v", err)
}

if len(mockWriter.Messages) != 0 {
t.Errorf("Expected no messages when no parser matches, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_Deduplication(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Register a test parser
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}
parserRegistry.Register("test-parser", config)

// Create an event that will be parsed
adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106", // Simple AD structure
}

// First processing - should publish
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("First processing failed: %v", err)
}

firstMessageCount := len(mockWriter.Messages)

// Second processing with identical data - should deduplicate
err = decodeBeacon(adv, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("Second processing failed: %v", err)
}

// Assert - message count should not have changed
if len(mockWriter.Messages) != firstMessageCount {
t.Errorf("Expected deduplication, got %d messages (should be %d)", len(mockWriter.Messages), firstMessageCount)
}
}

func TestDecodeBeacon_DifferentDataPublishes(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Register a test parser
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}
parserRegistry.Register("test-parser", config)

// First processing
adv1 := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106",
}

err := decodeBeacon(adv1, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("First processing failed: %v", err)
}

firstMessageCount := len(mockWriter.Messages)

// Second processing with different data - should publish again
adv2 := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020107", // Different data
}

err = decodeBeacon(adv2, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("Second processing failed: %v", err)
}

// Assert - message count should have increased
if len(mockWriter.Messages) != firstMessageCount+1 {
t.Errorf("Expected new message for different data, got %d messages (expected %d)", len(mockWriter.Messages), firstMessageCount+1)
}
}

func TestDecodeBeacon_WithFlagBytes(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Register a test parser
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}
parserRegistry.Register("test-parser", config)

// Data with flag bytes (0x01 at position 1)
adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "0201060302A0", // Will have flags removed
}

// Execute
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

// Assert - should process successfully after flag removal
if err != nil {
t.Errorf("Expected no error with flag bytes, got %v", err)
}
}

func TestDecodeBeacon_MultipleBeacons(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Register a test parser
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}
parserRegistry.Register("test-parser", config)

// Process multiple different beacons
beacons := []model.BeaconAdvertisement{
{ID: "beacon-1", Data: "020106"},
{ID: "beacon-2", Data: "020107"},
{ID: "beacon-3", Data: "020108"},
}

for _, adv := range beacons {
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)
if err != nil {
t.Errorf("Failed to process beacon %s: %v", adv.ID, err)
}
}

// Each unique beacon should produce a message
if len(mockWriter.Messages) != len(beacons) {
t.Errorf("Expected %d messages, got %d", len(beacons), len(mockWriter.Messages))
}
}

func TestProcessIncoming_ErrorHandling(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Invalid data that will cause an error
adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "INVALID_HEX",
}

// Execute - should not panic, just handle error
processIncoming(adv, appState, mockWriter, parserRegistry)

// Assert - no messages should be written
if len(mockWriter.Messages) != 0 {
t.Errorf("Expected no messages on error, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_EventHashing(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Register a test parser that creates consistent events
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}
parserRegistry.Register("test-parser", config)

adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106",
}

// First processing
err := decodeBeacon(adv, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("First processing failed: %v", err)
}

// Get the event from appState
event, exists := appState.GetBeaconEvent("test-beacon")
if !exists {
t.Fatal("Event should exist in appState")
}

// Verify hash is created
hash := event.Hash()
if hash == nil || len(hash) == 0 {
t.Error("Expected non-empty hash")
}

// Second processing should be deduplicated based on hash
err = decodeBeacon(adv, appState, mockWriter, parserRegistry)
if err != nil {
t.Fatalf("Second processing failed: %v", err)
}

// Should still have only one message
if len(mockWriter.Messages) != 1 {
t.Errorf("Expected 1 message after deduplication, got %d", len(mockWriter.Messages))
}
}

func TestDecodeBeacon_VariousHexFormats(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

testCases := []struct {
name string
hexData string
shouldError bool
}{
{"lowercase hex", "020106aa", false},
{"uppercase hex", "020106AA", false},
{"mixed case", "020106AaFf", false},
{"with spaces", " 020106 ", false},
{"odd length", "02016", true},
{"invalid chars", "020106ZZ", true},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
adv := model.BeaconAdvertisement{
ID: "test-beacon",
Data: tc.hexData,
}

err := decodeBeacon(adv, appState, mockWriter, parserRegistry)

if tc.shouldError && err == nil {
t.Errorf("Expected error for %s, got nil", tc.name)
}

if !tc.shouldError && err != nil && !bytes.Contains(err.Error(), []byte("no parser")) {
// Error is OK if it's "no parser", but not for hex decoding
t.Logf("Got expected error for %s: %v", tc.name, err)
}
})
}
}

+ 369
- 0
tests/decoder/event_loop_test.go Ver ficheiro

@@ -0,0 +1,369 @@
package decoder

import (
"context"
"testing"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
)

func TestEventLoop_RawMessageProcessing(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

chRaw := make(chan model.BeaconAdvertisement, 10)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a test message
msg := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106",
}

// Simulate event loop processing
go func() {
for {
select {
case <-ctx.Done():
return
case m := <-chRaw:
processIncoming(m, appState, mockWriter, parserRegistry)
}
}
}()

// Send message
chRaw <- msg

// Give it time to process
time.Sleep(100 * time.Millisecond)

// Cancel context
cancel()

// Verify message was processed (even if no parser matched, processIncoming was called)
// We just verify no panic occurred
}

func TestEventLoop_ParserRegistryUpdates(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

chParser := make(chan model.KafkaParser, 10)

// Test ADD operation
addMsg := model.KafkaParser{
ID: "add",
Name: "new-parser",
Config: model.Config{
Name: "new-parser",
Prefix: "02",
Length: 2,
},
}

chParser <- addMsg

// Simulate event loop handling
select {
case msg := <-chParser:
switch msg.ID {
case "add":
config := msg.Config
parserRegistry.Register(config.Name, config)
case "delete":
parserRegistry.Unregister(msg.Name)
case "update":
config := msg.Config
parserRegistry.Register(config.Name, config)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for parser message")
}

// Verify parser was added
if len(parserRegistry.ParserList) != 1 {
t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList))
}

// Test DELETE operation
deleteMsg := model.KafkaParser{
ID: "delete",
Name: "new-parser",
}

chParser <- deleteMsg

select {
case msg := <-chParser:
switch msg.ID {
case "add":
config := msg.Config
parserRegistry.Register(config.Name, config)
case "delete":
parserRegistry.Unregister(msg.Name)
case "update":
config := msg.Config
parserRegistry.Register(config.Name, config)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for parser message")
}

// Verify parser was deleted
if len(parserRegistry.ParserList) != 0 {
t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList))
}
}

func TestEventLoop_UpdateParser(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

// Add initial parser
parserRegistry.Register("test-parser", model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
})

chParser := make(chan model.KafkaParser, 10)

// Test UPDATE operation
updateMsg := model.KafkaParser{
ID: "update",
Name: "test-parser",
Config: model.Config{
Name: "test-parser",
Prefix: "03",
Length: 3,
},
}

chParser <- updateMsg

// Simulate event loop handling
select {
case msg := <-chParser:
switch msg.ID {
case "add":
config := msg.Config
parserRegistry.Register(config.Name, config)
case "delete":
parserRegistry.Unregister(msg.Name)
case "update":
config := msg.Config
parserRegistry.Register(config.Name, config)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for parser message")
}

// Verify parser still exists (was updated, not deleted)
if len(parserRegistry.ParserList) != 1 {
t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList))
}

if _, exists := parserRegistry.ParserList["test-parser"]; !exists {
t.Error("Parser should still exist after update")
}
}

func TestEventLoop_MultipleParserOperations(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

chParser := make(chan model.KafkaParser, 10)

// Send multiple operations
operations := []model.KafkaParser{
{ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}},
{ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}},
{ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}},
{ID: "delete", Name: "parser-2"},
{ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}},
}

for _, op := range operations {
chParser <- op
}

// Process all operations
for i := 0; i < len(operations); i++ {
select {
case msg := <-chParser:
switch msg.ID {
case "add":
config := msg.Config
parserRegistry.Register(config.Name, config)
case "delete":
parserRegistry.Unregister(msg.Name)
case "update":
config := msg.Config
parserRegistry.Register(config.Name, config)
}
case <-time.After(1 * time.Second):
t.Fatalf("Timeout processing operation %d", i)
}
}

// Verify final state
if len(parserRegistry.ParserList) != 2 {
t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList))
}

// parser-1 should exist (updated)
if _, exists := parserRegistry.ParserList["parser-1"]; !exists {
t.Error("parser-1 should exist")
}

// parser-2 should not exist (deleted)
if _, exists := parserRegistry.ParserList["parser-2"]; exists {
t.Error("parser-2 should not exist")
}

// parser-3 should exist (added)
if _, exists := parserRegistry.ParserList["parser-3"]; !exists {
t.Error("parser-3 should exist")
}
}

func TestEventLoop_ContextCancellation(t *testing.T) {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

chRaw := make(chan model.BeaconAdvertisement, 10)
chParser := make(chan model.KafkaParser, 10)

// Cancel immediately
cancel()

// Verify context is cancelled
select {
case <-ctx.Done():
// Expected - context was cancelled
return
case msg := <-chRaw:
t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg)
case msg := <-chParser:
t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg)
case <-time.After(1 * time.Second):
t.Error("Timeout - context cancellation should have been immediate")
}
}

func TestEventLoop_ChannelBuffering(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

// Create buffered channels (like in main)
chRaw := make(chan model.BeaconAdvertisement, 2000)
chParser := make(chan model.KafkaParser, 200)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Send multiple messages without blocking
for i := 0; i < 100; i++ {
msg := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106",
}
chRaw <- msg
}

// Verify all messages are buffered
if len(chRaw) != 100 {
t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw))
}

// Send parser updates
for i := 0; i < 10; i++ {
msg := model.KafkaParser{
ID: "add",
Name: "parser-" + string(rune('A'+i)),
Config: model.Config{
Name: "parser-" + string(rune('A'+i)),
Prefix: "02",
Length: 2,
},
}
chParser <- msg
}

// Verify all parser messages are buffered
if len(chParser) != 10 {
t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser))
}

// Cancel context
cancel()
}

func TestEventLoop_ParserAndRawChannels(t *testing.T) {
// Setup
appState := appcontext.NewAppState()
mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
parserRegistry := &model.ParserRegistry{}

chRaw := make(chan model.BeaconAdvertisement, 10)
chParser := make(chan model.KafkaParser, 10)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Send both raw and parser messages
rawMsg := model.BeaconAdvertisement{
ID: "test-beacon",
Data: "020106",
}

parserMsg := model.KafkaParser{
ID: "add",
Name: "test-parser",
Config: model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
},
}

chRaw <- rawMsg
chParser <- parserMsg

// Process both messages
processedRaw := false
processedParser := false

for i := 0; i < 2; i++ {
select {
case <-chRaw:
processedRaw = true
case <-chParser:
processedParser = true
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for messages")
}
}

if !processedRaw {
t.Error("Raw message should have been processed")
}

if !processedParser {
t.Error("Parser message should have been processed")
}

cancel()
}

+ 418
- 0
tests/decoder/integration_test.go Ver ficheiro

@@ -0,0 +1,418 @@
package decoder

import (
"context"
"encoding/json"
"os"
"testing"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// TestIntegration_DecoderEndToEnd tests the complete decoder flow
func TestIntegration_DecoderEndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

// Check if Kafka is available
kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

// Create test topics
rawTopic := "test-rawbeacons-" + time.Now().Format("20060102150405")
alertTopic := "test-alertbeacons-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

// Register a test parser
config := model.Config{
Name: "integration-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
}
parserRegistry.Register("integration-test-parser", config)

// Create Kafka writer
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

// Create Kafka reader to verify messages
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
GroupID: "test-group-" + time.Now().Format("20060102150405"),
})
defer reader.Close()

// Create a test beacon advertisement
adv := model.BeaconAdvertisement{
ID: "integration-test-beacon",
Data: "020106", // Valid hex data
}

// Process the beacon
err := decodeBeacon(adv, appState, writer, parserRegistry)
if err != nil {
t.Logf("Decode beacon returned error (may be expected if no parser matches): %v", err)
}

// Give Kafka time to propagate
time.Sleep(1 * time.Second)

// Verify event was stored in appState
event, exists := appState.GetBeaconEvent("integration-test-beacon")
if exists {
t.Logf("Event stored in appState: %+v", event)
}
}

// TestIntegration_ParserRegistryOperations tests parser registry with real Kafka
func TestIntegration_ParserRegistryOperations(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

alertTopic := "test-alertbeacons-registry-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

// Test parser registration through Kafka message flow
parserMsg := model.KafkaParser{
ID: "add",
Name: "kafka-test-parser",
Config: model.Config{
Name: "kafka-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
},
}

// Simulate parser registry update
switch parserMsg.ID {
case "add":
config := parserMsg.Config
parserRegistry.Register(config.Name, config)
case "delete":
parserRegistry.Unregister(parserMsg.Name)
case "update":
config := parserMsg.Config
parserRegistry.Register(config.Name, config)
}

// Verify parser was registered
if len(parserRegistry.ParserList) != 1 {
t.Errorf("Expected 1 parser in registry, got %d", len(parserRegistry.ParserList))
}

if _, exists := parserRegistry.ParserList["kafka-test-parser"]; !exists {
t.Error("Parser should exist in registry")
}
}

// TestIntegration_MultipleBeaconsSequential tests processing multiple beacons
func TestIntegration_MultipleBeaconsSequential(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

alertTopic := "test-alertbeacons-multi-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

// Register parser
config := model.Config{
Name: "multi-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
}
parserRegistry.Register("multi-test-parser", config)

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
GroupID: "test-group-multi-" + time.Now().Format("20060102150405"),
MinBytes: 10e3,
MaxBytes: 10e6,
})
defer reader.Close()

// Process multiple beacons
beacons := []model.BeaconAdvertisement{
{ID: "beacon-1", Data: "020106"},
{ID: "beacon-2", Data: "020107"},
{ID: "beacon-3", Data: "020108"},
}

for _, adv := range beacons {
err := decodeBeacon(adv, appState, writer, parserRegistry)
if err != nil {
t.Logf("Processing beacon %s returned error: %v", adv.ID, err)
}
}

// Give Kafka time to propagate
time.Sleep(2 * time.Second)

// Verify events in appState
for _, adv := range beacons {
event, exists := appState.GetBeaconEvent(adv.ID)
if exists {
t.Logf("Event for %s: %+v", adv.ID, event)
}
}
}

// TestIntegration_EventDeduplication tests that duplicate events are not published
func TestIntegration_EventDeduplication(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

alertTopic := "test-alertbeacons-dedup-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

// Register parser
config := model.Config{
Name: "dedup-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
}
parserRegistry.Register("dedup-test-parser", config)

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
GroupID: "test-group-dedup-" + time.Now().Format("20060102150405"),
})
defer reader.Close()

// Create identical beacon advertisement
adv := model.BeaconAdvertisement{
ID: "dedup-test-beacon",
Data: "020106",
}

// Process first time
err := decodeBeacon(adv, appState, writer, parserRegistry)
if err != nil {
t.Logf("First processing returned error: %v", err)
}

// Process second time with identical data
err = decodeBeacon(adv, appState, writer, parserRegistry)
if err != nil {
t.Logf("Second processing returned error: %v", err)
}

// Give Kafka time to propagate
time.Sleep(1 * time.Second)

// Try to read from Kafka - should have at most 1 message due to deduplication
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

messageCount := 0
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
break
}

messageCount++
t.Logf("Read message %d: %s", messageCount, string(msg.Value))

if messageCount > 1 {
t.Error("Expected at most 1 message due to deduplication, got more")
break
}
}

t.Logf("Total messages read: %d", messageCount)
}

// TestIntegration_AppStatePersistence tests that events persist in AppState
func TestIntegration_AppStatePersistence(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

alertTopic := "test-alertbeacons-persist-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

config := model.Config{
Name: "persist-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
}
parserRegistry.Register("persist-test-parser", config)

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

// Process beacon
adv := model.BeaconAdvertisement{
ID: "persist-test-beacon",
Data: "020106",
}

err := decodeBeacon(adv, appState, writer, parserRegistry)
if err != nil {
t.Logf("Processing returned error: %v", err)
}

// Verify event persists in AppState
event, exists := appState.GetBeaconEvent("persist-test-beacon")
if !exists {
t.Error("Event should exist in AppState after processing")
} else {
t.Logf("Event persisted: ID=%s, Type=%s, Battery=%d",
event.ID, event.Type, event.Battery)

// Verify event can be serialized to JSON
jsonData, err := event.ToJSON()
if err != nil {
t.Errorf("Failed to serialize event to JSON: %v", err)
} else {
t.Logf("Event JSON: %s", string(jsonData))
}
}
}

// TestIntegration_ParserUpdateFlow tests updating parsers during runtime
func TestIntegration_ParserUpdateFlow(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

kafkaURL := os.Getenv("KAFKA_URL")
if kafkaURL == "" {
kafkaURL = "localhost:9092"
}

alertTopic := "test-alertbeacons-update-" + time.Now().Format("20060102150405")

// Setup
appState := appcontext.NewAppState()
parserRegistry := &model.ParserRegistry{}

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: alertTopic,
})
defer writer.Close()

// Initial parser config
config1 := model.Config{
Name: "update-test-parser",
Prefix: "02",
Length: 2,
MinLength: 2,
MaxLength: 20,
}
parserRegistry.Register("update-test-parser", config1)

// Process with initial config
adv := model.BeaconAdvertisement{
ID: "update-test-beacon",
Data: "020106",
}

err := decodeBeacon(adv, appState, writer, parserRegistry)
t.Logf("First processing: %v", err)

// Update parser config
config2 := model.Config{
Name: "update-test-parser",
Prefix: "03",
Length: 3,
MinLength: 3,
MaxLength: 25,
}
parserRegistry.Register("update-test-parser", config2)

// Process again with updated config
adv2 := model.BeaconAdvertisement{
ID: "update-test-beacon-2",
Data: "030107",
}

err = decodeBeacon(adv2, appState, writer, parserRegistry)
t.Logf("Second processing with updated parser: %v", err)

// Verify parser still exists
if _, exists := parserRegistry.ParserList["update-test-parser"]; !exists {
t.Error("Parser should exist after update")
}
}

+ 275
- 0
tests/decoder/parser_registry_test.go Ver ficheiro

@@ -0,0 +1,275 @@
package decoder

import (
"testing"

"github.com/AFASystems/presence/internal/pkg/model"
)

func TestParserRegistry_AddParser(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

// Add a parser
config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}

registry.Register("test-parser", config)

// Verify parser was added
if len(registry.ParserList) != 1 {
t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList))
}

if _, exists := registry.ParserList["test-parser"]; !exists {
t.Error("Parser 'test-parser' should exist in registry")
}
}

func TestParserRegistry_RemoveParser(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

config := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}

registry.Register("test-parser", config)

// Remove parser
registry.Unregister("test-parser")

// Verify parser was removed
if len(registry.ParserList) != 0 {
t.Errorf("Expected 0 parsers in registry, got %d", len(registry.ParserList))
}

if _, exists := registry.ParserList["test-parser"]; exists {
t.Error("Parser 'test-parser' should not exist in registry")
}
}

func TestParserRegistry_UpdateParser(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

// Add initial parser
config1 := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}

registry.Register("test-parser", config1)

// Update parser
config2 := model.Config{
Name: "test-parser",
Prefix: "03",
Length: 3,
}

registry.Register("test-parser", config2)

// Verify only one parser exists
if len(registry.ParserList) != 1 {
t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList))
}

// Verify it was updated (the new config should be used)
if _, exists := registry.ParserList["test-parser"]; !exists {
t.Error("Parser 'test-parser' should exist in registry")
}
}

func TestParserRegistry_MultipleParsers(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

// Add multiple parsers
parsers := []model.Config{
{Name: "parser-1", Prefix: "02", Length: 2},
{Name: "parser-2", Prefix: "03", Length: 3},
{Name: "parser-3", Prefix: "04", Length: 4},
}

for _, p := range parsers {
registry.Register(p.Name, p)
}

// Verify all parsers were added
if len(registry.ParserList) != 3 {
t.Errorf("Expected 3 parsers in registry, got %d", len(registry.ParserList))
}

for _, p := range parsers {
if _, exists := registry.ParserList[p.Name]; !exists {
t.Errorf("Parser '%s' should exist in registry", p.Name)
}
}
}

func TestParserRegistry_RemoveNonExistent(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

// Try to remove non-existent parser - should not panic
registry.Unregister("non-existent")

// Verify registry is still empty
if len(registry.ParserList) != 0 {
t.Errorf("Expected 0 parsers, got %d", len(registry.ParserList))
}
}

func TestParserRegistry_ConcurrentAccess(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}
done := make(chan bool)

// Concurrent additions
for i := 0; i < 10; i++ {
go func(index int) {
config := model.Config{
Name: "parser-" + string(rune('A'+index)),
Prefix: "02",
Length: 2,
}
registry.Register(config.Name, config)
done <- true
}(i)
}

// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}

// Verify all parsers were added
if len(registry.ParserList) != 10 {
t.Errorf("Expected 10 parsers, got %d", len(registry.ParserList))
}
}

func TestParserConfig_Structure(t *testing.T) {
config := model.Config{
Name: "test-config",
Prefix: "0201",
MinLength: 10,
MaxLength: 30,
ParserType: "sensor",
}

if config.Name != "test-config" {
t.Errorf("Expected name 'test-config', got '%s'", config.Name)
}

if config.Prefix != "0201" {
t.Errorf("Expected prefix '0201', got '%s'", config.Prefix)
}

if config.MinLength != 10 {
t.Errorf("Expected MinLength 10, got %d", config.MinLength)
}

if config.MaxLength != 30 {
t.Errorf("Expected MaxLength 30, got %d", config.MaxLength)
}
}

func TestKafkaParser_MessageTypes(t *testing.T) {
testCases := []struct {
name string
id string
config model.Config
expected string
}{
{
name: "add parser",
id: "add",
config: model.Config{Name: "new-parser", Prefix: "02", Length: 2},
expected: "add",
},
{
name: "delete parser",
id: "delete",
config: model.Config{Name: "old-parser", Prefix: "02", Length: 2},
expected: "delete",
},
{
name: "update parser",
id: "update",
config: model.Config{Name: "updated-parser", Prefix: "03", Length: 3},
expected: "update",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
msg := model.KafkaParser{
ID: tc.id,
Name: tc.config.Name,
Config: tc.config,
}

if msg.ID != tc.expected {
t.Errorf("Expected ID '%s', got '%s'", tc.expected, msg.ID)
}

if msg.Name != tc.config.Name {
t.Errorf("Expected Name '%s', got '%s'", tc.config.Name, msg.Name)
}
})
}
}

func TestParserRegistry_EmptyRegistry(t *testing.T) {
// Setup empty registry
registry := &model.ParserRegistry{}

// Verify it's empty
if len(registry.ParserList) != 0 {
t.Errorf("Expected empty registry, got %d parsers", len(registry.ParserList))
}

// Should be safe to call Unregister on empty registry
registry.Unregister("anything")
}

func TestParserRegistry_ParserReplacement(t *testing.T) {
// Setup
registry := &model.ParserRegistry{}

// Add parser with config 1
config1 := model.Config{
Name: "test-parser",
Prefix: "02",
Length: 2,
}

registry.Register("test-parser", config1)

// Replace with config 2 (same name)
config2 := model.Config{
Name: "test-parser",
Prefix: "03",
Length: 3,
}

registry.Register("test-parser", config2)

// Verify only one entry exists
if len(registry.ParserList) != 1 {
t.Errorf("Expected 1 parser after replacement, got %d", len(registry.ParserList))
}

// Verify the parser still exists
if _, exists := registry.ParserList["test-parser"]; !exists {
t.Error("Parser 'test-parser' should still exist")
}
}

+ 321
- 0
tests/decoder/testutil.go Ver ficheiro

@@ -0,0 +1,321 @@
package decoder

import (
"context"
"testing"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// MockKafkaWriter is a mock implementation of kafkaWriter for testing
type MockKafkaWriter struct {
Messages []kafka.Message
}

func (m *MockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
m.Messages = append(m.Messages, msgs...)
return nil
}

// TestHelper provides utility functions for decoder testing
type TestHelper struct {
t *testing.T
appState *appcontext.AppState
parserRegistry *model.ParserRegistry
}

// NewTestHelper creates a new test helper instance
func NewTestHelper(t *testing.T) *TestHelper {
return &TestHelper{
t: t,
appState: appcontext.NewAppState(),
parserRegistry: &model.ParserRegistry{},
}
}

// GetAppState returns the appState instance
func (th *TestHelper) GetAppState() *appcontext.AppState {
return th.appState
}

// GetParserRegistry returns the parser registry
func (th *TestHelper) GetParserRegistry() *model.ParserRegistry {
return th.parserRegistry
}

// RegisterTestParser registers a parser with default test configuration
func (th *TestHelper) RegisterTestParser(name string) {
config := model.Config{
Name: name,
Min: 2,
Max: 20,
Pattern: []string{"02"},
Configs: map[string]model.ParserConfig{
"length": {Length: 2, Offset: 0, Order: "big"},
},
}
th.parserRegistry.Register(name, config)
}

// CreateBeaconAdvertisement creates a test beacon advertisement
func (th *TestHelper) CreateBeaconAdvertisement(id, data string) model.BeaconAdvertisement {
return model.BeaconAdvertisement{
ID: id,
Data: data,
}
}

// CreateValidHexAdvertisement creates a beacon with valid hex data
func (th *TestHelper) CreateValidHexAdvertisement(id string) model.BeaconAdvertisement {
return model.BeaconAdvertisement{
ID: id,
Data: "020106",
}
}

// CreateInvalidHexAdvertisement creates a beacon with invalid hex data
func (th *TestHelper) CreateInvalidHexAdvertisement(id string) model.BeaconAdvertisement {
return model.BeaconAdvertisement{
ID: id,
Data: "INVALID_HEX",
}
}

// CreateEmptyAdvertisement creates a beacon with empty data
func (th *TestHelper) CreateEmptyAdvertisement(id string) model.BeaconAdvertisement {
return model.BeaconAdvertisement{
ID: id,
Data: "",
}
}

// AssertParserExists asserts that a parser exists in the registry
func (th *TestHelper) AssertParserExists(name string) {
if _, exists := th.parserRegistry.ParserList[name]; !exists {
th.t.Errorf("Parser '%s' should exist in registry", name)
}
}

// AssertParserNotExists asserts that a parser does not exist in the registry
func (th *TestHelper) AssertParserNotExists(name string) {
if _, exists := th.parserRegistry.ParserList[name]; exists {
th.t.Errorf("Parser '%s' should not exist in registry", name)
}
}

// AssertEventExists asserts that an event exists in appState
func (th *TestHelper) AssertEventExists(id string) model.BeaconEvent {
event, exists := th.appState.GetBeaconEvent(id)
if !exists {
th.t.Errorf("Event for beacon '%s' should exist in appState", id)
return model.BeaconEvent{}
}
return event
}

// AssertEventNotExists asserts that an event does not exist in appState
func (th *TestHelper) AssertEventNotExists(id string) {
_, exists := th.appState.GetBeaconEvent(id)
if exists {
th.t.Errorf("Event for beacon '%s' should not exist in appState", id)
}
}

// AssertParserCount asserts the number of parsers in the registry
func (th *TestHelper) AssertParserCount(expected int) {
if len(th.parserRegistry.ParserList) != expected {
th.t.Errorf("Expected %d parsers in registry, got %d", expected, len(th.parserRegistry.ParserList))
}
}

// Helper functions for creating test configurations

// CreateTestConfig creates a test parser configuration
func CreateTestConfig(name string, min, max int, pattern []string) model.Config {
return model.Config{
Name: name,
Min: min,
Max: max,
Pattern: pattern,
Configs: map[string]model.ParserConfig{
"length": {Length: 2, Offset: 0, Order: "big"},
},
}
}

// CreateKafkaParserMessage creates a Kafka parser message for testing
func CreateKafkaParserMessage(id, name string, config model.Config) model.KafkaParser {
return model.KafkaParser{
ID: id,
Name: name,
Config: config,
}
}

// AssertNoError asserts that an error is nil
func AssertNoError(t *testing.T, err error, msg string) {
if err != nil {
t.Errorf("%s: %v", msg, err)
}
}

// AssertError asserts that an error is not nil
func AssertError(t *testing.T, err error, msg string) {
if err == nil {
t.Errorf("%s: expected error but got nil", msg)
}
}

// Common test data

// Valid hex strings for testing
var ValidHexStrings = []string{
"020106", // Simple AD structure
"0201060302A0", // AD structure with flags
"1AFF0C01", // iBeacon-like data
"0201061AFF0C01", // Multiple AD structures
}

// Invalid hex strings for testing
var InvalidHexStrings = []string{
"INVALID_HEX",
"02016ZZZ",
"GGGGGG",
"NOT-HEX",
}

// Empty or whitespace data for testing
var EmptyTestData = []string{
"",
" ",
"\t\n",
}

// CreateMockWriter creates a mock Kafka writer
func CreateMockWriter() *MockKafkaWriter {
return &MockKafkaWriter{Messages: []kafka.Message{}}
}

// Beacon event test helpers

// AssertEventFields asserts that event fields match expected values
func AssertEventFields(t *testing.T, event model.BeaconEvent, expectedID, expectedType string) {
if event.ID != expectedID {
t.Errorf("Expected event ID '%s', got '%s'", expectedID, event.ID)
}

if event.Type != expectedType {
t.Errorf("Expected event type '%s', got '%s'", expectedType, event.Type)
}
}

// SetupTestParsers registers a standard set of test parsers
func SetupTestParsers(registry *model.ParserRegistry) {
parsers := []model.Config{
{Name: "parser-1", Min: 2, Max: 20, Pattern: []string{"02"}},
{Name: "parser-2", Min: 3, Max: 25, Pattern: []string{"03"}},
{Name: "parser-3", Min: 4, Max: 30, Pattern: []string{"04"}},
}

for _, p := range parsers {
registry.Register(p.Name, p)
}
}

// CleanupTestParsers removes all parsers from the registry
func CleanupTestParsers(registry *model.ParserRegistry) {
for name := range registry.ParserList {
registry.Unregister(name)
}
}

// CreateTestBeaconEvent creates a test beacon event
func CreateTestBeaconEvent(id, eventType string) model.BeaconEvent {
return model.BeaconEvent{
ID: id,
Type: eventType,
Battery: 100,
Event: 1,
AccX: 0,
AccY: 0,
AccZ: 0,
}
}

// AssertKafkaMessageCount asserts the number of Kafka messages
func AssertKafkaMessageCount(t *testing.T, writer *MockKafkaWriter, expected int) {
if len(writer.Messages) != expected {
t.Errorf("Expected %d Kafka message(s), got %d", expected, len(writer.Messages))
}
}

// AssertNoKafkaMessages asserts that no messages were written to Kafka
func AssertNoKafkaMessages(t *testing.T, writer *MockKafkaWriter) {
AssertKafkaMessageCount(t, writer, 0)
}

// Parser registry test helpers

// SimulateEventLoopParserUpdate simulates the event loop's parser update logic
func SimulateEventLoopParserUpdate(msg model.KafkaParser, registry *model.ParserRegistry) {
switch msg.ID {
case "add":
config := msg.Config
registry.Register(config.Name, config)
case "delete":
registry.Unregister(msg.Name)
case "update":
config := msg.Config
registry.Register(config.Name, config)
}
}

// CreateParserAddMessage creates a parser add message
func CreateParserAddMessage(name string, min, max int) model.KafkaParser {
return model.KafkaParser{
ID: "add",
Name: name,
Config: model.Config{
Name: name,
Min: min,
Max: max,
Pattern: []string{"02"},
},
}
}

// CreateParserDeleteMessage creates a parser delete message
func CreateParserDeleteMessage(name string) model.KafkaParser {
return model.KafkaParser{
ID: "delete",
Name: name,
}
}

// CreateParserUpdateMessage creates a parser update message
func CreateParserUpdateMessage(name string, min, max int) model.KafkaParser {
return model.KafkaParser{
ID: "update",
Name: name,
Config: model.Config{
Name: name,
Min: min,
Max: max,
Pattern: []string{"02"},
},
}
}

// GenerateTestBeaconID generates a test beacon ID
func GenerateTestBeaconID(index int) string {
return "test-beacon-" + string(rune('A'+index))
}

// GenerateTestHexData generates test hex data
func GenerateTestHexData(index int) string {
prefix := "02"
value := string(rune('6' + index))
return prefix + "01" + value
}

Carregando…
Cancelar
Guardar