| Autore | SHA1 | Messaggio | Data |
|---|---|---|---|
|
|
55138a0f6c | chore: change to logging in file | 2 giorni fa |
|
|
8c14b77ba8 | fix: websocket implementation not working because of the CORS headers | 3 giorni fa |
|
|
faa941e4bd | fix: adding graceful shutdown of the api server | 3 giorni fa |
|
|
d69952a36d | chore: add settings topic to init script | 3 giorni fa |
| @@ -54,4 +54,60 @@ services: | |||||
| ports: | ports: | ||||
| - "127.0.0.1:6379:6379" | - "127.0.0.1:6379:6379" | ||||
| presense-decoder: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.decoder | |||||
| image: presense-decoder | |||||
| container_name: presense-decoder | |||||
| environment: | |||||
| - KAFKA_URL=kafka:29092 | |||||
| depends_on: | |||||
| - kafka-init | |||||
| restart: always | |||||
| presense-server: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.server | |||||
| image: presense-server | |||||
| container_name: presense-server | |||||
| environment: | |||||
| - VALKEY_URL=valkey:6379 | |||||
| - KAFKA_URL=kafka:29092 | |||||
| ports: | |||||
| - "127.0.0.1:1902:1902" | |||||
| depends_on: | |||||
| - kafka-init | |||||
| - valkey | |||||
| restart: always | |||||
| presense-bridge: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.bridge | |||||
| image: presense-bridge | |||||
| container_name: presense-bridge | |||||
| environment: | |||||
| - KAFKA_URL=kafka:29092 | |||||
| - MQTT_HOST=192.168.1.101:1883 | |||||
| - MQTT_USERNAME=user | |||||
| - MQTT_PASSWORD=pass | |||||
| depends_on: | |||||
| - kafka-init | |||||
| restart: always | |||||
| presense-location: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.location | |||||
| image: presense-location | |||||
| container_name: presense-location | |||||
| environment: | |||||
| - KAFKA_URL=kafka:29092 | |||||
| depends_on: | |||||
| - kafka-init | |||||
| restart: always | |||||
| @@ -15,7 +15,12 @@ | |||||
| --create --if-not-exists --topic alertbeacons \ | --create --if-not-exists --topic alertbeacons \ | ||||
| --partitions 1 --replication-factor 1 | --partitions 1 --replication-factor 1 | ||||
| # create topic alertBeacons | |||||
| # create topic locevents | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | ||||
| --create --if-not-exists --topic locevents \ | --create --if-not-exists --topic locevents \ | ||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic settings | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic settings \ | |||||
| --partitions 1 --replication-factor 1 | --partitions 1 --replication-factor 1 | ||||
| @@ -5,6 +5,10 @@ import ( | |||||
| "context" | "context" | ||||
| "encoding/hex" | "encoding/hex" | ||||
| "fmt" | "fmt" | ||||
| "io" | |||||
| "log" | |||||
| "log/slog" | |||||
| "os" | |||||
| "os/signal" | "os/signal" | ||||
| "strings" | "strings" | ||||
| "sync" | "sync" | ||||
| @@ -25,6 +29,16 @@ func main() { | |||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| // Create log file | |||||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||||
| if err != nil { | |||||
| log.Fatalf("Failed to open log file: %v\n", err) | |||||
| } | |||||
| // shell and log file multiwriter | |||||
| w := io.MultiWriter(os.Stderr, logFile) | |||||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||||
| slog.SetDefault(logger) | |||||
| // define context | // define context | ||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| @@ -34,7 +48,7 @@ func main() { | |||||
| alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") | alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") | ||||
| fmt.Println("Decoder initialized, subscribed to Kafka topics") | |||||
| slog.Info("Decoder initialized, subscribed to Kafka topics") | |||||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | chRaw := make(chan model.BeaconAdvertisement, 2000) | ||||
| chApi := make(chan model.ApiUpdate, 200) | chApi := make(chan model.ApiUpdate, 200) | ||||
| @@ -55,18 +69,21 @@ eventloop: | |||||
| case "POST": | case "POST": | ||||
| id := msg.Beacon.ID | id := msg.Beacon.ID | ||||
| appState.AddBeaconToLookup(id) | appState.AddBeaconToLookup(id) | ||||
| lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) | |||||
| slog.Info(lMsg) | |||||
| case "DELETE": | case "DELETE": | ||||
| id := msg.Beacon.ID | id := msg.Beacon.ID | ||||
| appState.RemoveBeaconFromLookup(id) | appState.RemoveBeaconFromLookup(id) | ||||
| fmt.Println("Beacon removed from lookup: ", id) | |||||
| lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) | |||||
| slog.Info(lMsg) | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| fmt.Println("broken out of the main event loop") | |||||
| slog.Info("broken out of the main event loop") | |||||
| wg.Wait() | wg.Wait() | ||||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | |||||
| appState.CleanKafkaReaders() | appState.CleanKafkaReaders() | ||||
| appState.CleanKafkaWriters() | appState.CleanKafkaWriters() | ||||
| } | } | ||||
| @@ -80,7 +97,8 @@ func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppStat | |||||
| err := decodeBeacon(adv, appState, writer) | err := decodeBeacon(adv, appState, writer) | ||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("error in decoding") | |||||
| eMsg := fmt.Sprintf("Error in decoding: %v", err) | |||||
| fmt.Println(eMsg) | |||||
| return | return | ||||
| } | } | ||||
| } | } | ||||
| @@ -4,6 +4,10 @@ import ( | |||||
| "context" | "context" | ||||
| "encoding/json" | "encoding/json" | ||||
| "fmt" | "fmt" | ||||
| "io" | |||||
| "log" | |||||
| "log/slog" | |||||
| "os" | |||||
| "os/signal" | "os/signal" | ||||
| "sync" | "sync" | ||||
| "syscall" | "syscall" | ||||
| @@ -24,6 +28,16 @@ func main() { | |||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| // Create log file | |||||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||||
| if err != nil { | |||||
| log.Fatalf("Failed to open log file: %v\n", err) | |||||
| } | |||||
| // shell and log file multiwriter | |||||
| w := io.MultiWriter(os.Stderr, logFile) | |||||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||||
| slog.SetDefault(logger) | |||||
| // Define context | // Define context | ||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| @@ -34,7 +48,7 @@ func main() { | |||||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") | writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") | ||||
| fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") | |||||
| slog.Info("Locations algorithm initialized, subscribed to Kafka topics") | |||||
| locTicker := time.NewTicker(1 * time.Second) | locTicker := time.NewTicker(1 * time.Second) | ||||
| defer locTicker.Stop() | defer locTicker.Stop() | ||||
| @@ -61,22 +75,24 @@ eventLoop: | |||||
| switch msg.Method { | switch msg.Method { | ||||
| case "POST": | case "POST": | ||||
| id := msg.Beacon.ID | id := msg.Beacon.ID | ||||
| fmt.Println("Beacon added to lookup: ", id) | |||||
| lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) | |||||
| slog.Info(lMsg) | |||||
| appState.AddBeaconToLookup(id) | appState.AddBeaconToLookup(id) | ||||
| case "DELETE": | case "DELETE": | ||||
| id := msg.Beacon.ID | id := msg.Beacon.ID | ||||
| appState.RemoveBeaconFromLookup(id) | appState.RemoveBeaconFromLookup(id) | ||||
| fmt.Println("Beacon removed from lookup: ", id) | |||||
| lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) | |||||
| slog.Info(lMsg) | |||||
| } | } | ||||
| case msg := <-chSettings: | case msg := <-chSettings: | ||||
| appState.UpdateSettings(msg) | appState.UpdateSettings(msg) | ||||
| } | } | ||||
| } | } | ||||
| fmt.Println("broken out of the main event loop") | |||||
| slog.Info("broken out of the main event loop") | |||||
| wg.Wait() | wg.Wait() | ||||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections") | |||||
| appState.CleanKafkaReaders() | appState.CleanKafkaReaders() | ||||
| appState.CleanKafkaWriters() | appState.CleanKafkaWriters() | ||||
| } | } | ||||
| @@ -98,7 +114,7 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| mSize := len(beacon.BeaconMetrics) | mSize := len(beacon.BeaconMetrics) | ||||
| if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { | if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { | ||||
| fmt.Println("Beacon is too old") | |||||
| slog.Warn("beacon is too old") | |||||
| continue | continue | ||||
| } | } | ||||
| @@ -133,7 +149,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| beacon.LocationConfidence = 0 | beacon.LocationConfidence = 0 | ||||
| // Why do I need this if I am sending entire structure anyways? who knows | // Why do I need this if I am sending entire structure anyways? who knows | ||||
| fmt.Println("this is called") | |||||
| js, err := json.Marshal(model.LocationChange{ | js, err := json.Marshal(model.LocationChange{ | ||||
| Method: "LocationChange", | Method: "LocationChange", | ||||
| BeaconRef: beacon, | BeaconRef: beacon, | ||||
| @@ -144,7 +159,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("This error happens: ", err) | |||||
| eMsg := fmt.Sprintf("Error in marshaling: %v", err) | |||||
| slog.Error(eMsg) | |||||
| beacon.PreviousConfidentLocation = bestLocName | beacon.PreviousConfidentLocation = bestLocName | ||||
| beacon.PreviousLocation = bestLocName | beacon.PreviousLocation = bestLocName | ||||
| appState.UpdateBeacon(beacon.ID, beacon) | appState.UpdateBeacon(beacon.ID, beacon) | ||||
| @@ -166,7 +182,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| js, err := json.Marshal(r) | js, err := json.Marshal(r) | ||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("Error in marshaling location: ", err) | |||||
| eMsg := fmt.Sprintf("Error in marshaling location: %v", err) | |||||
| slog.Error(eMsg) | |||||
| continue | continue | ||||
| } | } | ||||
| @@ -176,7 +193,8 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| err = writer.WriteMessages(context.Background(), msg) | err = writer.WriteMessages(context.Background(), msg) | ||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("Error in sending Kafka message: ", err) | |||||
| eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err) | |||||
| slog.Error(eMsg) | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| @@ -194,7 +212,7 @@ func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppS | |||||
| settings := appState.GetSettingsValue() | settings := appState.GetSettingsValue() | ||||
| if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { | if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { | ||||
| fmt.Println("Settings returns") | |||||
| slog.Info("Settings returns") | |||||
| return | return | ||||
| } | } | ||||
| @@ -4,9 +4,13 @@ import ( | |||||
| "context" | "context" | ||||
| "encoding/json" | "encoding/json" | ||||
| "fmt" | "fmt" | ||||
| "io" | |||||
| "log" | "log" | ||||
| "log/slog" | |||||
| "net/http" | "net/http" | ||||
| "os" | |||||
| "os/signal" | "os/signal" | ||||
| "strings" | |||||
| "sync" | "sync" | ||||
| "syscall" | "syscall" | ||||
| "time" | "time" | ||||
| @@ -27,12 +31,24 @@ var upgrader = websocket.Upgrader{ | |||||
| WriteBufferSize: 1024, | WriteBufferSize: 1024, | ||||
| } | } | ||||
| var _ io.Writer = (*os.File)(nil) | |||||
| var wg sync.WaitGroup | var wg sync.WaitGroup | ||||
| func main() { | func main() { | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| // Create log file | |||||
| logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |||||
| if err != nil { | |||||
| log.Fatalf("Failed to open log file: %v\n", err) | |||||
| } | |||||
| // shell and log file multiwriter | |||||
| w := io.MultiWriter(os.Stderr, logFile) | |||||
| logger := slog.New(slog.NewJSONHandler(w, nil)) | |||||
| slog.SetDefault(logger) | |||||
| // define context | // define context | ||||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||||
| defer stop() | defer stop() | ||||
| @@ -43,12 +59,14 @@ func main() { | |||||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") | writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") | ||||
| settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") | settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") | ||||
| slog.Info("Kafka writers topics: apibeacons, settings initialized") | |||||
| locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") | locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") | ||||
| alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | ||||
| slog.Info("Kafka readers topics: locevents, alertbeacons initialized") | |||||
| client := appState.AddValkeyClient(cfg.ValkeyURL) | client := appState.AddValkeyClient(cfg.ValkeyURL) | ||||
| // Need Lua script to pull all of the beacons in one go on init | |||||
| slog.Info("Valkey DB client created") | |||||
| chLoc := make(chan model.HTTPLocation, 200) | chLoc := make(chan model.HTTPLocation, 200) | ||||
| chEvents := make(chan model.BeaconEvent, 500) | chEvents := make(chan model.BeaconEvent, 500) | ||||
| @@ -59,7 +77,6 @@ func main() { | |||||
| r := mux.NewRouter() | r := mux.NewRouter() | ||||
| // For now just add beacon DELETE / GET / POST / PUT methods | |||||
| r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") | r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE") | ||||
| r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") | r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") | ||||
| r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") | r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") | ||||
| @@ -69,9 +86,23 @@ func main() { | |||||
| r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") | r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") | ||||
| r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") | r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") | ||||
| r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx)) | |||||
| wsHandler := http.HandlerFunc(serveWs(appState, ctx)) | |||||
| restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) | |||||
| mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |||||
| if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") { | |||||
| wsHandler.ServeHTTP(w, r) | |||||
| return | |||||
| } | |||||
| restApiHandler.ServeHTTP(w, r) | |||||
| }) | |||||
| http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | |||||
| server := http.Server{ | |||||
| Addr: cfg.HTTPAddr, | |||||
| Handler: mainHandler, | |||||
| } | |||||
| go server.ListenAndServe() | |||||
| eventLoop: | eventLoop: | ||||
| for { | for { | ||||
| @@ -80,23 +111,35 @@ eventLoop: | |||||
| break eventLoop | break eventLoop | ||||
| case msg := <-chLoc: | case msg := <-chLoc: | ||||
| if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { | if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { | ||||
| fmt.Printf("Error in writing location change to beacon: %v\n", err) | |||||
| eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err) | |||||
| slog.Error(eMsg) | |||||
| } | } | ||||
| case msg := <-chEvents: | case msg := <-chEvents: | ||||
| if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { | if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { | ||||
| fmt.Printf("Error in writing event change to beacon: %v\n", err) | |||||
| eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err) | |||||
| slog.Error(eMsg) | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| fmt.Println("broken out of the main event loop") | |||||
| if err := server.Shutdown(context.Background()); err != nil { | |||||
| eMsg := fmt.Sprintf("could not shutdown: %v\n", err) | |||||
| slog.Error(eMsg) | |||||
| } | |||||
| slog.Info("API SERVER: \n") | |||||
| slog.Warn("broken out of the main event loop and HTTP server shutdown\n") | |||||
| wg.Wait() | wg.Wait() | ||||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||||
| slog.Info("All go routines have stopped, Beggining to close Kafka connections\n") | |||||
| appState.CleanKafkaReaders() | appState.CleanKafkaReaders() | ||||
| appState.CleanKafkaWriters() | appState.CleanKafkaWriters() | ||||
| fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") | |||||
| slog.Info("All kafka clients shutdown, starting shutdown of valkey client") | |||||
| appState.CleanValkeyClient() | appState.CleanValkeyClient() | ||||
| slog.Info("API server shutting down") | |||||
| logFile.Close() | |||||
| } | } | ||||
| func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { | func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc { | ||||
| @@ -104,13 +147,14 @@ func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFun | |||||
| ws, err := upgrader.Upgrade(w, r, nil) | ws, err := upgrader.Upgrade(w, r, nil) | ||||
| if err != nil { | if err != nil { | ||||
| if _, ok := err.(websocket.HandshakeError); !ok { | if _, ok := err.(websocket.HandshakeError); !ok { | ||||
| log.Println(err) | |||||
| eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err) | |||||
| slog.Error(eMsg) | |||||
| } | } | ||||
| return | return | ||||
| } | } | ||||
| wg.Add(1) | |||||
| wg.Add(2) | |||||
| go writer(ws, appstate, ctx) | go writer(ws, appstate, ctx) | ||||
| reader(ws) | |||||
| go reader(ws, ctx) | |||||
| } | } | ||||
| } | } | ||||
| @@ -126,7 +170,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte | |||||
| for { | for { | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| log.Println("WebSocket writer received shutdown signal.") | |||||
| slog.Info("WebSocket writer received shutdown signal.") | |||||
| ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) | ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) | ||||
| ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | ||||
| return | return | ||||
| @@ -150,15 +194,24 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte | |||||
| } | } | ||||
| } | } | ||||
| func reader(ws *websocket.Conn) { | |||||
| defer ws.Close() | |||||
| func reader(ws *websocket.Conn, ctx context.Context) { | |||||
| defer func() { | |||||
| ws.Close() | |||||
| wg.Done() | |||||
| }() | |||||
| ws.SetReadLimit(512) | ws.SetReadLimit(512) | ||||
| ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) | ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)) | ||||
| ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) | ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil }) | ||||
| for { | for { | ||||
| _, _, err := ws.ReadMessage() | |||||
| if err != nil { | |||||
| break | |||||
| select { | |||||
| case <-ctx.Done(): | |||||
| slog.Info("closing ws reader") | |||||
| return | |||||
| default: | |||||
| _, _, err := ws.ReadMessage() | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| @@ -87,24 +87,30 @@ func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.Handle | |||||
| decoder := json.NewDecoder(r.Body) | decoder := json.NewDecoder(r.Body) | ||||
| var inBeacon model.Beacon | var inBeacon model.Beacon | ||||
| err := decoder.Decode(&inBeacon) | err := decoder.Decode(&inBeacon) | ||||
| fmt.Printf("hello world\n") | |||||
| if err != nil { | if err != nil { | ||||
| http.Error(w, err.Error(), 400) | http.Error(w, err.Error(), 400) | ||||
| return | return | ||||
| } | } | ||||
| fmt.Printf("hello world\n") | |||||
| fmt.Printf("in beacon: %+v\n", inBeacon) | |||||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { | if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { | ||||
| http.Error(w, "name and beacon_id cannot be blank", 400) | http.Error(w, "name and beacon_id cannot be blank", 400) | ||||
| return | return | ||||
| } | } | ||||
| fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) | |||||
| fmt.Printf("Adding new print here also\n") | |||||
| // fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) | |||||
| apiUpdate := model.ApiUpdate{ | apiUpdate := model.ApiUpdate{ | ||||
| Method: "POST", | Method: "POST", | ||||
| Beacon: inBeacon, | Beacon: inBeacon, | ||||
| } | } | ||||
| fmt.Printf("message: %+v\n", apiUpdate) | |||||
| if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { | if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { | ||||
| fmt.Println("error in sending Kafka POST message") | fmt.Println("error in sending Kafka POST message") | ||||
| http.Error(w, "Error in sending kafka message", 500) | http.Error(w, "Error in sending kafka message", 500) | ||||
| @@ -10,5 +10,6 @@ echo -e "\n" | |||||
| sleep 1 | sleep 1 | ||||
| echo "GET beacon ID: $BEACON_ID" | |||||
| curl -X GET $URL/$BEACON_ID | |||||
| curl -X GET $URL | |||||
| sleep 1 | |||||