4 Revīzijas

18 mainītis faili ar 162 papildinājumiem un 97 dzēšanām
Dalītais skats
  1. Binārs
      bridge
  2. +15
    -0
      build/init-scripts/create_topic.sh
  3. Binārs
      decoder
  4. +19
    -1
      internal/app/bridge/app.go
  5. +21
    -1
      internal/app/decoder/app.go
  6. +17
    -1
      internal/app/location/app.go
  7. +21
    -12
      internal/app/server/app.go
  8. +6
    -0
      internal/app/server/events.go
  9. +0
    -8
      internal/pkg/apiclient/data.go
  10. +0
    -1
      internal/pkg/apiclient/updatedb.go
  11. +2
    -7
      internal/pkg/bridge/handler.go
  12. +47
    -50
      internal/pkg/common/appcontext/context.go
  13. +11
    -14
      internal/pkg/common/appcontext/health.go
  14. +1
    -1
      internal/pkg/database/database.go
  15. +2
    -0
      internal/pkg/decoder/process.go
  16. +0
    -1
      internal/pkg/service/beacon_service.go
  17. Binārs
      location
  18. Binārs
      server

Binārs
bridge Parādīt failu


+ 15
- 0
build/init-scripts/create_topic.sh Parādīt failu

@@ -28,4 +28,19 @@
# create topic alert
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic alert \
--partitions 1 --replication-factor 1

# create topic healthlocation
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic healthlocation \
--partitions 1 --replication-factor 1

# create topic healthdecoder
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic healthdecoder \
--partitions 1 --replication-factor 1

# create topic healthbridge
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic healthbridge \
--partitions 1 --replication-factor 1

Binārs
decoder Parādīt failu


+ 19
- 1
internal/app/bridge/app.go Parādīt failu

