You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

144 rivejä
3.3 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "io"
  8. "log"
  9. "log/slog"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  16. "github.com/AFASystems/presence/internal/pkg/common/utils"
  17. "github.com/AFASystems/presence/internal/pkg/config"
  18. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  19. "github.com/AFASystems/presence/internal/pkg/model"
  20. "github.com/segmentio/kafka-go"
  21. )
  22. var wg sync.WaitGroup
  23. func main() {
  24. // Load global context to init beacons and latest list
  25. appState := appcontext.NewAppState()
  26. cfg := config.Load()
  27. // Create log file
  28. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  29. if err != nil {
  30. log.Fatalf("Failed to open log file: %v\n", err)
  31. }
  32. // shell and log file multiwriter
  33. w := io.MultiWriter(os.Stderr, logFile)
  34. logger := slog.New(slog.NewJSONHandler(w, nil))
  35. slog.SetDefault(logger)
  36. // define context
  37. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  38. defer stop()
  39. rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
  40. apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
  41. alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")
  42. slog.Info("Decoder initialized, subscribed to Kafka topics")
  43. chRaw := make(chan model.BeaconAdvertisement, 2000)
  44. chApi := make(chan model.ApiUpdate, 200)
  45. wg.Add(2)
  46. go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
  47. go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
  48. eventloop:
  49. for {
  50. select {
  51. case <-ctx.Done():
  52. break eventloop
  53. case msg := <-chRaw:
  54. processIncoming(msg, appState, alertWriter)
  55. case msg := <-chApi:
  56. switch msg.Method {
  57. case "POST":
  58. id := msg.Beacon.ID
  59. appState.AddBeaconToLookup(id)
  60. lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
  61. slog.Info(lMsg)
  62. case "DELETE":
  63. id := msg.Beacon.ID
  64. appState.RemoveBeaconFromLookup(id)
  65. lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
  66. slog.Info(lMsg)
  67. }
  68. }
  69. }
  70. slog.Info("broken out of the main event loop")
  71. wg.Wait()
  72. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  73. appState.CleanKafkaReaders()
  74. appState.CleanKafkaWriters()
  75. }
  76. func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) {
  77. id := adv.MAC
  78. ok := appState.BeaconExists(id)
  79. if !ok {
  80. return
  81. }
  82. err := decodeBeacon(adv, appState, writer)
  83. if err != nil {
  84. eMsg := fmt.Sprintf("Error in decoding: %v", err)
  85. fmt.Println(eMsg)
  86. return
  87. }
  88. }
  89. func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) error {
  90. beacon := strings.TrimSpace(adv.Data)
  91. id := adv.MAC
  92. if beacon == "" {
  93. return nil
  94. }
  95. b, err := hex.DecodeString(beacon)
  96. if err != nil {
  97. return err
  98. }
  99. b = utils.RemoveFlagBytes(b)
  100. indeces := utils.ParseADFast(b)
  101. event := utils.LoopADStructures(b, indeces, id)
  102. if event.ID == "" {
  103. return nil
  104. }
  105. prevEvent, ok := appState.GetBeaconEvent(id)
  106. appState.UpdateBeaconEvent(id, event)
  107. if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
  108. return nil
  109. }
  110. eMsg, err := event.ToJSON()
  111. if err != nil {
  112. return err
  113. }
  114. if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
  115. return err
  116. }
  117. return nil
  118. }