Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 

38 KiB

Logging Standards and Guidelines

Overview

This document defines comprehensive logging standards for the AFASystems Presence services using Go’s structured logging (log/slog). The goal is to provide consistent, searchable, and actionable logs across all services.

Logger Configuration

Enhanced Logger Package

File: internal/pkg/logger/logger.go

Update the logger to support both JSON and text formats with configurable log levels:

package logger

import (
    "io"
    "log"
    "log/slog"
    "os"
    "path/filepath"
)

type LogConfig struct {
    Filename   string
    Level      string
    Format     string // "json" or "text"
    AddSource  bool
}

func CreateLogger(config LogConfig) *slog.Logger {
    // Ensure log directory exists
    dir := filepath.Dir(config.Filename)
    if err := os.MkdirAll(dir, 0755); err != nil {
        log.Fatalf("Failed to create log directory: %v\n", err)
    }

    f, err := os.OpenFile(config.Filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
    if err != nil {
        log.Fatalf("Failed to open log file: %v\n", err)
    }

    // Multi-writer: stderr + file
    w := io.MultiWriter(os.Stderr, f)

    // Parse log level
    var level slog.Level
    switch config.Level {
    case "debug":
        level = slog.LevelDebug
    case "info":
        level = slog.LevelInfo
    case "warn":
        level = slog.LevelWarn
    case "error":
        level = slog.LevelError
    default:
        level = slog.LevelInfo
    }

    // Create handler options
    opts := &slog.HandlerOptions{
        Level:     level,
        AddSource: config.AddSource,
    }

    // Choose handler based on format
    var handler slog.Handler
    if config.Format == "json" {
        handler = slog.NewJSONHandler(w, opts)
    } else {
        handler = slog.NewTextHandler(w, opts)
    }

    logger := slog.New(handler)
    return logger
}

// Convenience function for backward compatibility
func CreateLoggerSimple(fname string) *slog.Logger {
    return CreateLogger(LogConfig{
        Filename:  fname,
        Level:     "info",
        Format:    "json",
        AddSource: false,
    })
}

Usage in Services

// Replace: slog.SetDefault(logger.CreateLogger("bridge.log"))
// With:
slog.SetDefault(logger.CreateLogger(logger.LogConfig{
    Filename:  "bridge.log",
    Level:     cfg.LogLevel,      // Add to config
    Format:    cfg.LogFormat,     // Add to config
    AddSource: cfg.LogAddSource,  // Add to config
}))

Structured Logging Best Practices

Log Levels

Level Usage Examples
DEBUG Detailed debugging information Message payloads, internal state changes
INFO General informational messages Service started, connection established, normal operations
WARN Unexpected but recoverable situations Retry attempts, fallback to defaults, degraded performance
ERROR Error events that might still allow the application to continue Failed operations, API errors, parsing failures
FATAL Critical errors requiring immediate shutdown Cannot connect to essential services, invalid configuration

Structured Fields

Always use structured attributes instead of formatting strings:

// ❌ BAD
slog.Info(fmt.Sprintf("Processing beacon %s with RSSI %d", id, rssi))

// ✅ GOOD
slog.Info("Processing beacon",
    "id", id,
    "rssi", rssi,
    "hostname", hostname,
)

Standard Field Names

Field Name Type Description
service string Service name (bridge, decoder, location, server)
component string Component within service (mqtt, kafka, http)
topic string Kafka topic or MQTT topic
beacon_id string Beacon/tracker identifier
gateway_id string Gateway identifier
mac string MAC address
rssi int64 Received Signal Strength Indicator
duration string/int Duration of operation
error error Error object (for error-level logs)
count int Count of items processed
status string Status of operation (success, failed, retry)

Service-Specific Logging Guidelines

1. Bridge Service

File: cmd/bridge/main.go

Initialization

// Line 100-116: Replace basic info logs
slog.Info("Initializing bridge service",
    "kafka_url", cfg.KafkaURL,
    "mqtt_host", cfg.MQTTHost,
    "reader_topics", []string{"apibeacons", "alert", "mqtt"},
    "writer_topics", []string{"rawbeacons"},
)

slog.Info("Kafka manager initialized",
    "readers_count", len(readerTopics),
    "writers_count", len(writerTopics),
)

// Line 127-144: MQTT connection
slog.Info("Connecting to MQTT broker",
    "broker", fmt.Sprintf("%s:%d", cfg.MQTTHost, 1883),
    "client_id", "go_mqtt_client",
)

if token := client.Connect(); token.Wait() && token.Error() != nil {
    slog.Error("Failed to connect to MQTT broker",
        "error", token.Error(),
        "broker", cfg.MQTTHost,
    )
    panic(token.Error())
}

slog.Info("Successfully connected to MQTT broker",
    "broker", cfg.MQTTHost,
)

// Line 146-202: MQTT subscription
slog.Info("Subscribed to MQTT topic",
    "topic", topic,
)

MQTT Message Handler

// Line 26-83: Replace with structured logging
func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
    hostname := strings.Split(topic, "/")[1]

    slog.Debug("Received MQTT message",
        "topic", topic,
        "hostname", hostname,
        "message_length", len(message),
    )

    msgStr := string(message)

    if strings.HasPrefix(msgStr, "[") {
        var readings []model.RawReading
        if err := json.Unmarshal(message, &readings); err != nil {
            slog.Error("Failed to parse JSON message",
                "error", err,
                "topic", topic,
                "message", msgStr,
            )
            return
        }

        slog.Debug("Parsed JSON readings",
            "count", len(readings),
            "topic", topic,
        )

        processed := 0
        skipped := 0
        for _, reading := range readings {
            if reading.Type == "Gateway" {
                skipped++
                continue
            }

            val, ok := appState.BeaconExists(reading.MAC)
            if !ok {
                slog.Debug("Skipping unregistered beacon",
                    "mac", reading.MAC,
                    "hostname", hostname,
                )
                skipped++
                continue
            }

            adv := model.BeaconAdvertisement{
                ID:       val,
                Hostname: hostname,
                MAC:      reading.MAC,
                RSSI:     int64(reading.RSSI),
                Data:     reading.RawData,
            }

            encodedMsg, err := json.Marshal(adv)
            if err != nil {
                slog.Error("Failed to marshal beacon advertisement",
                    "error", err,
                    "beacon_id", val,
                )
                continue
            }

            msg := kafka.Message{Value: encodedMsg}

            if err := writer.WriteMessages(context.Background(), msg); err != nil {
                slog.Error("Failed to write to Kafka",
                    "error", err,
                    "beacon_id", val,
                    "topic", "rawbeacons",
                )
                time.Sleep(1 * time.Second)
                break
            }
            processed++
        }

        slog.Info("Processed MQTT readings",
            "processed", processed,
            "skipped", skipped,
            "hostname", hostname,
        )
    } else {
        slog.Warn("Received non-JSON message format",
            "message", msgStr,
            "topic", topic,
        )
    }
}

