4 次程式碼提交

共有 5 個文件被更改,包括 30 次插入23 次删除
分割檢視
  1. +1
    -1
      cmd/bridge/main.go
  2. +2
    -2
      internal/app/bridge/app.go
  3. +3
    -5
      internal/pkg/bridge/handler.go
  4. +22
    -6
      internal/pkg/common/appcontext/context.go
  5. +2
    -9
      internal/pkg/common/appcontext/health.go

+ 1
- 1
cmd/bridge/main.go 查看文件

@@ -15,7 +15,7 @@ func main() {
defer stop()

cfg := config.LoadBridge()
app, err := bridge.New(cfg)
app, err := bridge.New(cfg, ctx)
if err != nil {
log.Fatalf("bridge: %v", err)
}


+ 2
- 2
internal/app/bridge/app.go 查看文件

@@ -31,7 +31,7 @@ type BridgeApp struct {
}

// New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client.
func New(cfg *config.Config) (*BridgeApp, error) {
func New(cfg *config.Config, ctx context.Context) (*BridgeApp, error) {
appState := appcontext.NewAppState()
kafkaManager := kafkaclient.InitKafkaManager()

@@ -46,7 +46,7 @@ func New(cfg *config.Config) (*BridgeApp, error) {

writer := kafkaManager.GetWriter("rawbeacons")
mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) {
bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer)
bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer, ctx)
})
if err != nil {
cleanup()


+ 3
- 5
internal/pkg/bridge/handler.go 查看文件

@@ -5,7 +5,6 @@ import (
"encoding/json"
"log/slog"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
@@ -16,7 +15,7 @@ import (
// HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV.
// For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup.
// Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/".
func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer) {
func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer, ctx context.Context) {
parts := strings.SplitN(topic, "/", 2)
hostname := ""
if len(parts) >= 2 {
@@ -51,10 +50,9 @@ func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppSta
slog.Error("marshaling beacon advertisement", "err", err)
break
}
if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: encoded}); err != nil {
if err := kafkaclient.Write(ctx, writer, kafka.Message{Value: encoded}); err != nil {
slog.Error("writing to Kafka", "err", err)
time.Sleep(1 * time.Second)
break
continue
}
}
return


+ 22
- 6
internal/pkg/common/appcontext/context.go 查看文件

@@ -59,15 +59,14 @@ func NewAppState() *AppState {
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
ActiveBeacons: []string{},
},
Decoder: DecoderHealth{
BaseHealth: BaseHealth{
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
},
Bridge: BridgeHealth{
@@ -75,8 +74,8 @@ func NewAppState() *AppState {
Uptime: 0,
ActiveReaders: []string{},
ActiveWriters: []string{},
ActiveBeacons: []string{},
},
ActiveBeacons: []string{},
},
Kafka: ServiceStatus{Status: "unknown"},
Database: ServiceStatus{Status: "unknown"},
@@ -117,10 +116,14 @@ func (m *AppState) UpdateServerHealth(kafka, database ServiceStatus) {
}

func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) {
beacons := m.GetAllBeacons()
m.health.Location.ActiveBeacons = make([]string, 0, len(beacons))
for beacon := range beacons {
m.health.Location.ActiveBeacons = append(m.health.Location.ActiveBeacons, beacon)
}
m.health.Location.GetUptime(m.startTime)
m.health.Location.GetActiveReaders(k)
m.health.Location.GetActiveWriters(k)
m.health.Location.GetActiveBeacons(m)
return m.health.Location.Marshal()
}

@@ -128,15 +131,18 @@ func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) ([]byte, error)
m.health.Decoder.GetUptime(m.startTime)
m.health.Decoder.GetActiveReaders(k)
m.health.Decoder.GetActiveWriters(k)
m.health.Decoder.GetActiveBeacons(m)
return m.health.Decoder.Marshal()
}

func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) {
beacons := m.GetBeaconLookup()
m.health.Bridge.ActiveBeacons = make([]string, 0, len(beacons))
for beacon := range beacons {
m.health.Bridge.ActiveBeacons = append(m.health.Bridge.ActiveBeacons, beacon)
}
m.health.Bridge.GetUptime(m.startTime)
m.health.Bridge.GetActiveReaders(k)
m.health.Bridge.GetActiveWriters(k)
m.health.Bridge.GetActiveBeacons(m)
return m.health.Bridge.Marshal()
}

@@ -179,6 +185,16 @@ func (m *AppState) CleanLookup() {
m.beaconsLookup.Lock.Unlock()
}

func (m *AppState) GetBeaconLookup() map[string]string {
m.beaconsLookup.Lock.RLock()
defer m.beaconsLookup.Lock.RUnlock()
cp := make(map[string]string, len(m.beaconsLookup.Lookup))
for k, v := range m.beaconsLookup.Lookup {
cp[k] = v
}
return cp
}

// BeaconExists checks if a beacon exists in the lookup
func (m *AppState) BeaconExists(id string) (string, bool) {
m.beaconsLookup.Lock.RLock()


+ 2
- 9
internal/pkg/common/appcontext/health.go 查看文件

@@ -17,7 +17,6 @@ type BaseHealth struct {
Uptime time.Duration `json:"uptime"`
ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"`
ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"`
ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"`
}

type DecoderHealth struct {
@@ -27,11 +26,13 @@ type DecoderHealth struct {

type LocationHealth struct {
BaseHealth
ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"`
ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"`
}

type BridgeHealth struct {
BaseHealth
ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"`
}

type Health struct {
@@ -54,14 +55,6 @@ func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) {
b.ActiveWriters = m.GetWriters()
}

func (b *BaseHealth) GetActiveBeacons(m *AppState) {
beacons := m.GetAllBeacons()
b.ActiveBeacons = []string{}
for beacon := range beacons {
b.ActiveBeacons = append(b.ActiveBeacons, beacon)
}
}

func (d *DecoderHealth) Marshal() ([]byte, error) {
return json.Marshal(d)
}


Loading…
取消
儲存