|
- package main
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "log/slog"
- "net/http"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/common/appcontext"
- "github.com/AFASystems/presence/internal/pkg/config"
- "github.com/AFASystems/presence/internal/pkg/controller"
- "github.com/AFASystems/presence/internal/pkg/kafkaclient"
- "github.com/AFASystems/presence/internal/pkg/model"
- "github.com/AFASystems/presence/internal/pkg/service"
- "github.com/gorilla/handlers"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- )
-
- var upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- }
-
- var _ io.Writer = (*os.File)(nil)
-
- var wg sync.WaitGroup
-
- func main() {
- cfg := config.Load()
- appState := appcontext.NewAppState()
-
- // Create log file
- logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
- if err != nil {
- log.Fatalf("Failed to open log file: %v\n", err)
- }
- // shell and log file multiwriter
- w := io.MultiWriter(os.Stderr, logFile)
- logger := slog.New(slog.NewJSONHandler(w, nil))
- slog.SetDefault(logger)
-
- // define context
- ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
- defer stop()
-
- headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
- originsOk := handlers.AllowedOrigins([]string{"*"})
- methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
-
- writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
- settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
- slog.Info("Kafka writers topics: apibeacons, settings initialized")
-
- locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
- alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
- slog.Info("Kafka readers topics: locevents, alertbeacons initialized")
-
- client := appState.AddValkeyClient(cfg.ValkeyURL)
- slog.Info("Valkey DB client created")
-
- chLoc := make(chan model.HTTPLocation, 200)
- chEvents := make(chan model.BeaconEvent, 500)
-
- wg.Add(2)
- go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
- go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
-
- r := mux.NewRouter()
-
- r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
- r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
- r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
- r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST")
- r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT")
-
- r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
- r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")
-
- wsHandler := http.HandlerFunc(serveWs(appState, ctx))
- restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
- mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
- wsHandler.ServeHTTP(w, r)
- return
- }
-
- restApiHandler.ServeHTTP(w, r)
- })
-
- server := http.Server{
- Addr: cfg.HTTPAddr,
- Handler: mainHandler,
- }
-
- go server.ListenAndServe()
-
- eventLoop:
- for {
- select {
- case <-ctx.Done():
- break eventLoop
- case msg := <-chLoc:
- if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
- eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
- slog.Error(eMsg)
- }
- case msg := <-chEvents:
- if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
- eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
- slog.Error(eMsg)
- }
- }
- }
-
- if err := server.Shutdown(context.Background()); err != nil {
- eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
- slog.Error(eMsg)
- }
-
- slog.Info("API SERVER: \n")
- slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
- wg.Wait()
-
- slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
- appState.CleanKafkaReaders()
- appState.CleanKafkaWriters()
-
- slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
- appState.CleanValkeyClient()
-
- slog.Info("API server shutting down")
- logFile.Close()
- }
-
- func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- if _, ok := err.(websocket.HandshakeError); !ok {
- eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
- slog.Error(eMsg)
- }
- return
- }
- wg.Add(2)
- go writer(ws, appstate, ctx)
- go reader(ws, ctx)
- }
- }
-
- func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
- pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
- beaconTicker := time.NewTicker(2 * time.Second)
- defer func() {
- pingTicker.Stop()
- beaconTicker.Stop()
- ws.Close()
- wg.Done()
- }()
- for {
- select {
- case <-ctx.Done():
- slog.Info("WebSocket writer received shutdown signal.")
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
- return
- case <-beaconTicker.C:
- beacons := appstate.GetAllBeacons()
- js, err := json.Marshal(beacons)
- if err != nil {
- js = []byte("error")
- }
-
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
- return
- }
- case <-pingTicker.C:
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- return
- }
- }
- }
- }
-
- func reader(ws *websocket.Conn, ctx context.Context) {
- defer func() {
- ws.Close()
- wg.Done()
- }()
- ws.SetReadLimit(512)
- ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
- ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
- for {
- select {
- case <-ctx.Done():
- slog.Info("closing ws reader")
- return
- default:
- _, _, err := ws.ReadMessage()
- if err != nil {
- return
- }
- }
- }
- }
|