@@ -5,6 +5,7 @@ import (
"encoding/json"
"log/slog"
"sync"
"time"

"github.com/AFASystems/presence/internal/pkg/bridge"
"github.com/AFASystems/presence/internal/pkg/common/appcontext"
@@ -13,6 +14,7 @@ import (
"github.com/AFASystems/presence/internal/pkg/logger"
"github.com/AFASystems/presence/internal/pkg/model"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/segmentio/kafka-go"
)

// BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka).
@@ -37,7 +39,7 @@ func New(cfg *config.Config) (*BridgeApp, error) {
slog.SetDefault(srvLogger)

readerTopics := []string{"apibeacons", "alert", "mqtt"}
writerTopics := []string{"rawbeacons"}
writerTopics := []string{"rawbeacons", "healthbridge"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics)
@@ -71,10 +73,26 @@ func (a *BridgeApp) Run(ctx context.Context) {
go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg)

healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
defer healthTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-healthTicker.C:
health, err := a.AppState.GetBridgeHealth(a.KafkaManager)
if err != nil {
slog.Error("getting bridge health", "err", err)
continue
}
m := kafka.Message{
Value: health,
}
if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil {
slog.Error("writing bridge health", "err", err)
continue
}
case msg := <-a.ChApi:
switch msg.Method {
case "POST":


+ 21
- 1
internal/app/decoder/app.go Parādīt failu

@@ -2,8 +2,10 @@ package decoder

import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/config"
@@ -11,6 +13,7 @@ import (
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/logger"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// DecoderApp holds dependencies for the decoder service.
@@ -34,7 +37,7 @@ func New(cfg *config.Config) (*DecoderApp, error) {
slog.SetDefault(srvLogger)

readerTopics := []string{"rawbeacons", "parser"}
writerTopics := []string{"alertbeacons"}
writerTopics := []string{"alertbeacons", "healthdecoder"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics)
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics)
@@ -60,11 +63,28 @@ func (a *DecoderApp) Run(ctx context.Context) {
go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("parser"), a.ChParser, ctx, &a.wg)

healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
defer healthTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-healthTicker.C:
health, err := a.AppState.GetDecoderHealth(a.KafkaManager)
if err != nil {
slog.Error("getting decoder health", "err", err)
continue
}
m := kafka.Message{
Value: health,
}
if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthdecoder"), m); err != nil {
slog.Error("writing decoder health", "err", err)
continue
}
case msg := <-a.ChRaw:
fmt.Println("msg: ", msg)
decoder.ProcessIncoming(msg, a.AppState, a.KafkaManager.GetWriter("alertbeacons"), a.ParserRegistry)
case msg := <-a.ChParser:
switch msg.ID {


+ 17
- 1
internal/app/location/app.go Parādīt failu

@@ -37,7 +37,7 @@ func New(cfg *config.Config) (*LocationApp, error) {
slog.SetDefault(srvLogger)

readerTopics := []string{"rawbeacons", "settings"}
writerTopics := []string{"locevents"}
writerTopics := []string{"locevents", "healthlocation"}
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics)
kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics)
@@ -62,10 +62,26 @@ func (a *LocationApp) Run(ctx context.Context) {
locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
defer locTicker.Stop()

healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
defer healthTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-healthTicker.C:
health, err := a.AppState.GetLocationHealth(a.KafkaManager)
if err != nil {
slog.Error("getting location health", "err", err)
continue
}
m := kafka.Message{
Value: health,
}
if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthlocation"), m); err != nil {
slog.Error("writing location health", "err", err)
continue
}
case <-locTicker.C:
settings := a.AppState.GetSettings()
slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm)


+ 21
- 12
internal/app/server/app.go Parādīt failu

@@ -23,16 +23,19 @@ import (

// ServerApp holds dependencies and state for the server service.
type ServerApp struct {
Cfg *config.Config
DB *gorm.DB
KafkaManager *kafkaclient.KafkaManager
AppState *appcontext.AppState
ChLoc chan model.HTTPLocation
ChEvents chan appcontext.BeaconEvent
ctx context.Context
Server *http.Server
Cleanup func()
wg sync.WaitGroup
Cfg *config.Config
DB *gorm.DB
KafkaManager *kafkaclient.KafkaManager
AppState *appcontext.AppState
ChLoc chan model.HTTPLocation
ChEvents chan appcontext.BeaconEvent
ChHealthLocation chan appcontext.LocationHealth
ChHealthDecoder chan appcontext.DecoderHealth
ChHealthBridge chan appcontext.BridgeHealth
ctx context.Context
Server *http.Server
Cleanup func()
wg sync.WaitGroup
}

// New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers.
@@ -98,16 +101,22 @@ func (a *ServerApp) Init(ctx context.Context) error {
slog.Error("UpdateDB", "err", err)
}

readerTopics := []string{"locevents", "alertbeacons", "health"}
readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"}
a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics)
slog.Info("Kafka readers initialized", "topics", readerTopics)

a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE)
a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE)
a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE)
a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE)
a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE)

a.wg.Add(2)
a.wg.Add(5)
go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg)
go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg)

