This document defines comprehensive logging standards for the AFASystems Presence services using Go’s structured logging (log/slog). The goal is to provide consistent, searchable, and actionable logs across all services.
File: internal/pkg/logger/logger.go
Update the logger to support both JSON and text formats with configurable log levels:
package logger
import (
"io"
"log"
"log/slog"
"os"
"path/filepath"
)
type LogConfig struct {
Filename string
Level string
Format string // "json" or "text"
AddSource bool
}
func CreateLogger(config LogConfig) *slog.Logger {
// Ensure log directory exists
dir := filepath.Dir(config.Filename)
if err := os.MkdirAll(dir, 0755); err != nil {
log.Fatalf("Failed to create log directory: %v\n", err)
}
f, err := os.OpenFile(config.Filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// Multi-writer: stderr + file
w := io.MultiWriter(os.Stderr, f)
// Parse log level
var level slog.Level
switch config.Level {
case "debug":
level = slog.LevelDebug
case "info":
level = slog.LevelInfo
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
level = slog.LevelInfo
}
// Create handler options
opts := &slog.HandlerOptions{
Level: level,
AddSource: config.AddSource,
}
// Choose handler based on format
var handler slog.Handler
if config.Format == "json" {
handler = slog.NewJSONHandler(w, opts)
} else {
handler = slog.NewTextHandler(w, opts)
}
logger := slog.New(handler)
return logger
}
// Convenience function for backward compatibility
func CreateLoggerSimple(fname string) *slog.Logger {
return CreateLogger(LogConfig{
Filename: fname,
Level: "info",
Format: "json",
AddSource: false,
})
}
// Replace: slog.SetDefault(logger.CreateLogger("bridge.log"))
// With:
slog.SetDefault(logger.CreateLogger(logger.LogConfig{
Filename: "bridge.log",
Level: cfg.LogLevel, // Add to config
Format: cfg.LogFormat, // Add to config
AddSource: cfg.LogAddSource, // Add to config
}))
| Level | Usage | Examples |
|---|---|---|
| DEBUG | Detailed debugging information | Message payloads, internal state changes |
| INFO | General informational messages | Service started, connection established, normal operations |
| WARN | Unexpected but recoverable situations | Retry attempts, fallback to defaults, degraded performance |
| ERROR | Error events that might still allow the application to continue | Failed operations, API errors, parsing failures |
| FATAL | Critical errors requiring immediate shutdown | Cannot connect to essential services, invalid configuration |
Always use structured attributes instead of formatting strings:
// ❌ BAD
slog.Info(fmt.Sprintf("Processing beacon %s with RSSI %d", id, rssi))
// ✅ GOOD
slog.Info("Processing beacon",
"id", id,
"rssi", rssi,
"hostname", hostname,
)
| Field Name | Type | Description |
|---|---|---|
service |
string | Service name (bridge, decoder, location, server) |
component |
string | Component within service (mqtt, kafka, http) |
topic |
string | Kafka topic or MQTT topic |
beacon_id |
string | Beacon/tracker identifier |
gateway_id |
string | Gateway identifier |
mac |
string | MAC address |
rssi |
int64 | Received Signal Strength Indicator |
duration |
string/int | Duration of operation |
error |
error | Error object (for error-level logs) |
count |
int | Count of items processed |
status |
string | Status of operation (success, failed, retry) |
File: cmd/bridge/main.go
// Line 100-116: Replace basic info logs
slog.Info("Initializing bridge service",
"kafka_url", cfg.KafkaURL,
"mqtt_host", cfg.MQTTHost,
"reader_topics", []string{"apibeacons", "alert", "mqtt"},
"writer_topics", []string{"rawbeacons"},
)
slog.Info("Kafka manager initialized",
"readers_count", len(readerTopics),
"writers_count", len(writerTopics),
)
// Line 127-144: MQTT connection
slog.Info("Connecting to MQTT broker",
"broker", fmt.Sprintf("%s:%d", cfg.MQTTHost, 1883),
"client_id", "go_mqtt_client",
)
if token := client.Connect(); token.Wait() && token.Error() != nil {
slog.Error("Failed to connect to MQTT broker",
"error", token.Error(),
"broker", cfg.MQTTHost,
)
panic(token.Error())
}
slog.Info("Successfully connected to MQTT broker",
"broker", cfg.MQTTHost,
)
// Line 146-202: MQTT subscription
slog.Info("Subscribed to MQTT topic",
"topic", topic,
)
// Line 26-83: Replace with structured logging
func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
hostname := strings.Split(topic, "/")[1]
slog.Debug("Received MQTT message",
"topic", topic,
"hostname", hostname,
"message_length", len(message),
)
msgStr := string(message)
if strings.HasPrefix(msgStr, "[") {
var readings []model.RawReading
if err := json.Unmarshal(message, &readings); err != nil {
slog.Error("Failed to parse JSON message",
"error", err,
"topic", topic,
"message", msgStr,
)
return
}
slog.Debug("Parsed JSON readings",
"count", len(readings),
"topic", topic,
)
processed := 0
skipped := 0
for _, reading := range readings {
if reading.Type == "Gateway" {
skipped++
continue
}
val, ok := appState.BeaconExists(reading.MAC)
if !ok {
slog.Debug("Skipping unregistered beacon",
"mac", reading.MAC,
"hostname", hostname,
)
skipped++
continue
}
adv := model.BeaconAdvertisement{
ID: val,
Hostname: hostname,
MAC: reading.MAC,
RSSI: int64(reading.RSSI),
Data: reading.RawData,
}
encodedMsg, err := json.Marshal(adv)
if err != nil {
slog.Error("Failed to marshal beacon advertisement",
"error", err,
"beacon_id", val,
)
continue
}
msg := kafka.Message{Value: encodedMsg}
if err := writer.WriteMessages(context.Background(), msg); err != nil {
slog.Error("Failed to write to Kafka",
"error", err,
"beacon_id", val,
"topic", "rawbeacons",
)
time.Sleep(1 * time.Second)
break
}
processed++
}
slog.Info("Processed MQTT readings",
"processed", processed,
"skipped", skipped,
"hostname", hostname,
)
} else {
slog.Warn("Received non-JSON message format",
"message", msgStr,
"topic", topic,
)
}
}
// Line 148-184: Event loop logging
case msg := <-chApi:
slog.Debug("Received API update from Kafka",
"method", msg.Method,
"id", msg.ID,
"mac", msg.MAC,
)
switch msg.Method {
case "POST":
id := msg.ID
appState.AddBeaconToLookup(msg.MAC, id)
slog.Info("Beacon added to lookup",
"beacon_id", id,
"mac", msg.MAC,
)
case "DELETE":
id := msg.MAC
if id == "all" {
appState.CleanLookup()
slog.Info("Cleared all beacons from lookup")
continue
}
appState.RemoveBeaconFromLookup(id)
slog.Info("Beacon removed from lookup",
"beacon_id", id,
)
}
case msg := <-chAlert:
slog.Debug("Received alert from Kafka",
"alert_id", msg.ID,
"type", msg.Type,
)
// ... processing ...
case msg := <-chMqtt:
slog.Debug("Received tracker list request",
"trackers_count", len(msg),
)
// ... processing ...
// Line 186-195: Graceful shutdown
slog.Info("Shutting down bridge service",
"reason", "context_done",
)
wg.Wait()
slog.Info("All goroutines stopped, closing Kafka connections")
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()
client.Disconnect(250)
slog.Info("Disconnected from MQTT broker")
slog.Info("Bridge service shutdown complete")
File: cmd/decoder/main.go
// Line 42-48: Service initialization
slog.Info("Initializing decoder service",
"kafka_url", cfg.KafkaURL,
"reader_topics", []string{"rawbeacons", "parser"},
"writer_topics", []string{"alertbeacons"},
)
slog.Info("Decoder service initialized",
"parsers_loaded", len(parserRegistry.ParserList),
)
// Line 86-93: Process incoming beacon
func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) {
slog.Debug("Processing beacon advertisement",
"beacon_id", adv.ID,
"hostname", adv.Hostname,
"rssi", adv.RSSI,
"data_length", len(adv.Data),
)
if err := decodeBeacon(adv, appState, writer, parserRegistry); err != nil {
slog.Error("Failed to decode beacon",
"error", err,
"beacon_id", adv.ID,
"hostname", adv.Hostname,
)
}
}
// Line 95-136: Decode beacon
func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error {
beacon := strings.TrimSpace(adv.Data)
id := adv.ID
if beacon == "" {
slog.Debug("Skipping beacon with empty data",
"beacon_id", id,
)
return nil
}
b, err := hex.DecodeString(beacon)
if err != nil {
slog.Warn("Failed to decode hex string",
"error", err,
"beacon_id", id,
"data", beacon,
)
return err
}
slog.Debug("Decoded beacon data",
"beacon_id", id,
"bytes_length", len(b),
)
// ... parsing logic ...
if event.ID == "" {
slog.Debug("Skipping event with empty ID",
"beacon_id", id,
)
return nil
}
prevEvent, ok := appState.GetBeaconEvent(id)
appState.UpdateBeaconEvent(id, event)
if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
slog.Debug("Skipping duplicate event",
"beacon_id", id,
"event_type", event.Type,
)
return nil
}
slog.Info("Decoded beacon event",
"beacon_id", id,
"event_type", event.Type,
"event", event.Event,
"battery", event.Battery,
"temperature", event.Temperature,
)
eMsg, err := event.ToJSON()
if err != nil {
slog.Error("Failed to marshal event to JSON",
"error", err,
"beacon_id", id,
)
return err
}
if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
slog.Error("Failed to write event to Kafka",
"error", err,
"beacon_id", id,
"topic", "alertbeacons",
)
return err
}
slog.Debug("Successfully sent event to Kafka",
"beacon_id", id,
"topic", "alertbeacons",
)
return nil
}
// Line 64-74: Parser configuration
case msg := <-chParser:
slog.Info("Received parser configuration update",
"action", msg.ID,
"parser_name", msg.Name,
)
switch msg.ID {
case "add":
config := msg.Config
parserRegistry.Register(config.Name, config)
slog.Info("Registered new parser",
"parser_name", config.Name,
"type", config.Type,
)
case "delete":
parserRegistry.Unregister(msg.Name)
slog.Info("Unregistered parser",
"parser_name", msg.Name,
)
case "update":
config := msg.Config
parserRegistry.Register(config.Name, config)
slog.Info("Updated parser configuration",
"parser_name", config.Name,
"type", config.Type,
)
}
File: cmd/location/main.go
// Line 37-47: Service initialization
slog.Info("Initializing location service",
"kafka_url", cfg.KafkaURL,
"reader_topics", []string{"rawbeacons", "settings"},
"writer_topics", []string{"locevents"},
"algorithm", "filter",
)
// Line 60-68: Algorithm execution
case <-locTicker.C:
settings := appState.GetSettings()
slog.Debug("Running location algorithm",
"algorithm", settings.CurrentAlgorithm,
"beacon_count", len(appState.GetAllBeacons()),
)
switch settings.CurrentAlgorithm {
case "filter":
getLikelyLocations(appState, kafkaManager.GetWriter("locevents"))
case "ai":
slog.Warn("AI algorithm not yet implemented")
}
// Line 85-157: Location calculation
func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
beacons := appState.GetAllBeacons()
settings := appState.GetSettingsValue()
slog.Debug("Calculating likely locations",
"beacon_count", len(beacons),
"algorithm", "filter",
)
processed := 0
skipped := 0
updated := 0
for _, beacon := range beacons {
mSize := len(beacon.BeaconMetrics)
if mSize == 0 {
skipped++
continue
}
// Check if beacon is too old
if (int64(time.Now().Unix()) - beacon.BeaconMetrics[mSize-1].Timestamp) > settings.LastSeenThreshold {
slog.Debug("Beacon data too old",
"beacon_id", beacon.ID,
"last_seen", beacon.BeaconMetrics[mSize-1].Timestamp,
"threshold", settings.LastSeenThreshold,
)
skipped++
continue
}
// ... location calculation logic ...
locationChanged := bestLocName != beacon.PreviousLocation
if locationChanged {
slog.Info("Beacon location changed",
"beacon_id", beacon.ID,
"previous_location", beacon.PreviousLocation,
"new_location", bestLocName,
"confidence", beacon.LocationConfidence,
"distance", r.Distance,
)
updated++
}
appState.UpdateBeacon(beacon.ID, beacon)
processed++
// ... send to Kafka ...
}
slog.Info("Location calculation complete",
"processed", processed,
"updated", updated,
"skipped", skipped,
)
}
// Line 159-199: Assign beacon to list
func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) {
id := adv.ID
now := time.Now().Unix()
settings := appState.GetSettingsValue()
slog.Debug("Assigning beacon to tracking list",
"beacon_id", id,
"hostname", adv.Hostname,
"rssi", adv.RSSI,
)
if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
slog.Debug("Beacon RSSI below threshold, skipping",
"beacon_id", id,
"rssi", adv.RSSI,
"threshold", settings.RSSIMinThreshold,
)
return
}
beacon, ok := appState.GetBeacon(id)
if !ok {
slog.Debug("Creating new beacon entry",
"beacon_id", id,
)
beacon = model.Beacon{ID: id}
}
// ... update beacon metrics ...
slog.Debug("Updated beacon metrics",
"beacon_id", id,
"metrics_count", len(beacon.BeaconMetrics),
"distance", metric.Distance,
"location", metric.Location,
)
appState.UpdateBeacon(id, beacon)
}
case msg := <-chSettings:
slog.Info("Received settings update",
"settings", msg,
)
appState.UpdateSettings(msg)
slog.Info("Settings updated successfully",
"current_algorithm", appState.GetSettings().CurrentAlgorithm,
)
File: cmd/server/main.go
// Line 46-49: Database connection
db, err := database.Connect(cfg)
if err != nil {
slog.Error("Failed to connect to database",
"error", err,
"connection_string", cfg.DBConnectionString,
)
log.Fatalf("Failed to open database connection: %v\n", err)
}
slog.Info("Successfully connected to database")
// Line 60-89: Configuration loading
configFile, err := os.Open("/app/cmd/server/config.json")
if err != nil {
slog.Error("Failed to open configuration file",
"error", err,
"file_path", "/app/cmd/server/config.json",
)
panic(err)
}
slog.Info("Loading parser configurations",
"file_path", "/app/cmd/server/config.json",
)
var configs []model.Config
if err := json.Unmarshal(b, &configs); err != nil {
slog.Error("Failed to parse configuration file",
"error", err,
)
panic(err)
}
slog.Info("Loaded parser configurations",
"count", len(configs),
)
// Persist configs
for _, config := range configs {
if err := db.Create(&config).Error; err != nil {
slog.Warn("Failed to persist config to database",
"error", err,
"config_name", config.Name,
)
}
}
// Send to Kafka
for _, config := range configs {
kp := model.KafkaParser{
ID: "add",
Config: config,
}
if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil {
slog.Error("Failed to send parser config to Kafka",
"error", err,
"parser_name", config.Name,
)
} else {
slog.Info("Sent parser config to Kafka",
"parser_name", config.Name,
"type", config.Type,
)
}
}
// Line 87-89: API client initialization
if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil {
slog.Error("Failed to initialize API client",
"error", err,
)
fmt.Printf("Error in getting token: %v\n", err)
}
slog.Info("API client initialized successfully")
// Line 136-147: HTTP server
slog.Info("Starting HTTP server",
"address", cfg.HTTPAddr,
"cors_origins", "*",
)
go server.ListenAndServe()
slog.Info("HTTP server started",
"address", cfg.HTTPAddr,
)
// Line 153-181: Event loop
case msg := <-chLoc:
slog.Debug("Received location event",
"beacon_id", msg.ID,
"location", msg.Location,
"distance", msg.Distance,
)
service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx)
case msg := <-chEvents:
slog.Debug("Received beacon event",
"beacon_id", msg.ID,
"event_type", msg.Type,
"event", msg.Event,
)
id := msg.ID
if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
slog.Warn("Received event for untracked beacon",
"beacon_id", id,
"error", err,
)
continue
}
if err := db.Updates(&model.Tracker{
ID: id,
Battery: msg.Battery,
Temperature: msg.Temperature,
}).Error; err != nil {
slog.Error("Failed to update tracker metrics",
"error", err,
"beacon_id", id,
)
} else {
slog.Info("Updated tracker metrics",
"beacon_id", id,
"battery", msg.Battery,
"temperature", msg.Temperature,
)
}
case <-beaconTicker.C:
slog.Debug("Refreshing tracker list for MQTT")
var list []model.Tracker
db.Find(&list)
eMsg, err := json.Marshal(list)
if err != nil {
slog.Error("Failed to marshal tracker list",
"error", err,
"count", len(list),
)
continue
}
msg := kafka.Message{Value: eMsg}
if err := kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg); err != nil {
slog.Error("Failed to send tracker list to MQTT",
"error", err,
"count", len(list),
)
} else {
slog.Debug("Sent tracker list to MQTT",
"count", len(list),
)
}
// Line 184-199: Graceful shutdown
if err := server.Shutdown(context.Background()); err != nil {
slog.Error("Failed to shutdown HTTP server",
"error", err,
)
}
slog.Info("HTTP server stopped")
wg.Wait()
slog.Info("All goroutines stopped, closing Kafka connections")
kafkaManager.CleanKafkaReaders()
kafkaManager.CleanKafkaWriters()
slog.Info("All services shutdown complete")
File: internal/pkg/kafkaclient/consumer.go
// Line 12-35: Consumer function
func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
slog.Info("Starting Kafka consumer",
"topic", r.Config().Topic,
"group_id", r.Config().GroupID,
)
messageCount := 0
errorCount := 0
for {
select {
case <-ctx.Done():
slog.Info("Kafka consumer shutting down",
"topic", r.Config().Topic,
"messages_processed", messageCount,
"errors", errorCount,
)
return
default:
msg, err := r.ReadMessage(ctx)
if err != nil {
errorCount++
slog.Error("Error reading Kafka message",
"error", err,
"topic", r.Config().Topic,
"error_count", errorCount,
)
continue
}
var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
errorCount++
slog.Error("Error decoding Kafka message",
"error", err,
"topic", r.Config().Topic,
"message_length", len(msg.Value),
)
continue
}
messageCount++
if messageCount%100 == 0 {
slog.Debug("Kafka consumer progress",
"topic", r.Config().Topic,
"messages_processed", messageCount,
)
}
ch <- data
}
}
}
File: internal/pkg/kafkaclient/manager.go
// Line 38-52: Add Kafka writer
func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
slog.Debug("Creating Kafka writer",
"topic", topic,
"brokers", kafkaUrl,
)
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
m.kafkaWritersMap.KafkaWritersLock.Lock()
m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
m.kafkaWritersMap.KafkaWritersLock.Unlock()
slog.Info("Kafka writer created",
"topic", topic,
)
}
// Line 66-79: Add Kafka reader
func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
slog.Debug("Creating Kafka reader",
"topic", topic,
"group_id", groupID,
"brokers", kafkaUrl,
)
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})
m.kafkaReadersMap.KafkaReadersLock.Lock()
m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
m.kafkaReadersMap.KafkaReadersLock.Unlock()
slog.Info("Kafka reader created",
"topic", topic,
"group_id", groupID,
)
}
// Line 54-64: Clean writers
func (m *KafkaManager) CleanKafkaWriters() {
slog.Info("Shutting down Kafka writers",
"count", len(m.kafkaWritersMap.KafkaWriters),
)
m.kafkaWritersMap.KafkaWritersLock.Lock()
for topic, r := range m.kafkaWritersMap.KafkaWriters {
if err := r.Close(); err != nil {
slog.Error("Error closing Kafka writer",
"error", err,
"topic", topic,
)
} else {
slog.Info("Kafka writer closed",
"topic", topic,
)
}
}
m.kafkaWritersMap.KafkaWritersLock.Unlock()
slog.Info("Kafka writers shutdown complete")
}
// Line 81-90: Clean readers
func (m *KafkaManager) CleanKafkaReaders() {
slog.Info("Shutting down Kafka readers",
"count", len(m.kafkaReadersMap.KafkaReaders),
)
m.kafkaReadersMap.KafkaReadersLock.Lock()
for topic, r := range m.kafkaReadersMap.KafkaReaders {
if err := r.Close(); err != nil {
slog.Error("Error closing Kafka reader",
"error", err,
"topic", topic,
)
} else {
slog.Info("Kafka reader closed",
"topic", topic,
)
}
}
m.kafkaReadersMap.KafkaReadersLock.Unlock()
slog.Info("Kafka readers shutdown complete")
}
File: internal/pkg/apiclient/updatedb.go
// Line 19-74: Update database
func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error {
slog.Info("Initializing database from API",
"api_url", cfg.APIURL,
)
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
token, err := GetToken(ctx, cfg, client)
if err != nil {
slog.Error("Failed to get authentication token",
"error", err,
)
return err
}
slog.Info("Successfully authenticated with API")
// Sync trackers
if trackers, err := GetTrackers(token, client); err == nil {
slog.Info("Fetched trackers from API",
"count", len(trackers),
)
syncTable(db, trackers)
if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil {
slog.Error("Failed to send clear lookup message",
"error", err,
)
}
for _, v := range trackers {
apiUpdate := model.ApiUpdate{
Method: "POST",
ID: v.ID,
MAC: v.MAC,
}
if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
slog.Error("Failed to send tracker registration",
"error", err,
"tracker_id", v.ID,
)
} else {
slog.Debug("Sent tracker registration",
"tracker_id", v.ID,
"mac", v.MAC,
)
}
}
} else {
slog.Error("Failed to fetch trackers from API",
"error", err,
)
}
// Sync gateways
if gateways, err := GetGateways(token, client); err == nil {
slog.Info("Fetched gateways from API",
"count", len(gateways),
)
syncTable(db, gateways)
} else {
slog.Error("Failed to fetch gateways from API",
"error", err,
)
}
// Sync zones
if zones, err := GetZones(token, client); err == nil {
slog.Info("Fetched zones from API",
"count", len(zones),
)
syncTable(db, zones)
} else {
slog.Error("Failed to fetch zones from API",
"error", err,
)
}
// Sync tracker zones
if trackerZones, err := GetTrackerZones(token, client); err == nil {
slog.Info("Fetched tracker zones from API",
"count", len(trackerZones),
)
syncTable(db, trackerZones)
} else {
slog.Error("Failed to fetch tracker zones from API",
"error", err,
)
}
// Initialize settings
var settings model.Settings
db.First(&settings)
if settings.ID == 0 {
slog.Info("Initializing default settings")
db.Create(appState.GetSettings())
}
slog.Info("Database sync complete")
return nil
}
// Line 76-91: Sync table
func syncTable[T any](db *gorm.DB, data []T) {
if len(data) == 0 {
slog.Debug("No data to sync",
"type", fmt.Sprintf("%T", data),
)
return
}
slog.Debug("Syncing table",
"count", len(data),
"type", fmt.Sprintf("%T", data),
)
var ids []string
for _, item := range data {
v := reflect.ValueOf(item).FieldByName("ID").String()
ids = append(ids, v)
}
db.Transaction(func(tx *gorm.DB) error {
result := tx.Where("id NOT IN ?", ids).Delete(new(T))
if result.Error != nil {
slog.Error("Failed to delete stale records",
"error", result.Error,
"deleted_count", result.RowsAffected,
)
}
result = tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data)
if result.Error != nil {
slog.Error("Failed to upsert records",
"error", result.Error,
)
return result.Error
}
slog.Info("Table sync complete",
"upserted", len(data),
"deleted", result.RowsAffected,
)
return nil
})
}
File: internal/pkg/apiclient/auth.go
// Line 17-45: Get token
func GetToken(ctx context.Context, cfg *config.Config, client *http.Client) (string, error) {
slog.Debug("Requesting authentication token",
"url", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token",
)
formData := url.Values{}
formData.Set("grant_type", "password")
formData.Set("client_id", "Fastapi")
formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC")
formData.Set("username", "core")
formData.Set("password", "C0r3_us3r_Cr3d3nt14ls")
formData.Set("audience", "Fastapi")
req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode()))
if err != nil {
slog.Error("Failed to create auth request",
"error", err,
)
return "", err
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req = req.WithContext(ctx)
res, err := client.Do(req)
if err != nil {
slog.Error("Failed to send auth request",
"error", err,
)
return "", err
}
var j response
if err := json.NewDecoder(res.Body).Decode(&j); err != nil {
slog.Error("Failed to decode auth response",
"error", err,
"status_code", res.StatusCode,
)
return "", err
}
slog.Info("Successfully obtained authentication token",
"token_length", len(j.Token),
)
return j.Token, nil
}
File: internal/pkg/service/beacon_service.go
// Line 17-67: Location to beacon service
func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
if msg.ID == "" {
slog.Warn("Received location message with empty ID")
return
}
slog.Debug("Processing location message",
"beacon_id", msg.ID,
"location", msg.Location,
"distance", msg.Distance,
)
var zones []model.TrackerZones
if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil {
slog.Error("Failed to fetch tracker zones",
"error", err,
"beacon_id", msg.ID,
)
return
}
slog.Debug("Fetched tracker zones",
"beacon_id", msg.ID,
"zones_count", len(zones),
)
var allowedZones []string
for _, z := range zones {
allowedZones = append(allowedZones, z.ZoneList...)
}
var gw model.Gateway
mac := formatMac(msg.Location)
if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil {
slog.Warn("Gateway not found for location",
"mac", mac,
"beacon_id", msg.ID,
"error", err,
)
return
}
slog.Debug("Found gateway for location",
"gateway_id", gw.ID,
"mac", mac,
)
if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) {
alert := model.Alert{
ID: msg.ID,
Type: "Restricted zone",
Value: gw.ID,
}
slog.Warn("Beacon in restricted zone",
"beacon_id", msg.ID,
"gateway_id", gw.ID,
"allowed_zones", len(allowedZones),
)
eMsg, err := json.Marshal(alert)
if err != nil {
slog.Error("Failed to marshal alert",
"error", err,
"beacon_id", msg.ID,
)
} else {
msg := kafka.Message{Value: eMsg}
if err := writer.WriteMessages(ctx, msg); err != nil {
slog.Error("Failed to send alert to Kafka",
"error", err,
"beacon_id", msg.ID,
"alert_type", "Restricted zone",
)
} else {
slog.Info("Sent restricted zone alert",
"beacon_id", msg.ID,
"gateway_id", gw.ID,
)
}
}
}
// Save track
if err := db.Create(&model.Tracks{
UUID: msg.ID,
Timestamp: time.Now(),
Gateway: gw.ID,
GatewayMac: gw.MAC,
Tracker: msg.ID,
}).Error; err != nil {
slog.Error("Failed to save track",
"error", err,
"beacon_id", msg.ID,
"gateway_id", gw.ID,
)
return
}
slog.Debug("Saved track record",
"beacon_id", msg.ID,
"gateway_id", gw.ID,
)
// Update tracker
if err := db.Updates(&model.Tracker{
ID: msg.ID,
Location: gw.ID,
Distance: msg.Distance,
}).Error; err != nil {
slog.Error("Failed to update tracker location",
"error", err,
"beacon_id", msg.ID,
)
} else {
slog.Info("Updated tracker location",
"beacon_id", msg.ID,
"gateway_id", gw.ID,
"distance", msg.Distance,
)
}
}
Add these fields to your configuration structure:
// internal/pkg/config/config.go
type Config struct {
// ... existing fields ...
// Logging configuration
LogLevel string `json:"log_level" env:"LOG_LEVEL" default:"info"`
LogFormat string `json:"log_format" env:"LOG_FORMAT" default:"json"`
LogAddSource bool `json:"log_add_source" env:"LOG_ADD_SOURCE" default:"false"`
}
All errors across services:
jq 'select(.level == "error")' *.log
Beacon processing by ID:
jq 'select(.beacon_id == "beacon-123")' bridge.log decoder.log location.log
Kafka message flow:
jq 'select(.topic != null)' bridge.log decoder.log location.log server.log
Performance analysis:
jq 'select(.duration != null)' *.log
{"time":"2026-01-20T10:15:30.123456Z","level":"INFO","msg":"Processing beacon","service":"bridge","beacon_id":"beacon-001","rssi":-65,"hostname":"gateway-1"}
{"time":"2026-01-20T10:15:30.234567Z","level":"ERROR","msg":"Failed to write to Kafka","service":"bridge","error":"connection timeout","beacon_id":"beacon-001","topic":"rawbeacons"}
time=2026-01-20T10:15:30.123Z level=INFO msg="Processing beacon" service=bridge beacon_id=beacon-001 rssi=-65 hostname=gateway-1
time=2026-01-20T10:15:30.234Z level=ERROR msg="Failed to write to Kafka" service=bridge error="connection timeout" beacon_id=beacon-001 topic=rawbeacons
slog.Debug for high-frequency logs in production