Event Loop

// Line 148-184: Event loop logging
case msg := <-chApi:
    slog.Debug("Received API update from Kafka",
        "method", msg.Method,
        "id", msg.ID,
        "mac", msg.MAC,
    )

    switch msg.Method {
    case "POST":
        id := msg.ID
        appState.AddBeaconToLookup(msg.MAC, id)
        slog.Info("Beacon added to lookup",
            "beacon_id", id,
            "mac", msg.MAC,
        )
    case "DELETE":
        id := msg.MAC
        if id == "all" {
            appState.CleanLookup()
            slog.Info("Cleared all beacons from lookup")
            continue
        }
        appState.RemoveBeaconFromLookup(id)
        slog.Info("Beacon removed from lookup",
            "beacon_id", id,
        )
    }

case msg := <-chAlert:
    slog.Debug("Received alert from Kafka",
        "alert_id", msg.ID,
        "type", msg.Type,
    )
    // ... processing ...

case msg := <-chMqtt:
    slog.Debug("Received tracker list request",
        "trackers_count", len(msg),
    )
    // ... processing ...

Shutdown

// Line 186-195: Graceful shutdown
slog.Info("Shutting down bridge service",
    "reason", "context_done",
)

wg.Wait()

slog.Info("All goroutines stopped, closing Kafka connections")
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()

client.Disconnect(250)
slog.Info("Disconnected from MQTT broker")

slog.Info("Bridge service shutdown complete")

2. Decoder Service

File: cmd/decoder/main.go

Initialization

