|
- package main
-
- import (
- "bytes"
- "context"
- "encoding/hex"
- "fmt"
- "strings"
-
- "github.com/AFASystems/presence/internal/pkg/common/appcontext"
- "github.com/AFASystems/presence/internal/pkg/common/utils"
- "github.com/AFASystems/presence/internal/pkg/config"
- "github.com/AFASystems/presence/internal/pkg/kafkaclient"
- "github.com/AFASystems/presence/internal/pkg/model"
- "github.com/segmentio/kafka-go"
- )
-
- func main() {
- // Load global context to init beacons and latest list
- appState := appcontext.NewAppState()
- cfg := config.Load()
-
- // Kafka reader for Raw MQTT beacons
- rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
- defer rawReader.Close()
-
- // Kafka reader for API server updates
- apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
- defer apiReader.Close()
-
- alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons")
- defer alertWriter.Close()
-
- fmt.Println("Decoder initialized, subscribed to Kafka topics")
- chRaw := make(chan model.BeaconAdvertisement, 2000)
- chApi := make(chan model.ApiUpdate, 2000)
-
- go kafkaclient.Consume(rawReader, chRaw)
- go kafkaclient.Consume(apiReader, chApi)
-
- for {
- select {
- case msg := <-chRaw:
- processIncoming(msg, appState, alertWriter)
- case msg := <-chApi:
- switch msg.Method {
- case "POST":
- id := msg.Beacon.ID
- appState.AddBeaconToLookup(id)
- case "DELETE":
- fmt.Println("Incoming delete message")
- }
- }
- }
- }
-
- func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) {
- id := adv.MAC
- ok := appState.BeaconExists(id)
- if !ok {
- return
- }
-
- err := decodeBeacon(adv, appState, writer)
- if err != nil {
- fmt.Println("error in decoding")
- return
- }
- }
-
- func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) error {
- beacon := strings.TrimSpace(adv.Data)
- id := adv.MAC
- if beacon == "" {
- return nil
- }
-
- b, err := hex.DecodeString(beacon)
- if err != nil {
- return err
- }
-
- b = utils.RemoveFlagBytes(b)
-
- indeces := utils.ParseADFast(b)
- event := utils.LoopADStructures(b, indeces, id)
-
- if event.ID == "" {
- return nil
- }
-
- prevEvent, ok := appState.GetBeaconEvent(id)
- appState.UpdateBeaconEvent(id, event)
- if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
- return nil
- }
-
- eMsg, err := event.ToJSON()
- if err != nil {
- return err
- }
-
- if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
- return err
- }
-
- return nil
- }
|