| Autor | SHA1 | Wiadomość | Data |
|---|---|---|---|
|
|
c7499b5c22 | fix: remove reading of the body in server get trackers | 2 tygodni temu |
|
|
34cadb4cec | chore: remove interface | 2 tygodni temu |
|
|
6490cd2dd6 | feat: services sending their data to the server | 2 tygodni temu |
|
|
5fd72d8151 | feat: add new topics, readers and writers | 2 tygodni temu |
| @@ -28,4 +28,19 @@ | |||||
| # create topic alert | # create topic alert | ||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | ||||
| --create --if-not-exists --topic alert \ | --create --if-not-exists --topic alert \ | ||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthlocation | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthlocation \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthdecoder | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthdecoder \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthbridge | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthbridge \ | |||||
| --partitions 1 --replication-factor 1 | --partitions 1 --replication-factor 1 | ||||
| @@ -5,6 +5,7 @@ import ( | |||||
| "encoding/json" | "encoding/json" | ||||
| "log/slog" | "log/slog" | ||||
| "sync" | "sync" | ||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/bridge" | "github.com/AFASystems/presence/internal/pkg/bridge" | ||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | "github.com/AFASystems/presence/internal/pkg/common/appcontext" | ||||
| @@ -13,6 +14,7 @@ import ( | |||||
| "github.com/AFASystems/presence/internal/pkg/logger" | "github.com/AFASystems/presence/internal/pkg/logger" | ||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| mqtt "github.com/eclipse/paho.mqtt.golang" | mqtt "github.com/eclipse/paho.mqtt.golang" | ||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka). | // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka). | ||||
| @@ -37,7 +39,7 @@ func New(cfg *config.Config) (*BridgeApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"apibeacons", "alert", "mqtt"} | readerTopics := []string{"apibeacons", "alert", "mqtt"} | ||||
| writerTopics := []string{"rawbeacons"} | |||||
| writerTopics := []string{"rawbeacons", "healthbridge"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -71,10 +73,26 @@ func (a *BridgeApp) Run(ctx context.Context) { | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg) | ||||
| healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | |||||
| defer healthTicker.Stop() | |||||
| for { | for { | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| return | return | ||||
| case <-healthTicker.C: | |||||
| health, err := a.AppState.GetBridgeHealth(a.KafkaManager) | |||||
| if err != nil { | |||||
| slog.Error("getting bridge health", "err", err) | |||||
| continue | |||||
| } | |||||
| m := kafka.Message{ | |||||
| Value: health, | |||||
| } | |||||
| if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil { | |||||
| slog.Error("writing bridge health", "err", err) | |||||
| continue | |||||
| } | |||||
| case msg := <-a.ChApi: | case msg := <-a.ChApi: | ||||
| switch msg.Method { | switch msg.Method { | ||||
| case "POST": | case "POST": | ||||
| @@ -2,8 +2,10 @@ package decoder | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "fmt" | |||||
| "log/slog" | "log/slog" | ||||
| "sync" | "sync" | ||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | "github.com/AFASystems/presence/internal/pkg/common/appcontext" | ||||
| "github.com/AFASystems/presence/internal/pkg/config" | "github.com/AFASystems/presence/internal/pkg/config" | ||||
| @@ -11,6 +13,7 @@ import ( | |||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| "github.com/AFASystems/presence/internal/pkg/logger" | "github.com/AFASystems/presence/internal/pkg/logger" | ||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| // DecoderApp holds dependencies for the decoder service. | // DecoderApp holds dependencies for the decoder service. | ||||
| @@ -34,7 +37,7 @@ func New(cfg *config.Config) (*DecoderApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"rawbeacons", "parser"} | readerTopics := []string{"rawbeacons", "parser"} | ||||
| writerTopics := []string{"alertbeacons"} | |||||
| writerTopics := []string{"alertbeacons", "healthdecoder"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -60,11 +63,28 @@ func (a *DecoderApp) Run(ctx context.Context) { | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("parser"), a.ChParser, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("parser"), a.ChParser, ctx, &a.wg) | ||||
| healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | |||||
| defer healthTicker.Stop() | |||||
| for { | for { | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| return | return | ||||
| case <-healthTicker.C: | |||||
| health, err := a.AppState.GetDecoderHealth(a.KafkaManager) | |||||
| if err != nil { | |||||
| slog.Error("getting decoder health", "err", err) | |||||
| continue | |||||
| } | |||||
| m := kafka.Message{ | |||||
| Value: health, | |||||
| } | |||||
| if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthdecoder"), m); err != nil { | |||||
| slog.Error("writing decoder health", "err", err) | |||||
| continue | |||||
| } | |||||
| case msg := <-a.ChRaw: | case msg := <-a.ChRaw: | ||||
| fmt.Println("msg: ", msg) | |||||
| decoder.ProcessIncoming(msg, a.AppState, a.KafkaManager.GetWriter("alertbeacons"), a.ParserRegistry) | decoder.ProcessIncoming(msg, a.AppState, a.KafkaManager.GetWriter("alertbeacons"), a.ParserRegistry) | ||||
| case msg := <-a.ChParser: | case msg := <-a.ChParser: | ||||
| switch msg.ID { | switch msg.ID { | ||||
| @@ -37,7 +37,7 @@ func New(cfg *config.Config) (*LocationApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"rawbeacons", "settings"} | readerTopics := []string{"rawbeacons", "settings"} | ||||
| writerTopics := []string{"locevents"} | |||||
| writerTopics := []string{"locevents", "healthlocation"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -62,10 +62,26 @@ func (a *LocationApp) Run(ctx context.Context) { | |||||
| locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | ||||
| defer locTicker.Stop() | defer locTicker.Stop() | ||||
| healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | |||||
| defer healthTicker.Stop() | |||||
| for { | for { | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| return | return | ||||
| case <-healthTicker.C: | |||||
| health, err := a.AppState.GetLocationHealth(a.KafkaManager) | |||||
| if err != nil { | |||||
| slog.Error("getting location health", "err", err) | |||||
| continue | |||||
| } | |||||
| m := kafka.Message{ | |||||
| Value: health, | |||||
| } | |||||
| if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthlocation"), m); err != nil { | |||||
| slog.Error("writing location health", "err", err) | |||||
| continue | |||||
| } | |||||
| case <-locTicker.C: | case <-locTicker.C: | ||||
| settings := a.AppState.GetSettings() | settings := a.AppState.GetSettings() | ||||
| slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm) | slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm) | ||||
| @@ -23,16 +23,19 @@ import ( | |||||
| // ServerApp holds dependencies and state for the server service. | // ServerApp holds dependencies and state for the server service. | ||||
| type ServerApp struct { | type ServerApp struct { | ||||
| Cfg *config.Config | |||||
| DB *gorm.DB | |||||
| KafkaManager *kafkaclient.KafkaManager | |||||
| AppState *appcontext.AppState | |||||
| ChLoc chan model.HTTPLocation | |||||
| ChEvents chan appcontext.BeaconEvent | |||||
| ctx context.Context | |||||
| Server *http.Server | |||||
| Cleanup func() | |||||
| wg sync.WaitGroup | |||||
| Cfg *config.Config | |||||
| DB *gorm.DB | |||||
| KafkaManager *kafkaclient.KafkaManager | |||||
| AppState *appcontext.AppState | |||||
| ChLoc chan model.HTTPLocation | |||||
| ChEvents chan appcontext.BeaconEvent | |||||
| ChHealthLocation chan appcontext.LocationHealth | |||||
| ChHealthDecoder chan appcontext.DecoderHealth | |||||
| ChHealthBridge chan appcontext.BridgeHealth | |||||
| ctx context.Context | |||||
| Server *http.Server | |||||
| Cleanup func() | |||||
| wg sync.WaitGroup | |||||
| } | } | ||||
| // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers. | // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers. | ||||
| @@ -98,16 +101,22 @@ func (a *ServerApp) Init(ctx context.Context) error { | |||||
| slog.Error("UpdateDB", "err", err) | slog.Error("UpdateDB", "err", err) | ||||
| } | } | ||||
| readerTopics := []string{"locevents", "alertbeacons", "health"} | |||||
| readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"} | |||||
| a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics) | a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics) | ||||
| slog.Info("Kafka readers initialized", "topics", readerTopics) | slog.Info("Kafka readers initialized", "topics", readerTopics) | ||||
| a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE) | a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE) | ||||
| a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE) | a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE) | ||||
| a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.wg.Add(2) | |||||
| a.wg.Add(5) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg) | |||||
| a.Server = &http.Server{ | a.Server = &http.Server{ | ||||
| Addr: a.Cfg.HTTPAddr, | Addr: a.Cfg.HTTPAddr, | ||||
| @@ -22,6 +22,12 @@ func RunEventLoop(ctx context.Context, a *ServerApp) { | |||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| return | return | ||||
| case msg := <-a.ChHealthLocation: | |||||
| slog.Info("location health", "health", msg) | |||||
| case msg := <-a.ChHealthDecoder: | |||||
| slog.Info("decoder health", "health", msg) | |||||
| case msg := <-a.ChHealthBridge: | |||||
| slog.Info("bridge health", "health", msg) | |||||
| case msg := <-a.ChLoc: | case msg := <-a.ChLoc: | ||||
| switch msg.Method { | switch msg.Method { | ||||
| case "Standard": | case "Standard": | ||||
| @@ -3,7 +3,6 @@ package apiclient | |||||
| import ( | import ( | ||||
| "encoding/json" | "encoding/json" | ||||
| "fmt" | "fmt" | ||||
| "io" | |||||
| "net/http" | "net/http" | ||||
| "github.com/AFASystems/presence/internal/pkg/config" | "github.com/AFASystems/presence/internal/pkg/config" | ||||
| @@ -17,13 +16,6 @@ func GetTrackers(token string, client *http.Client, cfg *config.Config) ([]model | |||||
| return []model.Tracker{}, err | return []model.Tracker{}, err | ||||
| } | } | ||||
| bodyBytes, err := io.ReadAll(res.Body) | |||||
| if err != nil { | |||||
| fmt.Printf("error read body: %+v\n", err) | |||||
| return []model.Tracker{}, err | |||||
| } | |||||
| fmt.Printf("body: %s\n", string(bodyBytes)) | |||||
| var i []model.Tracker | var i []model.Tracker | ||||
| err = json.NewDecoder(res.Body).Decode(&i) | err = json.NewDecoder(res.Body).Decode(&i) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -29,7 +29,6 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk | |||||
| } | } | ||||
| if trackers, err := GetTrackers(token, client, cfg); err == nil { | if trackers, err := GetTrackers(token, client, cfg); err == nil { | ||||
| fmt.Printf("trackers: %+v\n", trackers) | |||||
| syncTable(db, trackers) | syncTable(db, trackers) | ||||
| if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { | if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { | ||||
| msg := fmt.Sprintf("Error in sending delete all from lookup message: %v", err) | msg := fmt.Sprintf("Error in sending delete all from lookup message: %v", err) | ||||
| @@ -13,15 +13,10 @@ import ( | |||||
| "github.com/segmentio/kafka-go" | "github.com/segmentio/kafka-go" | ||||
| ) | ) | ||||
| // BeaconLookup provides MAC->ID lookup (e.g. AppState). | |||||
| type BeaconLookup interface { | |||||
| BeaconExists(mac string) (id string, ok bool) | |||||
| } | |||||
| // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. | // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. | ||||
| // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. | // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. | ||||
| // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". | // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". | ||||
| func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer *kafka.Writer) { | |||||
| func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| parts := strings.SplitN(topic, "/", 2) | parts := strings.SplitN(topic, "/", 2) | ||||
| hostname := "" | hostname := "" | ||||
| if len(parts) >= 2 { | if len(parts) >= 2 { | ||||
| @@ -39,7 +34,7 @@ func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer | |||||
| if reading.Type == "Gateway" { | if reading.Type == "Gateway" { | ||||
| continue | continue | ||||
| } | } | ||||
| id, ok := lookup.BeaconExists(reading.MAC) | |||||
| id, ok := appState.BeaconExists(reading.MAC) | |||||
| if !ok { | if !ok { | ||||
| continue | continue | ||||
| } | } | ||||
| @@ -4,7 +4,6 @@ import ( | |||||
| "fmt" | "fmt" | ||||
| "log/slog" | "log/slog" | ||||
| "os" | "os" | ||||
| "sync" | |||||
| "time" | "time" | ||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| @@ -13,14 +12,12 @@ import ( | |||||
| // AppState provides centralized access to application state | // AppState provides centralized access to application state | ||||
| type AppState struct { | type AppState struct { | ||||
| beacons BeaconsList | |||||
| settings Settings | |||||
| beaconEvents BeaconEventList | |||||
| beaconsLookup BeaconsLookup | |||||
| locationHealth LocationHealth | |||||
| decoderHealth DecoderHealth | |||||
| bridgeHealth BridgeHealth | |||||
| startTime time.Time | |||||
| beacons BeaconsList | |||||
| settings Settings | |||||
| beaconEvents BeaconEventList | |||||
| beaconsLookup BeaconsLookup | |||||
| health Health | |||||
| startTime time.Time | |||||
| } | } | ||||
| func getEnv(key, def string) string { | func getEnv(key, def string) string { | ||||
| @@ -54,58 +51,58 @@ func NewAppState() *AppState { | |||||
| beaconsLookup: BeaconsLookup{ | beaconsLookup: BeaconsLookup{ | ||||
| Lookup: make(map[string]string), | Lookup: make(map[string]string), | ||||
| }, | }, | ||||
| locationHealth: LocationHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Lock: sync.RWMutex{}, | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| health: Health{ | |||||
| ID: "1", | |||||
| Location: LocationHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| }, | |||||
| }, | }, | ||||
| }, | |||||
| decoderHealth: DecoderHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Lock: sync.RWMutex{}, | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| Decoder: DecoderHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| }, | |||||
| }, | }, | ||||
| }, | |||||
| bridgeHealth: BridgeHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Lock: sync.RWMutex{}, | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| Bridge: BridgeHealth{ | |||||
| BaseHealth: BaseHealth{ | |||||
| Uptime: 0, | |||||
| ActiveReaders: []string{}, | |||||
| ActiveWriters: []string{}, | |||||
| ActiveBeacons: []string{}, | |||||
| }, | |||||
| }, | }, | ||||
| }, | }, | ||||
| } | } | ||||
| } | } | ||||
| func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) *LocationHealth { | |||||
| m.locationHealth.GetUptime(m.startTime) | |||||
| m.locationHealth.GetActiveReaders(k) | |||||
| m.locationHealth.GetActiveWriters(k) | |||||
| m.locationHealth.GetActiveBeacons(m) | |||||
| return &m.locationHealth | |||||
| func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | |||||
| m.health.Location.GetUptime(m.startTime) | |||||
| m.health.Location.GetActiveReaders(k) | |||||
| m.health.Location.GetActiveWriters(k) | |||||
| m.health.Location.GetActiveBeacons(m) | |||||
| return m.health.Location.Marshal() | |||||
| } | } | ||||
| func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) *DecoderHealth { | |||||
| m.decoderHealth.GetUptime(m.startTime) | |||||
| m.decoderHealth.GetActiveReaders(k) | |||||
| m.decoderHealth.GetActiveWriters(k) | |||||
| m.decoderHealth.GetActiveBeacons(m) | |||||
| return &m.decoderHealth | |||||
| func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | |||||
| m.health.Decoder.GetUptime(m.startTime) | |||||
| m.health.Decoder.GetActiveReaders(k) | |||||
| m.health.Decoder.GetActiveWriters(k) | |||||
| m.health.Decoder.GetActiveBeacons(m) | |||||
| return m.health.Decoder.Marshal() | |||||
| } | } | ||||
| func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) *BridgeHealth { | |||||
| m.bridgeHealth.GetUptime(m.startTime) | |||||
| m.bridgeHealth.GetActiveReaders(k) | |||||
| m.bridgeHealth.GetActiveWriters(k) | |||||
| m.bridgeHealth.GetActiveBeacons(m) | |||||
| return &m.bridgeHealth | |||||
| func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | |||||
| m.health.Bridge.GetUptime(m.startTime) | |||||
| m.health.Bridge.GetActiveReaders(k) | |||||
| m.health.Bridge.GetActiveWriters(k) | |||||
| m.health.Bridge.GetActiveBeacons(m) | |||||
| return m.health.Bridge.Marshal() | |||||
| } | } | ||||
| // GetBeacons returns thread-safe access to beacons list | // GetBeacons returns thread-safe access to beacons list | ||||
| @@ -2,18 +2,16 @@ package appcontext | |||||
| import ( | import ( | ||||
| "encoding/json" | "encoding/json" | ||||
| "sync" | |||||
| "time" | "time" | ||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| ) | ) | ||||
| type BaseHealth struct { | type BaseHealth struct { | ||||
| Lock sync.RWMutex | |||||
| Uptime time.Duration `json:"uptime"` | Uptime time.Duration `json:"uptime"` | ||||
| ActiveReaders []string `json:"activeReaders"` | |||||
| ActiveWriters []string `json:"activeWriters"` | |||||
| ActiveBeacons []string `json:"activeBeacons"` | |||||
| ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` | |||||
| ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"` | |||||
| ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"` | |||||
| } | } | ||||
| type DecoderHealth struct { | type DecoderHealth struct { | ||||
| @@ -23,38 +21,37 @@ type DecoderHealth struct { | |||||
| type LocationHealth struct { | type LocationHealth struct { | ||||
| BaseHealth | BaseHealth | ||||
| ActiveSettings []Settings `json:"activeSettings"` | |||||
| ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"` | |||||
| } | } | ||||
| type BridgeHealth struct { | type BridgeHealth struct { | ||||
| BaseHealth | BaseHealth | ||||
| } | } | ||||
| type Health struct { | |||||
| ID string `json:"id" gorm:"primaryKey"` | |||||
| Location LocationHealth `gorm:"embedded;embeddedPrefix:loc_"` | |||||
| Decoder DecoderHealth `gorm:"embedded;embeddedPrefix:dec_"` | |||||
| Bridge BridgeHealth `gorm:"embedded;embeddedPrefix:brg_"` | |||||
| } | |||||
| func (b *BaseHealth) GetUptime(startTime time.Time) { | func (b *BaseHealth) GetUptime(startTime time.Time) { | ||||
| b.Lock.Lock() | |||||
| b.Uptime = time.Since(startTime) | b.Uptime = time.Since(startTime) | ||||
| b.Lock.Unlock() | |||||
| } | } | ||||
| func (b *BaseHealth) GetActiveReaders(m *kafkaclient.KafkaManager) { | func (b *BaseHealth) GetActiveReaders(m *kafkaclient.KafkaManager) { | ||||
| b.Lock.Lock() | |||||
| b.ActiveReaders = m.GetReaders() | b.ActiveReaders = m.GetReaders() | ||||
| b.Lock.Unlock() | |||||
| } | } | ||||
| func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) { | func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) { | ||||
| b.Lock.Lock() | |||||
| b.ActiveWriters = m.GetWriters() | b.ActiveWriters = m.GetWriters() | ||||
| b.Lock.Unlock() | |||||
| } | } | ||||
| func (b *BaseHealth) GetActiveBeacons(m *AppState) { | func (b *BaseHealth) GetActiveBeacons(m *AppState) { | ||||
| b.Lock.Lock() | |||||
| beacons := m.GetAllBeacons() | beacons := m.GetAllBeacons() | ||||
| for beacon := range beacons { | for beacon := range beacons { | ||||
| b.ActiveBeacons = append(b.ActiveBeacons, beacon) | b.ActiveBeacons = append(b.ActiveBeacons, beacon) | ||||
| } | } | ||||
| b.Lock.Unlock() | |||||
| } | } | ||||
| func (d *DecoderHealth) Marshal() ([]byte, error) { | func (d *DecoderHealth) Marshal() ([]byte, error) { | ||||
| @@ -26,7 +26,7 @@ func Connect(cfg *config.Config) (*gorm.DB, error) { | |||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}); err != nil { | |||||
| if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}, appcontext.Health{}); err != nil { | |||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| @@ -30,6 +30,8 @@ func DecodeBeacon(adv appcontext.BeaconAdvertisement, appState *appcontext.AppSt | |||||
| return nil | return nil | ||||
| } | } | ||||
| fmt.Println("beacon: ", beacon) | |||||
| b, err := hex.DecodeString(beacon) | b, err := hex.DecodeString(beacon) | ||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| @@ -18,7 +18,6 @@ import ( | |||||
| ) | ) | ||||
| func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) { | func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) { | ||||
| fmt.Printf("Finding tracker for MAC: %s, ID: %s\n", msg.MAC, msg.ID) | |||||
| var tracker model.Tracker | var tracker model.Tracker | ||||
| if msg.MAC != "" { | if msg.MAC != "" { | ||||
| if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil { | if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil { | ||||