// Line 42-48: Service initialization
slog.Info("Initializing decoder service",
    "kafka_url", cfg.KafkaURL,
    "reader_topics", []string{"rawbeacons", "parser"},
    "writer_topics", []string{"alertbeacons"},
)

slog.Info("Decoder service initialized",
    "parsers_loaded", len(parserRegistry.ParserList),
)

Beacon Processing

// Line 86-93: Process incoming beacon
func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
    slog.Debug("Processing beacon advertisement",
        "beacon_id", adv.ID,
        "hostname", adv.Hostname,
        "rssi", adv.RSSI,
        "data_length", len(adv.Data),
    )

    if err := decodeBeacon(adv, appState, writer, parserRegistry); err != nil {
        slog.Error("Failed to decode beacon",
            "error", err,
            "beacon_id", adv.ID,
            "hostname", adv.Hostname,
        )
    }
}

// Line 95-136: Decode beacon
func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error {
    beacon := strings.TrimSpace(adv.Data)
    id := adv.ID

    if beacon == "" {
        slog.Debug("Skipping beacon with empty data",
            "beacon_id", id,
        )
        return nil
    }

    b, err := hex.DecodeString(beacon)
    if err != nil {
        slog.Warn("Failed to decode hex string",
            "error", err,
            "beacon_id", id,
            "data", beacon,
        )
        return err
    }

    slog.Debug("Decoded beacon data",
        "beacon_id", id,
        "bytes_length", len(b),
    )

    // ... parsing logic ...

    if event.ID == "" {
        slog.Debug("Skipping event with empty ID",
            "beacon_id", id,
        )
        return nil
    }

    prevEvent, ok := appState.GetBeaconEvent(id)
    appState.UpdateBeaconEvent(id, event)

    if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
        slog.Debug("Skipping duplicate event",
            "beacon_id", id,
            "event_type", event.Type,
        )
        return nil
    }

    slog.Info("Decoded beacon event",
        "beacon_id", id,
        "event_type", event.Type,
        "event", event.Event,
        "battery", event.Battery,
        "temperature", event.Temperature,
    )

    eMsg, err := event.ToJSON()
    if err != nil {
        slog.Error("Failed to marshal event to JSON",
            "error", err,
            "beacon_id", id,
        )
        return err
    }

    if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
        slog.Error("Failed to write event to Kafka",
            "error", err,
            "beacon_id", id,
            "topic", "alertbeacons",
        )
        return err
    }

    slog.Debug("Successfully sent event to Kafka",
        "beacon_id", id,
        "topic", "alertbeacons",
    )

    return nil
}

Parser Management

// Line 64-74: Parser configuration
case msg := <-chParser:
    slog.Info("Received parser configuration update",
        "action", msg.ID,
        "parser_name", msg.Name,
    )

    switch msg.ID {
    case "add":
        config := msg.Config
        parserRegistry.Register(config.Name, config)
        slog.Info("Registered new parser",
            "parser_name", config.Name,
            "type", config.Type,
        )
    case "delete":
        parserRegistry.Unregister(msg.Name)
        slog.Info("Unregistered parser",
            "parser_name", msg.Name,
        )
    case "update":
        config := msg.Config
        parserRegistry.Register(config.Name, config)
        slog.Info("Updated parser configuration",
            "parser_name", config.Name,
            "type", config.Type,
        )
    }

3. Location Service

File: cmd/location/main.go

Initialization

// Line 37-47: Service initialization
slog.Info("Initializing location service",
    "kafka_url", cfg.KafkaURL,
    "reader_topics", []string{"rawbeacons", "settings"},
    "writer_topics", []string{"locevents"},
    "algorithm", "filter",
)

Location Algorithm

// Line 60-68: Algorithm execution
case <-locTicker.C:
    settings := appState.GetSettings()
    slog.Debug("Running location algorithm",
        "algorithm", settings.CurrentAlgorithm,
        "beacon_count", len(appState.GetAllBeacons()),
    )

    switch settings.CurrentAlgorithm {
    case "filter":
        getLikelyLocations(appState, kafkaManager.GetWriter("locevents"))
    case "ai":
        slog.Warn("AI algorithm not yet implemented")
    }

Location Processing

