| Автор | SHA1 | Повідомлення | Дата |
|---|---|---|---|
|
|
0bac1939c6 | chore: update parsing configuration | 2 тижднів тому |
|
|
84124cff13 | chore: refactor | 2 тижднів тому |
|
|
748348c9b5 | refactor | 2 тижднів тому |
| @@ -71,35 +71,15 @@ func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *a | |||||
| break | break | ||||
| } | } | ||||
| } | } | ||||
| } else { | |||||
| s := strings.Split(string(message), ",") | |||||
| if len(s) < 6 { | |||||
| log.Printf("Messaggio CSV non valido: %s", msgStr) | |||||
| return | |||||
| } | |||||
| fmt.Println("this gateway is also sending data: ", s) | |||||
| } | } | ||||
| // } else { | |||||
| // s := strings.Split(string(message), ",") | |||||
| // if len(s) < 6 { | |||||
| // log.Printf("Messaggio CSV non valido: %s", msgStr) | |||||
| // return | |||||
| // } | |||||
| // rawdata := s[4] | |||||
| // buttonCounter := parseButtonState(rawdata) | |||||
| // if buttonCounter > 0 { | |||||
| // adv := model.BeaconAdvertisement{} | |||||
| // i, _ := strconv.ParseInt(s[3], 10, 64) | |||||
| // adv.Hostname = hostname | |||||
| // adv.BeaconType = "hb_button" | |||||
| // adv.MAC = s[1] | |||||
| // adv.RSSI = i | |||||
| // adv.Data = rawdata | |||||
| // adv.HSButtonCounter = buttonCounter | |||||
| // read_line := strings.TrimRight(string(s[5]), "\r\n") | |||||
| // it, err33 := strconv.Atoi(read_line) | |||||
| // if err33 != nil { | |||||
| // fmt.Println(it) | |||||
| // fmt.Println(err33) | |||||
| // os.Exit(2) | |||||
| // } | |||||
| // } | |||||
| // } | |||||
| } | } | ||||
| var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) { | var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) { | ||||
| @@ -118,6 +98,7 @@ func main() { | |||||
| // Load global context to init beacons and latest list | // Load global context to init beacons and latest list | ||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| kafkaManager := kafkaclient.InitKafkaManager() | |||||
| // Set logger -> terminal and log file | // Set logger -> terminal and log file | ||||
| slog.SetDefault(logger.CreateLogger("bridge.log")) | slog.SetDefault(logger.CreateLogger("bridge.log")) | ||||
| @@ -126,13 +107,11 @@ func main() { | |||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| // define kafka readers | |||||
| apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api") | |||||
| alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert") | |||||
| mqttReader := appState.AddKafkaReader(cfg.KafkaURL, "mqtt", "bridge-mqtt") | |||||
| readerTopics := []string{"apibeacons", "alert", "mqtt"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) | |||||
| // define kafka writer | |||||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons") | |||||
| writerTopics := []string{"rawbeacons"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | |||||
| slog.Info("Bridge initialized, subscribed to kafka topics") | slog.Info("Bridge initialized, subscribed to kafka topics") | ||||
| @@ -141,9 +120,9 @@ func main() { | |||||
| chMqtt := make(chan []model.Tracker, 200) | chMqtt := make(chan []model.Tracker, 200) | ||||
| wg.Add(3) | wg.Add(3) | ||||
| go kafkaclient.Consume(apiReader, chApi, ctx, &wg) | |||||
| go kafkaclient.Consume(alertReader, chAlert, ctx, &wg) | |||||
| go kafkaclient.Consume(mqttReader, chMqtt, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("apibeacons"), chApi, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("alert"), chAlert, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("mqtt"), chMqtt, ctx, &wg) | |||||
| opts := mqtt.NewClientOptions() | opts := mqtt.NewClientOptions() | ||||
| opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883)) | opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883)) | ||||
| @@ -154,7 +133,9 @@ func main() { | |||||
| opts.SetMaxReconnectInterval(600 * time.Second) | opts.SetMaxReconnectInterval(600 * time.Second) | ||||
| opts.SetCleanSession(false) | opts.SetCleanSession(false) | ||||
| opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) }) | |||||
| opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { | |||||
| messagePubHandler(m, kafkaManager.GetWriter("rawbeacons"), appState) | |||||
| }) | |||||
| opts.OnConnect = connectHandler | opts.OnConnect = connectHandler | ||||
| opts.OnConnectionLost = connectLostHandler | opts.OnConnectionLost = connectLostHandler | ||||
| client := mqtt.NewClient(opts) | client := mqtt.NewClient(opts) | ||||
| @@ -188,14 +169,12 @@ eventloop: | |||||
| slog.Info(lMsg) | slog.Info(lMsg) | ||||
| } | } | ||||
| case msg := <-chAlert: | case msg := <-chAlert: | ||||
| fmt.Printf("Alerts: %+v\n", msg) | |||||
| p, err := json.Marshal(msg) | p, err := json.Marshal(msg) | ||||
| if err != nil { | if err != nil { | ||||
| continue | continue | ||||
| } | } | ||||
| client.Publish("/alerts", 0, true, p) | client.Publish("/alerts", 0, true, p) | ||||
| case msg := <-chMqtt: | case msg := <-chMqtt: | ||||
| fmt.Printf("trackers: %+v\n", msg) | |||||
| p, err := json.Marshal(msg) | p, err := json.Marshal(msg) | ||||
| if err != nil { | if err != nil { | ||||
| continue | continue | ||||
| @@ -208,8 +187,8 @@ eventloop: | |||||
| wg.Wait() | wg.Wait() | ||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | slog.Info("All go routines have stopped, Beggining to close Kafka connections") | ||||
| appState.CleanKafkaReaders() | |||||
| appState.CleanKafkaWriters() | |||||
| kafkaManager.CleanKafkaReaders() | |||||
| kafkaManager.CleanKafkaWriters() | |||||
| client.Disconnect(250) | client.Disconnect(250) | ||||
| slog.Info("Closing connection to MQTT broker") | slog.Info("Closing connection to MQTT broker") | ||||
| @@ -1,5 +0,0 @@ | |||||
| DB_HOST=localhost | |||||
| DB_PORT=5432 | |||||
| DB_USER=postgres | |||||
| DB_PASSWORD=postgres | |||||
| DB_NAME=go_crud_db | |||||
| @@ -1,37 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "fmt" | |||||
| "log" | |||||
| "os" | |||||
| "github.com/joho/godotenv" | |||||
| "gorm.io/driver/postgres" | |||||
| "gorm.io/gorm" | |||||
| ) | |||||
| var DB *gorm.DB | |||||
| func main() { | |||||
| err := godotenv.Load() | |||||
| if err != nil { | |||||
| log.Fatal("Error loading .env file") | |||||
| } | |||||
| dsn := fmt.Sprintf( | |||||
| "host=%s user=%s password=%s dbname=%s port=%s sslmode=disable", | |||||
| os.Getenv("DB_HOST"), | |||||
| os.Getenv("DB_USER"), | |||||
| os.Getenv("DB_PASSWORD"), | |||||
| os.Getenv("DB_NAME"), | |||||
| os.Getenv("DB_PORT"), | |||||
| ) | |||||
| db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) | |||||
| if err != nil { | |||||
| log.Fatal("Failed to connect to the database:", err) | |||||
| } | |||||
| DB = db | |||||
| fmt.Println("Database connection established") | |||||
| } | |||||
| @@ -1,7 +0,0 @@ | |||||
| package models | |||||
| type Book struct { | |||||
| ID uint `json:"id" gorm:"primaryKey"` | |||||
| Title string `json:"title"` | |||||
| Author string `json:"author"` | |||||
| } | |||||
| @@ -1,22 +0,0 @@ | |||||
| [ | |||||
| { | |||||
| "name": "config1", | |||||
| "min": 10, | |||||
| "max": 15, | |||||
| "pattern": ["0x02", "0x01", "0x06"], | |||||
| "configs": { | |||||
| "battery": {"offset": 3, "length": 1}, | |||||
| "accX": {"offset": 4, "length": 2, "order": "bigendian"} | |||||
| } | |||||
| }, | |||||
| { | |||||
| "name": "config2", | |||||
| "min": 10, | |||||
| "max": 15, | |||||
| "pattern": ["0x02", "0x01", "0x06"], | |||||
| "configs": { | |||||
| "battery": {"offset": 3, "length": 1}, | |||||
| "accY": {"offset": 4, "length": 2, "order": "bigendian"} | |||||
| } | |||||
| } | |||||
| ] | |||||
| @@ -1,137 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "bytes" | |||||
| "encoding/binary" | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "io" | |||||
| "os" | |||||
| "sync" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| type parserConfig struct { | |||||
| Length int `json:"length"` | |||||
| Offset int `json:"offset"` | |||||
| Order string `json:"order"` | |||||
| } | |||||
| type beaconParser struct { | |||||
| name string | |||||
| canParse func([]byte) bool | |||||
| configs map[string]parserConfig | |||||
| } | |||||
| type parserRegistry struct { | |||||
| parserList []beaconParser | |||||
| rw sync.RWMutex | |||||
| } | |||||
| type config struct { | |||||
| Name string `json:"name"` | |||||
| Min int `json:"min"` | |||||
| Max int `json:"max"` | |||||
| Pattern []string `json:"pattern"` | |||||
| Configs map[string]parserConfig `json:"configs"` | |||||
| } | |||||
| func (pc parserConfig) GetOrder() binary.ByteOrder { | |||||
| if pc.Order == "bigendian" { | |||||
| return binary.BigEndian | |||||
| } | |||||
| return binary.LittleEndian | |||||
| } | |||||
| func (p *parserRegistry) Register(name string, c config) { | |||||
| p.rw.Lock() | |||||
| defer p.rw.Unlock() | |||||
| b := beaconParser{ | |||||
| name: name, | |||||
| canParse: func(ad []byte) bool { | |||||
| return len(ad) >= c.Min && len(ad) <= c.Max && bytes.HasPrefix(ad, c.GetPatternBytes()) | |||||
| }, | |||||
| configs: c.Configs, | |||||
| } | |||||
| p.parserList = append(p.parserList, b) | |||||
| } | |||||
| func (b *beaconParser) Parse(ad []byte) (model.BeaconEvent, bool) { | |||||
| flag := false | |||||
| event := model.BeaconEvent{Type: b.name} | |||||
| if cfg, ok := b.configs["battery"]; ok { | |||||
| event.Battery = uint32(b.extract(ad, cfg)) | |||||
| flag = true | |||||
| } | |||||
| if cfg, ok := b.configs["accX"]; ok { | |||||
| event.AccX = int16(b.extract(ad, cfg)) | |||||
| flag = true | |||||
| } | |||||
| if cfg, ok := b.configs["accY"]; ok { | |||||
| event.AccY = int16(b.extract(ad, cfg)) | |||||
| flag = true | |||||
| } | |||||
| if cfg, ok := b.configs["accZ"]; ok { | |||||
| event.AccZ = int16(b.extract(ad, cfg)) | |||||
| flag = true | |||||
| } | |||||
| return event, flag | |||||
| } | |||||
| func (b *beaconParser) extract(ad []byte, pc parserConfig) uint16 { | |||||
| if len(ad) < pc.Offset+pc.Length { | |||||
| return 0 | |||||
| } | |||||
| data := ad[pc.Offset : pc.Offset+pc.Length] | |||||
| if pc.Length == 1 { | |||||
| return uint16(data[0]) | |||||
| } | |||||
| return pc.GetOrder().Uint16(data) | |||||
| } | |||||
| func (c config) GetPatternBytes() []byte { | |||||
| res := make([]byte, len(c.Pattern)) | |||||
| for i, s := range c.Pattern { | |||||
| fmt.Sscanf(s, "0x%02x", &res[i]) | |||||
| } | |||||
| return res | |||||
| } | |||||
| func main() { | |||||
| parserRegistry := parserRegistry{ | |||||
| parserList: make([]beaconParser, 0), | |||||
| } | |||||
| seq := []byte{0x02, 0x01, 0x06, 0x64, 0x01, 0xF4, 0x00, 0x0A, 0xFF, 0x05} | |||||
| jsonFile, err := os.Open("configs.json") | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| fmt.Println("succesfully opened json file") | |||||
| b, _ := io.ReadAll(jsonFile) | |||||
| var configs []config | |||||
| json.Unmarshal(b, &configs) | |||||
| for _, config := range configs { | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| for _, parser := range parserRegistry.parserList { | |||||
| if parser.canParse(seq) { | |||||
| event, ok := parser.Parse(seq) | |||||
| if ok { | |||||
| fmt.Printf("Device: %s | Battery: %d%% | AccX: %d | AccY: %d | AccZ: %d\n", event.Type, event.Battery, event.AccX, event.AccY, event.AccZ) | |||||
| } | |||||
| } | |||||
| } | |||||
| fmt.Printf("configs: %+v\n", configs) | |||||
| jsonFile.Close() | |||||
| } | |||||
| @@ -26,6 +26,7 @@ func main() { | |||||
| // Load global context to init beacons and latest list | // Load global context to init beacons and latest list | ||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| kafkaManager := kafkaclient.InitKafkaManager() | |||||
| parserRegistry := model.ParserRegistry{ | parserRegistry := model.ParserRegistry{ | ||||
| ParserList: make(map[string]model.BeaconParser), | ParserList: make(map[string]model.BeaconParser), | ||||
| @@ -38,10 +39,11 @@ func main() { | |||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") | |||||
| parserReader := appState.AddKafkaReader(cfg.KafkaURL, "parser", "gid-parser") | |||||
| readerTopics := []string{"rawbeacons", "parser"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) | |||||
| alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") | |||||
| writerTopics := []string{"alertbeacons"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | |||||
| slog.Info("Decoder initialized, subscribed to Kafka topics") | slog.Info("Decoder initialized, subscribed to Kafka topics") | ||||
| @@ -49,8 +51,8 @@ func main() { | |||||
| chParser := make(chan model.KafkaParser, 200) | chParser := make(chan model.KafkaParser, 200) | ||||
| wg.Add(3) | wg.Add(3) | ||||
| go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) | |||||
| go kafkaclient.Consume(parserReader, chParser, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("rawbeacons"), chRaw, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("parser"), chParser, ctx, &wg) | |||||
| eventloop: | eventloop: | ||||
| for { | for { | ||||
| @@ -58,7 +60,7 @@ eventloop: | |||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| break eventloop | break eventloop | ||||
| case msg := <-chRaw: | case msg := <-chRaw: | ||||
| processIncoming(msg, appState, alertWriter, &parserRegistry) | |||||
| processIncoming(msg, appState, kafkaManager.GetWriter("alertbeacons"), &parserRegistry) | |||||
| case msg := <-chParser: | case msg := <-chParser: | ||||
| switch msg.ID { | switch msg.ID { | ||||
| case "add": | case "add": | ||||
| @@ -77,8 +79,8 @@ eventloop: | |||||
| wg.Wait() | wg.Wait() | ||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | slog.Info("All go routines have stopped, Beggining to close Kafka connections") | ||||
| appState.CleanKafkaReaders() | |||||
| appState.CleanKafkaWriters() | |||||
| kafkaManager.CleanKafkaReaders() | |||||
| kafkaManager.CleanKafkaWriters() | |||||
| } | } | ||||
| func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) { | func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) { | ||||
| @@ -110,9 +112,13 @@ func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, | |||||
| if event.ID == "" { | if event.ID == "" { | ||||
| return nil | return nil | ||||
| } | } | ||||
| prevEvent, ok := appState.GetBeaconEvent(id) | prevEvent, ok := appState.GetBeaconEvent(id) | ||||
| appState.UpdateBeaconEvent(id, event) | appState.UpdateBeaconEvent(id, event) | ||||
| if event.Type == "iBeacon" { | |||||
| event.BtnPressed = true | |||||
| } | |||||
| if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { | if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { | ||||
| return nil | return nil | ||||
| } | } | ||||
| @@ -25,6 +25,7 @@ func main() { | |||||
| // Load global context to init beacons and latest list | // Load global context to init beacons and latest list | ||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| kafkaManager := kafkaclient.InitKafkaManager() | |||||
| // Set logger -> terminal and log file | // Set logger -> terminal and log file | ||||
| slog.SetDefault(logger.CreateLogger("location.log")) | slog.SetDefault(logger.CreateLogger("location.log")) | ||||
| @@ -33,10 +34,11 @@ func main() { | |||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") | |||||
| settingsReader := appState.AddKafkaReader(cfg.KafkaURL, "settings", "gid-settings-loc") | |||||
| readerTopics := []string{"rawbeacons", "settings"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) | |||||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") | |||||
| writerTopics := []string{"locevents"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | |||||
| slog.Info("Locations algorithm initialized, subscribed to Kafka topics") | slog.Info("Locations algorithm initialized, subscribed to Kafka topics") | ||||
| @@ -47,8 +49,8 @@ func main() { | |||||
| chSettings := make(chan map[string]any, 5) | chSettings := make(chan map[string]any, 5) | ||||
| wg.Add(3) | wg.Add(3) | ||||
| go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) | |||||
| go kafkaclient.Consume(settingsReader, chSettings, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("rawbeacons"), chRaw, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("settings"), chSettings, ctx, &wg) | |||||
| eventLoop: | eventLoop: | ||||
| for { | for { | ||||
| @@ -60,7 +62,7 @@ eventLoop: | |||||
| fmt.Printf("Settings: %+v\n", settings) | fmt.Printf("Settings: %+v\n", settings) | ||||
| switch settings.CurrentAlgorithm { | switch settings.CurrentAlgorithm { | ||||
| case "filter": | case "filter": | ||||
| getLikelyLocations(appState, writer) | |||||
| getLikelyLocations(appState, kafkaManager.GetWriter("locevents")) | |||||
| case "ai": | case "ai": | ||||
| fmt.Println("AI algorithm selected") | fmt.Println("AI algorithm selected") | ||||
| } | } | ||||
| @@ -76,8 +78,8 @@ eventLoop: | |||||
| wg.Wait() | wg.Wait() | ||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | slog.Info("All go routines have stopped, Beggining to close Kafka connections") | ||||
| appState.CleanKafkaReaders() | |||||
| appState.CleanKafkaWriters() | |||||
| kafkaManager.CleanKafkaReaders() | |||||
| kafkaManager.CleanKafkaWriters() | |||||
| } | } | ||||
| func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | ||||
| @@ -39,5 +39,12 @@ | |||||
| "accY": {"offset": 9, "length": 2, "order": "fixedpoint"}, | "accY": {"offset": 9, "length": 2, "order": "fixedpoint"}, | ||||
| "accZ": {"offset": 11, "length": 2, "order": "fixedpoint"} | "accZ": {"offset": 11, "length": 2, "order": "fixedpoint"} | ||||
| } | } | ||||
| }, | |||||
| { | |||||
| "name": "iBeacon", | |||||
| "min": 5, | |||||
| "max": 27, | |||||
| "pattern": ["0xFF", "0x4C", "0x00", "0x02"], | |||||
| "configs": {} | |||||
| } | } | ||||
| ] | ] | ||||
| @@ -25,21 +25,16 @@ import ( | |||||
| "github.com/AFASystems/presence/internal/pkg/service" | "github.com/AFASystems/presence/internal/pkg/service" | ||||
| "github.com/gorilla/handlers" | "github.com/gorilla/handlers" | ||||
| "github.com/gorilla/mux" | "github.com/gorilla/mux" | ||||
| "github.com/gorilla/websocket" | |||||
| "github.com/segmentio/kafka-go" | "github.com/segmentio/kafka-go" | ||||
| ) | ) | ||||
| var upgrader = websocket.Upgrader{ | |||||
| ReadBufferSize: 1024, | |||||
| WriteBufferSize: 1024, | |||||
| } | |||||
| var _ io.Writer = (*os.File)(nil) | var _ io.Writer = (*os.File)(nil) | ||||
| var wg sync.WaitGroup | var wg sync.WaitGroup | ||||
| func main() { | func main() { | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| kafkaManager := kafkaclient.InitKafkaManager() | |||||
| // Set logger -> terminal and log file | // Set logger -> terminal and log file | ||||
| slog.SetDefault(logger.CreateLogger("server.log")) | slog.SetDefault(logger.CreateLogger("server.log")) | ||||
| @@ -57,11 +52,9 @@ func main() { | |||||
| originsOk := handlers.AllowedOrigins([]string{"*"}) | originsOk := handlers.AllowedOrigins([]string{"*"}) | ||||
| methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | ||||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") | |||||
| settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") | |||||
| alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert") | |||||
| parserWriter := appState.AddKafkaWriter(cfg.KafkaURL, "parser") | |||||
| mqttWriter := appState.AddKafkaWriter(cfg.KafkaURL, "mqtt") | |||||
| writerTopics := []string{"apibeacons", "alert", "mqtt", "settings", "parser"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | |||||
| slog.Info("Kafka writers topics: apibeacons, settings initialized") | slog.Info("Kafka writers topics: apibeacons, settings initialized") | ||||
| configFile, err := os.Open("/app/cmd/server/config.json") | configFile, err := os.Open("/app/cmd/server/config.json") | ||||
| @@ -86,25 +79,25 @@ func main() { | |||||
| Config: config, | Config: config, | ||||
| } | } | ||||
| if err := service.SendParserConfig(kp, parserWriter, ctx); err != nil { | |||||
| if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil { | |||||
| fmt.Printf("Unable to send parser config to kafka broker %v\n", err) | fmt.Printf("Unable to send parser config to kafka broker %v\n", err) | ||||
| } | } | ||||
| } | } | ||||
| if err := apiclient.UpdateDB(db, ctx, cfg, writer, appState); err != nil { | |||||
| if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil { | |||||
| fmt.Printf("Error in getting token: %v\n", err) | fmt.Printf("Error in getting token: %v\n", err) | ||||
| } | } | ||||
| locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") | |||||
| alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | |||||
| readerTopics := []string{"locevents", "alertbeacons"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "server", readerTopics) | |||||
| slog.Info("Kafka readers topics: locevents, alertbeacons initialized") | slog.Info("Kafka readers topics: locevents, alertbeacons initialized") | ||||
| chLoc := make(chan model.HTTPLocation, 200) | chLoc := make(chan model.HTTPLocation, 200) | ||||
| chEvents := make(chan model.BeaconEvent, 500) | chEvents := make(chan model.BeaconEvent, 500) | ||||
| wg.Add(2) | wg.Add(2) | ||||
| go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) | |||||
| go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("locevents"), chLoc, ctx, &wg) | |||||
| go kafkaclient.Consume(kafkaManager.GetReader("alertbeacons"), chEvents, ctx, &wg) | |||||
| r := mux.NewRouter() | r := mux.NewRouter() | ||||
| @@ -124,16 +117,16 @@ func main() { | |||||
| r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT") | r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT") | ||||
| r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET") | r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET") | ||||
| r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST") | |||||
| r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE") | |||||
| r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("POST") | |||||
| r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("DELETE") | |||||
| r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT") | r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT") | ||||
| r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET") | r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET") | ||||
| r.HandleFunc("/configs/beacons", controller.ParserAddController(db, parserWriter, ctx)).Methods("POST") | |||||
| r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, parserWriter, ctx)).Methods("PUT") | |||||
| r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, parserWriter, ctx)).Methods("DELETE") | |||||
| r.HandleFunc("/configs/beacons", controller.ParserAddController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("POST") | |||||
| r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("PUT") | |||||
| r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("DELETE") | |||||
| r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, settingsWriter, ctx)).Methods("PATCH") | |||||
| r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, kafkaManager.GetWriter("settings"), ctx)).Methods("PATCH") | |||||
| r.HandleFunc("/reslevis/settings", controller.SettingsListController(db)).Methods("GET") | r.HandleFunc("/reslevis/settings", controller.SettingsListController(db)).Methods("GET") | ||||
| beaconTicker := time.NewTicker(2 * time.Second) | beaconTicker := time.NewTicker(2 * time.Second) | ||||
| @@ -156,7 +149,7 @@ eventLoop: | |||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| break eventLoop | break eventLoop | ||||
| case msg := <-chLoc: | case msg := <-chLoc: | ||||
| service.LocationToBeaconService(msg, db, alertWriter, ctx) | |||||
| service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx) | |||||
| case msg := <-chEvents: | case msg := <-chEvents: | ||||
| fmt.Printf("event: %+v\n", msg) | fmt.Printf("event: %+v\n", msg) | ||||
| id := msg.ID | id := msg.ID | ||||
| @@ -182,7 +175,7 @@ eventLoop: | |||||
| Value: eMsg, | Value: eMsg, | ||||
| } | } | ||||
| mqttWriter.WriteMessages(ctx, msg) | |||||
| kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg) | |||||
| } | } | ||||
| } | } | ||||
| @@ -196,8 +189,8 @@ eventLoop: | |||||
| wg.Wait() | wg.Wait() | ||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") | slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") | ||||
| appState.CleanKafkaReaders() | |||||
| appState.CleanKafkaWriters() | |||||
| kafkaManager.CleanKafkaReaders() | |||||
| kafkaManager.CleanKafkaWriters() | |||||
| slog.Info("All kafka clients shutdown, starting shutdown of valkey client") | slog.Info("All kafka clients shutdown, starting shutdown of valkey client") | ||||
| slog.Info("API server shutting down") | slog.Info("API server shutting down") | ||||
| @@ -1,86 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "bufio" | |||||
| "encoding/hex" | |||||
| "fmt" | |||||
| "log" | |||||
| "os" | |||||
| "strings" | |||||
| ) | |||||
| func main() { | |||||
| file, err := os.Open("save.txt") | |||||
| if err != nil { | |||||
| log.Fatalf("Failed to open file: %s", err) | |||||
| } | |||||
| defer file.Close() | |||||
| scanner := bufio.NewScanner(file) | |||||
| for scanner.Scan() { | |||||
| line := scanner.Text() | |||||
| decodeBeacon(line) | |||||
| } | |||||
| } | |||||
| func decodeBeacon(beacon string) { | |||||
| beacon = strings.TrimSpace(beacon) | |||||
| if beacon == "" { | |||||
| return | |||||
| } | |||||
| // convert to bytes for faster operations | |||||
| b, err := hex.DecodeString(beacon) | |||||
| if err != nil { | |||||
| fmt.Println("invalid line: ", beacon) | |||||
| return | |||||
| } | |||||
| // remove flag bytes - they hold no structural information | |||||
| if len(b) > 1 && b[1] == 0x01 { | |||||
| l := int(b[0]) | |||||
| if 1+l <= len(b) { | |||||
| b = b[1+l:] | |||||
| } | |||||
| } | |||||
| adBlockIndeces := parseADFast(b) | |||||
| for _, r := range adBlockIndeces { | |||||
| ad := b[r[0]:r[1]] | |||||
| if len(ad) >= 4 && | |||||
| ad[1] == 0x16 && | |||||
| ad[2] == 0xAA && | |||||
| ad[3] == 0xFE { | |||||
| // fmt.Println("Eddystone:", hex.EncodeToString(b)) | |||||
| return | |||||
| } | |||||
| if len(ad) >= 7 && | |||||
| ad[1] == 0xFF && | |||||
| ad[2] == 0x4C && ad[3] == 0x00 && | |||||
| ad[4] == 0x02 && ad[5] == 0x15 { | |||||
| // fmt.Println("iBeacon:", hex.EncodeToString(b)) | |||||
| return | |||||
| } | |||||
| } | |||||
| fmt.Println(hex.EncodeToString(b)) | |||||
| } | |||||
| func parseADFast(b []byte) [][2]int { | |||||
| var res [][2]int | |||||
| i := 0 | |||||
| for i < len(b) { | |||||
| l := int(b[i]) | |||||
| if l == 0 || i+1+l > len(b) { | |||||
| break | |||||
| } | |||||
| res = append(res, [2]int{i, i + 1 + l}) | |||||
| i += 1 + l | |||||
| } | |||||
| return res | |||||
| } | |||||
| @@ -1,161 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "context" | |||||
| "crypto/tls" | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "net/http" | |||||
| "net/url" | |||||
| "strings" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| type response struct { | |||||
| A string `json:"access_token"` | |||||
| } | |||||
| func main() { | |||||
| ctx := context.Background() | |||||
| tr := &http.Transport{ | |||||
| TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | |||||
| } | |||||
| client := &http.Client{Transport: tr} | |||||
| formData := url.Values{} | |||||
| formData.Set("grant_type", "password") | |||||
| formData.Set("client_id", "Fastapi") | |||||
| formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC") | |||||
| formData.Set("username", "core") | |||||
| formData.Set("password", "C0r3_us3r_Cr3d3nt14ls") | |||||
| formData.Set("audience", "Fastapi") | |||||
| req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode())) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| req.Header.Add("Content-Type", "application/x-www-form-urlencoded") | |||||
| req = req.WithContext(ctx) | |||||
| res, err := client.Do(req) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| var j response | |||||
| err = json.NewDecoder(res.Body).Decode(&j) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| token := j.A | |||||
| trackers, err := GetTrackers(token, client) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| fmt.Printf("trackers: %+v\n", trackers) | |||||
| gateways, err := getGateways(token, client) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| fmt.Printf("gateways: %+v\n", gateways) | |||||
| zones, err := GetZones(token, client) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| fmt.Printf("zones: %+v\n", zones) | |||||
| trackerZones, err := GetTrackerZones(token, client) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| fmt.Printf("tracker zones: %+v\n", trackerZones) | |||||
| } | |||||
| func GetTrackers(token string, client *http.Client) ([]model.Tracker, error) { | |||||
| res, err := getRequest(token, "getTrackers", client) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| var i []model.Tracker | |||||
| err = json.NewDecoder(res.Body).Decode(&i) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return i, nil | |||||
| } | |||||
| func getGateways(token string, client *http.Client) ([]model.Gateway, error) { | |||||
| res, err := getRequest(token, "getGateways", client) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| var i []model.Gateway | |||||
| err = json.NewDecoder(res.Body).Decode(&i) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return i, nil | |||||
| } | |||||
| func GetTrackerZones(token string, client *http.Client) ([]model.TrackerZones, error) { | |||||
| res, err := getRequest(token, "getTrackerZones", client) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| var i []model.TrackerZones | |||||
| err = json.NewDecoder(res.Body).Decode(&i) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return i, nil | |||||
| } | |||||
| func GetZones(token string, client *http.Client) ([]model.Zone, error) { | |||||
| res, err := getRequest(token, "getZones", client) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| var i []model.Zone | |||||
| err = json.NewDecoder(res.Body).Decode(&i) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return i, nil | |||||
| } | |||||
| func getRequest(token, route string, client *http.Client) (*http.Response, error) { | |||||
| url := fmt.Sprintf("https://10.251.0.30:5050/reslevis/%s", route) | |||||
| req, err := http.NewRequest("GET", url, nil) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| header := fmt.Sprintf("Bearer %s", token) | |||||
| req.Header.Add("Authorization", header) | |||||
| res, err := client.Do(req) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return res, nil | |||||
| } | |||||
| @@ -1,131 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "context" | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "reflect" | |||||
| "github.com/redis/go-redis/v9" | |||||
| ) | |||||
| type Per struct { | |||||
| Name string `json:"name"` | |||||
| Age int `json:"age"` | |||||
| } | |||||
| type Beacon struct { | |||||
| ID string `json:"id"` // Use JSON tags to ensure correct field names | |||||
| Type string `json:"type"` | |||||
| Temp int `json:"temp"` | |||||
| Name string `json:"name"` | |||||
| } | |||||
| func ConvertStructToMap(obj any) (map[string]any, error) { | |||||
| // 1. Marshal the struct into a JSON byte slice | |||||
| data, err := json.Marshal(obj) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| // 2. Unmarshal the JSON byte slice into the map | |||||
| var result map[string]any | |||||
| err = json.Unmarshal(data, &result) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return result, nil | |||||
| } | |||||
| // func main() { ... } | |||||
| // client.HSet(ctx, "beacon:123", resultMap).Err() | |||||
| func main() { | |||||
| client := redis.NewClient(&redis.Options{ | |||||
| Addr: "127.0.0.1:6379", | |||||
| Password: "", | |||||
| }) | |||||
| ctx := context.Background() | |||||
| err := client.Set(ctx, "testkey", "hello world", 0).Err() | |||||
| if err != nil { | |||||
| fmt.Println("Ok") | |||||
| } | |||||
| val, err := client.Get(ctx, "testkey").Result() | |||||
| if err != nil { | |||||
| fmt.Println("Ok") | |||||
| } | |||||
| fmt.Println(val) | |||||
| err = client.SAdd(ctx, "myset", "b-1").Err() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| res, err := client.SMembers(ctx, "myset").Result() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| fmt.Println("res1: ", res) | |||||
| err = client.SAdd(ctx, "myset", "b-2").Err() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| res, err = client.SMembers(ctx, "myset").Result() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| fmt.Println("res1: ", res) | |||||
| err = client.SAdd(ctx, "myset", "b-1").Err() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| res, err = client.SMembers(ctx, "myset").Result() | |||||
| if err != nil { | |||||
| fmt.Println(err) | |||||
| } | |||||
| fmt.Println("res1: ", res) | |||||
| fmt.Println("type: ", reflect.TypeOf(res)) | |||||
| // b := Beacon{ | |||||
| // ID: "hello", | |||||
| // Type: "node", | |||||
| // Temp: 10, | |||||
| // Name: "Peter", | |||||
| // } | |||||
| // per := Per{ | |||||
| // Name: "Janez", | |||||
| // Age: 10, | |||||
| // } | |||||
| // bEncoded, err := ConvertStructToMap(b) | |||||
| // if err != nil { | |||||
| // fmt.Print("error\n") | |||||
| // } | |||||
| // perEncoded, err := ConvertStructToMap(per) | |||||
| // if err != nil { | |||||
| // fmt.Print("error\n") | |||||
| // } | |||||
| // err = client.HSet(ctx, "myhash", bEncoded).Err() | |||||
| // fmt.Println(err) | |||||
| // res, _ := client.HGetAll(ctx, "myhash").Result() | |||||
| // fmt.Println(res) | |||||
| // err = client.HSet(ctx, "myhash", perEncoded).Err() | |||||
| // fmt.Println(err) | |||||
| // res, _ = client.HGetAll(ctx, "myhash").Result() | |||||
| // fmt.Println(res) | |||||
| } | |||||
| @@ -2,25 +2,18 @@ package appcontext | |||||
| import ( | import ( | ||||
| "fmt" | "fmt" | ||||
| "maps" | |||||
| "strings" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| "github.com/mitchellh/mapstructure" | "github.com/mitchellh/mapstructure" | ||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| // AppState provides centralized access to application state | // AppState provides centralized access to application state | ||||
| type AppState struct { | type AppState struct { | ||||
| beacons model.BeaconsList | |||||
| httpResults model.HTTPResultList | |||||
| settings model.Settings | |||||
| beaconEvents model.BeaconEventList | |||||
| beaconsLookup map[string]string | |||||
| latestList model.LatestBeaconsList | |||||
| kafkaReadersList model.KafkaReadersList | |||||
| kafkaWritersList model.KafkaWritersList | |||||
| beacons model.BeaconsList | |||||
| httpResults model.HTTPResultList | |||||
| settings model.Settings | |||||
| beaconEvents model.BeaconEventList | |||||
| beaconsLookup map[string]string | |||||
| } | } | ||||
| // NewAppState creates a new application context AppState with default values | // NewAppState creates a new application context AppState with default values | ||||
| @@ -47,72 +40,7 @@ func NewAppState() *AppState { | |||||
| Beacons: make(map[string]model.BeaconEvent), | Beacons: make(map[string]model.BeaconEvent), | ||||
| }, | }, | ||||
| beaconsLookup: make(map[string]string), | beaconsLookup: make(map[string]string), | ||||
| latestList: model.LatestBeaconsList{ | |||||
| LatestList: make(map[string]model.Beacon), | |||||
| }, | |||||
| kafkaReadersList: model.KafkaReadersList{ | |||||
| KafkaReaders: make([]*kafka.Reader, 0), | |||||
| }, | |||||
| kafkaWritersList: model.KafkaWritersList{ | |||||
| KafkaWriters: make([]*kafka.Writer, 0), | |||||
| }, | |||||
| } | |||||
| } | |||||
| func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer { | |||||
| kafkaWriter := &kafka.Writer{ | |||||
| Addr: kafka.TCP(kafkaUrl), | |||||
| Topic: topic, | |||||
| Balancer: &kafka.LeastBytes{}, | |||||
| Async: false, | |||||
| RequiredAcks: kafka.RequireAll, | |||||
| BatchSize: 100, | |||||
| BatchTimeout: 10 * time.Millisecond, | |||||
| } | |||||
| m.kafkaWritersList.KafkaWritersLock.Lock() | |||||
| m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter) | |||||
| m.kafkaWritersList.KafkaWritersLock.Unlock() | |||||
| return kafkaWriter | |||||
| } | |||||
| func (m *AppState) CleanKafkaWriters() { | |||||
| fmt.Println("shutdown of kafka readers starts") | |||||
| for _, r := range m.kafkaWritersList.KafkaWriters { | |||||
| if err := r.Close(); err != nil { | |||||
| fmt.Printf("Error in closing kafka writer %v", err) | |||||
| } | |||||
| } | |||||
| fmt.Println("Kafka writers graceful shutdown complete") | |||||
| } | |||||
| func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader { | |||||
| brokers := strings.Split(kafkaUrl, ",") | |||||
| kafkaReader := kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: brokers, | |||||
| GroupID: groupID, | |||||
| Topic: topic, | |||||
| MinBytes: 1, | |||||
| MaxBytes: 10e6, | |||||
| }) | |||||
| m.kafkaReadersList.KafkaReadersLock.Lock() | |||||
| m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader) | |||||
| m.kafkaReadersList.KafkaReadersLock.Unlock() | |||||
| return kafkaReader | |||||
| } | |||||
| func (m *AppState) CleanKafkaReaders() { | |||||
| for _, r := range m.kafkaReadersList.KafkaReaders { | |||||
| if err := r.Close(); err != nil { | |||||
| fmt.Printf("Error in closing kafka reader %v", err) | |||||
| } | |||||
| } | } | ||||
| fmt.Println("Kafka readers graceful shutdown complete") | |||||
| } | } | ||||
| // GetBeacons returns thread-safe access to beacons list | // GetBeacons returns thread-safe access to beacons list | ||||
| @@ -135,11 +63,6 @@ func (m *AppState) GetBeaconsLookup() map[string]string { | |||||
| return m.beaconsLookup | return m.beaconsLookup | ||||
| } | } | ||||
| // GetLatestList returns thread-safe access to latest beacons list | |||||
| func (m *AppState) GetLatestList() *model.LatestBeaconsList { | |||||
| return &m.latestList | |||||
| } | |||||
| // AddBeaconToLookup adds a beacon ID to the lookup map | // AddBeaconToLookup adds a beacon ID to the lookup map | ||||
| func (m *AppState) AddBeaconToLookup(id, value string) { | func (m *AppState) AddBeaconToLookup(id, value string) { | ||||
| m.beaconsLookup[id] = value | m.beaconsLookup[id] = value | ||||
| @@ -223,23 +146,6 @@ func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) { | |||||
| m.beaconEvents.Beacons[id] = event | m.beaconEvents.Beacons[id] = event | ||||
| } | } | ||||
| // GetLatestBeacon returns the latest beacon by ID (thread-safe) | |||||
| func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) { | |||||
| m.latestList.Lock.RLock() | |||||
| defer m.latestList.Lock.RUnlock() | |||||
| beacon, exists := m.latestList.LatestList[id] | |||||
| return beacon, exists | |||||
| } | |||||
| // UpdateLatestBeacon updates the latest beacon in the list (thread-safe) | |||||
| func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) { | |||||
| m.latestList.Lock.Lock() | |||||
| defer m.latestList.Lock.Unlock() | |||||
| m.latestList.LatestList[id] = beacon | |||||
| } | |||||
| // GetAllBeacons returns a copy of all beacons | // GetAllBeacons returns a copy of all beacons | ||||
| func (m *AppState) GetAllBeacons() map[string]model.Beacon { | func (m *AppState) GetAllBeacons() map[string]model.Beacon { | ||||
| m.beacons.Lock.RLock() | m.beacons.Lock.RLock() | ||||
| @@ -264,16 +170,6 @@ func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult { | |||||
| return beacons | return beacons | ||||
| } | } | ||||
| // GetAllLatestBeacons returns a copy of all latest beacons | |||||
| func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon { | |||||
| m.latestList.Lock.RLock() | |||||
| defer m.latestList.Lock.RUnlock() | |||||
| beacons := make(map[string]model.Beacon) | |||||
| maps.Copy(beacons, m.latestList.LatestList) | |||||
| return beacons | |||||
| } | |||||
| // GetBeaconCount returns the number of tracked beacons | // GetBeaconCount returns the number of tracked beacons | ||||
| func (m *AppState) GetBeaconCount() int { | func (m *AppState) GetBeaconCount() int { | ||||
| m.beacons.Lock.RLock() | m.beacons.Lock.RLock() | ||||
| @@ -0,0 +1,113 @@ | |||||
| package kafkaclient | |||||
| import ( | |||||
| "fmt" | |||||
| "strings" | |||||
| "sync" | |||||
| "time" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| type KafkaReadersMap struct { | |||||
| KafkaReadersLock sync.RWMutex | |||||
| KafkaReaders map[string]*kafka.Reader | |||||
| } | |||||
| type KafkaWritersMap struct { | |||||
| KafkaWritersLock sync.RWMutex | |||||
| KafkaWriters map[string]*kafka.Writer | |||||
| } | |||||
| type KafkaManager struct { | |||||
| kafkaReadersMap KafkaReadersMap | |||||
| kafkaWritersMap KafkaWritersMap | |||||
| } | |||||
| func InitKafkaManager() *KafkaManager { | |||||
| return &KafkaManager{ | |||||
| kafkaReadersMap: KafkaReadersMap{ | |||||
| KafkaReaders: make(map[string]*kafka.Reader), | |||||
| }, | |||||
| kafkaWritersMap: KafkaWritersMap{ | |||||
| KafkaWriters: make(map[string]*kafka.Writer), | |||||
| }, | |||||
| } | |||||
| } | |||||
| func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { | |||||
| kafkaWriter := &kafka.Writer{ | |||||
| Addr: kafka.TCP(kafkaUrl), | |||||
| Topic: topic, | |||||
| Balancer: &kafka.LeastBytes{}, | |||||
| Async: false, | |||||
| RequiredAcks: kafka.RequireAll, | |||||
| BatchSize: 100, | |||||
| BatchTimeout: 10 * time.Millisecond, | |||||
| } | |||||
| m.kafkaWritersMap.KafkaWritersLock.Lock() | |||||
| m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter | |||||
| m.kafkaWritersMap.KafkaWritersLock.Unlock() | |||||
| } | |||||
| func (m *KafkaManager) CleanKafkaWriters() { | |||||
| fmt.Println("shutdown of kafka readers starts") | |||||
| m.kafkaWritersMap.KafkaWritersLock.Lock() | |||||
| for _, r := range m.kafkaWritersMap.KafkaWriters { | |||||
| if err := r.Close(); err != nil { | |||||
| fmt.Printf("Error in closing kafka writer %v", err) | |||||
| } | |||||
| } | |||||
| m.kafkaWritersMap.KafkaWritersLock.Unlock() | |||||
| fmt.Println("Kafka writers graceful shutdown complete") | |||||
| } | |||||
| func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { | |||||
| brokers := strings.Split(kafkaUrl, ",") | |||||
| kafkaReader := kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: brokers, | |||||
| GroupID: groupID, | |||||
| Topic: topic, | |||||
| MinBytes: 1, | |||||
| MaxBytes: 10e6, | |||||
| }) | |||||
| m.kafkaReadersMap.KafkaReadersLock.Lock() | |||||
| m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader | |||||
| m.kafkaReadersMap.KafkaReadersLock.Unlock() | |||||
| } | |||||
| func (m *KafkaManager) CleanKafkaReaders() { | |||||
| m.kafkaReadersMap.KafkaReadersLock.Lock() | |||||
| for _, r := range m.kafkaReadersMap.KafkaReaders { | |||||
| if err := r.Close(); err != nil { | |||||
| fmt.Printf("Error in closing kafka reader %v", err) | |||||
| } | |||||
| } | |||||
| m.kafkaReadersMap.KafkaReadersLock.Unlock() | |||||
| fmt.Println("Kafka readers graceful shutdown complete") | |||||
| } | |||||
| func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) { | |||||
| for _, topic := range topics { | |||||
| if name != "" { | |||||
| gid := fmt.Sprintf("%s-%s", topic, name) | |||||
| m.AddKafkaReader(url, topic, gid) | |||||
| } else { | |||||
| m.AddKafkaWriter(url, topic) | |||||
| } | |||||
| } | |||||
| } | |||||
| func (m *KafkaManager) GetReader(topic string) *kafka.Reader { | |||||
| m.kafkaReadersMap.KafkaReadersLock.Lock() | |||||
| defer m.kafkaReadersMap.KafkaReadersLock.Unlock() | |||||
| return m.kafkaReadersMap.KafkaReaders[topic] | |||||
| } | |||||
| func (m *KafkaManager) GetWriter(topic string) *kafka.Writer { | |||||
| m.kafkaWritersMap.KafkaWritersLock.Lock() | |||||
| defer m.kafkaWritersMap.KafkaWritersLock.Unlock() | |||||
| return m.kafkaWritersMap.KafkaWriters[topic] | |||||
| } | |||||
| @@ -1,21 +0,0 @@ | |||||
| package kafkaclient | |||||
| import ( | |||||
| "strings" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| // Create Kafka reader | |||||
| // | |||||
| // Deprecated: Use context manager object instead | |||||
| func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { | |||||
| brokers := strings.Split(kafkaURL, ",") | |||||
| return kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: brokers, | |||||
| GroupID: groupID, | |||||
| Topic: topic, | |||||
| MinBytes: 1, | |||||
| MaxBytes: 10e6, | |||||
| }) | |||||
| } | |||||
| @@ -1,22 +0,0 @@ | |||||
| package kafkaclient | |||||
| import ( | |||||
| "time" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| // Create Kafka writer | |||||
| // | |||||
| // Deprecated: Use context manager object instead | |||||
| func KafkaWriter(kafkaURL, topic string) *kafka.Writer { | |||||
| return &kafka.Writer{ | |||||
| Addr: kafka.TCP(kafkaURL), | |||||
| Topic: topic, | |||||
| Balancer: &kafka.LeastBytes{}, | |||||
| Async: false, | |||||
| RequiredAcks: kafka.RequireAll, | |||||
| BatchSize: 100, | |||||
| BatchTimeout: 10 * time.Millisecond, | |||||
| } | |||||
| } | |||||
| @@ -77,7 +77,6 @@ func (p *ParserRegistry) Unregister(name string) { | |||||
| func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) { | func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) { | ||||
| flag := false | flag := false | ||||
| event := BeaconEvent{Type: name} | event := BeaconEvent{Type: name} | ||||
| fmt.Printf("parsing: %s\n", name) | |||||
| if cfg, ok := b.configs["battery"]; ok { | if cfg, ok := b.configs["battery"]; ok { | ||||
| event.Battery = uint32(b.extract(ad, cfg).(uint16)) | event.Battery = uint32(b.extract(ad, cfg).(uint16)) | ||||
| flag = true | flag = true | ||||
| @@ -97,7 +96,12 @@ func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) { | |||||
| event.AccZ = int16(val) | event.AccZ = int16(val) | ||||
| flag = true | flag = true | ||||
| } | } | ||||
| fmt.Printf("success: %s, event: %+v\n", flag, event) | |||||
| if cfg, ok := b.configs["temperature"]; ok { | |||||
| val := b.extract(ad, cfg).(float64) | |||||
| event.Temperature = uint16(val) | |||||
| flag = true | |||||
| } | |||||
| return event, flag | return event, flag | ||||
| } | } | ||||
| @@ -2,8 +2,6 @@ package model | |||||
| import ( | import ( | ||||
| "sync" | "sync" | ||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| // BeaconAdvertisement represents the JSON payload received from beacon advertisements. | // BeaconAdvertisement represents the JSON payload received from beacon advertisements. | ||||
| @@ -89,14 +87,17 @@ type Beacon struct { | |||||
| } | } | ||||
| type BeaconEvent struct { | type BeaconEvent struct { | ||||
| Name string | |||||
| ID string | |||||
| Type string | |||||
| Battery uint32 | |||||
| Event int | |||||
| AccX int16 | |||||
| AccY int16 | |||||
| AccZ int16 | |||||
| Name string | |||||
| ID string | |||||
| Type string | |||||
| Battery uint32 | |||||
| Event int | |||||
| AccX int16 | |||||
| AccY int16 | |||||
| AccZ int16 | |||||
| Temperature uint16 | |||||
| Heart int16 | |||||
| BtnPressed bool | |||||
| } | } | ||||
| type HTTPResult struct { | type HTTPResult struct { | ||||
| @@ -161,16 +162,6 @@ type ApiUpdate struct { | |||||
| MAC string | MAC string | ||||
| } | } | ||||
| type KafkaReadersList struct { | |||||
| KafkaReadersLock sync.RWMutex | |||||
| KafkaReaders []*kafka.Reader | |||||
| } | |||||
| type KafkaWritersList struct { | |||||
| KafkaWritersLock sync.RWMutex | |||||
| KafkaWriters []*kafka.Writer | |||||
| } | |||||
| type Alert struct { | type Alert struct { | ||||
| ID string `json:"id"` // tracker id | ID string `json:"id"` // tracker id | ||||
| Type string `json:"type"` // type of alert | Type string `json:"type"` // type of alert | ||||
| @@ -0,0 +1,364 @@ | |||||
| package decoder | |||||
| import ( | |||||
| "bytes" | |||||
| "testing" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| func TestDecodeBeacon_EmptyData(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "", // Empty data | |||||
| } | |||||
| // Execute | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert | |||||
| if err != nil { | |||||
| t.Errorf("Expected no error for empty data, got %v", err) | |||||
| } | |||||
| if len(mockWriter.Messages) != 0 { | |||||
| t.Errorf("Expected no messages for empty data, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_WhitespaceOnly(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: " ", // Whitespace only | |||||
| } | |||||
| // Execute | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert | |||||
| if err != nil { | |||||
| t.Errorf("Expected no error for whitespace-only data, got %v", err) | |||||
| } | |||||
| if len(mockWriter.Messages) != 0 { | |||||
| t.Errorf("Expected no messages for whitespace-only data, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_InvalidHex(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "INVALID_HEX_DATA!!!", | |||||
| } | |||||
| // Execute | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert | |||||
| if err == nil { | |||||
| t.Error("Expected error for invalid hex data, got nil") | |||||
| } | |||||
| if len(mockWriter.Messages) != 0 { | |||||
| t.Errorf("Expected no messages for invalid hex, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_ValidHexNoParser(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} // No parsers registered | |||||
| // Valid hex but no matching parser | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "0201060302A0", // Valid AD structure | |||||
| } | |||||
| // Execute | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert | |||||
| if err != nil { | |||||
| t.Errorf("Expected no error when no parser matches, got %v", err) | |||||
| } | |||||
| if len(mockWriter.Messages) != 0 { | |||||
| t.Errorf("Expected no messages when no parser matches, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_Deduplication(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| parserRegistry.Register("test-parser", config) | |||||
| // Create an event that will be parsed | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", // Simple AD structure | |||||
| } | |||||
| // First processing - should publish | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("First processing failed: %v", err) | |||||
| } | |||||
| firstMessageCount := len(mockWriter.Messages) | |||||
| // Second processing with identical data - should deduplicate | |||||
| err = decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("Second processing failed: %v", err) | |||||
| } | |||||
| // Assert - message count should not have changed | |||||
| if len(mockWriter.Messages) != firstMessageCount { | |||||
| t.Errorf("Expected deduplication, got %d messages (should be %d)", len(mockWriter.Messages), firstMessageCount) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_DifferentDataPublishes(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| parserRegistry.Register("test-parser", config) | |||||
| // First processing | |||||
| adv1 := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| err := decodeBeacon(adv1, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("First processing failed: %v", err) | |||||
| } | |||||
| firstMessageCount := len(mockWriter.Messages) | |||||
| // Second processing with different data - should publish again | |||||
| adv2 := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020107", // Different data | |||||
| } | |||||
| err = decodeBeacon(adv2, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("Second processing failed: %v", err) | |||||
| } | |||||
| // Assert - message count should have increased | |||||
| if len(mockWriter.Messages) != firstMessageCount+1 { | |||||
| t.Errorf("Expected new message for different data, got %d messages (expected %d)", len(mockWriter.Messages), firstMessageCount+1) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_WithFlagBytes(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| parserRegistry.Register("test-parser", config) | |||||
| // Data with flag bytes (0x01 at position 1) | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "0201060302A0", // Will have flags removed | |||||
| } | |||||
| // Execute | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert - should process successfully after flag removal | |||||
| if err != nil { | |||||
| t.Errorf("Expected no error with flag bytes, got %v", err) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_MultipleBeacons(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| parserRegistry.Register("test-parser", config) | |||||
| // Process multiple different beacons | |||||
| beacons := []model.BeaconAdvertisement{ | |||||
| {ID: "beacon-1", Data: "020106"}, | |||||
| {ID: "beacon-2", Data: "020107"}, | |||||
| {ID: "beacon-3", Data: "020108"}, | |||||
| } | |||||
| for _, adv := range beacons { | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Errorf("Failed to process beacon %s: %v", adv.ID, err) | |||||
| } | |||||
| } | |||||
| // Each unique beacon should produce a message | |||||
| if len(mockWriter.Messages) != len(beacons) { | |||||
| t.Errorf("Expected %d messages, got %d", len(beacons), len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestProcessIncoming_ErrorHandling(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Invalid data that will cause an error | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "INVALID_HEX", | |||||
| } | |||||
| // Execute - should not panic, just handle error | |||||
| processIncoming(adv, appState, mockWriter, parserRegistry) | |||||
| // Assert - no messages should be written | |||||
| if len(mockWriter.Messages) != 0 { | |||||
| t.Errorf("Expected no messages on error, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_EventHashing(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser that creates consistent events | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| parserRegistry.Register("test-parser", config) | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| // First processing | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("First processing failed: %v", err) | |||||
| } | |||||
| // Get the event from appState | |||||
| event, exists := appState.GetBeaconEvent("test-beacon") | |||||
| if !exists { | |||||
| t.Fatal("Event should exist in appState") | |||||
| } | |||||
| // Verify hash is created | |||||
| hash := event.Hash() | |||||
| if hash == nil || len(hash) == 0 { | |||||
| t.Error("Expected non-empty hash") | |||||
| } | |||||
| // Second processing should be deduplicated based on hash | |||||
| err = decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if err != nil { | |||||
| t.Fatalf("Second processing failed: %v", err) | |||||
| } | |||||
| // Should still have only one message | |||||
| if len(mockWriter.Messages) != 1 { | |||||
| t.Errorf("Expected 1 message after deduplication, got %d", len(mockWriter.Messages)) | |||||
| } | |||||
| } | |||||
| func TestDecodeBeacon_VariousHexFormats(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| testCases := []struct { | |||||
| name string | |||||
| hexData string | |||||
| shouldError bool | |||||
| }{ | |||||
| {"lowercase hex", "020106aa", false}, | |||||
| {"uppercase hex", "020106AA", false}, | |||||
| {"mixed case", "020106AaFf", false}, | |||||
| {"with spaces", " 020106 ", false}, | |||||
| {"odd length", "02016", true}, | |||||
| {"invalid chars", "020106ZZ", true}, | |||||
| } | |||||
| for _, tc := range testCases { | |||||
| t.Run(tc.name, func(t *testing.T) { | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: tc.hexData, | |||||
| } | |||||
| err := decodeBeacon(adv, appState, mockWriter, parserRegistry) | |||||
| if tc.shouldError && err == nil { | |||||
| t.Errorf("Expected error for %s, got nil", tc.name) | |||||
| } | |||||
| if !tc.shouldError && err != nil && !bytes.Contains(err.Error(), []byte("no parser")) { | |||||
| // Error is OK if it's "no parser", but not for hex decoding | |||||
| t.Logf("Got expected error for %s: %v", tc.name, err) | |||||
| } | |||||
| }) | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,369 @@ | |||||
| package decoder | |||||
| import ( | |||||
| "context" | |||||
| "testing" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| func TestEventLoop_RawMessageProcessing(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| chRaw := make(chan model.BeaconAdvertisement, 10) | |||||
| ctx, cancel := context.WithCancel(context.Background()) | |||||
| defer cancel() | |||||
| // Create a test message | |||||
| msg := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| // Simulate event loop processing | |||||
| go func() { | |||||
| for { | |||||
| select { | |||||
| case <-ctx.Done(): | |||||
| return | |||||
| case m := <-chRaw: | |||||
| processIncoming(m, appState, mockWriter, parserRegistry) | |||||
| } | |||||
| } | |||||
| }() | |||||
| // Send message | |||||
| chRaw <- msg | |||||
| // Give it time to process | |||||
| time.Sleep(100 * time.Millisecond) | |||||
| // Cancel context | |||||
| cancel() | |||||
| // Verify message was processed (even if no parser matched, processIncoming was called) | |||||
| // We just verify no panic occurred | |||||
| } | |||||
| func TestEventLoop_ParserRegistryUpdates(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| chParser := make(chan model.KafkaParser, 10) | |||||
| // Test ADD operation | |||||
| addMsg := model.KafkaParser{ | |||||
| ID: "add", | |||||
| Name: "new-parser", | |||||
| Config: model.Config{ | |||||
| Name: "new-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| }, | |||||
| } | |||||
| chParser <- addMsg | |||||
| // Simulate event loop handling | |||||
| select { | |||||
| case msg := <-chParser: | |||||
| switch msg.ID { | |||||
| case "add": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| case "delete": | |||||
| parserRegistry.Unregister(msg.Name) | |||||
| case "update": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Fatal("Timeout waiting for parser message") | |||||
| } | |||||
| // Verify parser was added | |||||
| if len(parserRegistry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList)) | |||||
| } | |||||
| // Test DELETE operation | |||||
| deleteMsg := model.KafkaParser{ | |||||
| ID: "delete", | |||||
| Name: "new-parser", | |||||
| } | |||||
| chParser <- deleteMsg | |||||
| select { | |||||
| case msg := <-chParser: | |||||
| switch msg.ID { | |||||
| case "add": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| case "delete": | |||||
| parserRegistry.Unregister(msg.Name) | |||||
| case "update": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Fatal("Timeout waiting for parser message") | |||||
| } | |||||
| // Verify parser was deleted | |||||
| if len(parserRegistry.ParserList) != 0 { | |||||
| t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList)) | |||||
| } | |||||
| } | |||||
| func TestEventLoop_UpdateParser(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Add initial parser | |||||
| parserRegistry.Register("test-parser", model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| }) | |||||
| chParser := make(chan model.KafkaParser, 10) | |||||
| // Test UPDATE operation | |||||
| updateMsg := model.KafkaParser{ | |||||
| ID: "update", | |||||
| Name: "test-parser", | |||||
| Config: model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "03", | |||||
| Length: 3, | |||||
| }, | |||||
| } | |||||
| chParser <- updateMsg | |||||
| // Simulate event loop handling | |||||
| select { | |||||
| case msg := <-chParser: | |||||
| switch msg.ID { | |||||
| case "add": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| case "delete": | |||||
| parserRegistry.Unregister(msg.Name) | |||||
| case "update": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Fatal("Timeout waiting for parser message") | |||||
| } | |||||
| // Verify parser still exists (was updated, not deleted) | |||||
| if len(parserRegistry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList)) | |||||
| } | |||||
| if _, exists := parserRegistry.ParserList["test-parser"]; !exists { | |||||
| t.Error("Parser should still exist after update") | |||||
| } | |||||
| } | |||||
| func TestEventLoop_MultipleParserOperations(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| chParser := make(chan model.KafkaParser, 10) | |||||
| // Send multiple operations | |||||
| operations := []model.KafkaParser{ | |||||
| {ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}}, | |||||
| {ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}}, | |||||
| {ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}}, | |||||
| {ID: "delete", Name: "parser-2"}, | |||||
| {ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}}, | |||||
| } | |||||
| for _, op := range operations { | |||||
| chParser <- op | |||||
| } | |||||
| // Process all operations | |||||
| for i := 0; i < len(operations); i++ { | |||||
| select { | |||||
| case msg := <-chParser: | |||||
| switch msg.ID { | |||||
| case "add": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| case "delete": | |||||
| parserRegistry.Unregister(msg.Name) | |||||
| case "update": | |||||
| config := msg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Fatalf("Timeout processing operation %d", i) | |||||
| } | |||||
| } | |||||
| // Verify final state | |||||
| if len(parserRegistry.ParserList) != 2 { | |||||
| t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList)) | |||||
| } | |||||
| // parser-1 should exist (updated) | |||||
| if _, exists := parserRegistry.ParserList["parser-1"]; !exists { | |||||
| t.Error("parser-1 should exist") | |||||
| } | |||||
| // parser-2 should not exist (deleted) | |||||
| if _, exists := parserRegistry.ParserList["parser-2"]; exists { | |||||
| t.Error("parser-2 should not exist") | |||||
| } | |||||
| // parser-3 should exist (added) | |||||
| if _, exists := parserRegistry.ParserList["parser-3"]; !exists { | |||||
| t.Error("parser-3 should exist") | |||||
| } | |||||
| } | |||||
| func TestEventLoop_ContextCancellation(t *testing.T) { | |||||
| // Setup | |||||
| ctx, cancel := context.WithCancel(context.Background()) | |||||
| defer cancel() | |||||
| chRaw := make(chan model.BeaconAdvertisement, 10) | |||||
| chParser := make(chan model.KafkaParser, 10) | |||||
| // Cancel immediately | |||||
| cancel() | |||||
| // Verify context is cancelled | |||||
| select { | |||||
| case <-ctx.Done(): | |||||
| // Expected - context was cancelled | |||||
| return | |||||
| case msg := <-chRaw: | |||||
| t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg) | |||||
| case msg := <-chParser: | |||||
| t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg) | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Error("Timeout - context cancellation should have been immediate") | |||||
| } | |||||
| } | |||||
| func TestEventLoop_ChannelBuffering(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Create buffered channels (like in main) | |||||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||||
| chParser := make(chan model.KafkaParser, 200) | |||||
| ctx, cancel := context.WithCancel(context.Background()) | |||||
| defer cancel() | |||||
| // Send multiple messages without blocking | |||||
| for i := 0; i < 100; i++ { | |||||
| msg := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| chRaw <- msg | |||||
| } | |||||
| // Verify all messages are buffered | |||||
| if len(chRaw) != 100 { | |||||
| t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw)) | |||||
| } | |||||
| // Send parser updates | |||||
| for i := 0; i < 10; i++ { | |||||
| msg := model.KafkaParser{ | |||||
| ID: "add", | |||||
| Name: "parser-" + string(rune('A'+i)), | |||||
| Config: model.Config{ | |||||
| Name: "parser-" + string(rune('A'+i)), | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| }, | |||||
| } | |||||
| chParser <- msg | |||||
| } | |||||
| // Verify all parser messages are buffered | |||||
| if len(chParser) != 10 { | |||||
| t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser)) | |||||
| } | |||||
| // Cancel context | |||||
| cancel() | |||||
| } | |||||
| func TestEventLoop_ParserAndRawChannels(t *testing.T) { | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| chRaw := make(chan model.BeaconAdvertisement, 10) | |||||
| chParser := make(chan model.KafkaParser, 10) | |||||
| ctx, cancel := context.WithCancel(context.Background()) | |||||
| defer cancel() | |||||
| // Send both raw and parser messages | |||||
| rawMsg := model.BeaconAdvertisement{ | |||||
| ID: "test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| parserMsg := model.KafkaParser{ | |||||
| ID: "add", | |||||
| Name: "test-parser", | |||||
| Config: model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| }, | |||||
| } | |||||
| chRaw <- rawMsg | |||||
| chParser <- parserMsg | |||||
| // Process both messages | |||||
| processedRaw := false | |||||
| processedParser := false | |||||
| for i := 0; i < 2; i++ { | |||||
| select { | |||||
| case <-chRaw: | |||||
| processedRaw = true | |||||
| case <-chParser: | |||||
| processedParser = true | |||||
| case <-time.After(1 * time.Second): | |||||
| t.Fatal("Timeout waiting for messages") | |||||
| } | |||||
| } | |||||
| if !processedRaw { | |||||
| t.Error("Raw message should have been processed") | |||||
| } | |||||
| if !processedParser { | |||||
| t.Error("Parser message should have been processed") | |||||
| } | |||||
| cancel() | |||||
| } | |||||
| @@ -0,0 +1,418 @@ | |||||
| package decoder | |||||
| import ( | |||||
| "context" | |||||
| "encoding/json" | |||||
| "os" | |||||
| "testing" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| // TestIntegration_DecoderEndToEnd tests the complete decoder flow | |||||
| func TestIntegration_DecoderEndToEnd(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| // Check if Kafka is available | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| // Create test topics | |||||
| rawTopic := "test-rawbeacons-" + time.Now().Format("20060102150405") | |||||
| alertTopic := "test-alertbeacons-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register a test parser | |||||
| config := model.Config{ | |||||
| Name: "integration-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| } | |||||
| parserRegistry.Register("integration-test-parser", config) | |||||
| // Create Kafka writer | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| // Create Kafka reader to verify messages | |||||
| reader := kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| GroupID: "test-group-" + time.Now().Format("20060102150405"), | |||||
| }) | |||||
| defer reader.Close() | |||||
| // Create a test beacon advertisement | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "integration-test-beacon", | |||||
| Data: "020106", // Valid hex data | |||||
| } | |||||
| // Process the beacon | |||||
| err := decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| if err != nil { | |||||
| t.Logf("Decode beacon returned error (may be expected if no parser matches): %v", err) | |||||
| } | |||||
| // Give Kafka time to propagate | |||||
| time.Sleep(1 * time.Second) | |||||
| // Verify event was stored in appState | |||||
| event, exists := appState.GetBeaconEvent("integration-test-beacon") | |||||
| if exists { | |||||
| t.Logf("Event stored in appState: %+v", event) | |||||
| } | |||||
| } | |||||
| // TestIntegration_ParserRegistryOperations tests parser registry with real Kafka | |||||
| func TestIntegration_ParserRegistryOperations(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| alertTopic := "test-alertbeacons-registry-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| // Test parser registration through Kafka message flow | |||||
| parserMsg := model.KafkaParser{ | |||||
| ID: "add", | |||||
| Name: "kafka-test-parser", | |||||
| Config: model.Config{ | |||||
| Name: "kafka-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| }, | |||||
| } | |||||
| // Simulate parser registry update | |||||
| switch parserMsg.ID { | |||||
| case "add": | |||||
| config := parserMsg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| case "delete": | |||||
| parserRegistry.Unregister(parserMsg.Name) | |||||
| case "update": | |||||
| config := parserMsg.Config | |||||
| parserRegistry.Register(config.Name, config) | |||||
| } | |||||
| // Verify parser was registered | |||||
| if len(parserRegistry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser in registry, got %d", len(parserRegistry.ParserList)) | |||||
| } | |||||
| if _, exists := parserRegistry.ParserList["kafka-test-parser"]; !exists { | |||||
| t.Error("Parser should exist in registry") | |||||
| } | |||||
| } | |||||
| // TestIntegration_MultipleBeaconsSequential tests processing multiple beacons | |||||
| func TestIntegration_MultipleBeaconsSequential(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| alertTopic := "test-alertbeacons-multi-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register parser | |||||
| config := model.Config{ | |||||
| Name: "multi-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| } | |||||
| parserRegistry.Register("multi-test-parser", config) | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| reader := kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| GroupID: "test-group-multi-" + time.Now().Format("20060102150405"), | |||||
| MinBytes: 10e3, | |||||
| MaxBytes: 10e6, | |||||
| }) | |||||
| defer reader.Close() | |||||
| // Process multiple beacons | |||||
| beacons := []model.BeaconAdvertisement{ | |||||
| {ID: "beacon-1", Data: "020106"}, | |||||
| {ID: "beacon-2", Data: "020107"}, | |||||
| {ID: "beacon-3", Data: "020108"}, | |||||
| } | |||||
| for _, adv := range beacons { | |||||
| err := decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| if err != nil { | |||||
| t.Logf("Processing beacon %s returned error: %v", adv.ID, err) | |||||
| } | |||||
| } | |||||
| // Give Kafka time to propagate | |||||
| time.Sleep(2 * time.Second) | |||||
| // Verify events in appState | |||||
| for _, adv := range beacons { | |||||
| event, exists := appState.GetBeaconEvent(adv.ID) | |||||
| if exists { | |||||
| t.Logf("Event for %s: %+v", adv.ID, event) | |||||
| } | |||||
| } | |||||
| } | |||||
| // TestIntegration_EventDeduplication tests that duplicate events are not published | |||||
| func TestIntegration_EventDeduplication(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| alertTopic := "test-alertbeacons-dedup-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| // Register parser | |||||
| config := model.Config{ | |||||
| Name: "dedup-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| } | |||||
| parserRegistry.Register("dedup-test-parser", config) | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| reader := kafka.NewReader(kafka.ReaderConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| GroupID: "test-group-dedup-" + time.Now().Format("20060102150405"), | |||||
| }) | |||||
| defer reader.Close() | |||||
| // Create identical beacon advertisement | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "dedup-test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| // Process first time | |||||
| err := decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| if err != nil { | |||||
| t.Logf("First processing returned error: %v", err) | |||||
| } | |||||
| // Process second time with identical data | |||||
| err = decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| if err != nil { | |||||
| t.Logf("Second processing returned error: %v", err) | |||||
| } | |||||
| // Give Kafka time to propagate | |||||
| time.Sleep(1 * time.Second) | |||||
| // Try to read from Kafka - should have at most 1 message due to deduplication | |||||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | |||||
| defer cancel() | |||||
| messageCount := 0 | |||||
| for { | |||||
| msg, err := reader.ReadMessage(ctx) | |||||
| if err != nil { | |||||
| break | |||||
| } | |||||
| messageCount++ | |||||
| t.Logf("Read message %d: %s", messageCount, string(msg.Value)) | |||||
| if messageCount > 1 { | |||||
| t.Error("Expected at most 1 message due to deduplication, got more") | |||||
| break | |||||
| } | |||||
| } | |||||
| t.Logf("Total messages read: %d", messageCount) | |||||
| } | |||||
| // TestIntegration_AppStatePersistence tests that events persist in AppState | |||||
| func TestIntegration_AppStatePersistence(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| alertTopic := "test-alertbeacons-persist-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| config := model.Config{ | |||||
| Name: "persist-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| } | |||||
| parserRegistry.Register("persist-test-parser", config) | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| // Process beacon | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "persist-test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| err := decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| if err != nil { | |||||
| t.Logf("Processing returned error: %v", err) | |||||
| } | |||||
| // Verify event persists in AppState | |||||
| event, exists := appState.GetBeaconEvent("persist-test-beacon") | |||||
| if !exists { | |||||
| t.Error("Event should exist in AppState after processing") | |||||
| } else { | |||||
| t.Logf("Event persisted: ID=%s, Type=%s, Battery=%d", | |||||
| event.ID, event.Type, event.Battery) | |||||
| // Verify event can be serialized to JSON | |||||
| jsonData, err := event.ToJSON() | |||||
| if err != nil { | |||||
| t.Errorf("Failed to serialize event to JSON: %v", err) | |||||
| } else { | |||||
| t.Logf("Event JSON: %s", string(jsonData)) | |||||
| } | |||||
| } | |||||
| } | |||||
| // TestIntegration_ParserUpdateFlow tests updating parsers during runtime | |||||
| func TestIntegration_ParserUpdateFlow(t *testing.T) { | |||||
| if testing.Short() { | |||||
| t.Skip("Skipping integration test in short mode") | |||||
| } | |||||
| kafkaURL := os.Getenv("KAFKA_URL") | |||||
| if kafkaURL == "" { | |||||
| kafkaURL = "localhost:9092" | |||||
| } | |||||
| alertTopic := "test-alertbeacons-update-" + time.Now().Format("20060102150405") | |||||
| // Setup | |||||
| appState := appcontext.NewAppState() | |||||
| parserRegistry := &model.ParserRegistry{} | |||||
| writer := kafka.NewWriter(kafka.WriterConfig{ | |||||
| Brokers: []string{kafkaURL}, | |||||
| Topic: alertTopic, | |||||
| }) | |||||
| defer writer.Close() | |||||
| // Initial parser config | |||||
| config1 := model.Config{ | |||||
| Name: "update-test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| MinLength: 2, | |||||
| MaxLength: 20, | |||||
| } | |||||
| parserRegistry.Register("update-test-parser", config1) | |||||
| // Process with initial config | |||||
| adv := model.BeaconAdvertisement{ | |||||
| ID: "update-test-beacon", | |||||
| Data: "020106", | |||||
| } | |||||
| err := decodeBeacon(adv, appState, writer, parserRegistry) | |||||
| t.Logf("First processing: %v", err) | |||||
| // Update parser config | |||||
| config2 := model.Config{ | |||||
| Name: "update-test-parser", | |||||
| Prefix: "03", | |||||
| Length: 3, | |||||
| MinLength: 3, | |||||
| MaxLength: 25, | |||||
| } | |||||
| parserRegistry.Register("update-test-parser", config2) | |||||
| // Process again with updated config | |||||
| adv2 := model.BeaconAdvertisement{ | |||||
| ID: "update-test-beacon-2", | |||||
| Data: "030107", | |||||
| } | |||||
| err = decodeBeacon(adv2, appState, writer, parserRegistry) | |||||
| t.Logf("Second processing with updated parser: %v", err) | |||||
| // Verify parser still exists | |||||
| if _, exists := parserRegistry.ParserList["update-test-parser"]; !exists { | |||||
| t.Error("Parser should exist after update") | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,275 @@ | |||||
| package decoder | |||||
| import ( | |||||
| "testing" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| func TestParserRegistry_AddParser(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| // Add a parser | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| registry.Register("test-parser", config) | |||||
| // Verify parser was added | |||||
| if len(registry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList)) | |||||
| } | |||||
| if _, exists := registry.ParserList["test-parser"]; !exists { | |||||
| t.Error("Parser 'test-parser' should exist in registry") | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_RemoveParser(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| config := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| registry.Register("test-parser", config) | |||||
| // Remove parser | |||||
| registry.Unregister("test-parser") | |||||
| // Verify parser was removed | |||||
| if len(registry.ParserList) != 0 { | |||||
| t.Errorf("Expected 0 parsers in registry, got %d", len(registry.ParserList)) | |||||
| } | |||||
| if _, exists := registry.ParserList["test-parser"]; exists { | |||||
| t.Error("Parser 'test-parser' should not exist in registry") | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_UpdateParser(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| // Add initial parser | |||||
| config1 := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| registry.Register("test-parser", config1) | |||||
| // Update parser | |||||
| config2 := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "03", | |||||
| Length: 3, | |||||
| } | |||||
| registry.Register("test-parser", config2) | |||||
| // Verify only one parser exists | |||||
| if len(registry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList)) | |||||
| } | |||||
| // Verify it was updated (the new config should be used) | |||||
| if _, exists := registry.ParserList["test-parser"]; !exists { | |||||
| t.Error("Parser 'test-parser' should exist in registry") | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_MultipleParsers(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| // Add multiple parsers | |||||
| parsers := []model.Config{ | |||||
| {Name: "parser-1", Prefix: "02", Length: 2}, | |||||
| {Name: "parser-2", Prefix: "03", Length: 3}, | |||||
| {Name: "parser-3", Prefix: "04", Length: 4}, | |||||
| } | |||||
| for _, p := range parsers { | |||||
| registry.Register(p.Name, p) | |||||
| } | |||||
| // Verify all parsers were added | |||||
| if len(registry.ParserList) != 3 { | |||||
| t.Errorf("Expected 3 parsers in registry, got %d", len(registry.ParserList)) | |||||
| } | |||||
| for _, p := range parsers { | |||||
| if _, exists := registry.ParserList[p.Name]; !exists { | |||||
| t.Errorf("Parser '%s' should exist in registry", p.Name) | |||||
| } | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_RemoveNonExistent(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| // Try to remove non-existent parser - should not panic | |||||
| registry.Unregister("non-existent") | |||||
| // Verify registry is still empty | |||||
| if len(registry.ParserList) != 0 { | |||||
| t.Errorf("Expected 0 parsers, got %d", len(registry.ParserList)) | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_ConcurrentAccess(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| done := make(chan bool) | |||||
| // Concurrent additions | |||||
| for i := 0; i < 10; i++ { | |||||
| go func(index int) { | |||||
| config := model.Config{ | |||||
| Name: "parser-" + string(rune('A'+index)), | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| registry.Register(config.Name, config) | |||||
| done <- true | |||||
| }(i) | |||||
| } | |||||
| // Wait for all goroutines | |||||
| for i := 0; i < 10; i++ { | |||||
| <-done | |||||
| } | |||||
| // Verify all parsers were added | |||||
| if len(registry.ParserList) != 10 { | |||||
| t.Errorf("Expected 10 parsers, got %d", len(registry.ParserList)) | |||||
| } | |||||
| } | |||||
| func TestParserConfig_Structure(t *testing.T) { | |||||
| config := model.Config{ | |||||
| Name: "test-config", | |||||
| Prefix: "0201", | |||||
| MinLength: 10, | |||||
| MaxLength: 30, | |||||
| ParserType: "sensor", | |||||
| } | |||||
| if config.Name != "test-config" { | |||||
| t.Errorf("Expected name 'test-config', got '%s'", config.Name) | |||||
| } | |||||
| if config.Prefix != "0201" { | |||||
| t.Errorf("Expected prefix '0201', got '%s'", config.Prefix) | |||||
| } | |||||
| if config.MinLength != 10 { | |||||
| t.Errorf("Expected MinLength 10, got %d", config.MinLength) | |||||
| } | |||||
| if config.MaxLength != 30 { | |||||
| t.Errorf("Expected MaxLength 30, got %d", config.MaxLength) | |||||
| } | |||||
| } | |||||
| func TestKafkaParser_MessageTypes(t *testing.T) { | |||||
| testCases := []struct { | |||||
| name string | |||||
| id string | |||||
| config model.Config | |||||
| expected string | |||||
| }{ | |||||
| { | |||||
| name: "add parser", | |||||
| id: "add", | |||||
| config: model.Config{Name: "new-parser", Prefix: "02", Length: 2}, | |||||
| expected: "add", | |||||
| }, | |||||
| { | |||||
| name: "delete parser", | |||||
| id: "delete", | |||||
| config: model.Config{Name: "old-parser", Prefix: "02", Length: 2}, | |||||
| expected: "delete", | |||||
| }, | |||||
| { | |||||
| name: "update parser", | |||||
| id: "update", | |||||
| config: model.Config{Name: "updated-parser", Prefix: "03", Length: 3}, | |||||
| expected: "update", | |||||
| }, | |||||
| } | |||||
| for _, tc := range testCases { | |||||
| t.Run(tc.name, func(t *testing.T) { | |||||
| msg := model.KafkaParser{ | |||||
| ID: tc.id, | |||||
| Name: tc.config.Name, | |||||
| Config: tc.config, | |||||
| } | |||||
| if msg.ID != tc.expected { | |||||
| t.Errorf("Expected ID '%s', got '%s'", tc.expected, msg.ID) | |||||
| } | |||||
| if msg.Name != tc.config.Name { | |||||
| t.Errorf("Expected Name '%s', got '%s'", tc.config.Name, msg.Name) | |||||
| } | |||||
| }) | |||||
| } | |||||
| } | |||||
| func TestParserRegistry_EmptyRegistry(t *testing.T) { | |||||
| // Setup empty registry | |||||
| registry := &model.ParserRegistry{} | |||||
| // Verify it's empty | |||||
| if len(registry.ParserList) != 0 { | |||||
| t.Errorf("Expected empty registry, got %d parsers", len(registry.ParserList)) | |||||
| } | |||||
| // Should be safe to call Unregister on empty registry | |||||
| registry.Unregister("anything") | |||||
| } | |||||
| func TestParserRegistry_ParserReplacement(t *testing.T) { | |||||
| // Setup | |||||
| registry := &model.ParserRegistry{} | |||||
| // Add parser with config 1 | |||||
| config1 := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "02", | |||||
| Length: 2, | |||||
| } | |||||
| registry.Register("test-parser", config1) | |||||
| // Replace with config 2 (same name) | |||||
| config2 := model.Config{ | |||||
| Name: "test-parser", | |||||
| Prefix: "03", | |||||
| Length: 3, | |||||
| } | |||||
| registry.Register("test-parser", config2) | |||||
| // Verify only one entry exists | |||||
| if len(registry.ParserList) != 1 { | |||||
| t.Errorf("Expected 1 parser after replacement, got %d", len(registry.ParserList)) | |||||
| } | |||||
| // Verify the parser still exists | |||||
| if _, exists := registry.ParserList["test-parser"]; !exists { | |||||
| t.Error("Parser 'test-parser' should still exist") | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,321 @@ | |||||
| package decoder | |||||
| import ( | |||||
| "context" | |||||
| "testing" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | |||||
| // MockKafkaWriter is a mock implementation of kafkaWriter for testing | |||||
| type MockKafkaWriter struct { | |||||
| Messages []kafka.Message | |||||
| } | |||||
| func (m *MockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { | |||||
| m.Messages = append(m.Messages, msgs...) | |||||
| return nil | |||||
| } | |||||
| // TestHelper provides utility functions for decoder testing | |||||
| type TestHelper struct { | |||||
| t *testing.T | |||||
| appState *appcontext.AppState | |||||
| parserRegistry *model.ParserRegistry | |||||
| } | |||||
| // NewTestHelper creates a new test helper instance | |||||
| func NewTestHelper(t *testing.T) *TestHelper { | |||||
| return &TestHelper{ | |||||
| t: t, | |||||
| appState: appcontext.NewAppState(), | |||||
| parserRegistry: &model.ParserRegistry{}, | |||||
| } | |||||
| } | |||||
| // GetAppState returns the appState instance | |||||
| func (th *TestHelper) GetAppState() *appcontext.AppState { | |||||
| return th.appState | |||||
| } | |||||
| // GetParserRegistry returns the parser registry | |||||
| func (th *TestHelper) GetParserRegistry() *model.ParserRegistry { | |||||
| return th.parserRegistry | |||||
| } | |||||
| // RegisterTestParser registers a parser with default test configuration | |||||
| func (th *TestHelper) RegisterTestParser(name string) { | |||||
| config := model.Config{ | |||||
| Name: name, | |||||
| Min: 2, | |||||
| Max: 20, | |||||
| Pattern: []string{"02"}, | |||||
| Configs: map[string]model.ParserConfig{ | |||||
| "length": {Length: 2, Offset: 0, Order: "big"}, | |||||
| }, | |||||
| } | |||||
| th.parserRegistry.Register(name, config) | |||||
| } | |||||
| // CreateBeaconAdvertisement creates a test beacon advertisement | |||||
| func (th *TestHelper) CreateBeaconAdvertisement(id, data string) model.BeaconAdvertisement { | |||||
| return model.BeaconAdvertisement{ | |||||
| ID: id, | |||||
| Data: data, | |||||
| } | |||||
| } | |||||
| // CreateValidHexAdvertisement creates a beacon with valid hex data | |||||
| func (th *TestHelper) CreateValidHexAdvertisement(id string) model.BeaconAdvertisement { | |||||
| return model.BeaconAdvertisement{ | |||||
| ID: id, | |||||
| Data: "020106", | |||||
| } | |||||
| } | |||||
| // CreateInvalidHexAdvertisement creates a beacon with invalid hex data | |||||
| func (th *TestHelper) CreateInvalidHexAdvertisement(id string) model.BeaconAdvertisement { | |||||
| return model.BeaconAdvertisement{ | |||||
| ID: id, | |||||
| Data: "INVALID_HEX", | |||||
| } | |||||
| } | |||||
| // CreateEmptyAdvertisement creates a beacon with empty data | |||||
| func (th *TestHelper) CreateEmptyAdvertisement(id string) model.BeaconAdvertisement { | |||||
| return model.BeaconAdvertisement{ | |||||
| ID: id, | |||||
| Data: "", | |||||
| } | |||||
| } | |||||
| // AssertParserExists asserts that a parser exists in the registry | |||||
| func (th *TestHelper) AssertParserExists(name string) { | |||||
| if _, exists := th.parserRegistry.ParserList[name]; !exists { | |||||
| th.t.Errorf("Parser '%s' should exist in registry", name) | |||||
| } | |||||
| } | |||||
| // AssertParserNotExists asserts that a parser does not exist in the registry | |||||
| func (th *TestHelper) AssertParserNotExists(name string) { | |||||
| if _, exists := th.parserRegistry.ParserList[name]; exists { | |||||
| th.t.Errorf("Parser '%s' should not exist in registry", name) | |||||
| } | |||||
| } | |||||
| // AssertEventExists asserts that an event exists in appState | |||||
| func (th *TestHelper) AssertEventExists(id string) model.BeaconEvent { | |||||
| event, exists := th.appState.GetBeaconEvent(id) | |||||
| if !exists { | |||||
| th.t.Errorf("Event for beacon '%s' should exist in appState", id) | |||||
| return model.BeaconEvent{} | |||||
| } | |||||
| return event | |||||
| } | |||||
| // AssertEventNotExists asserts that an event does not exist in appState | |||||
| func (th *TestHelper) AssertEventNotExists(id string) { | |||||
| _, exists := th.appState.GetBeaconEvent(id) | |||||
| if exists { | |||||
| th.t.Errorf("Event for beacon '%s' should not exist in appState", id) | |||||
| } | |||||
| } | |||||
| // AssertParserCount asserts the number of parsers in the registry | |||||
| func (th *TestHelper) AssertParserCount(expected int) { | |||||
| if len(th.parserRegistry.ParserList) != expected { | |||||
| th.t.Errorf("Expected %d parsers in registry, got %d", expected, len(th.parserRegistry.ParserList)) | |||||
| } | |||||
| } | |||||
| // Helper functions for creating test configurations | |||||
| // CreateTestConfig creates a test parser configuration | |||||
| func CreateTestConfig(name string, min, max int, pattern []string) model.Config { | |||||
| return model.Config{ | |||||
| Name: name, | |||||
| Min: min, | |||||
| Max: max, | |||||
| Pattern: pattern, | |||||
| Configs: map[string]model.ParserConfig{ | |||||
| "length": {Length: 2, Offset: 0, Order: "big"}, | |||||
| }, | |||||
| } | |||||
| } | |||||
| // CreateKafkaParserMessage creates a Kafka parser message for testing | |||||
| func CreateKafkaParserMessage(id, name string, config model.Config) model.KafkaParser { | |||||
| return model.KafkaParser{ | |||||
| ID: id, | |||||
| Name: name, | |||||
| Config: config, | |||||
| } | |||||
| } | |||||
| // AssertNoError asserts that an error is nil | |||||
| func AssertNoError(t *testing.T, err error, msg string) { | |||||
| if err != nil { | |||||
| t.Errorf("%s: %v", msg, err) | |||||
| } | |||||
| } | |||||
| // AssertError asserts that an error is not nil | |||||
| func AssertError(t *testing.T, err error, msg string) { | |||||
| if err == nil { | |||||
| t.Errorf("%s: expected error but got nil", msg) | |||||
| } | |||||
| } | |||||
| // Common test data | |||||
| // Valid hex strings for testing | |||||
| var ValidHexStrings = []string{ | |||||
| "020106", // Simple AD structure | |||||
| "0201060302A0", // AD structure with flags | |||||
| "1AFF0C01", // iBeacon-like data | |||||
| "0201061AFF0C01", // Multiple AD structures | |||||
| } | |||||
| // Invalid hex strings for testing | |||||
| var InvalidHexStrings = []string{ | |||||
| "INVALID_HEX", | |||||
| "02016ZZZ", | |||||
| "GGGGGG", | |||||
| "NOT-HEX", | |||||
| } | |||||
| // Empty or whitespace data for testing | |||||
| var EmptyTestData = []string{ | |||||
| "", | |||||
| " ", | |||||
| "\t\n", | |||||
| } | |||||
| // CreateMockWriter creates a mock Kafka writer | |||||
| func CreateMockWriter() *MockKafkaWriter { | |||||
| return &MockKafkaWriter{Messages: []kafka.Message{}} | |||||
| } | |||||
| // Beacon event test helpers | |||||
| // AssertEventFields asserts that event fields match expected values | |||||
| func AssertEventFields(t *testing.T, event model.BeaconEvent, expectedID, expectedType string) { | |||||
| if event.ID != expectedID { | |||||
| t.Errorf("Expected event ID '%s', got '%s'", expectedID, event.ID) | |||||
| } | |||||
| if event.Type != expectedType { | |||||
| t.Errorf("Expected event type '%s', got '%s'", expectedType, event.Type) | |||||
| } | |||||
| } | |||||
| // SetupTestParsers registers a standard set of test parsers | |||||
| func SetupTestParsers(registry *model.ParserRegistry) { | |||||
| parsers := []model.Config{ | |||||
| {Name: "parser-1", Min: 2, Max: 20, Pattern: []string{"02"}}, | |||||
| {Name: "parser-2", Min: 3, Max: 25, Pattern: []string{"03"}}, | |||||
| {Name: "parser-3", Min: 4, Max: 30, Pattern: []string{"04"}}, | |||||
| } | |||||
| for _, p := range parsers { | |||||
| registry.Register(p.Name, p) | |||||
| } | |||||
| } | |||||
| // CleanupTestParsers removes all parsers from the registry | |||||
| func CleanupTestParsers(registry *model.ParserRegistry) { | |||||
| for name := range registry.ParserList { | |||||
| registry.Unregister(name) | |||||
| } | |||||
| } | |||||
| // CreateTestBeaconEvent creates a test beacon event | |||||
| func CreateTestBeaconEvent(id, eventType string) model.BeaconEvent { | |||||
| return model.BeaconEvent{ | |||||
| ID: id, | |||||
| Type: eventType, | |||||
| Battery: 100, | |||||
| Event: 1, | |||||
| AccX: 0, | |||||
| AccY: 0, | |||||
| AccZ: 0, | |||||
| } | |||||
| } | |||||
| // AssertKafkaMessageCount asserts the number of Kafka messages | |||||
| func AssertKafkaMessageCount(t *testing.T, writer *MockKafkaWriter, expected int) { | |||||
| if len(writer.Messages) != expected { | |||||
| t.Errorf("Expected %d Kafka message(s), got %d", expected, len(writer.Messages)) | |||||
| } | |||||
| } | |||||
| // AssertNoKafkaMessages asserts that no messages were written to Kafka | |||||
| func AssertNoKafkaMessages(t *testing.T, writer *MockKafkaWriter) { | |||||
| AssertKafkaMessageCount(t, writer, 0) | |||||
| } | |||||
| // Parser registry test helpers | |||||
| // SimulateEventLoopParserUpdate simulates the event loop's parser update logic | |||||
| func SimulateEventLoopParserUpdate(msg model.KafkaParser, registry *model.ParserRegistry) { | |||||
| switch msg.ID { | |||||
| case "add": | |||||
| config := msg.Config | |||||
| registry.Register(config.Name, config) | |||||
| case "delete": | |||||
| registry.Unregister(msg.Name) | |||||
| case "update": | |||||
| config := msg.Config | |||||
| registry.Register(config.Name, config) | |||||
| } | |||||
| } | |||||
| // CreateParserAddMessage creates a parser add message | |||||
| func CreateParserAddMessage(name string, min, max int) model.KafkaParser { | |||||
| return model.KafkaParser{ | |||||
| ID: "add", | |||||
| Name: name, | |||||
| Config: model.Config{ | |||||
| Name: name, | |||||
| Min: min, | |||||
| Max: max, | |||||
| Pattern: []string{"02"}, | |||||
| }, | |||||
| } | |||||
| } | |||||
| // CreateParserDeleteMessage creates a parser delete message | |||||
| func CreateParserDeleteMessage(name string) model.KafkaParser { | |||||
| return model.KafkaParser{ | |||||
| ID: "delete", | |||||
| Name: name, | |||||
| } | |||||
| } | |||||
| // CreateParserUpdateMessage creates a parser update message | |||||
| func CreateParserUpdateMessage(name string, min, max int) model.KafkaParser { | |||||
| return model.KafkaParser{ | |||||
| ID: "update", | |||||
| Name: name, | |||||
| Config: model.Config{ | |||||
| Name: name, | |||||
| Min: min, | |||||
| Max: max, | |||||
| Pattern: []string{"02"}, | |||||
| }, | |||||
| } | |||||
| } | |||||
| // GenerateTestBeaconID generates a test beacon ID | |||||
| func GenerateTestBeaconID(index int) string { | |||||
| return "test-beacon-" + string(rune('A'+index)) | |||||
| } | |||||
| // GenerateTestHexData generates test hex data | |||||
| func GenerateTestHexData(index int) string { | |||||
| prefix := "02" | |||||
| value := string(rune('6' + index)) | |||||
| return prefix + "01" + value | |||||
| } | |||||