| Autor | SHA1 | Zpráva | Datum |
|---|---|---|---|
|
|
55138a0f6c | chore: change to logging in file | před 1 dnem |
|
|
8c14b77ba8 | fix: websocket implementation not working because of the CORS headers | před 2 dny |
|
|
faa941e4bd | fix: adding graceful shutdown of the api server | před 2 dny |
|
|
d69952a36d | chore: add settings topic to init script | před 2 dny |
| @@ -54,4 +54,60 @@ services: | |||
| ports: | |||
| - "127.0.0.1:6379:6379" | |||
| presense-decoder: | |||
| build: | |||
| context: ../ | |||
| dockerfile: build/package/Dockerfile.decoder | |||
| image: presense-decoder | |||
| container_name: presense-decoder | |||
| environment: | |||
| - KAFKA_URL=kafka:29092 | |||
| depends_on: | |||
| - kafka-init | |||
| restart: always | |||
| presense-server: | |||
| build: | |||
| context: ../ | |||
| dockerfile: build/package/Dockerfile.server | |||
| image: presense-server | |||
| container_name: presense-server | |||
| environment: | |||
| - VALKEY_URL=valkey:6379 | |||
| - KAFKA_URL=kafka:29092 | |||
| ports: | |||
| - "127.0.0.1:1902:1902" | |||
| depends_on: | |||
| - kafka-init | |||
| - valkey | |||
| restart: always | |||
| presense-bridge: | |||
| build: | |||
| context: ../ | |||
| dockerfile: build/package/Dockerfile.bridge | |||
| image: presense-bridge | |||
| container_name: presense-bridge | |||
| environment: | |||
| - KAFKA_URL=kafka:29092 | |||
| - MQTT_HOST=192.168.1.101:1883 | |||
| - MQTT_USERNAME=user | |||
| - MQTT_PASSWORD=pass | |||
| depends_on: | |||
| - kafka-init | |||
| restart: always | |||
| presense-location: | |||
| build: | |||
| context: ../ | |||
| dockerfile: build/package/Dockerfile.location | |||
| image: presense-location | |||
| container_name: presense-location | |||
| environment: | |||
| - KAFKA_URL=kafka:29092 | |||
| depends_on: | |||
| - kafka-init | |||
| restart: always | |||
| @@ -15,7 +15,12 @@ | |||
| --create --if-not-exists --topic alertbeacons \ | |||
| --partitions 1 --replication-factor 1 | |||
| # create topic alertBeacons | |||
| # create topic locevents | |||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||
| --create --if-not-exists --topic locevents \ | |||
| --partitions 1 --replication-factor 1 | |||
| # create topic settings | |||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||
| --create --if-not-exists --topic settings \ | |||
| --partitions 1 --replication-factor 1 | |||
| @@ -5,6 +5,10 @@ import ( | |||
| "context" | |||
| "encoding/hex" | |||
| "fmt" | |||
| "io" | |||
| "log" | |||
| "log/slog" | |||
| "os" | |||
| "os/signal" | |||
| "strings" | |||
| "sync" | |||
| @@ -25,6 +29,16 @@ func main() { | |||
| appState := appcontext.NewAppState() | |||
| cfg := config.Load() | |||
| // Create log file | |||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||
| if err != nil { | |||
| log.Fatalf("Failed to open log file: %v\n", err) | |||
| } | |||
| // shell and log file multiwriter | |||
| w := io.MultiWriter(os.Stderr, logFile) | |||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||
| slog.SetDefault(logger) | |||
| // define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| @@ -34,7 +48,7 @@ func main() { | |||
| alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") | |||
| fmt.Println("Decoder initialized, subscribed to Kafka topics") | |||
| slog.Info("Decoder initialized, subscribed to Kafka topics") | |||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||
| chApi := make(chan model.ApiUpdate, 200) | |||
| @@ -55,18 +69,21 @@ eventloop: | |||
| case "POST": | |||
| id := msg.Beacon.ID | |||
| appState.AddBeaconToLookup(id) | |||
| lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) | |||
| slog.Info(lMsg) | |||
| case "DELETE": | |||
| id := msg.Beacon.ID | |||
| appState.RemoveBeaconFromLookup(id) | |||
| fmt.Println("Beacon removed from lookup: ", id) | |||
| lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) | |||
| slog.Info(lMsg) | |||
| } | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| slog.Info("broken out of the main event loop") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| } | |||
| @@ -80,7 +97,8 @@ func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppStat | |||
| err := decodeBeacon(adv, appState, writer) | |||
| if err != nil { | |||
| fmt.Println("error in decoding") | |||
| eMsg := fmt.Sprintf("Error in decoding: %v", err) | |||
| fmt.Println(eMsg) | |||
| return | |||
| } | |||
| } | |||
| @@ -4,6 +4,10 @@ import ( | |||
| "context" | |||
| "encoding/json" | |||
| "fmt" | |||
| "io" | |||
| "log" | |||
| "log/slog" | |||
| "os" | |||
| "os/signal" | |||
| "sync" | |||
| "syscall" | |||
| @@ -24,6 +28,16 @@ func main() { | |||
| appState := appcontext.NewAppState() | |||
| cfg := config.Load() | |||
| // Create log file | |||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||
| if err != nil { | |||
| log.Fatalf("Failed to open log file: %v\n", err) | |||
| } | |||
| // shell and log file multiwriter | |||
| w := io.MultiWriter(os.Stderr, logFile) | |||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||
| slog.SetDefault(logger) | |||
| // Define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| @@ -34,7 +48,7 @@ func main() { | |||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") | |||
| fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") | |||
| slog.Info("Locations algorithm initialized, subscribed to Kafka topics") | |||
| locTicker := time.NewTicker(1 * time.Second) | |||
| defer locTicker.Stop() | |||
| @@ -61,22 +75,24 @@ eventLoop: | |||
| switch msg.Method { | |||
| case "POST": | |||
| id := msg.Beacon.ID | |||
| fmt.Println("Beacon added to lookup: ", id) | |||
| lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) | |||
| slog.Info(lMsg) | |||
| appState.AddBeaconToLookup(id) | |||
| case "DELETE": | |||
| id := msg.Beacon.ID | |||
| appState.RemoveBeaconFromLookup(id) | |||
| fmt.Println("Beacon removed from lookup: ", id) | |||
| lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) | |||
| slog.Info(lMsg) | |||
| } | |||
| case msg := <-chSettings: | |||
| appState.UpdateSettings(msg) | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| slog.Info("broken out of the main event loop") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| } | |||
| @@ -98,7 +114,7 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| mSize := len(beacon.BeaconMetrics) | |||
| if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { | |||
| fmt.Println("Beacon is too old") | |||
| slog.Warn("beacon is too old") | |||
| continue | |||
| } | |||
| @@ -133,7 +149,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| beacon.LocationConfidence = 0 | |||
| // Why do I need this if I am sending entire structure anyways? who knows | |||
| fmt.Println("this is called") | |||
| js, err := json.Marshal(model.LocationChange{ | |||
| Method: "LocationChange", | |||
| BeaconRef: beacon, | |||
| @@ -144,7 +159,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| }) | |||
| if err != nil { | |||
| fmt.Println("This error happens: ", err) | |||
| eMsg := fmt.Sprintf("Error in marshaling: %v", err) | |||
| slog.Error(eMsg) | |||
| beacon.PreviousConfidentLocation = bestLocName | |||
| beacon.PreviousLocation = bestLocName | |||
| appState.UpdateBeacon(beacon.ID, beacon) | |||
| @@ -166,7 +182,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| js, err := json.Marshal(r) | |||
| if err != nil { | |||
| fmt.Println("Error in marshaling location: ", err) | |||
| eMsg := fmt.Sprintf("Error in marshaling location: %v", err) | |||
| slog.Error(eMsg) | |||
| continue | |||
| } | |||
| @@ -176,7 +193,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| err = writer.WriteMessages(context.Background(), msg) | |||
| if err != nil { | |||
| fmt.Println("Error in sending Kafka message: ", err) | |||
| eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err) | |||
| slog.Error(eMsg) | |||
| } | |||
| } | |||
| } | |||
| @@ -194,7 +212,7 @@ func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppS | |||
| settings := appState.GetSettingsValue() | |||
| if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { | |||
| fmt.Println("Settings returns") | |||
| slog.Info("Settings returns") | |||
| return | |||
| } | |||
| @@ -4,9 +4,13 @@ import ( | |||
| "context" | |||
| "encoding/json" | |||
| "fmt" | |||
| "io" | |||
| "log" | |||
| "log/slog" | |||
| "net/http" | |||
| "os" | |||
| "os/signal" | |||
| "strings" | |||
| "sync" | |||
| "syscall" | |||
| "time" | |||
| @@ -27,12 +31,24 @@ var upgrader = websocket.Upgrader{ | |||
| WriteBufferSize: 1024, | |||
| } | |||
| var _ io.Writer = (*os.File)(nil) | |||
| var wg sync.WaitGroup | |||
| func main() { | |||
| cfg := config.Load() | |||
| appState := appcontext.NewAppState() | |||
| // Create log file | |||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||
| if err != nil { | |||
| log.Fatalf("Failed to open log file: %v\n", err) | |||
| } | |||
| // shell and log file multiwriter | |||
| w := io.MultiWriter(os.Stderr, logFile) | |||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||
| slog.SetDefault(logger) | |||
| // define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| @@ -43,12 +59,14 @@ func main() { | |||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") | |||
| settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") | |||
| slog.Info("Kafka writers topics: apibeacons, settings initialized") | |||
| locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") | |||
| alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | |||
| slog.Info("Kafka readers topics: locevents, alertbeacons initialized") | |||
| client := appState.AddValkeyClient(cfg.ValkeyURL) | |||
| // Need Lua script to pull all of the beacons in one go on init | |||
| slog.Info("Valkey DB client created") | |||
| chLoc := make(chan model.HTTPLocation, 200) | |||
| chEvents := make(chan model.BeaconEvent, 500) | |||
| @@ -59,7 +77,6 @@ func main() { | |||
| r := mux.NewRouter() | |||
| // For now just add beacon DELETE / GET / POST / PUT methods | |||
| r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") | |||
| r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") | |||
| r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") | |||
| @@ -69,9 +86,23 @@ func main() { | |||
| r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") | |||
| r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") | |||
| r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx)) | |||
| wsHandler := http.HandlerFunc(serveWs(appState, ctx)) | |||
| restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) | |||
| mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |||
| if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") { | |||
| wsHandler.ServeHTTP(w, r) | |||
| return | |||
| } | |||
| restApiHandler.ServeHTTP(w, r) | |||
| }) | |||
| http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | |||
| server := http.Server{ | |||
| Addr: cfg.HTTPAddr, | |||
| Handler: mainHandler, | |||
| } | |||
| go server.ListenAndServe() | |||
| eventLoop: | |||
| for { | |||
| @@ -80,23 +111,35 @@ eventLoop: | |||
| break eventLoop | |||
| case msg := <-chLoc: | |||
| if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { | |||
| fmt.Printf("Error in writing location change to beacon: %v\n", err) | |||
| eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err) | |||
| slog.Error(eMsg) | |||
| } | |||
| case msg := <-chEvents: | |||
| if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { | |||
| fmt.Printf("Error in writing event change to beacon: %v\n", err) | |||
| eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err) | |||
| slog.Error(eMsg) | |||
| } | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| if err := server.Shutdown(context.Background()); err != nil { | |||
| eMsg := fmt.Sprintf("could not shutdown: %v\n", err) | |||
| slog.Error(eMsg) | |||
| } | |||
| slog.Info("API SERVER: \n") | |||
| slog.Warn("broken out of the main event loop and HTTP server shutdown\n") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") | |||
| slog.Info("All kafka clients shutdown, starting shutdown of valkey client") | |||
| appState.CleanValkeyClient() | |||
| slog.Info("API server shutting down") | |||
| logFile.Close() | |||
| } | |||
| func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { | |||
| @@ -104,13 +147,14 @@ func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFun | |||
| ws, err := upgrader.Upgrade(w, r, nil) | |||
| if err != nil { | |||
| if _, ok := err.(websocket.HandshakeError); !ok { | |||
| log.Println(err) | |||
| eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err) | |||
| slog.Error(eMsg) | |||
| } | |||
| return | |||
| } | |||
| wg.Add(1) | |||
| wg.Add(2) | |||
| go writer(ws, appstate, ctx) | |||
| reader(ws) | |||
| go reader(ws, ctx) | |||
| } | |||
| } | |||
| @@ -126,7 +170,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte | |||
| for { | |||
| select { | |||
| case <-ctx.Done(): | |||
| log.Println("WebSocket writer received shutdown signal.") | |||
| slog.Info("WebSocket writer received shutdown signal.") | |||
| ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) | |||
| ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | |||
| return | |||
| @@ -150,15 +194,24 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte | |||
| } | |||
| } | |||
| func reader(ws *websocket.Conn) { | |||
| defer ws.Close() | |||
| func reader(ws *websocket.Conn, ctx context.Context) { | |||
| defer func() { | |||
| ws.Close() | |||
| wg.Done() | |||
| }() | |||
| ws.SetReadLimit(512) | |||
| ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) | |||
| ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) | |||
| for { | |||
| _, _, err := ws.ReadMessage() | |||
| if err != nil { | |||
| break | |||
| select { | |||
| case <-ctx.Done(): | |||
| slog.Info("closing ws reader") | |||
| return | |||
| default: | |||
| _, _, err := ws.ReadMessage() | |||
| if err != nil { | |||
| return | |||
| } | |||
| } | |||
| } | |||
| } | |||
| @@ -87,24 +87,30 @@ func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.Handle | |||
| decoder := json.NewDecoder(r.Body) | |||
| var inBeacon model.Beacon | |||
| err := decoder.Decode(&inBeacon) | |||
| fmt.Printf("hello world\n") | |||
| if err != nil { | |||
| http.Error(w, err.Error(), 400) | |||
| return | |||
| } | |||
| fmt.Printf("hello world\n") | |||
| fmt.Printf("in beacon: %+v\n", inBeacon) | |||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { | |||
| http.Error(w, "name and beacon_id cannot be blank", 400) | |||
| return | |||
| } | |||
| fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) | |||
| fmt.Printf("Adding new print here also\n") | |||
| // fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) | |||
| apiUpdate := model.ApiUpdate{ | |||
| Method: "POST", | |||
| Beacon: inBeacon, | |||
| } | |||
| fmt.Printf("message: %+v\n", apiUpdate) | |||
| if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { | |||
| fmt.Println("error in sending Kafka POST message") | |||
| http.Error(w, "Error in sending kafka message", 500) | |||
| @@ -10,5 +10,6 @@ echo -e "\n" | |||
| sleep 1 | |||
| echo "GET beacon ID: $BEACON_ID" | |||
| curl -X GET $URL/$BEACON_ID | |||
| curl -X GET $URL | |||
| sleep 1 | |||