// Line 85-157: Location calculation
func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
    beacons := appState.GetAllBeacons()
    settings := appState.GetSettingsValue()

    slog.Debug("Calculating likely locations",
        "beacon_count", len(beacons),
        "algorithm", "filter",
    )

    processed := 0
    skipped := 0
    updated := 0

    for _, beacon := range beacons {
        mSize := len(beacon.BeaconMetrics)
        if mSize == 0 {
            skipped++
            continue
        }

        // Check if beacon is too old
        if (int64(time.Now().Unix()) - beacon.BeaconMetrics[mSize-1].Timestamp) > settings.LastSeenThreshold {
            slog.Debug("Beacon data too old",
                "beacon_id", beacon.ID,
                "last_seen", beacon.BeaconMetrics[mSize-1].Timestamp,
                "threshold", settings.LastSeenThreshold,
            )
            skipped++
            continue
        }

        // ... location calculation logic ...

        locationChanged := bestLocName != beacon.PreviousLocation

        if locationChanged {
            slog.Info("Beacon location changed",
                "beacon_id", beacon.ID,
                "previous_location", beacon.PreviousLocation,
                "new_location", bestLocName,
                "confidence", beacon.LocationConfidence,
                "distance", r.Distance,
            )
            updated++
        }

        appState.UpdateBeacon(beacon.ID, beacon)
        processed++

        // ... send to Kafka ...
    }

    slog.Info("Location calculation complete",
        "processed", processed,
        "updated", updated,
        "skipped", skipped,
    )
}

Beacon Assignment

// Line 159-199: Assign beacon to list
func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) {
    id := adv.ID
    now := time.Now().Unix()
    settings := appState.GetSettingsValue()

    slog.Debug("Assigning beacon to tracking list",
        "beacon_id", id,
        "hostname", adv.Hostname,
        "rssi", adv.RSSI,
    )

    if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
        slog.Debug("Beacon RSSI below threshold, skipping",
            "beacon_id", id,
            "rssi", adv.RSSI,
            "threshold", settings.RSSIMinThreshold,
        )
        return
    }

    beacon, ok := appState.GetBeacon(id)
    if !ok {
        slog.Debug("Creating new beacon entry",
            "beacon_id", id,
        )
        beacon = model.Beacon{ID: id}
    }

    // ... update beacon metrics ...

    slog.Debug("Updated beacon metrics",
        "beacon_id", id,
        "metrics_count", len(beacon.BeaconMetrics),
        "distance", metric.Distance,
        "location", metric.Location,
    )

    appState.UpdateBeacon(id, beacon)
}

Settings Update

case msg := <-chSettings:
    slog.Info("Received settings update",
        "settings", msg,
    )
    appState.UpdateSettings(msg)
    slog.Info("Settings updated successfully",
        "current_algorithm", appState.GetSettings().CurrentAlgorithm,
    )

4. Server Service

File: cmd/server/main.go

Initialization

// Line 46-49: Database connection
db, err := database.Connect(cfg)
if err != nil {
    slog.Error("Failed to connect to database",
        "error", err,
        "connection_string", cfg.DBConnectionString,
    )
    log.Fatalf("Failed to open database connection: %v\n", err)
}

slog.Info("Successfully connected to database")

// Line 60-89: Configuration loading
configFile, err := os.Open("/app/cmd/server/config.json")
if err != nil {
    slog.Error("Failed to open configuration file",
        "error", err,
        "file_path", "/app/cmd/server/config.json",
    )
    panic(err)
}

slog.Info("Loading parser configurations",
    "file_path", "/app/cmd/server/config.json",
)

var configs []model.Config
if err := json.Unmarshal(b, &configs); err != nil {
    slog.Error("Failed to parse configuration file",
        "error", err,
    )
    panic(err)
}

slog.Info("Loaded parser configurations",
    "count", len(configs),
)

// Persist configs
for _, config := range configs {
    if err := db.Create(&config).Error; err != nil {
        slog.Warn("Failed to persist config to database",
            "error", err,
            "config_name", config.Name,
        )
    }
}

// Send to Kafka
for _, config := range configs {
    kp := model.KafkaParser{
        ID:     "add",
        Config: config,
    }

    if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil {
        slog.Error("Failed to send parser config to Kafka",
            "error", err,
            "parser_name", config.Name,
        )
    } else {
        slog.Info("Sent parser config to Kafka",
            "parser_name", config.Name,
            "type", config.Type,
        )
    }
}

