package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os/signal" "strings" "sync" "syscall" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/service" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } var wg sync.WaitGroup func main() { cfg := config.Load() appState := appcontext.NewAppState() // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") client := appState.AddValkeyClient(cfg.ValkeyURL) fmt.Println("Init of kafka writers and readers done") chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) wg.Add(2) go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) r := mux.NewRouter() fmt.Println("new print") fmt.Println("new print") // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST") r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT") r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") wsHandler := http.HandlerFunc(serveWs(appState, ctx)) restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") { wsHandler.ServeHTTP(w, r) return } restApiHandler.ServeHTTP(w, r) }) server := http.Server{ Addr: cfg.HTTPAddr, Handler: mainHandler, } go server.ListenAndServe() eventLoop: for { select { case <-ctx.Done(): break eventLoop case msg := <-chLoc: if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { fmt.Printf("Error in writing location change to beacon: %v\n", err) } case msg := <-chEvents: if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { fmt.Printf("Error in writing event change to beacon: %v\n", err) } } } if err := server.Shutdown(context.Background()); err != nil { fmt.Printf("could not shutdown: %v\n", err) } fmt.Println("broken out of the main event loop and HTTP server shutdown") wg.Wait() fmt.Println("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") appState.CleanValkeyClient() } func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { log.Println(err) } return } wg.Add(2) go writer(ws, appstate, ctx) go reader(ws, ctx) } } func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) { pingTicker := time.NewTicker((60 * 9) / 10 * time.Second) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() wg.Done() }() for { select { case <-ctx.Done(): log.Println("WebSocket writer received shutdown signal.") ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return case <-beaconTicker.C: beacons := appstate.GetAllBeacons() js, err := json.Marshal(beacons) if err != nil { js = []byte("error") } ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, js); err != nil { return } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func reader(ws *websocket.Conn, ctx context.Context) { defer func() { ws.Close() wg.Done() }() ws.SetReadLimit(512) ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) for { fmt.Println("this is called") select { case <-ctx.Done(): fmt.Println("closing websocket reader") return default: _, _, err := ws.ReadMessage() if err != nil { return } } } }