package main import ( "context" "encoding/json" "fmt" "io" "log" "log/slog" "net/http" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/AFASystems/presence/internal/pkg/apiclient" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/database" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/service" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } var _ io.Writer = (*os.File)(nil) var wg sync.WaitGroup func main() { cfg := config.Load() appState := appcontext.NewAppState() // Create log file logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalf("Failed to open log file: %v\n", err) } // shell and log file multiwriter w := io.MultiWriter(os.Stderr, logFile) logger := slog.New(slog.NewJSONHandler(w, nil)) slog.SetDefault(logger) // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() db, err := database.Connect(cfg) if err != nil { log.Fatalf("Failed to open database connection: %v\n", err) } headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert") slog.Info("Kafka writers topics: apibeacons, settings initialized") if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil { fmt.Printf("Error in getting token: %v\n", err) } locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") slog.Info("Kafka readers topics: locevents, alertbeacons initialized") chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) wg.Add(2) go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) r := mux.NewRouter() r.HandleFunc("/api/settings", controller.SettingsListController(appState, ctx)).Methods("GET") r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, ctx)).Methods("POST") r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(db)).Methods("GET") r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getZones", controller.ZoneListController(db)).Methods("GET") r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(db)).Methods("GET") r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET") r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST") r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE") r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT") wsHandler := http.HandlerFunc(serveWs(appState, ctx)) restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") { wsHandler.ServeHTTP(w, r) return } restApiHandler.ServeHTTP(w, r) }) server := http.Server{ Addr: cfg.HTTPAddr, Handler: mainHandler, } go server.ListenAndServe() eventLoop: for { select { case <-ctx.Done(): break eventLoop case msg := <-chLoc: service.LocationToBeaconService(msg, db, alertWriter, ctx) case msg := <-chEvents: fmt.Printf("event: %+v\n", msg) id := msg.ID if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil { fmt.Printf("Decoder event for untracked beacon: %s\n", id) continue } if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil { fmt.Printf("Error in saving decoder event for beacon: %s\n", id) continue } } } if err := server.Shutdown(context.Background()); err != nil { eMsg := fmt.Sprintf("could not shutdown: %v\n", err) slog.Error(eMsg) } slog.Info("API SERVER: \n") slog.Warn("broken out of the main event loop and HTTP server shutdown\n") wg.Wait() slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") appState.CleanKafkaReaders() appState.CleanKafkaWriters() slog.Info("All kafka clients shutdown, starting shutdown of valkey client") slog.Info("API server shutting down") logFile.Close() } func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err) slog.Error(eMsg) } return } wg.Add(2) go writer(ws, appstate, ctx) go reader(ws, ctx) } } func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) { pingTicker := time.NewTicker((60 * 9) / 10 * time.Second) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() wg.Done() }() for { select { case <-ctx.Done(): slog.Info("WebSocket writer received shutdown signal.") ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return case <-beaconTicker.C: beacons := appstate.GetAllHttpResults() js, err := json.Marshal(beacons) if err != nil { js = []byte("error") } ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, js); err != nil { return } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func reader(ws *websocket.Conn, ctx context.Context) { defer func() { ws.Close() wg.Done() }() ws.SetReadLimit(512) ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) for { select { case <-ctx.Done(): slog.Info("closing ws reader") return default: _, _, err := ws.ReadMessage() if err != nil { return } } } }