// Line 87-89: API client initialization
if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil {
    slog.Error("Failed to initialize API client",
        "error", err,
    )
    fmt.Printf("Error in getting token: %v\n", err)
}

slog.Info("API client initialized successfully")

// Line 136-147: HTTP server
slog.Info("Starting HTTP server",
    "address", cfg.HTTPAddr,
    "cors_origins", "*",
)

go server.ListenAndServe()

slog.Info("HTTP server started",
    "address", cfg.HTTPAddr,
)

Event Loop

// Line 153-181: Event loop
case msg := <-chLoc:
    slog.Debug("Received location event",
        "beacon_id", msg.ID,
        "location", msg.Location,
        "distance", msg.Distance,
    )
    service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx)

case msg := <-chEvents:
    slog.Debug("Received beacon event",
        "beacon_id", msg.ID,
        "event_type", msg.Type,
        "event", msg.Event,
    )

    id := msg.ID
    if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
        slog.Warn("Received event for untracked beacon",
            "beacon_id", id,
            "error", err,
        )
        continue
    }

    if err := db.Updates(&model.Tracker{
        ID:          id,
        Battery:     msg.Battery,
        Temperature: msg.Temperature,
    }).Error; err != nil {
        slog.Error("Failed to update tracker metrics",
            "error", err,
            "beacon_id", id,
        )
    } else {
        slog.Info("Updated tracker metrics",
            "beacon_id", id,
            "battery", msg.Battery,
            "temperature", msg.Temperature,
        )
    }

case <-beaconTicker.C:
    slog.Debug("Refreshing tracker list for MQTT")
    var list []model.Tracker
    db.Find(&list)

    eMsg, err := json.Marshal(list)
    if err != nil {
        slog.Error("Failed to marshal tracker list",
            "error", err,
            "count", len(list),
        )
        continue
    }

    msg := kafka.Message{Value: eMsg}
    if err := kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg); err != nil {
        slog.Error("Failed to send tracker list to MQTT",
            "error", err,
            "count", len(list),
        )
    } else {
        slog.Debug("Sent tracker list to MQTT",
            "count", len(list),
        )
    }

Shutdown

// Line 184-199: Graceful shutdown
if err := server.Shutdown(context.Background()); err != nil {
    slog.Error("Failed to shutdown HTTP server",
        "error", err,
    )
}

slog.Info("HTTP server stopped")

wg.Wait()

slog.Info("All goroutines stopped, closing Kafka connections")
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()

slog.Info("All services shutdown complete")

Supporting Packages

Kafka Client Package

File: internal/pkg/kafkaclient/consumer.go

// Line 12-35: Consumer function
func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()

    slog.Info("Starting Kafka consumer",
        "topic", r.Config().Topic,
        "group_id", r.Config().GroupID,
    )

    messageCount := 0
    errorCount := 0

    for {
        select {
        case <-ctx.Done():
            slog.Info("Kafka consumer shutting down",
                "topic", r.Config().Topic,
                "messages_processed", messageCount,
                "errors", errorCount,
            )
            return
        default:
            msg, err := r.ReadMessage(ctx)
            if err != nil {
                errorCount++
                slog.Error("Error reading Kafka message",
                    "error", err,
                    "topic", r.Config().Topic,
                    "error_count", errorCount,
                )
                continue
            }

            var data T
            if err := json.Unmarshal(msg.Value, &data); err != nil {
                errorCount++
                slog.Error("Error decoding Kafka message",
                    "error", err,
                    "topic", r.Config().Topic,
                    "message_length", len(msg.Value),
                )
                continue
            }

            messageCount++
            if messageCount%100 == 0 {
                slog.Debug("Kafka consumer progress",
                    "topic", r.Config().Topic,
                    "messages_processed", messageCount,
                )
            }

            ch <- data
        }
    }
}

File: internal/pkg/kafkaclient/manager.go

