# Logging Standards and Guidelines ## Overview This document defines comprehensive logging standards for the AFASystems Presence services using Go's structured logging (`log/slog`). The goal is to provide consistent, searchable, and actionable logs across all services. ## Logger Configuration ### Enhanced Logger Package **File:** [internal/pkg/logger/logger.go](internal/pkg/logger/logger.go) Update the logger to support both JSON and text formats with configurable log levels: ```go package logger import ( "io" "log" "log/slog" "os" "path/filepath" ) type LogConfig struct { Filename string Level string Format string // "json" or "text" AddSource bool } func CreateLogger(config LogConfig) *slog.Logger { // Ensure log directory exists dir := filepath.Dir(config.Filename) if err := os.MkdirAll(dir, 0755); err != nil { log.Fatalf("Failed to create log directory: %v\n", err) } f, err := os.OpenFile(config.Filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalf("Failed to open log file: %v\n", err) } // Multi-writer: stderr + file w := io.MultiWriter(os.Stderr, f) // Parse log level var level slog.Level switch config.Level { case "debug": level = slog.LevelDebug case "info": level = slog.LevelInfo case "warn": level = slog.LevelWarn case "error": level = slog.LevelError default: level = slog.LevelInfo } // Create handler options opts := &slog.HandlerOptions{ Level: level, AddSource: config.AddSource, } // Choose handler based on format var handler slog.Handler if config.Format == "json" { handler = slog.NewJSONHandler(w, opts) } else { handler = slog.NewTextHandler(w, opts) } logger := slog.New(handler) return logger } // Convenience function for backward compatibility func CreateLoggerSimple(fname string) *slog.Logger { return CreateLogger(LogConfig{ Filename: fname, Level: "info", Format: "json", AddSource: false, }) } ``` ### Usage in Services ```go // Replace: slog.SetDefault(logger.CreateLogger("bridge.log")) // With: slog.SetDefault(logger.CreateLogger(logger.LogConfig{ Filename: "bridge.log", Level: cfg.LogLevel, // Add to config Format: cfg.LogFormat, // Add to config AddSource: cfg.LogAddSource, // Add to config })) ``` --- ## Structured Logging Best Practices ### Log Levels | Level | Usage | Examples | |-------|-------|----------| | **DEBUG** | Detailed debugging information | Message payloads, internal state changes | | **INFO** | General informational messages | Service started, connection established, normal operations | | **WARN** | Unexpected but recoverable situations | Retry attempts, fallback to defaults, degraded performance | | **ERROR** | Error events that might still allow the application to continue | Failed operations, API errors, parsing failures | | **FATAL** | Critical errors requiring immediate shutdown | Cannot connect to essential services, invalid configuration | ### Structured Fields Always use structured attributes instead of formatting strings: ```go // ❌ BAD slog.Info(fmt.Sprintf("Processing beacon %s with RSSI %d", id, rssi)) // ✅ GOOD slog.Info("Processing beacon", "id", id, "rssi", rssi, "hostname", hostname, ) ``` ### Standard Field Names | Field Name | Type | Description | |------------|------|-------------| | `service` | string | Service name (bridge, decoder, location, server) | | `component` | string | Component within service (mqtt, kafka, http) | | `topic` | string | Kafka topic or MQTT topic | | `beacon_id` | string | Beacon/tracker identifier | | `gateway_id` | string | Gateway identifier | | `mac` | string | MAC address | | `rssi` | int64 | Received Signal Strength Indicator | | `duration` | string/int | Duration of operation | | `error` | error | Error object (for error-level logs) | | `count` | int | Count of items processed | | `status` | string | Status of operation (success, failed, retry) | --- ## Service-Specific Logging Guidelines ### 1. Bridge Service **File:** [cmd/bridge/main.go](cmd/bridge/main.go) #### Initialization ```go // Line 100-116: Replace basic info logs slog.Info("Initializing bridge service", "kafka_url", cfg.KafkaURL, "mqtt_host", cfg.MQTTHost, "reader_topics", []string{"apibeacons", "alert", "mqtt"}, "writer_topics", []string{"rawbeacons"}, ) slog.Info("Kafka manager initialized", "readers_count", len(readerTopics), "writers_count", len(writerTopics), ) // Line 127-144: MQTT connection slog.Info("Connecting to MQTT broker", "broker", fmt.Sprintf("%s:%d", cfg.MQTTHost, 1883), "client_id", "go_mqtt_client", ) if token := client.Connect(); token.Wait() && token.Error() != nil { slog.Error("Failed to connect to MQTT broker", "error", token.Error(), "broker", cfg.MQTTHost, ) panic(token.Error()) } slog.Info("Successfully connected to MQTT broker", "broker", cfg.MQTTHost, ) // Line 146-202: MQTT subscription slog.Info("Subscribed to MQTT topic", "topic", topic, ) ``` #### MQTT Message Handler ```go // Line 26-83: Replace with structured logging func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) { hostname := strings.Split(topic, "/")[1] slog.Debug("Received MQTT message", "topic", topic, "hostname", hostname, "message_length", len(message), ) msgStr := string(message) if strings.HasPrefix(msgStr, "[") { var readings []model.RawReading if err := json.Unmarshal(message, &readings); err != nil { slog.Error("Failed to parse JSON message", "error", err, "topic", topic, "message", msgStr, ) return } slog.Debug("Parsed JSON readings", "count", len(readings), "topic", topic, ) processed := 0 skipped := 0 for _, reading := range readings { if reading.Type == "Gateway" { skipped++ continue } val, ok := appState.BeaconExists(reading.MAC) if !ok { slog.Debug("Skipping unregistered beacon", "mac", reading.MAC, "hostname", hostname, ) skipped++ continue } adv := model.BeaconAdvertisement{ ID: val, Hostname: hostname, MAC: reading.MAC, RSSI: int64(reading.RSSI), Data: reading.RawData, } encodedMsg, err := json.Marshal(adv) if err != nil { slog.Error("Failed to marshal beacon advertisement", "error", err, "beacon_id", val, ) continue } msg := kafka.Message{Value: encodedMsg} if err := writer.WriteMessages(context.Background(), msg); err != nil { slog.Error("Failed to write to Kafka", "error", err, "beacon_id", val, "topic", "rawbeacons", ) time.Sleep(1 * time.Second) break } processed++ } slog.Info("Processed MQTT readings", "processed", processed, "skipped", skipped, "hostname", hostname, ) } else { slog.Warn("Received non-JSON message format", "message", msgStr, "topic", topic, ) } } ``` #### Event Loop ```go // Line 148-184: Event loop logging case msg := <-chApi: slog.Debug("Received API update from Kafka", "method", msg.Method, "id", msg.ID, "mac", msg.MAC, ) switch msg.Method { case "POST": id := msg.ID appState.AddBeaconToLookup(msg.MAC, id) slog.Info("Beacon added to lookup", "beacon_id", id, "mac", msg.MAC, ) case "DELETE": id := msg.MAC if id == "all" { appState.CleanLookup() slog.Info("Cleared all beacons from lookup") continue } appState.RemoveBeaconFromLookup(id) slog.Info("Beacon removed from lookup", "beacon_id", id, ) } case msg := <-chAlert: slog.Debug("Received alert from Kafka", "alert_id", msg.ID, "type", msg.Type, ) // ... processing ... case msg := <-chMqtt: slog.Debug("Received tracker list request", "trackers_count", len(msg), ) // ... processing ... ``` #### Shutdown ```go // Line 186-195: Graceful shutdown slog.Info("Shutting down bridge service", "reason", "context_done", ) wg.Wait() slog.Info("All goroutines stopped, closing Kafka connections") kafkaManager.CleanKafkaReaders() kafkaManager.CleanKafkaWriters() client.Disconnect(250) slog.Info("Disconnected from MQTT broker") slog.Info("Bridge service shutdown complete") ``` --- ### 2. Decoder Service **File:** [cmd/decoder/main.go](cmd/decoder/main.go) #### Initialization ```go // Line 42-48: Service initialization slog.Info("Initializing decoder service", "kafka_url", cfg.KafkaURL, "reader_topics", []string{"rawbeacons", "parser"}, "writer_topics", []string{"alertbeacons"}, ) slog.Info("Decoder service initialized", "parsers_loaded", len(parserRegistry.ParserList), ) ``` #### Beacon Processing ```go // Line 86-93: Process incoming beacon func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) { slog.Debug("Processing beacon advertisement", "beacon_id", adv.ID, "hostname", adv.Hostname, "rssi", adv.RSSI, "data_length", len(adv.Data), ) if err := decodeBeacon(adv, appState, writer, parserRegistry); err != nil { slog.Error("Failed to decode beacon", "error", err, "beacon_id", adv.ID, "hostname", adv.Hostname, ) } } // Line 95-136: Decode beacon func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error { beacon := strings.TrimSpace(adv.Data) id := adv.ID if beacon == "" { slog.Debug("Skipping beacon with empty data", "beacon_id", id, ) return nil } b, err := hex.DecodeString(beacon) if err != nil { slog.Warn("Failed to decode hex string", "error", err, "beacon_id", id, "data", beacon, ) return err } slog.Debug("Decoded beacon data", "beacon_id", id, "bytes_length", len(b), ) // ... parsing logic ... if event.ID == "" { slog.Debug("Skipping event with empty ID", "beacon_id", id, ) return nil } prevEvent, ok := appState.GetBeaconEvent(id) appState.UpdateBeaconEvent(id, event) if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { slog.Debug("Skipping duplicate event", "beacon_id", id, "event_type", event.Type, ) return nil } slog.Info("Decoded beacon event", "beacon_id", id, "event_type", event.Type, "event", event.Event, "battery", event.Battery, "temperature", event.Temperature, ) eMsg, err := event.ToJSON() if err != nil { slog.Error("Failed to marshal event to JSON", "error", err, "beacon_id", id, ) return err } if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil { slog.Error("Failed to write event to Kafka", "error", err, "beacon_id", id, "topic", "alertbeacons", ) return err } slog.Debug("Successfully sent event to Kafka", "beacon_id", id, "topic", "alertbeacons", ) return nil } ``` #### Parser Management ```go // Line 64-74: Parser configuration case msg := <-chParser: slog.Info("Received parser configuration update", "action", msg.ID, "parser_name", msg.Name, ) switch msg.ID { case "add": config := msg.Config parserRegistry.Register(config.Name, config) slog.Info("Registered new parser", "parser_name", config.Name, "type", config.Type, ) case "delete": parserRegistry.Unregister(msg.Name) slog.Info("Unregistered parser", "parser_name", msg.Name, ) case "update": config := msg.Config parserRegistry.Register(config.Name, config) slog.Info("Updated parser configuration", "parser_name", config.Name, "type", config.Type, ) } ``` --- ### 3. Location Service **File:** [cmd/location/main.go](cmd/location/main.go) #### Initialization ```go // Line 37-47: Service initialization slog.Info("Initializing location service", "kafka_url", cfg.KafkaURL, "reader_topics", []string{"rawbeacons", "settings"}, "writer_topics", []string{"locevents"}, "algorithm", "filter", ) ``` #### Location Algorithm ```go // Line 60-68: Algorithm execution case <-locTicker.C: settings := appState.GetSettings() slog.Debug("Running location algorithm", "algorithm", settings.CurrentAlgorithm, "beacon_count", len(appState.GetAllBeacons()), ) switch settings.CurrentAlgorithm { case "filter": getLikelyLocations(appState, kafkaManager.GetWriter("locevents")) case "ai": slog.Warn("AI algorithm not yet implemented") } ``` #### Location Processing ```go // Line 85-157: Location calculation func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { beacons := appState.GetAllBeacons() settings := appState.GetSettingsValue() slog.Debug("Calculating likely locations", "beacon_count", len(beacons), "algorithm", "filter", ) processed := 0 skipped := 0 updated := 0 for _, beacon := range beacons { mSize := len(beacon.BeaconMetrics) if mSize == 0 { skipped++ continue } // Check if beacon is too old if (int64(time.Now().Unix()) - beacon.BeaconMetrics[mSize-1].Timestamp) > settings.LastSeenThreshold { slog.Debug("Beacon data too old", "beacon_id", beacon.ID, "last_seen", beacon.BeaconMetrics[mSize-1].Timestamp, "threshold", settings.LastSeenThreshold, ) skipped++ continue } // ... location calculation logic ... locationChanged := bestLocName != beacon.PreviousLocation if locationChanged { slog.Info("Beacon location changed", "beacon_id", beacon.ID, "previous_location", beacon.PreviousLocation, "new_location", bestLocName, "confidence", beacon.LocationConfidence, "distance", r.Distance, ) updated++ } appState.UpdateBeacon(beacon.ID, beacon) processed++ // ... send to Kafka ... } slog.Info("Location calculation complete", "processed", processed, "updated", updated, "skipped", skipped, ) } ``` #### Beacon Assignment ```go // Line 159-199: Assign beacon to list func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) { id := adv.ID now := time.Now().Unix() settings := appState.GetSettingsValue() slog.Debug("Assigning beacon to tracking list", "beacon_id", id, "hostname", adv.Hostname, "rssi", adv.RSSI, ) if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { slog.Debug("Beacon RSSI below threshold, skipping", "beacon_id", id, "rssi", adv.RSSI, "threshold", settings.RSSIMinThreshold, ) return } beacon, ok := appState.GetBeacon(id) if !ok { slog.Debug("Creating new beacon entry", "beacon_id", id, ) beacon = model.Beacon{ID: id} } // ... update beacon metrics ... slog.Debug("Updated beacon metrics", "beacon_id", id, "metrics_count", len(beacon.BeaconMetrics), "distance", metric.Distance, "location", metric.Location, ) appState.UpdateBeacon(id, beacon) } ``` #### Settings Update ```go case msg := <-chSettings: slog.Info("Received settings update", "settings", msg, ) appState.UpdateSettings(msg) slog.Info("Settings updated successfully", "current_algorithm", appState.GetSettings().CurrentAlgorithm, ) ``` --- ### 4. Server Service **File:** [cmd/server/main.go](cmd/server/main.go) #### Initialization ```go // Line 46-49: Database connection db, err := database.Connect(cfg) if err != nil { slog.Error("Failed to connect to database", "error", err, "connection_string", cfg.DBConnectionString, ) log.Fatalf("Failed to open database connection: %v\n", err) } slog.Info("Successfully connected to database") // Line 60-89: Configuration loading configFile, err := os.Open("/app/cmd/server/config.json") if err != nil { slog.Error("Failed to open configuration file", "error", err, "file_path", "/app/cmd/server/config.json", ) panic(err) } slog.Info("Loading parser configurations", "file_path", "/app/cmd/server/config.json", ) var configs []model.Config if err := json.Unmarshal(b, &configs); err != nil { slog.Error("Failed to parse configuration file", "error", err, ) panic(err) } slog.Info("Loaded parser configurations", "count", len(configs), ) // Persist configs for _, config := range configs { if err := db.Create(&config).Error; err != nil { slog.Warn("Failed to persist config to database", "error", err, "config_name", config.Name, ) } } // Send to Kafka for _, config := range configs { kp := model.KafkaParser{ ID: "add", Config: config, } if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil { slog.Error("Failed to send parser config to Kafka", "error", err, "parser_name", config.Name, ) } else { slog.Info("Sent parser config to Kafka", "parser_name", config.Name, "type", config.Type, ) } } // Line 87-89: API client initialization if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil { slog.Error("Failed to initialize API client", "error", err, ) fmt.Printf("Error in getting token: %v\n", err) } slog.Info("API client initialized successfully") // Line 136-147: HTTP server slog.Info("Starting HTTP server", "address", cfg.HTTPAddr, "cors_origins", "*", ) go server.ListenAndServe() slog.Info("HTTP server started", "address", cfg.HTTPAddr, ) ``` #### Event Loop ```go // Line 153-181: Event loop case msg := <-chLoc: slog.Debug("Received location event", "beacon_id", msg.ID, "location", msg.Location, "distance", msg.Distance, ) service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx) case msg := <-chEvents: slog.Debug("Received beacon event", "beacon_id", msg.ID, "event_type", msg.Type, "event", msg.Event, ) id := msg.ID if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil { slog.Warn("Received event for untracked beacon", "beacon_id", id, "error", err, ) continue } if err := db.Updates(&model.Tracker{ ID: id, Battery: msg.Battery, Temperature: msg.Temperature, }).Error; err != nil { slog.Error("Failed to update tracker metrics", "error", err, "beacon_id", id, ) } else { slog.Info("Updated tracker metrics", "beacon_id", id, "battery", msg.Battery, "temperature", msg.Temperature, ) } case <-beaconTicker.C: slog.Debug("Refreshing tracker list for MQTT") var list []model.Tracker db.Find(&list) eMsg, err := json.Marshal(list) if err != nil { slog.Error("Failed to marshal tracker list", "error", err, "count", len(list), ) continue } msg := kafka.Message{Value: eMsg} if err := kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg); err != nil { slog.Error("Failed to send tracker list to MQTT", "error", err, "count", len(list), ) } else { slog.Debug("Sent tracker list to MQTT", "count", len(list), ) } ``` #### Shutdown ```go // Line 184-199: Graceful shutdown if err := server.Shutdown(context.Background()); err != nil { slog.Error("Failed to shutdown HTTP server", "error", err, ) } slog.Info("HTTP server stopped") wg.Wait() slog.Info("All goroutines stopped, closing Kafka connections") kafkaManager.CleanKafkaReaders() kafkaManager.CleanKafkaWriters() slog.Info("All services shutdown complete") ``` --- ## Supporting Packages ### Kafka Client Package **File:** [internal/pkg/kafkaclient/consumer.go](internal/pkg/kafkaclient/consumer.go) ```go // Line 12-35: Consumer function func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() slog.Info("Starting Kafka consumer", "topic", r.Config().Topic, "group_id", r.Config().GroupID, ) messageCount := 0 errorCount := 0 for { select { case <-ctx.Done(): slog.Info("Kafka consumer shutting down", "topic", r.Config().Topic, "messages_processed", messageCount, "errors", errorCount, ) return default: msg, err := r.ReadMessage(ctx) if err != nil { errorCount++ slog.Error("Error reading Kafka message", "error", err, "topic", r.Config().Topic, "error_count", errorCount, ) continue } var data T if err := json.Unmarshal(msg.Value, &data); err != nil { errorCount++ slog.Error("Error decoding Kafka message", "error", err, "topic", r.Config().Topic, "message_length", len(msg.Value), ) continue } messageCount++ if messageCount%100 == 0 { slog.Debug("Kafka consumer progress", "topic", r.Config().Topic, "messages_processed", messageCount, ) } ch <- data } } } ``` **File:** [internal/pkg/kafkaclient/manager.go](internal/pkg/kafkaclient/manager.go) ```go // Line 38-52: Add Kafka writer func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { slog.Debug("Creating Kafka writer", "topic", topic, "brokers", kafkaUrl, ) kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(kafkaUrl), Topic: topic, Balancer: &kafka.LeastBytes{}, Async: false, RequiredAcks: kafka.RequireAll, BatchSize: 100, BatchTimeout: 10 * time.Millisecond, } m.kafkaWritersMap.KafkaWritersLock.Lock() m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter m.kafkaWritersMap.KafkaWritersLock.Unlock() slog.Info("Kafka writer created", "topic", topic, ) } // Line 66-79: Add Kafka reader func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { slog.Debug("Creating Kafka reader", "topic", topic, "group_id", groupID, "brokers", kafkaUrl, ) brokers := strings.Split(kafkaUrl, ",") kafkaReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, MinBytes: 1, MaxBytes: 10e6, }) m.kafkaReadersMap.KafkaReadersLock.Lock() m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader m.kafkaReadersMap.KafkaReadersLock.Unlock() slog.Info("Kafka reader created", "topic", topic, "group_id", groupID, ) } // Line 54-64: Clean writers func (m *KafkaManager) CleanKafkaWriters() { slog.Info("Shutting down Kafka writers", "count", len(m.kafkaWritersMap.KafkaWriters), ) m.kafkaWritersMap.KafkaWritersLock.Lock() for topic, r := range m.kafkaWritersMap.KafkaWriters { if err := r.Close(); err != nil { slog.Error("Error closing Kafka writer", "error", err, "topic", topic, ) } else { slog.Info("Kafka writer closed", "topic", topic, ) } } m.kafkaWritersMap.KafkaWritersLock.Unlock() slog.Info("Kafka writers shutdown complete") } // Line 81-90: Clean readers func (m *KafkaManager) CleanKafkaReaders() { slog.Info("Shutting down Kafka readers", "count", len(m.kafkaReadersMap.KafkaReaders), ) m.kafkaReadersMap.KafkaReadersLock.Lock() for topic, r := range m.kafkaReadersMap.KafkaReaders { if err := r.Close(); err != nil { slog.Error("Error closing Kafka reader", "error", err, "topic", topic, ) } else { slog.Info("Kafka reader closed", "topic", topic, ) } } m.kafkaReadersMap.KafkaReadersLock.Unlock() slog.Info("Kafka readers shutdown complete") } ``` ### API Client Package **File:** [internal/pkg/apiclient/updatedb.go](internal/pkg/apiclient/updatedb.go) ```go // Line 19-74: Update database func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error { slog.Info("Initializing database from API", "api_url", cfg.APIURL, ) tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } client := &http.Client{Transport: tr} token, err := GetToken(ctx, cfg, client) if err != nil { slog.Error("Failed to get authentication token", "error", err, ) return err } slog.Info("Successfully authenticated with API") // Sync trackers if trackers, err := GetTrackers(token, client); err == nil { slog.Info("Fetched trackers from API", "count", len(trackers), ) syncTable(db, trackers) if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { slog.Error("Failed to send clear lookup message", "error", err, ) } for _, v := range trackers { apiUpdate := model.ApiUpdate{ Method: "POST", ID: v.ID, MAC: v.MAC, } if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil { slog.Error("Failed to send tracker registration", "error", err, "tracker_id", v.ID, ) } else { slog.Debug("Sent tracker registration", "tracker_id", v.ID, "mac", v.MAC, ) } } } else { slog.Error("Failed to fetch trackers from API", "error", err, ) } // Sync gateways if gateways, err := GetGateways(token, client); err == nil { slog.Info("Fetched gateways from API", "count", len(gateways), ) syncTable(db, gateways) } else { slog.Error("Failed to fetch gateways from API", "error", err, ) } // Sync zones if zones, err := GetZones(token, client); err == nil { slog.Info("Fetched zones from API", "count", len(zones), ) syncTable(db, zones) } else { slog.Error("Failed to fetch zones from API", "error", err, ) } // Sync tracker zones if trackerZones, err := GetTrackerZones(token, client); err == nil { slog.Info("Fetched tracker zones from API", "count", len(trackerZones), ) syncTable(db, trackerZones) } else { slog.Error("Failed to fetch tracker zones from API", "error", err, ) } // Initialize settings var settings model.Settings db.First(&settings) if settings.ID == 0 { slog.Info("Initializing default settings") db.Create(appState.GetSettings()) } slog.Info("Database sync complete") return nil } // Line 76-91: Sync table func syncTable[T any](db *gorm.DB, data []T) { if len(data) == 0 { slog.Debug("No data to sync", "type", fmt.Sprintf("%T", data), ) return } slog.Debug("Syncing table", "count", len(data), "type", fmt.Sprintf("%T", data), ) var ids []string for _, item := range data { v := reflect.ValueOf(item).FieldByName("ID").String() ids = append(ids, v) } db.Transaction(func(tx *gorm.DB) error { result := tx.Where("id NOT IN ?", ids).Delete(new(T)) if result.Error != nil { slog.Error("Failed to delete stale records", "error", result.Error, "deleted_count", result.RowsAffected, ) } result = tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data) if result.Error != nil { slog.Error("Failed to upsert records", "error", result.Error, ) return result.Error } slog.Info("Table sync complete", "upserted", len(data), "deleted", result.RowsAffected, ) return nil }) } ``` **File:** [internal/pkg/apiclient/auth.go](internal/pkg/apiclient/auth.go) ```go // Line 17-45: Get token func GetToken(ctx context.Context, cfg *config.Config, client *http.Client) (string, error) { slog.Debug("Requesting authentication token", "url", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", ) formData := url.Values{} formData.Set("grant_type", "password") formData.Set("client_id", "Fastapi") formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC") formData.Set("username", "core") formData.Set("password", "C0r3_us3r_Cr3d3nt14ls") formData.Set("audience", "Fastapi") req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode())) if err != nil { slog.Error("Failed to create auth request", "error", err, ) return "", err } req.Header.Add("Content-Type", "application/x-www-form-urlencoded") req = req.WithContext(ctx) res, err := client.Do(req) if err != nil { slog.Error("Failed to send auth request", "error", err, ) return "", err } var j response if err := json.NewDecoder(res.Body).Decode(&j); err != nil { slog.Error("Failed to decode auth response", "error", err, "status_code", res.StatusCode, ) return "", err } slog.Info("Successfully obtained authentication token", "token_length", len(j.Token), ) return j.Token, nil } ``` ### Service Package **File:** [internal/pkg/service/beacon_service.go](internal/pkg/service/beacon_service.go) ```go // Line 17-67: Location to beacon service func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { if msg.ID == "" { slog.Warn("Received location message with empty ID") return } slog.Debug("Processing location message", "beacon_id", msg.ID, "location", msg.Location, "distance", msg.Distance, ) var zones []model.TrackerZones if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil { slog.Error("Failed to fetch tracker zones", "error", err, "beacon_id", msg.ID, ) return } slog.Debug("Fetched tracker zones", "beacon_id", msg.ID, "zones_count", len(zones), ) var allowedZones []string for _, z := range zones { allowedZones = append(allowedZones, z.ZoneList...) } var gw model.Gateway mac := formatMac(msg.Location) if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil { slog.Warn("Gateway not found for location", "mac", mac, "beacon_id", msg.ID, "error", err, ) return } slog.Debug("Found gateway for location", "gateway_id", gw.ID, "mac", mac, ) if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) { alert := model.Alert{ ID: msg.ID, Type: "Restricted zone", Value: gw.ID, } slog.Warn("Beacon in restricted zone", "beacon_id", msg.ID, "gateway_id", gw.ID, "allowed_zones", len(allowedZones), ) eMsg, err := json.Marshal(alert) if err != nil { slog.Error("Failed to marshal alert", "error", err, "beacon_id", msg.ID, ) } else { msg := kafka.Message{Value: eMsg} if err := writer.WriteMessages(ctx, msg); err != nil { slog.Error("Failed to send alert to Kafka", "error", err, "beacon_id", msg.ID, "alert_type", "Restricted zone", ) } else { slog.Info("Sent restricted zone alert", "beacon_id", msg.ID, "gateway_id", gw.ID, ) } } } // Save track if err := db.Create(&model.Tracks{ UUID: msg.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: msg.ID, }).Error; err != nil { slog.Error("Failed to save track", "error", err, "beacon_id", msg.ID, "gateway_id", gw.ID, ) return } slog.Debug("Saved track record", "beacon_id", msg.ID, "gateway_id", gw.ID, ) // Update tracker if err := db.Updates(&model.Tracker{ ID: msg.ID, Location: gw.ID, Distance: msg.Distance, }).Error; err != nil { slog.Error("Failed to update tracker location", "error", err, "beacon_id", msg.ID, ) } else { slog.Info("Updated tracker location", "beacon_id", msg.ID, "gateway_id", gw.ID, "distance", msg.Distance, ) } } ``` --- ## Configuration Updates Add these fields to your configuration structure: ```go // internal/pkg/config/config.go type Config struct { // ... existing fields ... // Logging configuration LogLevel string `json:"log_level" env:"LOG_LEVEL" default:"info"` LogFormat string `json:"log_format" env:"LOG_FORMAT" default:"json"` LogAddSource bool `json:"log_add_source" env:"LOG_ADD_SOURCE" default:"false"` } ``` --- ## Log Aggregation and Monitoring ### Recommended Log Queries **All errors across services:** ```bash jq 'select(.level == "error")' *.log ``` **Beacon processing by ID:** ```bash jq 'select(.beacon_id == "beacon-123")' bridge.log decoder.log location.log ``` **Kafka message flow:** ```bash jq 'select(.topic != null)' bridge.log decoder.log location.log server.log ``` **Performance analysis:** ```bash jq 'select(.duration != null)' *.log ``` ### Metrics to Track 1. **Message throughput** - Messages processed per second per service 2. **Error rates** - Errors per 1000 messages by type 3. **Processing latency** - Time from ingestion to processing 4. **Kafka lag** - Consumer lag per topic/group 5. **Connection failures** - MQTT/Kafka reconnection frequency --- ## Migration Checklist - [ ] Update `internal/pkg/logger/logger.go` with new configuration options - [ ] Add logging configuration fields to `internal/pkg/config/config.go` - [ ] Replace `fmt.Println`, `fmt.Printf`, `log.Printf` with `slog` calls in: - [ ] `cmd/bridge/main.go` - [ ] `cmd/decoder/main.go` - [ ] `cmd/location/main.go` - [ ] `cmd/server/main.go` - [ ] `internal/pkg/kafkaclient/consumer.go` - [ ] `internal/pkg/kafkaclient/manager.go` - [ ] `internal/pkg/apiclient/updatedb.go` - [ ] `internal/pkg/apiclient/auth.go` - [ ] `internal/pkg/service/beacon_service.go` - [ ] Test all services with new logging - [ ] Verify log rotation and disk space management - [ ] Set up log aggregation/monitoring dashboard --- ## Example Log Output ### JSON Format (Production) ```json {"time":"2026-01-20T10:15:30.123456Z","level":"INFO","msg":"Processing beacon","service":"bridge","beacon_id":"beacon-001","rssi":-65,"hostname":"gateway-1"} {"time":"2026-01-20T10:15:30.234567Z","level":"ERROR","msg":"Failed to write to Kafka","service":"bridge","error":"connection timeout","beacon_id":"beacon-001","topic":"rawbeacons"} ``` ### Text Format (Development) ``` time=2026-01-20T10:15:30.123Z level=INFO msg="Processing beacon" service=bridge beacon_id=beacon-001 rssi=-65 hostname=gateway-1 time=2026-01-20T10:15:30.234Z level=ERROR msg="Failed to write to Kafka" service=bridge error="connection timeout" beacon_id=beacon-001 topic=rawbeacons ``` --- ## Notes 1. **Security**: Be careful not to log sensitive information (passwords, tokens, personal data) 2. **Performance**: Use `slog.Debug` for high-frequency logs in production 3. **Disk space**: Implement log rotation to prevent disk exhaustion 4. **Testing**: Add unit tests to verify logging behavior where critical 5. **Context propagation**: Consider adding request/context IDs for tracing