From 7411f34002ed960421e79a896da9ecac307b7177 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Tue, 20 Jan 2026 15:42:37 +0100 Subject: [PATCH] chore: remove unecessary types --- cmd/server/main.go | 2 +- internal/pkg/apiclient/updatedb.go | 8 +- internal/pkg/model/trackers.go | 30 +- internal/pkg/model/types.go | 28 - internal/pkg/service/beacon_service.go | 10 +- logging.md | 1467 ++++++++++++++++++++++++ 6 files changed, 1496 insertions(+), 49 deletions(-) create mode 100644 logging.md diff --git a/cmd/server/main.go b/cmd/server/main.go index bd343a2..1b1314d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -160,7 +160,7 @@ eventLoop: continue } - if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil { + if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery, Temperature: msg.Temperature}).Error; err != nil { fmt.Printf("Error in saving decoder event for beacon: %s\n", id) continue } diff --git a/internal/pkg/apiclient/updatedb.go b/internal/pkg/apiclient/updatedb.go index aecc1fe..c670644 100644 --- a/internal/pkg/apiclient/updatedb.go +++ b/internal/pkg/apiclient/updatedb.go @@ -50,10 +50,10 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk syncTable(db, gateways) } - if tracks, err := GetTracks(token, client); err == nil { - fmt.Printf("Tracks: %+v\n", tracks) - syncTable(db, tracks) - } + // if tracks, err := GetTracks(token, client); err == nil { + // fmt.Printf("Tracks: %+v\n", tracks) + // syncTable(db, tracks) + // } if zones, err := GetZones(token, client); err == nil { syncTable(db, zones) diff --git a/internal/pkg/model/trackers.go b/internal/pkg/model/trackers.go index 6ddf694..af373a3 100644 --- a/internal/pkg/model/trackers.go +++ b/internal/pkg/model/trackers.go @@ -1,18 +1,20 @@ package model type Tracker struct { - ID string `json:"id" gorm:"primaryKey"` - Name string `json:"name"` - MAC string `json:"mac"` - Status string `json:"status"` - Model string `json:"model"` - Position string `json:"position"` - Notes string `json:"notes"` - X float32 `json:"x"` - Y float32 `json:"y"` - Floor string `json:"floor"` - Building string `json:"building"` - Location string `json:"location"` - Distance float64 `json:"distance"` - Battery uint32 `json:"battery"` + ID string `json:"id" gorm:"primaryKey"` + Name string `json:"name"` + MAC string `json:"mac"` + Status string `json:"status"` + Model string `json:"model"` + Position string `json:"position"` + Notes string `json:"notes"` + X float32 `json:"x"` + Y float32 `json:"y"` + Floor string `json:"floor"` + Building string `json:"building"` + Location string `json:"location"` + Distance float64 `json:"distance"` + Battery uint32 `json:"battery"` + BatteryThreshold uint32 `json:"batteryThreshold"` + Temperature uint16 `json:"temperature"` } diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index 3892d54..0062035 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -35,12 +35,6 @@ type BeaconMetric struct { Timestamp int64 } -// Location defines a physical location and synchronization control. -type Location struct { - Name string - Lock sync.RWMutex -} - // HTTPLocation describes a beacon's state as served over HTTP. type HTTPLocation struct { Method string `json:"method"` @@ -51,17 +45,6 @@ type HTTPLocation struct { LastSeen int64 `json:"last_seen"` } -// LocationChange defines a change event for a beacon's detected location. -type LocationChange struct { - Method string `json:"method"` - BeaconRef Beacon `json:"beacon_info"` - Name string `json:"name"` - BeaconName string `json:"beacon_name"` - PreviousLocation string `json:"previous_location"` - NewLocation string `json:"new_location"` - Timestamp int64 `json:"timestamp"` -} - // Beacon holds all relevant information about a tracked beacon device. type Beacon struct { Name string `json:"name"` @@ -136,12 +119,6 @@ type BeaconEventList struct { Lock sync.RWMutex } -// LocationsList holds all known locations with concurrency protection. -type LocationsList struct { - Locations map[string]Location - Lock sync.RWMutex -} - // RawReading represents an incoming raw sensor reading. type RawReading struct { Timestamp string `json:"timestamp"` @@ -151,11 +128,6 @@ type RawReading struct { RawData string `json:"rawData"` } -type LatestBeaconsList struct { - LatestList map[string]Beacon - Lock sync.RWMutex -} - type ApiUpdate struct { Method string ID string diff --git a/internal/pkg/service/beacon_service.go b/internal/pkg/service/beacon_service.go index e481b6e..e005bf6 100644 --- a/internal/pkg/service/beacon_service.go +++ b/internal/pkg/service/beacon_service.go @@ -25,6 +25,11 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka. return } + var tracker model.Tracker + if err := db.Where("id = ?", msg.ID).Find(&tracker).Error; err != nil { + return + } + var allowedZones []string for _, z := range zones { allowedZones = append(allowedZones, z.ZoneList...) @@ -55,12 +60,13 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka. } } - if err := db.Create(&model.Tracks{UUID: msg.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: msg.ID}).Error; err != nil { + // status, subject, subject name? + if err := db.Create(&model.Tracks{UUID: msg.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: msg.ID, Floor: gw.Floor, Building: gw.Building, TrackerMac: tracker.MAC}).Error; err != nil { fmt.Println("Error in saving distance for beacon: ", err) return } - if err := db.Updates(&model.Tracker{ID: msg.ID, Location: gw.ID, Distance: msg.Distance}).Error; err != nil { + if err := db.Updates(&model.Tracker{ID: msg.ID, Location: gw.ID, Distance: msg.Distance, X: gw.X, Y: gw.Y}).Error; err != nil { fmt.Println("Error in saving distance for beacon: ", err) return } diff --git a/logging.md b/logging.md new file mode 100644 index 0000000..f5935d8 --- /dev/null +++ b/logging.md @@ -0,0 +1,1467 @@ +# Logging Standards and Guidelines + +## Overview +This document defines comprehensive logging standards for the AFASystems Presence services using Go's structured logging (`log/slog`). The goal is to provide consistent, searchable, and actionable logs across all services. + +## Logger Configuration + +### Enhanced Logger Package +**File:** [internal/pkg/logger/logger.go](internal/pkg/logger/logger.go) + +Update the logger to support both JSON and text formats with configurable log levels: + +```go +package logger + +import ( + "io" + "log" + "log/slog" + "os" + "path/filepath" +) + +type LogConfig struct { + Filename string + Level string + Format string // "json" or "text" + AddSource bool +} + +func CreateLogger(config LogConfig) *slog.Logger { + // Ensure log directory exists + dir := filepath.Dir(config.Filename) + if err := os.MkdirAll(dir, 0755); err != nil { + log.Fatalf("Failed to create log directory: %v\n", err) + } + + f, err := os.OpenFile(config.Filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v\n", err) + } + + // Multi-writer: stderr + file + w := io.MultiWriter(os.Stderr, f) + + // Parse log level + var level slog.Level + switch config.Level { + case "debug": + level = slog.LevelDebug + case "info": + level = slog.LevelInfo + case "warn": + level = slog.LevelWarn + case "error": + level = slog.LevelError + default: + level = slog.LevelInfo + } + + // Create handler options + opts := &slog.HandlerOptions{ + Level: level, + AddSource: config.AddSource, + } + + // Choose handler based on format + var handler slog.Handler + if config.Format == "json" { + handler = slog.NewJSONHandler(w, opts) + } else { + handler = slog.NewTextHandler(w, opts) + } + + logger := slog.New(handler) + return logger +} + +// Convenience function for backward compatibility +func CreateLoggerSimple(fname string) *slog.Logger { + return CreateLogger(LogConfig{ + Filename: fname, + Level: "info", + Format: "json", + AddSource: false, + }) +} +``` + +### Usage in Services + +```go +// Replace: slog.SetDefault(logger.CreateLogger("bridge.log")) +// With: +slog.SetDefault(logger.CreateLogger(logger.LogConfig{ + Filename: "bridge.log", + Level: cfg.LogLevel, // Add to config + Format: cfg.LogFormat, // Add to config + AddSource: cfg.LogAddSource, // Add to config +})) +``` + +--- + +## Structured Logging Best Practices + +### Log Levels + +| Level | Usage | Examples | +|-------|-------|----------| +| **DEBUG** | Detailed debugging information | Message payloads, internal state changes | +| **INFO** | General informational messages | Service started, connection established, normal operations | +| **WARN** | Unexpected but recoverable situations | Retry attempts, fallback to defaults, degraded performance | +| **ERROR** | Error events that might still allow the application to continue | Failed operations, API errors, parsing failures | +| **FATAL** | Critical errors requiring immediate shutdown | Cannot connect to essential services, invalid configuration | + +### Structured Fields + +Always use structured attributes instead of formatting strings: + +```go +// ❌ BAD +slog.Info(fmt.Sprintf("Processing beacon %s with RSSI %d", id, rssi)) + +// ✅ GOOD +slog.Info("Processing beacon", + "id", id, + "rssi", rssi, + "hostname", hostname, +) +``` + +### Standard Field Names + +| Field Name | Type | Description | +|------------|------|-------------| +| `service` | string | Service name (bridge, decoder, location, server) | +| `component` | string | Component within service (mqtt, kafka, http) | +| `topic` | string | Kafka topic or MQTT topic | +| `beacon_id` | string | Beacon/tracker identifier | +| `gateway_id` | string | Gateway identifier | +| `mac` | string | MAC address | +| `rssi` | int64 | Received Signal Strength Indicator | +| `duration` | string/int | Duration of operation | +| `error` | error | Error object (for error-level logs) | +| `count` | int | Count of items processed | +| `status` | string | Status of operation (success, failed, retry) | + +--- + +## Service-Specific Logging Guidelines + +### 1. Bridge Service +**File:** [cmd/bridge/main.go](cmd/bridge/main.go) + +#### Initialization + +```go +// Line 100-116: Replace basic info logs +slog.Info("Initializing bridge service", + "kafka_url", cfg.KafkaURL, + "mqtt_host", cfg.MQTTHost, + "reader_topics", []string{"apibeacons", "alert", "mqtt"}, + "writer_topics", []string{"rawbeacons"}, +) + +slog.Info("Kafka manager initialized", + "readers_count", len(readerTopics), + "writers_count", len(writerTopics), +) + +// Line 127-144: MQTT connection +slog.Info("Connecting to MQTT broker", + "broker", fmt.Sprintf("%s:%d", cfg.MQTTHost, 1883), + "client_id", "go_mqtt_client", +) + +if token := client.Connect(); token.Wait() && token.Error() != nil { + slog.Error("Failed to connect to MQTT broker", + "error", token.Error(), + "broker", cfg.MQTTHost, + ) + panic(token.Error()) +} + +slog.Info("Successfully connected to MQTT broker", + "broker", cfg.MQTTHost, +) + +// Line 146-202: MQTT subscription +slog.Info("Subscribed to MQTT topic", + "topic", topic, +) +``` + +#### MQTT Message Handler + +```go +// Line 26-83: Replace with structured logging +func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) { + hostname := strings.Split(topic, "/")[1] + + slog.Debug("Received MQTT message", + "topic", topic, + "hostname", hostname, + "message_length", len(message), + ) + + msgStr := string(message) + + if strings.HasPrefix(msgStr, "[") { + var readings []model.RawReading + if err := json.Unmarshal(message, &readings); err != nil { + slog.Error("Failed to parse JSON message", + "error", err, + "topic", topic, + "message", msgStr, + ) + return + } + + slog.Debug("Parsed JSON readings", + "count", len(readings), + "topic", topic, + ) + + processed := 0 + skipped := 0 + for _, reading := range readings { + if reading.Type == "Gateway" { + skipped++ + continue + } + + val, ok := appState.BeaconExists(reading.MAC) + if !ok { + slog.Debug("Skipping unregistered beacon", + "mac", reading.MAC, + "hostname", hostname, + ) + skipped++ + continue + } + + adv := model.BeaconAdvertisement{ + ID: val, + Hostname: hostname, + MAC: reading.MAC, + RSSI: int64(reading.RSSI), + Data: reading.RawData, + } + + encodedMsg, err := json.Marshal(adv) + if err != nil { + slog.Error("Failed to marshal beacon advertisement", + "error", err, + "beacon_id", val, + ) + continue + } + + msg := kafka.Message{Value: encodedMsg} + + if err := writer.WriteMessages(context.Background(), msg); err != nil { + slog.Error("Failed to write to Kafka", + "error", err, + "beacon_id", val, + "topic", "rawbeacons", + ) + time.Sleep(1 * time.Second) + break + } + processed++ + } + + slog.Info("Processed MQTT readings", + "processed", processed, + "skipped", skipped, + "hostname", hostname, + ) + } else { + slog.Warn("Received non-JSON message format", + "message", msgStr, + "topic", topic, + ) + } +} +``` + +#### Event Loop + +```go +// Line 148-184: Event loop logging +case msg := <-chApi: + slog.Debug("Received API update from Kafka", + "method", msg.Method, + "id", msg.ID, + "mac", msg.MAC, + ) + + switch msg.Method { + case "POST": + id := msg.ID + appState.AddBeaconToLookup(msg.MAC, id) + slog.Info("Beacon added to lookup", + "beacon_id", id, + "mac", msg.MAC, + ) + case "DELETE": + id := msg.MAC + if id == "all" { + appState.CleanLookup() + slog.Info("Cleared all beacons from lookup") + continue + } + appState.RemoveBeaconFromLookup(id) + slog.Info("Beacon removed from lookup", + "beacon_id", id, + ) + } + +case msg := <-chAlert: + slog.Debug("Received alert from Kafka", + "alert_id", msg.ID, + "type", msg.Type, + ) + // ... processing ... + +case msg := <-chMqtt: + slog.Debug("Received tracker list request", + "trackers_count", len(msg), + ) + // ... processing ... +``` + +#### Shutdown + +```go +// Line 186-195: Graceful shutdown +slog.Info("Shutting down bridge service", + "reason", "context_done", +) + +wg.Wait() + +slog.Info("All goroutines stopped, closing Kafka connections") +kafkaManager.CleanKafkaReaders() +kafkaManager.CleanKafkaWriters() + +client.Disconnect(250) +slog.Info("Disconnected from MQTT broker") + +slog.Info("Bridge service shutdown complete") +``` + +--- + +### 2. Decoder Service +**File:** [cmd/decoder/main.go](cmd/decoder/main.go) + +#### Initialization + +```go +// Line 42-48: Service initialization +slog.Info("Initializing decoder service", + "kafka_url", cfg.KafkaURL, + "reader_topics", []string{"rawbeacons", "parser"}, + "writer_topics", []string{"alertbeacons"}, +) + +slog.Info("Decoder service initialized", + "parsers_loaded", len(parserRegistry.ParserList), +) +``` + +#### Beacon Processing + +```go +// Line 86-93: Process incoming beacon +func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) { + slog.Debug("Processing beacon advertisement", + "beacon_id", adv.ID, + "hostname", adv.Hostname, + "rssi", adv.RSSI, + "data_length", len(adv.Data), + ) + + if err := decodeBeacon(adv, appState, writer, parserRegistry); err != nil { + slog.Error("Failed to decode beacon", + "error", err, + "beacon_id", adv.ID, + "hostname", adv.Hostname, + ) + } +} + +// Line 95-136: Decode beacon +func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error { + beacon := strings.TrimSpace(adv.Data) + id := adv.ID + + if beacon == "" { + slog.Debug("Skipping beacon with empty data", + "beacon_id", id, + ) + return nil + } + + b, err := hex.DecodeString(beacon) + if err != nil { + slog.Warn("Failed to decode hex string", + "error", err, + "beacon_id", id, + "data", beacon, + ) + return err + } + + slog.Debug("Decoded beacon data", + "beacon_id", id, + "bytes_length", len(b), + ) + + // ... parsing logic ... + + if event.ID == "" { + slog.Debug("Skipping event with empty ID", + "beacon_id", id, + ) + return nil + } + + prevEvent, ok := appState.GetBeaconEvent(id) + appState.UpdateBeaconEvent(id, event) + + if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { + slog.Debug("Skipping duplicate event", + "beacon_id", id, + "event_type", event.Type, + ) + return nil + } + + slog.Info("Decoded beacon event", + "beacon_id", id, + "event_type", event.Type, + "event", event.Event, + "battery", event.Battery, + "temperature", event.Temperature, + ) + + eMsg, err := event.ToJSON() + if err != nil { + slog.Error("Failed to marshal event to JSON", + "error", err, + "beacon_id", id, + ) + return err + } + + if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil { + slog.Error("Failed to write event to Kafka", + "error", err, + "beacon_id", id, + "topic", "alertbeacons", + ) + return err + } + + slog.Debug("Successfully sent event to Kafka", + "beacon_id", id, + "topic", "alertbeacons", + ) + + return nil +} +``` + +#### Parser Management + +```go +// Line 64-74: Parser configuration +case msg := <-chParser: + slog.Info("Received parser configuration update", + "action", msg.ID, + "parser_name", msg.Name, + ) + + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + slog.Info("Registered new parser", + "parser_name", config.Name, + "type", config.Type, + ) + case "delete": + parserRegistry.Unregister(msg.Name) + slog.Info("Unregistered parser", + "parser_name", msg.Name, + ) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + slog.Info("Updated parser configuration", + "parser_name", config.Name, + "type", config.Type, + ) + } +``` + +--- + +### 3. Location Service +**File:** [cmd/location/main.go](cmd/location/main.go) + +#### Initialization + +```go +// Line 37-47: Service initialization +slog.Info("Initializing location service", + "kafka_url", cfg.KafkaURL, + "reader_topics", []string{"rawbeacons", "settings"}, + "writer_topics", []string{"locevents"}, + "algorithm", "filter", +) +``` + +#### Location Algorithm + +```go +// Line 60-68: Algorithm execution +case <-locTicker.C: + settings := appState.GetSettings() + slog.Debug("Running location algorithm", + "algorithm", settings.CurrentAlgorithm, + "beacon_count", len(appState.GetAllBeacons()), + ) + + switch settings.CurrentAlgorithm { + case "filter": + getLikelyLocations(appState, kafkaManager.GetWriter("locevents")) + case "ai": + slog.Warn("AI algorithm not yet implemented") + } +``` + +#### Location Processing + +```go +// Line 85-157: Location calculation +func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { + beacons := appState.GetAllBeacons() + settings := appState.GetSettingsValue() + + slog.Debug("Calculating likely locations", + "beacon_count", len(beacons), + "algorithm", "filter", + ) + + processed := 0 + skipped := 0 + updated := 0 + + for _, beacon := range beacons { + mSize := len(beacon.BeaconMetrics) + if mSize == 0 { + skipped++ + continue + } + + // Check if beacon is too old + if (int64(time.Now().Unix()) - beacon.BeaconMetrics[mSize-1].Timestamp) > settings.LastSeenThreshold { + slog.Debug("Beacon data too old", + "beacon_id", beacon.ID, + "last_seen", beacon.BeaconMetrics[mSize-1].Timestamp, + "threshold", settings.LastSeenThreshold, + ) + skipped++ + continue + } + + // ... location calculation logic ... + + locationChanged := bestLocName != beacon.PreviousLocation + + if locationChanged { + slog.Info("Beacon location changed", + "beacon_id", beacon.ID, + "previous_location", beacon.PreviousLocation, + "new_location", bestLocName, + "confidence", beacon.LocationConfidence, + "distance", r.Distance, + ) + updated++ + } + + appState.UpdateBeacon(beacon.ID, beacon) + processed++ + + // ... send to Kafka ... + } + + slog.Info("Location calculation complete", + "processed", processed, + "updated", updated, + "skipped", skipped, + ) +} +``` + +#### Beacon Assignment + +```go +// Line 159-199: Assign beacon to list +func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) { + id := adv.ID + now := time.Now().Unix() + settings := appState.GetSettingsValue() + + slog.Debug("Assigning beacon to tracking list", + "beacon_id", id, + "hostname", adv.Hostname, + "rssi", adv.RSSI, + ) + + if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { + slog.Debug("Beacon RSSI below threshold, skipping", + "beacon_id", id, + "rssi", adv.RSSI, + "threshold", settings.RSSIMinThreshold, + ) + return + } + + beacon, ok := appState.GetBeacon(id) + if !ok { + slog.Debug("Creating new beacon entry", + "beacon_id", id, + ) + beacon = model.Beacon{ID: id} + } + + // ... update beacon metrics ... + + slog.Debug("Updated beacon metrics", + "beacon_id", id, + "metrics_count", len(beacon.BeaconMetrics), + "distance", metric.Distance, + "location", metric.Location, + ) + + appState.UpdateBeacon(id, beacon) +} +``` + +#### Settings Update + +```go +case msg := <-chSettings: + slog.Info("Received settings update", + "settings", msg, + ) + appState.UpdateSettings(msg) + slog.Info("Settings updated successfully", + "current_algorithm", appState.GetSettings().CurrentAlgorithm, + ) +``` + +--- + +### 4. Server Service +**File:** [cmd/server/main.go](cmd/server/main.go) + +#### Initialization + +```go +// Line 46-49: Database connection +db, err := database.Connect(cfg) +if err != nil { + slog.Error("Failed to connect to database", + "error", err, + "connection_string", cfg.DBConnectionString, + ) + log.Fatalf("Failed to open database connection: %v\n", err) +} + +slog.Info("Successfully connected to database") + +// Line 60-89: Configuration loading +configFile, err := os.Open("/app/cmd/server/config.json") +if err != nil { + slog.Error("Failed to open configuration file", + "error", err, + "file_path", "/app/cmd/server/config.json", + ) + panic(err) +} + +slog.Info("Loading parser configurations", + "file_path", "/app/cmd/server/config.json", +) + +var configs []model.Config +if err := json.Unmarshal(b, &configs); err != nil { + slog.Error("Failed to parse configuration file", + "error", err, + ) + panic(err) +} + +slog.Info("Loaded parser configurations", + "count", len(configs), +) + +// Persist configs +for _, config := range configs { + if err := db.Create(&config).Error; err != nil { + slog.Warn("Failed to persist config to database", + "error", err, + "config_name", config.Name, + ) + } +} + +// Send to Kafka +for _, config := range configs { + kp := model.KafkaParser{ + ID: "add", + Config: config, + } + + if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil { + slog.Error("Failed to send parser config to Kafka", + "error", err, + "parser_name", config.Name, + ) + } else { + slog.Info("Sent parser config to Kafka", + "parser_name", config.Name, + "type", config.Type, + ) + } +} + +// Line 87-89: API client initialization +if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil { + slog.Error("Failed to initialize API client", + "error", err, + ) + fmt.Printf("Error in getting token: %v\n", err) +} + +slog.Info("API client initialized successfully") + +// Line 136-147: HTTP server +slog.Info("Starting HTTP server", + "address", cfg.HTTPAddr, + "cors_origins", "*", +) + +go server.ListenAndServe() + +slog.Info("HTTP server started", + "address", cfg.HTTPAddr, +) +``` + +#### Event Loop + +```go +// Line 153-181: Event loop +case msg := <-chLoc: + slog.Debug("Received location event", + "beacon_id", msg.ID, + "location", msg.Location, + "distance", msg.Distance, + ) + service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx) + +case msg := <-chEvents: + slog.Debug("Received beacon event", + "beacon_id", msg.ID, + "event_type", msg.Type, + "event", msg.Event, + ) + + id := msg.ID + if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil { + slog.Warn("Received event for untracked beacon", + "beacon_id", id, + "error", err, + ) + continue + } + + if err := db.Updates(&model.Tracker{ + ID: id, + Battery: msg.Battery, + Temperature: msg.Temperature, + }).Error; err != nil { + slog.Error("Failed to update tracker metrics", + "error", err, + "beacon_id", id, + ) + } else { + slog.Info("Updated tracker metrics", + "beacon_id", id, + "battery", msg.Battery, + "temperature", msg.Temperature, + ) + } + +case <-beaconTicker.C: + slog.Debug("Refreshing tracker list for MQTT") + var list []model.Tracker + db.Find(&list) + + eMsg, err := json.Marshal(list) + if err != nil { + slog.Error("Failed to marshal tracker list", + "error", err, + "count", len(list), + ) + continue + } + + msg := kafka.Message{Value: eMsg} + if err := kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg); err != nil { + slog.Error("Failed to send tracker list to MQTT", + "error", err, + "count", len(list), + ) + } else { + slog.Debug("Sent tracker list to MQTT", + "count", len(list), + ) + } +``` + +#### Shutdown + +```go +// Line 184-199: Graceful shutdown +if err := server.Shutdown(context.Background()); err != nil { + slog.Error("Failed to shutdown HTTP server", + "error", err, + ) +} + +slog.Info("HTTP server stopped") + +wg.Wait() + +slog.Info("All goroutines stopped, closing Kafka connections") +kafkaManager.CleanKafkaReaders() +kafkaManager.CleanKafkaWriters() + +slog.Info("All services shutdown complete") +``` + +--- + +## Supporting Packages + +### Kafka Client Package +**File:** [internal/pkg/kafkaclient/consumer.go](internal/pkg/kafkaclient/consumer.go) + +```go +// Line 12-35: Consumer function +func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + slog.Info("Starting Kafka consumer", + "topic", r.Config().Topic, + "group_id", r.Config().GroupID, + ) + + messageCount := 0 + errorCount := 0 + + for { + select { + case <-ctx.Done(): + slog.Info("Kafka consumer shutting down", + "topic", r.Config().Topic, + "messages_processed", messageCount, + "errors", errorCount, + ) + return + default: + msg, err := r.ReadMessage(ctx) + if err != nil { + errorCount++ + slog.Error("Error reading Kafka message", + "error", err, + "topic", r.Config().Topic, + "error_count", errorCount, + ) + continue + } + + var data T + if err := json.Unmarshal(msg.Value, &data); err != nil { + errorCount++ + slog.Error("Error decoding Kafka message", + "error", err, + "topic", r.Config().Topic, + "message_length", len(msg.Value), + ) + continue + } + + messageCount++ + if messageCount%100 == 0 { + slog.Debug("Kafka consumer progress", + "topic", r.Config().Topic, + "messages_processed", messageCount, + ) + } + + ch <- data + } + } +} +``` + +**File:** [internal/pkg/kafkaclient/manager.go](internal/pkg/kafkaclient/manager.go) + +```go +// Line 38-52: Add Kafka writer +func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { + slog.Debug("Creating Kafka writer", + "topic", topic, + "brokers", kafkaUrl, + ) + + kafkaWriter := &kafka.Writer{ + Addr: kafka.TCP(kafkaUrl), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + RequiredAcks: kafka.RequireAll, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } + + m.kafkaWritersMap.KafkaWritersLock.Lock() + m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter + m.kafkaWritersMap.KafkaWritersLock.Unlock() + + slog.Info("Kafka writer created", + "topic", topic, + ) +} + +// Line 66-79: Add Kafka reader +func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { + slog.Debug("Creating Kafka reader", + "topic", topic, + "group_id", groupID, + "brokers", kafkaUrl, + ) + + brokers := strings.Split(kafkaUrl, ",") + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + GroupID: groupID, + Topic: topic, + MinBytes: 1, + MaxBytes: 10e6, + }) + + m.kafkaReadersMap.KafkaReadersLock.Lock() + m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader + m.kafkaReadersMap.KafkaReadersLock.Unlock() + + slog.Info("Kafka reader created", + "topic", topic, + "group_id", groupID, + ) +} + +// Line 54-64: Clean writers +func (m *KafkaManager) CleanKafkaWriters() { + slog.Info("Shutting down Kafka writers", + "count", len(m.kafkaWritersMap.KafkaWriters), + ) + + m.kafkaWritersMap.KafkaWritersLock.Lock() + for topic, r := range m.kafkaWritersMap.KafkaWriters { + if err := r.Close(); err != nil { + slog.Error("Error closing Kafka writer", + "error", err, + "topic", topic, + ) + } else { + slog.Info("Kafka writer closed", + "topic", topic, + ) + } + } + m.kafkaWritersMap.KafkaWritersLock.Unlock() + + slog.Info("Kafka writers shutdown complete") +} + +// Line 81-90: Clean readers +func (m *KafkaManager) CleanKafkaReaders() { + slog.Info("Shutting down Kafka readers", + "count", len(m.kafkaReadersMap.KafkaReaders), + ) + + m.kafkaReadersMap.KafkaReadersLock.Lock() + for topic, r := range m.kafkaReadersMap.KafkaReaders { + if err := r.Close(); err != nil { + slog.Error("Error closing Kafka reader", + "error", err, + "topic", topic, + ) + } else { + slog.Info("Kafka reader closed", + "topic", topic, + ) + } + } + m.kafkaReadersMap.KafkaReadersLock.Unlock() + + slog.Info("Kafka readers shutdown complete") +} +``` + +### API Client Package +**File:** [internal/pkg/apiclient/updatedb.go](internal/pkg/apiclient/updatedb.go) + +```go +// Line 19-74: Update database +func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error { + slog.Info("Initializing database from API", + "api_url", cfg.APIURL, + ) + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + token, err := GetToken(ctx, cfg, client) + if err != nil { + slog.Error("Failed to get authentication token", + "error", err, + ) + return err + } + + slog.Info("Successfully authenticated with API") + + // Sync trackers + if trackers, err := GetTrackers(token, client); err == nil { + slog.Info("Fetched trackers from API", + "count", len(trackers), + ) + syncTable(db, trackers) + + if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { + slog.Error("Failed to send clear lookup message", + "error", err, + ) + } + + for _, v := range trackers { + apiUpdate := model.ApiUpdate{ + Method: "POST", + ID: v.ID, + MAC: v.MAC, + } + + if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil { + slog.Error("Failed to send tracker registration", + "error", err, + "tracker_id", v.ID, + ) + } else { + slog.Debug("Sent tracker registration", + "tracker_id", v.ID, + "mac", v.MAC, + ) + } + } + } else { + slog.Error("Failed to fetch trackers from API", + "error", err, + ) + } + + // Sync gateways + if gateways, err := GetGateways(token, client); err == nil { + slog.Info("Fetched gateways from API", + "count", len(gateways), + ) + syncTable(db, gateways) + } else { + slog.Error("Failed to fetch gateways from API", + "error", err, + ) + } + + // Sync zones + if zones, err := GetZones(token, client); err == nil { + slog.Info("Fetched zones from API", + "count", len(zones), + ) + syncTable(db, zones) + } else { + slog.Error("Failed to fetch zones from API", + "error", err, + ) + } + + // Sync tracker zones + if trackerZones, err := GetTrackerZones(token, client); err == nil { + slog.Info("Fetched tracker zones from API", + "count", len(trackerZones), + ) + syncTable(db, trackerZones) + } else { + slog.Error("Failed to fetch tracker zones from API", + "error", err, + ) + } + + // Initialize settings + var settings model.Settings + db.First(&settings) + if settings.ID == 0 { + slog.Info("Initializing default settings") + db.Create(appState.GetSettings()) + } + + slog.Info("Database sync complete") + return nil +} + +// Line 76-91: Sync table +func syncTable[T any](db *gorm.DB, data []T) { + if len(data) == 0 { + slog.Debug("No data to sync", + "type", fmt.Sprintf("%T", data), + ) + return + } + + slog.Debug("Syncing table", + "count", len(data), + "type", fmt.Sprintf("%T", data), + ) + + var ids []string + for _, item := range data { + v := reflect.ValueOf(item).FieldByName("ID").String() + ids = append(ids, v) + } + + db.Transaction(func(tx *gorm.DB) error { + result := tx.Where("id NOT IN ?", ids).Delete(new(T)) + if result.Error != nil { + slog.Error("Failed to delete stale records", + "error", result.Error, + "deleted_count", result.RowsAffected, + ) + } + + result = tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data) + if result.Error != nil { + slog.Error("Failed to upsert records", + "error", result.Error, + ) + return result.Error + } + + slog.Info("Table sync complete", + "upserted", len(data), + "deleted", result.RowsAffected, + ) + return nil + }) +} +``` + +**File:** [internal/pkg/apiclient/auth.go](internal/pkg/apiclient/auth.go) + +```go +// Line 17-45: Get token +func GetToken(ctx context.Context, cfg *config.Config, client *http.Client) (string, error) { + slog.Debug("Requesting authentication token", + "url", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", + ) + + formData := url.Values{} + formData.Set("grant_type", "password") + formData.Set("client_id", "Fastapi") + formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC") + formData.Set("username", "core") + formData.Set("password", "C0r3_us3r_Cr3d3nt14ls") + formData.Set("audience", "Fastapi") + + req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode())) + if err != nil { + slog.Error("Failed to create auth request", + "error", err, + ) + return "", err + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + req = req.WithContext(ctx) + res, err := client.Do(req) + if err != nil { + slog.Error("Failed to send auth request", + "error", err, + ) + return "", err + } + + var j response + if err := json.NewDecoder(res.Body).Decode(&j); err != nil { + slog.Error("Failed to decode auth response", + "error", err, + "status_code", res.StatusCode, + ) + return "", err + } + + slog.Info("Successfully obtained authentication token", + "token_length", len(j.Token), + ) + + return j.Token, nil +} +``` + +### Service Package +**File:** [internal/pkg/service/beacon_service.go](internal/pkg/service/beacon_service.go) + +```go +// Line 17-67: Location to beacon service +func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { + if msg.ID == "" { + slog.Warn("Received location message with empty ID") + return + } + + slog.Debug("Processing location message", + "beacon_id", msg.ID, + "location", msg.Location, + "distance", msg.Distance, + ) + + var zones []model.TrackerZones + if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil { + slog.Error("Failed to fetch tracker zones", + "error", err, + "beacon_id", msg.ID, + ) + return + } + + slog.Debug("Fetched tracker zones", + "beacon_id", msg.ID, + "zones_count", len(zones), + ) + + var allowedZones []string + for _, z := range zones { + allowedZones = append(allowedZones, z.ZoneList...) + } + + var gw model.Gateway + mac := formatMac(msg.Location) + if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil { + slog.Warn("Gateway not found for location", + "mac", mac, + "beacon_id", msg.ID, + "error", err, + ) + return + } + + slog.Debug("Found gateway for location", + "gateway_id", gw.ID, + "mac", mac, + ) + + if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) { + alert := model.Alert{ + ID: msg.ID, + Type: "Restricted zone", + Value: gw.ID, + } + + slog.Warn("Beacon in restricted zone", + "beacon_id", msg.ID, + "gateway_id", gw.ID, + "allowed_zones", len(allowedZones), + ) + + eMsg, err := json.Marshal(alert) + if err != nil { + slog.Error("Failed to marshal alert", + "error", err, + "beacon_id", msg.ID, + ) + } else { + msg := kafka.Message{Value: eMsg} + if err := writer.WriteMessages(ctx, msg); err != nil { + slog.Error("Failed to send alert to Kafka", + "error", err, + "beacon_id", msg.ID, + "alert_type", "Restricted zone", + ) + } else { + slog.Info("Sent restricted zone alert", + "beacon_id", msg.ID, + "gateway_id", gw.ID, + ) + } + } + } + + // Save track + if err := db.Create(&model.Tracks{ + UUID: msg.ID, + Timestamp: time.Now(), + Gateway: gw.ID, + GatewayMac: gw.MAC, + Tracker: msg.ID, + }).Error; err != nil { + slog.Error("Failed to save track", + "error", err, + "beacon_id", msg.ID, + "gateway_id", gw.ID, + ) + return + } + + slog.Debug("Saved track record", + "beacon_id", msg.ID, + "gateway_id", gw.ID, + ) + + // Update tracker + if err := db.Updates(&model.Tracker{ + ID: msg.ID, + Location: gw.ID, + Distance: msg.Distance, + }).Error; err != nil { + slog.Error("Failed to update tracker location", + "error", err, + "beacon_id", msg.ID, + ) + } else { + slog.Info("Updated tracker location", + "beacon_id", msg.ID, + "gateway_id", gw.ID, + "distance", msg.Distance, + ) + } +} +``` + +--- + +## Configuration Updates + +Add these fields to your configuration structure: + +```go +// internal/pkg/config/config.go +type Config struct { + // ... existing fields ... + + // Logging configuration + LogLevel string `json:"log_level" env:"LOG_LEVEL" default:"info"` + LogFormat string `json:"log_format" env:"LOG_FORMAT" default:"json"` + LogAddSource bool `json:"log_add_source" env:"LOG_ADD_SOURCE" default:"false"` +} +``` + +--- + +## Log Aggregation and Monitoring + +### Recommended Log Queries + +**All errors across services:** +```bash +jq 'select(.level == "error")' *.log +``` + +**Beacon processing by ID:** +```bash +jq 'select(.beacon_id == "beacon-123")' bridge.log decoder.log location.log +``` + +**Kafka message flow:** +```bash +jq 'select(.topic != null)' bridge.log decoder.log location.log server.log +``` + +**Performance analysis:** +```bash +jq 'select(.duration != null)' *.log +``` + +### Metrics to Track + +1. **Message throughput** - Messages processed per second per service +2. **Error rates** - Errors per 1000 messages by type +3. **Processing latency** - Time from ingestion to processing +4. **Kafka lag** - Consumer lag per topic/group +5. **Connection failures** - MQTT/Kafka reconnection frequency + +--- + +## Migration Checklist + +- [ ] Update `internal/pkg/logger/logger.go` with new configuration options +- [ ] Add logging configuration fields to `internal/pkg/config/config.go` +- [ ] Replace `fmt.Println`, `fmt.Printf`, `log.Printf` with `slog` calls in: + - [ ] `cmd/bridge/main.go` + - [ ] `cmd/decoder/main.go` + - [ ] `cmd/location/main.go` + - [ ] `cmd/server/main.go` + - [ ] `internal/pkg/kafkaclient/consumer.go` + - [ ] `internal/pkg/kafkaclient/manager.go` + - [ ] `internal/pkg/apiclient/updatedb.go` + - [ ] `internal/pkg/apiclient/auth.go` + - [ ] `internal/pkg/service/beacon_service.go` +- [ ] Test all services with new logging +- [ ] Verify log rotation and disk space management +- [ ] Set up log aggregation/monitoring dashboard + +--- + +## Example Log Output + +### JSON Format (Production) +```json +{"time":"2026-01-20T10:15:30.123456Z","level":"INFO","msg":"Processing beacon","service":"bridge","beacon_id":"beacon-001","rssi":-65,"hostname":"gateway-1"} +{"time":"2026-01-20T10:15:30.234567Z","level":"ERROR","msg":"Failed to write to Kafka","service":"bridge","error":"connection timeout","beacon_id":"beacon-001","topic":"rawbeacons"} +``` + +### Text Format (Development) +``` +time=2026-01-20T10:15:30.123Z level=INFO msg="Processing beacon" service=bridge beacon_id=beacon-001 rssi=-65 hostname=gateway-1 +time=2026-01-20T10:15:30.234Z level=ERROR msg="Failed to write to Kafka" service=bridge error="connection timeout" beacon_id=beacon-001 topic=rawbeacons +``` + +--- + +## Notes + +1. **Security**: Be careful not to log sensitive information (passwords, tokens, personal data) +2. **Performance**: Use `slog.Debug` for high-frequency logs in production +3. **Disk space**: Implement log rotation to prevent disk exhaustion +4. **Testing**: Add unit tests to verify logging behavior where critical +5. **Context propagation**: Consider adding request/context IDs for tracing