// Line 38-52: Add Kafka writer
func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
    slog.Debug("Creating Kafka writer",
        "topic", topic,
        "brokers", kafkaUrl,
    )

    kafkaWriter := &kafka.Writer{
        Addr:         kafka.TCP(kafkaUrl),
        Topic:        topic,
        Balancer:     &kafka.LeastBytes{},
        Async:        false,
        RequiredAcks: kafka.RequireAll,
        BatchSize:    100,
        BatchTimeout: 10 * time.Millisecond,
    }

    m.kafkaWritersMap.KafkaWritersLock.Lock()
    m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
    m.kafkaWritersMap.KafkaWritersLock.Unlock()

    slog.Info("Kafka writer created",
        "topic", topic,
    )
}

// Line 66-79: Add Kafka reader
func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
    slog.Debug("Creating Kafka reader",
        "topic", topic,
        "group_id", groupID,
        "brokers", kafkaUrl,
    )

    brokers := strings.Split(kafkaUrl, ",")
    kafkaReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  brokers,
        GroupID:  groupID,
        Topic:    topic,
        MinBytes: 1,
        MaxBytes: 10e6,
    })

    m.kafkaReadersMap.KafkaReadersLock.Lock()
    m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
    m.kafkaReadersMap.KafkaReadersLock.Unlock()

    slog.Info("Kafka reader created",
        "topic", topic,
        "group_id", groupID,
    )
}

// Line 54-64: Clean writers
func (m *KafkaManager) CleanKafkaWriters() {
    slog.Info("Shutting down Kafka writers",
        "count", len(m.kafkaWritersMap.KafkaWriters),
    )

    m.kafkaWritersMap.KafkaWritersLock.Lock()
    for topic, r := range m.kafkaWritersMap.KafkaWriters {
        if err := r.Close(); err != nil {
            slog.Error("Error closing Kafka writer",
                "error", err,
                "topic", topic,
            )
        } else {
            slog.Info("Kafka writer closed",
                "topic", topic,
            )
        }
    }
    m.kafkaWritersMap.KafkaWritersLock.Unlock()

    slog.Info("Kafka writers shutdown complete")
}

// Line 81-90: Clean readers
func (m *KafkaManager) CleanKafkaReaders() {
    slog.Info("Shutting down Kafka readers",
        "count", len(m.kafkaReadersMap.KafkaReaders),
    )

    m.kafkaReadersMap.KafkaReadersLock.Lock()
    for topic, r := range m.kafkaReadersMap.KafkaReaders {
        if err := r.Close(); err != nil {
            slog.Error("Error closing Kafka reader",
                "error", err,
                "topic", topic,
            )
        } else {
            slog.Info("Kafka reader closed",
                "topic", topic,
            )
        }
    }
    m.kafkaReadersMap.KafkaReadersLock.Unlock()

    slog.Info("Kafka readers shutdown complete")
}

API Client Package

File: internal/pkg/apiclient/updatedb.go

// Line 19-74: Update database
func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error {
    slog.Info("Initializing database from API",
        "api_url", cfg.APIURL,
    )

    tr := &http.Transport{
        TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
    }
    client := &http.Client{Transport: tr}

    token, err := GetToken(ctx, cfg, client)
    if err != nil {
        slog.Error("Failed to get authentication token",
            "error", err,
        )
        return err
    }

    slog.Info("Successfully authenticated with API")

    // Sync trackers
    if trackers, err := GetTrackers(token, client); err == nil {
        slog.Info("Fetched trackers from API",
            "count", len(trackers),
        )
        syncTable(db, trackers)

        if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil {
            slog.Error("Failed to send clear lookup message",
                "error", err,
            )
        }

        for _, v := range trackers {
            apiUpdate := model.ApiUpdate{
                Method: "POST",
                ID:     v.ID,
                MAC:    v.MAC,
            }

            if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
                slog.Error("Failed to send tracker registration",
                    "error", err,
                    "tracker_id", v.ID,
                )
            } else {
                slog.Debug("Sent tracker registration",
                    "tracker_id", v.ID,
                    "mac", v.MAC,
                )
            }
        }
    } else {
        slog.Error("Failed to fetch trackers from API",
            "error", err,
        )
    }

    // Sync gateways
    if gateways, err := GetGateways(token, client); err == nil {
        slog.Info("Fetched gateways from API",
            "count", len(gateways),
        )
        syncTable(db, gateways)
    } else {
        slog.Error("Failed to fetch gateways from API",
            "error", err,
        )
    }

    // Sync zones
    if zones, err := GetZones(token, client); err == nil {
        slog.Info("Fetched zones from API",
            "count", len(zones),
        )
        syncTable(db, zones)
    } else {
        slog.Error("Failed to fetch zones from API",
            "error", err,
        )
    }

    // Sync tracker zones
    if trackerZones, err := GetTrackerZones(token, client); err == nil {
        slog.Info("Fetched tracker zones from API",
            "count", len(trackerZones),
        )
        syncTable(db, trackerZones)
    } else {
        slog.Error("Failed to fetch tracker zones from API",
            "error", err,
        )
    }

    // Initialize settings
    var settings model.Settings
    db.First(&settings)
    if settings.ID == 0 {
        slog.Info("Initializing default settings")
        db.Create(appState.GetSettings())
    }

    slog.Info("Database sync complete")
    return nil
}

