No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

140 líneas
3.4 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "io"
  8. "log"
  9. "log/slog"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  16. "github.com/AFASystems/presence/internal/pkg/common/utils"
  17. "github.com/AFASystems/presence/internal/pkg/config"
  18. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  19. "github.com/AFASystems/presence/internal/pkg/model"
  20. "github.com/segmentio/kafka-go"
  21. )
  22. var wg sync.WaitGroup
  23. func main() {
  24. // Load global context to init beacons and latest list
  25. appState := appcontext.NewAppState()
  26. cfg := config.Load()
  27. parserRegistry := model.ParserRegistry{
  28. ParserList: make(map[string]model.BeaconParser),
  29. }
  30. // Create log file
  31. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  32. if err != nil {
  33. log.Fatalf("Failed to open log file: %v\n", err)
  34. }
  35. // shell and log file multiwriter
  36. w := io.MultiWriter(os.Stderr, logFile)
  37. logger := slog.New(slog.NewJSONHandler(w, nil))
  38. slog.SetDefault(logger)
  39. // define context
  40. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  41. defer stop()
  42. rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
  43. parserReader := appState.AddKafkaReader(cfg.KafkaURL, "parser", "gid-parser")
  44. alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")
  45. slog.Info("Decoder initialized, subscribed to Kafka topics")
  46. chRaw := make(chan model.BeaconAdvertisement, 2000)
  47. chParser := make(chan model.KafkaParser, 200)
  48. wg.Add(3)
  49. go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
  50. go kafkaclient.Consume(parserReader, chParser, ctx, &wg)
  51. eventloop:
  52. for {
  53. select {
  54. case <-ctx.Done():
  55. break eventloop
  56. case msg := <-chRaw:
  57. processIncoming(msg, appState, alertWriter, &parserRegistry)
  58. case msg := <-chParser:
  59. switch msg.ID {
  60. case "add":
  61. config := msg.Config
  62. parserRegistry.Register(config.Name, config)
  63. case "delete":
  64. parserRegistry.Unregister(msg.Name)
  65. case "update":
  66. config := msg.Config
  67. parserRegistry.Register(config.Name, config)
  68. }
  69. }
  70. }
  71. slog.Info("broken out of the main event loop")
  72. wg.Wait()
  73. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  74. appState.CleanKafkaReaders()
  75. appState.CleanKafkaWriters()
  76. }
  77. func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
  78. err := decodeBeacon(adv, appState, writer, parserRegistry)
  79. if err != nil {
  80. eMsg := fmt.Sprintf("Error in decoding: %v", err)
  81. fmt.Println(eMsg)
  82. return
  83. }
  84. }
  85. func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error {
  86. beacon := strings.TrimSpace(adv.Data)
  87. id := adv.ID
  88. if beacon == "" {
  89. return nil
  90. }
  91. b, err := hex.DecodeString(beacon)
  92. if err != nil {
  93. return err
  94. }
  95. b = utils.RemoveFlagBytes(b)
  96. indeces := utils.ParseADFast(b)
  97. event := utils.LoopADStructures(b, indeces, id, parserRegistry)
  98. if event.ID == "" {
  99. return nil
  100. }
  101. prevEvent, ok := appState.GetBeaconEvent(id)
  102. appState.UpdateBeaconEvent(id, event)
  103. if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
  104. return nil
  105. }
  106. eMsg, err := event.ToJSON()
  107. if err != nil {
  108. return err
  109. }
  110. if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
  111. return err
  112. }
  113. return nil
  114. }