a.Server = &http.Server{
Addr: a.Cfg.HTTPAddr,


+ 6
- 0
internal/app/server/events.go Parādīt failu

@@ -22,6 +22,12 @@ func RunEventLoop(ctx context.Context, a *ServerApp) {
select {
case <-ctx.Done():
return
case msg := <-a.ChHealthLocation:
slog.Info("location health", "health", msg)
case msg := <-a.ChHealthDecoder:
slog.Info("decoder health", "health", msg)
case msg := <-a.ChHealthBridge:
slog.Info("bridge health", "health", msg)
case msg := <-a.ChLoc:
switch msg.Method {
case "Standard":


+ 0
- 8
internal/pkg/apiclient/data.go Parādīt failu

@@ -3,7 +3,6 @@ package apiclient
import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/AFASystems/presence/internal/pkg/config"
@@ -17,13 +16,6 @@ func GetTrackers(token string, client *http.Client, cfg *config.Config) ([]model
return []model.Tracker{}, err
}

bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
fmt.Printf("error read body: %+v\n", err)
return []model.Tracker{}, err
}
fmt.Printf("body: %s\n", string(bodyBytes))

var i []model.Tracker
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {


+ 0
- 1
internal/pkg/apiclient/updatedb.go Parādīt failu

@@ -29,7 +29,6 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk
}

if trackers, err := GetTrackers(token, client, cfg); err == nil {
fmt.Printf("trackers: %+v\n", trackers)
syncTable(db, trackers)
if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil {
msg := fmt.Sprintf("Error in sending delete all from lookup message: %v", err)


+ 2
- 7
internal/pkg/bridge/handler.go Parādīt failu

@@ -13,15 +13,10 @@ import (
"github.com/segmentio/kafka-go"
)

// BeaconLookup provides MAC->ID lookup (e.g. AppState).
type BeaconLookup interface {
BeaconExists(mac string) (id string, ok bool)
}

// HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV.
// For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup.
// Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/".
func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer *kafka.Writer) {
func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer) {
parts := strings.SplitN(topic, "/", 2)
hostname := ""
if len(parts) >= 2 {
@@ -39,7 +34,7 @@ func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer
if reading.Type == "Gateway" {
continue
}
id, ok := lookup.BeaconExists(reading.MAC)
id, ok := appState.BeaconExists(reading.MAC)
if !ok {
continue
}


+ 47
- 50
internal/pkg/common/appcontext/context.go Parādīt failu

@@ -4,7 +4,6 @@ import (
"fmt"
"log/slog"
"os"
"sync"
"time"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
@@ -13,14 +12,12 @@ import (

// AppState provides centralized access to application state
type AppState struct {
beacons BeaconsList
settings Settings
beaconEvents BeaconEventList
beaconsLookup BeaconsLookup
locationHealth LocationHealth
decoderHealth DecoderHealth
bridgeHealth BridgeHealth
startTime time.Time
beacons BeaconsList
settings Settings
beaconEvents BeaconEventList
beaconsLookup BeaconsLookup
health Health
startTime time.Time
}

func getEnv(key, def string) string {
@@ -54,58 +51,58 @@ func NewAppState() *AppState {
beaconsLookup: BeaconsLookup{
Lookup: make(map[string]string),
},
locationHealth: LocationHealth{
BaseHealth: BaseHealth{
Lock: sync.RWMutex{},
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
health: Health{
ID: "1",
Location: LocationHealth{
BaseHealth: BaseHealth{
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
},
},
decoderHealth: DecoderHealth{
BaseHealth: BaseHealth{
Lock: sync.RWMutex{},
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
Decoder: DecoderHealth{
BaseHealth: BaseHealth{
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
},
},
bridgeHealth: BridgeHealth{
BaseHealth: BaseHealth{
Lock: sync.RWMutex{},
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
Bridge: BridgeHealth{
BaseHealth: BaseHealth{
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
},
},
}
}

func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) *LocationHealth {
m.locationHealth.GetUptime(m.startTime)
m.locationHealth.GetActiveReaders(k)
m.locationHealth.GetActiveWriters(k)
m.locationHealth.GetActiveBeacons(m)
return &m.locationHealth
func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) {
m.health.Location.GetUptime(m.startTime)
m.health.Location.GetActiveReaders(k)
m.health.Location.GetActiveWriters(k)
m.health.Location.GetActiveBeacons(m)
return m.health.Location.Marshal()
}

func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) *DecoderHealth {
m.decoderHealth.GetUptime(m.startTime)
m.decoderHealth.GetActiveReaders(k)
m.decoderHealth.GetActiveWriters(k)
m.decoderHealth.GetActiveBeacons(m)
return &m.decoderHealth
func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) ([]byte, error) {
m.health.Decoder.GetUptime(m.startTime)
m.health.Decoder.GetActiveReaders(k)
m.health.Decoder.GetActiveWriters(k)
m.health.Decoder.GetActiveBeacons(m)
return m.health.Decoder.Marshal()
}

func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) *BridgeHealth {
m.bridgeHealth.GetUptime(m.startTime)
m.bridgeHealth.GetActiveReaders(k)
m.bridgeHealth.GetActiveWriters(k)
m.bridgeHealth.GetActiveBeacons(m)
return &m.bridgeHealth
func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) {
m.health.Bridge.GetUptime(m.startTime)
m.health.Bridge.GetActiveReaders(k)
m.health.Bridge.GetActiveWriters(k)
m.health.Bridge.GetActiveBeacons(m)
return m.health.Bridge.Marshal()
}

// GetBeacons returns thread-safe access to beacons list


+ 11
- 14
internal/pkg/common/appcontext/health.go Parādīt failu

@@ -2,18 +2,16 @@ package appcontext

import (
"encoding/json"
"sync"
"time"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
)

type BaseHealth struct {
Lock sync.RWMutex
Uptime time.Duration `json:"uptime"`
ActiveReaders []string `json:"activeReaders"`
ActiveWriters []string `json:"activeWriters"`
ActiveBeacons []string `json:"activeBeacons"`
ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"`
ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"`
ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"`
}

type DecoderHealth struct {
@@ -23,38 +21,37 @@ type DecoderHealth struct {

type LocationHealth struct {
BaseHealth
ActiveSettings []Settings `json:"activeSettings"`
ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"`
}

type BridgeHealth struct {
BaseHealth
}

type Health struct {
ID string `json:"id" gorm:"primaryKey"`
Location LocationHealth `gorm:"embedded;embeddedPrefix:loc_"`
Decoder DecoderHealth `gorm:"embedded;embeddedPrefix:dec_"`
Bridge BridgeHealth `gorm:"embedded;embeddedPrefix:brg_"`
}

func (b *BaseHealth) GetUptime(startTime time.Time) {
b.Lock.Lock()
b.Uptime = time.Since(startTime)
b.Lock.Unlock()
}

func (b *BaseHealth) GetActiveReaders(m *kafkaclient.KafkaManager) {
b.Lock.Lock()
b.ActiveReaders = m.GetReaders()
b.Lock.Unlock()
}

func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) {
b.Lock.Lock()
b.ActiveWriters = m.GetWriters()
b.Lock.Unlock()
}

func (b *BaseHealth) GetActiveBeacons(m *AppState) {
b.Lock.Lock()
beacons := m.GetAllBeacons()
for beacon := range beacons {
b.ActiveBeacons = append(b.ActiveBeacons, beacon)
}
b.Lock.Unlock()
}

func (d *DecoderHealth) Marshal() ([]byte, error) {


+ 1
- 1
internal/pkg/database/database.go Parādīt failu

@@ -26,7 +26,7 @@ func Connect(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}); err != nil {
if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}, appcontext.Health{}); err != nil {
return nil, err
}



+ 2
- 0
internal/pkg/decoder/process.go Parādīt failu

@@ -30,6 +30,8 @@ func DecodeBeacon(adv appcontext.BeaconAdvertisement, appState *appcontext.AppSt
return nil
}

fmt.Println("beacon: ", beacon)

b, err := hex.DecodeString(beacon)
if err != nil {
return err


+ 0
- 1
internal/pkg/service/beacon_service.go Parādīt failu

@@ -18,7 +18,6 @@ import (
)

func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) {
fmt.Printf("Finding tracker for MAC: %s, ID: %s\n", msg.MAC, msg.ID)
var tracker model.Tracker
if msg.MAC != "" {
if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil {


Binārs
location Parādīt failu


Binārs
server Parādīt failu


Notiek ielāde…
Atcelt
Saglabāt