// Line 76-91: Sync table
func syncTable[T any](db *gorm.DB, data []T) {
    if len(data) == 0 {
        slog.Debug("No data to sync",
            "type", fmt.Sprintf("%T", data),
        )
        return
    }

    slog.Debug("Syncing table",
        "count", len(data),
        "type", fmt.Sprintf("%T", data),
    )

    var ids []string
    for _, item := range data {
        v := reflect.ValueOf(item).FieldByName("ID").String()
        ids = append(ids, v)
    }

    db.Transaction(func(tx *gorm.DB) error {
        result := tx.Where("id NOT IN ?", ids).Delete(new(T))
        if result.Error != nil {
            slog.Error("Failed to delete stale records",
                "error", result.Error,
                "deleted_count", result.RowsAffected,
            )
        }

        result = tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data)
        if result.Error != nil {
            slog.Error("Failed to upsert records",
                "error", result.Error,
            )
            return result.Error
        }

        slog.Info("Table sync complete",
            "upserted", len(data),
            "deleted", result.RowsAffected,
        )
        return nil
    })
}

File: internal/pkg/apiclient/auth.go

// Line 17-45: Get token
func GetToken(ctx context.Context, cfg *config.Config, client *http.Client) (string, error) {
    slog.Debug("Requesting authentication token",
        "url", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token",
    )

    formData := url.Values{}
    formData.Set("grant_type", "password")
    formData.Set("client_id", "Fastapi")
    formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC")
    formData.Set("username", "core")
    formData.Set("password", "C0r3_us3r_Cr3d3nt14ls")
    formData.Set("audience", "Fastapi")

    req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode()))
    if err != nil {
        slog.Error("Failed to create auth request",
            "error", err,
        )
        return "", err
    }
    req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

    req = req.WithContext(ctx)
    res, err := client.Do(req)
    if err != nil {
        slog.Error("Failed to send auth request",
            "error", err,
        )
        return "", err
    }

    var j response
    if err := json.NewDecoder(res.Body).Decode(&j); err != nil {
        slog.Error("Failed to decode auth response",
            "error", err,
            "status_code", res.StatusCode,
        )
        return "", err
    }

    slog.Info("Successfully obtained authentication token",
        "token_length", len(j.Token),
    )

    return j.Token, nil
}

Service Package

File: internal/pkg/service/beacon_service.go

