diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index d9d80c0..81c4d89 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -5,6 +5,10 @@ import ( "context" "encoding/hex" "fmt" + "io" + "log" + "log/slog" + "os" "os/signal" "strings" "sync" @@ -25,6 +29,16 @@ func main() { appState := appcontext.NewAppState() cfg := config.Load() + // Create log file + logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v\n", err) + } + // shell and log file multiwriter + w := io.MultiWriter(os.Stderr, logFile) + logger := slog.New(slog.NewJSONHandler(w, nil)) + slog.SetDefault(logger) + // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() @@ -34,7 +48,7 @@ func main() { alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") - fmt.Println("Decoder initialized, subscribed to Kafka topics") + slog.Info("Decoder initialized, subscribed to Kafka topics") chRaw := make(chan model.BeaconAdvertisement, 2000) chApi := make(chan model.ApiUpdate, 200) @@ -55,18 +69,21 @@ eventloop: case "POST": id := msg.Beacon.ID appState.AddBeaconToLookup(id) + lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) + slog.Info(lMsg) case "DELETE": id := msg.Beacon.ID appState.RemoveBeaconFromLookup(id) - fmt.Println("Beacon removed from lookup: ", id) + lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) + slog.Info(lMsg) } } } - fmt.Println("broken out of the main event loop") + slog.Info("broken out of the main event loop") wg.Wait() - fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + slog.Info("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() } @@ -80,7 +97,8 @@ func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppStat err := decodeBeacon(adv, appState, writer) if err != nil { - fmt.Println("error in decoding") + eMsg := fmt.Sprintf("Error in decoding: %v", err) + fmt.Println(eMsg) return } } diff --git a/cmd/location/main.go b/cmd/location/main.go index 47cebf3..53842da 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -4,6 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io" + "log" + "log/slog" + "os" "os/signal" "sync" "syscall" @@ -24,6 +28,16 @@ func main() { appState := appcontext.NewAppState() cfg := config.Load() + // Create log file + logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v\n", err) + } + // shell and log file multiwriter + w := io.MultiWriter(os.Stderr, logFile) + logger := slog.New(slog.NewJSONHandler(w, nil)) + slog.SetDefault(logger) + // Define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() @@ -34,7 +48,7 @@ func main() { writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") - fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") + slog.Info("Locations algorithm initialized, subscribed to Kafka topics") locTicker := time.NewTicker(1 * time.Second) defer locTicker.Stop() @@ -61,22 +75,24 @@ eventLoop: switch msg.Method { case "POST": id := msg.Beacon.ID - fmt.Println("Beacon added to lookup: ", id) + lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) + slog.Info(lMsg) appState.AddBeaconToLookup(id) case "DELETE": id := msg.Beacon.ID appState.RemoveBeaconFromLookup(id) - fmt.Println("Beacon removed from lookup: ", id) + lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) + slog.Info(lMsg) } case msg := <-chSettings: appState.UpdateSettings(msg) } } - fmt.Println("broken out of the main event loop") + slog.Info("broken out of the main event loop") wg.Wait() - fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + slog.Info("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() } @@ -98,7 +114,7 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { mSize := len(beacon.BeaconMetrics) if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { - fmt.Println("Beacon is too old") + slog.Warn("beacon is too old") continue } @@ -133,7 +149,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { beacon.LocationConfidence = 0 // Why do I need this if I am sending entire structure anyways? who knows - fmt.Println("this is called") js, err := json.Marshal(model.LocationChange{ Method: "LocationChange", BeaconRef: beacon, @@ -144,7 +159,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { }) if err != nil { - fmt.Println("This error happens: ", err) + eMsg := fmt.Sprintf("Error in marshaling: %v", err) + slog.Error(eMsg) beacon.PreviousConfidentLocation = bestLocName beacon.PreviousLocation = bestLocName appState.UpdateBeacon(beacon.ID, beacon) @@ -166,7 +182,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { js, err := json.Marshal(r) if err != nil { - fmt.Println("Error in marshaling location: ", err) + eMsg := fmt.Sprintf("Error in marshaling location: %v", err) + slog.Error(eMsg) continue } @@ -176,7 +193,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { err = writer.WriteMessages(context.Background(), msg) if err != nil { - fmt.Println("Error in sending Kafka message: ", err) + eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err) + slog.Error(eMsg) } } } @@ -194,7 +212,7 @@ func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppS settings := appState.GetSettingsValue() if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { - fmt.Println("Settings returns") + slog.Info("Settings returns") return } diff --git a/cmd/server/main.go b/cmd/server/main.go index 58199b2..c845b84 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -4,8 +4,11 @@ import ( "context" "encoding/json" "fmt" + "io" "log" + "log/slog" "net/http" + "os" "os/signal" "strings" "sync" @@ -28,12 +31,24 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } +var _ io.Writer = (*os.File)(nil) + var wg sync.WaitGroup func main() { cfg := config.Load() appState := appcontext.NewAppState() + // Create log file + logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v\n", err) + } + // shell and log file multiwriter + w := io.MultiWriter(os.Stderr, logFile) + logger := slog.New(slog.NewJSONHandler(w, nil)) + slog.SetDefault(logger) + // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() @@ -44,12 +59,14 @@ func main() { writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") + slog.Info("Kafka writers topics: apibeacons, settings initialized") locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") + slog.Info("Kafka readers topics: locevents, alertbeacons initialized") client := appState.AddValkeyClient(cfg.ValkeyURL) - fmt.Println("Init of kafka writers and readers done") + slog.Info("Valkey DB client created") chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) @@ -60,9 +77,6 @@ func main() { r := mux.NewRouter() - fmt.Println("new print") - fmt.Println("new print") - // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") @@ -97,27 +111,35 @@ eventLoop: break eventLoop case msg := <-chLoc: if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { - fmt.Printf("Error in writing location change to beacon: %v\n", err) + eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err) + slog.Error(eMsg) } case msg := <-chEvents: if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { - fmt.Printf("Error in writing event change to beacon: %v\n", err) + eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err) + slog.Error(eMsg) } } } if err := server.Shutdown(context.Background()); err != nil { - fmt.Printf("could not shutdown: %v\n", err) + eMsg := fmt.Sprintf("could not shutdown: %v\n", err) + slog.Error(eMsg) } - fmt.Println("broken out of the main event loop and HTTP server shutdown") + slog.Info("API SERVER: \n") + slog.Warn("broken out of the main event loop and HTTP server shutdown\n") wg.Wait() - fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") appState.CleanKafkaReaders() appState.CleanKafkaWriters() - fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") + + slog.Info("All kafka clients shutdown, starting shutdown of valkey client") appState.CleanValkeyClient() + + slog.Info("API server shutting down") + logFile.Close() } func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { @@ -125,7 +147,8 @@ func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFun ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { - log.Println(err) + eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err) + slog.Error(eMsg) } return } @@ -147,7 +170,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte for { select { case <-ctx.Done(): - log.Println("WebSocket writer received shutdown signal.") + slog.Info("WebSocket writer received shutdown signal.") ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return @@ -180,10 +203,9 @@ func reader(ws *websocket.Conn, ctx context.Context) { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) for { - fmt.Println("this is called") select { case <-ctx.Done(): - fmt.Println("closing websocket reader") + slog.Info("closing ws reader") return default: _, _, err := ws.ReadMessage() diff --git a/internal/pkg/model/typeMethods.go b/internal/pkg/model/type_methods.go similarity index 100% rename from internal/pkg/model/typeMethods.go rename to internal/pkg/model/type_methods.go