package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os/signal" "sync" "syscall" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/service" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } var wg sync.WaitGroup func main() { cfg := config.Load() appState := appcontext.NewAppState() // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") client := appState.AddValkeyClient(cfg.ValkeyURL) // Need Lua script to pull all of the beacons in one go on init chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) wg.Add(2) go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) r := mux.NewRouter() // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST") r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT") r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx)) http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) eventLoop: for { select { case <-ctx.Done(): break eventLoop case msg := <-chLoc: if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { fmt.Printf("Error in writing location change to beacon: %v\n", err) } case msg := <-chEvents: if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { fmt.Printf("Error in writing event change to beacon: %v\n", err) } } } fmt.Println("broken out of the main event loop") wg.Wait() fmt.Println("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") appState.CleanValkeyClient() } func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { log.Println(err) } return } wg.Add(1) go writer(ws, appstate, ctx) reader(ws) } } func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) { pingTicker := time.NewTicker((60 * 9) / 10 * time.Second) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() wg.Done() }() for { select { case <-ctx.Done(): log.Println("WebSocket writer received shutdown signal.") ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return case <-beaconTicker.C: beacons := appstate.GetAllBeacons() js, err := json.Marshal(beacons) if err != nil { js = []byte("error") } ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, js); err != nil { return } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func reader(ws *websocket.Conn) { defer ws.Close() ws.SetReadLimit(512) ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) for { _, _, err := ws.ReadMessage() if err != nil { break } } }