// Line 17-67: Location to beacon service
func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
    if msg.ID == "" {
        slog.Warn("Received location message with empty ID")
        return
    }

    slog.Debug("Processing location message",
        "beacon_id", msg.ID,
        "location", msg.Location,
        "distance", msg.Distance,
    )

    var zones []model.TrackerZones
    if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil {
        slog.Error("Failed to fetch tracker zones",
            "error", err,
            "beacon_id", msg.ID,
        )
        return
    }

    slog.Debug("Fetched tracker zones",
        "beacon_id", msg.ID,
        "zones_count", len(zones),
    )

    var allowedZones []string
    for _, z := range zones {
        allowedZones = append(allowedZones, z.ZoneList...)
    }

    var gw model.Gateway
    mac := formatMac(msg.Location)
    if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil {
        slog.Warn("Gateway not found for location",
            "mac", mac,
            "beacon_id", msg.ID,
            "error", err,
        )
        return
    }

    slog.Debug("Found gateway for location",
        "gateway_id", gw.ID,
        "mac", mac,
    )

    if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) {
        alert := model.Alert{
            ID:    msg.ID,
            Type:  "Restricted zone",
            Value: gw.ID,
        }

        slog.Warn("Beacon in restricted zone",
            "beacon_id", msg.ID,
            "gateway_id", gw.ID,
            "allowed_zones", len(allowedZones),
        )

        eMsg, err := json.Marshal(alert)
        if err != nil {
            slog.Error("Failed to marshal alert",
                "error", err,
                "beacon_id", msg.ID,
            )
        } else {
            msg := kafka.Message{Value: eMsg}
            if err := writer.WriteMessages(ctx, msg); err != nil {
                slog.Error("Failed to send alert to Kafka",
                    "error", err,
                    "beacon_id", msg.ID,
                    "alert_type", "Restricted zone",
                )
            } else {
                slog.Info("Sent restricted zone alert",
                    "beacon_id", msg.ID,
                    "gateway_id", gw.ID,
                )
            }
        }
    }

    // Save track
    if err := db.Create(&model.Tracks{
        UUID:       msg.ID,
        Timestamp:  time.Now(),
        Gateway:    gw.ID,
        GatewayMac: gw.MAC,
        Tracker:    msg.ID,
    }).Error; err != nil {
        slog.Error("Failed to save track",
            "error", err,
            "beacon_id", msg.ID,
            "gateway_id", gw.ID,
        )
        return
    }

    slog.Debug("Saved track record",
        "beacon_id", msg.ID,
        "gateway_id", gw.ID,
    )

    // Update tracker
    if err := db.Updates(&model.Tracker{
        ID:       msg.ID,
        Location: gw.ID,
        Distance: msg.Distance,
    }).Error; err != nil {
        slog.Error("Failed to update tracker location",
            "error", err,
            "beacon_id", msg.ID,
        )
    } else {
        slog.Info("Updated tracker location",
            "beacon_id", msg.ID,
            "gateway_id", gw.ID,
            "distance", msg.Distance,
        )
    }
}

Configuration Updates

Add these fields to your configuration structure:

// internal/pkg/config/config.go
type Config struct {
    // ... existing fields ...

    // Logging configuration
    LogLevel     string `json:"log_level" env:"LOG_LEVEL" default:"info"`
    LogFormat    string `json:"log_format" env:"LOG_FORMAT" default:"json"`
    LogAddSource bool   `json:"log_add_source" env:"LOG_ADD_SOURCE" default:"false"`
}

Log Aggregation and Monitoring

All errors across services:

jq 'select(.level == "error")' *.log

Beacon processing by ID:

jq 'select(.beacon_id == "beacon-123")' bridge.log decoder.log location.log

Kafka message flow:

jq 'select(.topic != null)' bridge.log decoder.log location.log server.log

Performance analysis:

jq 'select(.duration != null)' *.log

Metrics to Track

  1. Message throughput - Messages processed per second per service
  2. Error rates - Errors per 1000 messages by type
  3. Processing latency - Time from ingestion to processing
  4. Kafka lag - Consumer lag per topic/group
  5. Connection failures - MQTT/Kafka reconnection frequency

Migration Checklist


Example Log Output

JSON Format (Production)

{"time":"2026-01-20T10:15:30.123456Z","level":"INFO","msg":"Processing beacon","service":"bridge","beacon_id":"beacon-001","rssi":-65,"hostname":"gateway-1"}
{"time":"2026-01-20T10:15:30.234567Z","level":"ERROR","msg":"Failed to write to Kafka","service":"bridge","error":"connection timeout","beacon_id":"beacon-001","topic":"rawbeacons"}

Text Format (Development)

time=2026-01-20T10:15:30.123Z level=INFO msg="Processing beacon" service=bridge beacon_id=beacon-001 rssi=-65 hostname=gateway-1
time=2026-01-20T10:15:30.234Z level=ERROR msg="Failed to write to Kafka" service=bridge error="connection timeout" beacon_id=beacon-001 topic=rawbeacons

Notes

  1. Security: Be careful not to log sensitive information (passwords, tokens, personal data)
  2. Performance: Use slog.Debug for high-frequency logs in production
  3. Disk space: Implement log rotation to prevent disk exhaustion
  4. Testing: Add unit tests to verify logging behavior where critical
  5. Context propagation: Consider adding request/context IDs for tracing