package main import ( "context" "encoding/json" "fmt" "io" "log" "log/slog" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/AFASystems/presence/internal/pkg/apiclient" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/database" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/logger" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/service" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/segmentio/kafka-go" ) var _ io.Writer = (*os.File)(nil) var wg sync.WaitGroup func main() { cfg := config.Load() appState := appcontext.NewAppState() kafkaManager := kafkaclient.InitKafkaManager() // Set logger -> terminal and log file slog.SetDefault(logger.CreateLogger("server.log")) // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() db, err := database.Connect(cfg) if err != nil { log.Fatalf("Failed to open database connection: %v\n", err) } headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) writerTopics := []string{"apibeacons", "alert", "mqtt", "settings", "parser"} kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) slog.Info("Kafka writers topics: apibeacons, settings initialized") configFile, err := os.Open("/app/cmd/server/config.json") if err != nil { panic(err) } b, _ := io.ReadAll(configFile) var configs []model.Config json.Unmarshal(b, &configs) for _, config := range configs { // persist read configs in database db.Create(&config) } db.Find(&configs) for _, config := range configs { kp := model.KafkaParser{ ID: "add", Config: config, } if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil { fmt.Printf("Unable to send parser config to kafka broker %v\n", err) } } if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil { fmt.Printf("Error in getting token: %v\n", err) } readerTopics := []string{"locevents", "alertbeacons"} kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "server", readerTopics) slog.Info("Kafka readers topics: locevents, alertbeacons initialized") chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) wg.Add(2) go kafkaclient.Consume(kafkaManager.GetReader("locevents"), chLoc, ctx, &wg) go kafkaclient.Consume(kafkaManager.GetReader("alertbeacons"), chEvents, ctx, &wg) r := mux.NewRouter() r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(db)).Methods("GET") r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getZones", controller.ZoneListController(db)).Methods("GET") r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(db)).Methods("GET") r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(db)).Methods("POST") r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(db)).Methods("DELETE") r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT") r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET") r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("POST") r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, kafkaManager.GetWriter("apibeacons"), ctx)).Methods("DELETE") r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT") r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET") r.HandleFunc("/configs/beacons", controller.ParserAddController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("POST") r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("PUT") r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, kafkaManager.GetWriter("parser"), ctx)).Methods("DELETE") r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, kafkaManager.GetWriter("settings"), ctx)).Methods("PATCH") r.HandleFunc("/reslevis/settings", controller.SettingsListController(db)).Methods("GET") r.HandleFunc("/reslevis/getTracks/{id}", controller.TracksListController(db)).Methods("GET") beaconTicker := time.NewTicker(2 * time.Second) restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { restApiHandler.ServeHTTP(w, r) }) server := http.Server{ Addr: cfg.HTTPAddr, Handler: mainHandler, } go server.ListenAndServe() eventLoop: for { select { case <-ctx.Done(): break eventLoop case msg := <-chLoc: service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx) case msg := <-chEvents: fmt.Printf("event: %+v\n", msg) id := msg.ID if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil { fmt.Printf("Decoder event for untracked beacon: %s\n", id) continue } if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil { fmt.Printf("Error in saving decoder event for beacon: %s\n", id) continue } case <-beaconTicker.C: var list []model.Tracker db.Find(&list) eMsg, err := json.Marshal(list) if err != nil { fmt.Printf("Error in marshaling trackers list: %v\n", err) continue } msg := kafka.Message{ Value: eMsg, } kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg) } } if err := server.Shutdown(context.Background()); err != nil { eMsg := fmt.Sprintf("could not shutdown: %v\n", err) slog.Error(eMsg) } slog.Info("API SERVER: \n") slog.Warn("broken out of the main event loop and HTTP server shutdown\n") wg.Wait() slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") kafkaManager.CleanKafkaReaders() kafkaManager.CleanKafkaWriters() slog.Info("All kafka clients shutdown, starting shutdown of valkey client") slog.Info("API server shutting down") }