Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.
 
 
 
 

133 Zeilen
3.3 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "log/slog"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  13. "github.com/AFASystems/presence/internal/pkg/common/utils"
  14. "github.com/AFASystems/presence/internal/pkg/config"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/logger"
  17. "github.com/AFASystems/presence/internal/pkg/model"
  18. "github.com/segmentio/kafka-go"
  19. )
  20. var wg sync.WaitGroup
  21. func main() {
  22. // Load global context to init beacons and latest list
  23. appState := appcontext.NewAppState()
  24. cfg := config.Load()
  25. kafkaManager := kafkaclient.InitKafkaManager()
  26. parserRegistry := model.ParserRegistry{
  27. ParserList: make(map[string]model.BeaconParser),
  28. }
  29. // Set logger -> terminal and log file
  30. slog.SetDefault(logger.CreateLogger("decoder.log"))
  31. // define context
  32. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  33. defer stop()
  34. readerTopics := []string{"rawbeacons", "parser"}
  35. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics)
  36. writerTopics := []string{"alertbeacons"}
  37. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  38. slog.Info("Decoder initialized, subscribed to Kafka topics")
  39. chRaw := make(chan model.BeaconAdvertisement, 2000)
  40. chParser := make(chan model.KafkaParser, 200)
  41. wg.Add(3)
  42. go kafkaclient.Consume(kafkaManager.GetReader("rawbeacons"), chRaw, ctx, &wg)
  43. go kafkaclient.Consume(kafkaManager.GetReader("parser"), chParser, ctx, &wg)
  44. eventloop:
  45. for {
  46. select {
  47. case <-ctx.Done():
  48. break eventloop
  49. case msg := <-chRaw:
  50. processIncoming(msg, appState, kafkaManager.GetWriter("alertbeacons"), &parserRegistry)
  51. case msg := <-chParser:
  52. switch msg.ID {
  53. case "add":
  54. config := msg.Config
  55. parserRegistry.Register(config.Name, config)
  56. case "delete":
  57. parserRegistry.Unregister(msg.Name)
  58. case "update":
  59. config := msg.Config
  60. parserRegistry.Register(config.Name, config)
  61. }
  62. }
  63. }
  64. slog.Info("broken out of the main event loop")
  65. wg.Wait()
  66. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  67. kafkaManager.CleanKafkaReaders()
  68. kafkaManager.CleanKafkaWriters()
  69. }
  70. func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
  71. err := decodeBeacon(adv, appState, writer, parserRegistry)
  72. if err != nil {
  73. eMsg := fmt.Sprintf("Error in decoding: %v", err)
  74. fmt.Println(eMsg)
  75. return
  76. }
  77. }
  78. func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error {
  79. beacon := strings.TrimSpace(adv.Data)
  80. id := adv.ID
  81. if beacon == "" {
  82. return nil
  83. }
  84. b, err := hex.DecodeString(beacon)
  85. if err != nil {
  86. return err
  87. }
  88. b = utils.RemoveFlagBytes(b)
  89. indeces := utils.ParseADFast(b)
  90. event := utils.LoopADStructures(b, indeces, id, parserRegistry)
  91. if event.ID == "" {
  92. return nil
  93. }
  94. prevEvent, ok := appState.GetBeaconEvent(id)
  95. appState.UpdateBeaconEvent(id, event)
  96. if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
  97. return nil
  98. }
  99. eMsg, err := event.ToJSON()
  100. if err != nil {
  101. return err
  102. }
  103. if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
  104. return err
  105. }
  106. return nil
  107. }