Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

141 строка
3.2 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "log/slog"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  17. "github.com/AFASystems/presence/internal/pkg/common/utils"
  18. "github.com/AFASystems/presence/internal/pkg/config"
  19. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  20. "github.com/AFASystems/presence/internal/pkg/model"
  21. "github.com/segmentio/kafka-go"
  22. )
  23. var wg sync.WaitGroup
  24. func main() {
  25. // Load global context to init beacons and latest list
  26. appState := appcontext.NewAppState()
  27. cfg := config.Load()
  28. parserRegistry := model.ParserRegistry{
  29. ParserList: make([]model.BeaconParser, 0),
  30. }
  31. configFile, err := os.Open("/app/cmd/decoder/config.json")
  32. if err != nil {
  33. panic(err)
  34. }
  35. b, _ := io.ReadAll(configFile)
  36. var configs []model.Config
  37. json.Unmarshal(b, &configs)
  38. for _, config := range configs {
  39. parserRegistry.Register(config.Name, config)
  40. }
  41. // Create log file
  42. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  43. if err != nil {
  44. log.Fatalf("Failed to open log file: %v\n", err)
  45. }
  46. // shell and log file multiwriter
  47. w := io.MultiWriter(os.Stderr, logFile)
  48. logger := slog.New(slog.NewJSONHandler(w, nil))
  49. slog.SetDefault(logger)
  50. // define context
  51. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  52. defer stop()
  53. rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
  54. alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")
  55. slog.Info("Decoder initialized, subscribed to Kafka topics")
  56. chRaw := make(chan model.BeaconAdvertisement, 2000)
  57. wg.Add(2)
  58. go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
  59. eventloop:
  60. for {
  61. select {
  62. case <-ctx.Done():
  63. break eventloop
  64. case msg := <-chRaw:
  65. processIncoming(msg, appState, alertWriter, &parserRegistry)
  66. }
  67. }
  68. slog.Info("broken out of the main event loop")
  69. wg.Wait()
  70. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  71. appState.CleanKafkaReaders()
  72. appState.CleanKafkaWriters()
  73. }
  74. func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
  75. err := decodeBeacon(adv, appState, writer, parserRegistry)
  76. if err != nil {
  77. eMsg := fmt.Sprintf("Error in decoding: %v", err)
  78. fmt.Println(eMsg)
  79. return
  80. }
  81. }
  82. func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error {
  83. beacon := strings.TrimSpace(adv.Data)
  84. id := adv.ID
  85. if beacon == "" {
  86. return nil
  87. }
  88. b, err := hex.DecodeString(beacon)
  89. if err != nil {
  90. return err
  91. }
  92. b = utils.RemoveFlagBytes(b)
  93. indeces := utils.ParseADFast(b)
  94. event := utils.LoopADStructures(b, indeces, id, parserRegistry)
  95. if event.ID == "" {
  96. return nil
  97. }
  98. prevEvent, ok := appState.GetBeaconEvent(id)
  99. appState.UpdateBeaconEvent(id, event)
  100. if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
  101. return nil
  102. }
  103. eMsg, err := event.ToJSON()
  104. if err != nil {
  105. return err
  106. }
  107. if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
  108. return err
  109. }
  110. return nil
  111. }