diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index b5d805e..f951b5a 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -56,7 +56,9 @@ eventloop: id := msg.Beacon.ID appState.AddBeaconToLookup(id) case "DELETE": - fmt.Println("Incoming delete message") + id := msg.Beacon.ID + appState.RemoveBeaconFromLookup(id) + fmt.Println("Beacon removed from lookup: ", id) } } } diff --git a/cmd/location/main.go b/cmd/location/main.go index 69669d5..ebd8195 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -61,7 +61,9 @@ eventLoop: fmt.Println("Beacon added to lookup: ", id) appState.AddBeaconToLookup(id) case "DELETE": - fmt.Println("Incoming delete message") + id := msg.Beacon.ID + appState.RemoveBeaconFromLookup(id) + fmt.Println("Beacon removed from lookup: ", id) } } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 3105fc5..5444e6e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -59,29 +59,16 @@ func main() { r := mux.NewRouter() - // declare WS clients list | do I need it though? or will locations worker send message - // to kafka and then only this service (server) is being used for communication with the clients - clients := make(map[*websocket.Conn]bool) - - // Declare broadcast channel - broadcast := make(chan model.Message) - // For now just add beacon DELETE / GET / POST / PUT methods - r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") - r.HandleFunc("/api/beacons", beaconsListHandler(appState)).Methods("GET") - r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleHandler(appState)).Methods("GET") - r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") - r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") + r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer, ctx)).Methods("DELETE") + r.HandleFunc("/api/beacons", beaconsListController(appState)).Methods("GET") + r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleController(appState)).Methods("GET") + r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("POST") + r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("PUT") // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") - // Handler for WS messages - // No point in having seperate route for each message type, better to handle different message types in one connection - // r.HandleFunc("/ws/api/beacons", serveWs(client)) - // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) - r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) - http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r)) eventLoop: @@ -101,18 +88,8 @@ eventLoop: beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation appState.UpdateBeacon(msg.ID, beacon) } - key := fmt.Sprintf("beacon:%s", msg.ID) - hashM, err := msg.RedisHashable() - if err != nil { - fmt.Println("Error in converting location into hashmap for Redis insert: ", err) - continue - } - if err := client.HSet(ctx, key, hashM).Err(); err != nil { - fmt.Println("Error in persisting set in Redis key: ", key) - continue - } - if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { - fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil { + fmt.Printf("Error in persisting change location beacon: %v", err) } case msg := <-chEvents: beacon, ok := appState.GetBeacon(msg.ID) @@ -125,18 +102,8 @@ eventLoop: beacon.Event = msg.Event appState.UpdateBeacon(msg.ID, beacon) } - key := fmt.Sprintf("beacon:%s", msg.ID) - hashM, err := msg.RedisHashable() - if err != nil { - fmt.Println("Error in converting location into hashmap for Redis insert: ", err) - continue - } - if err := client.HSet(ctx, key, hashM).Err(); err != nil { - fmt.Println("Error in persisting set in Redis key: ", key) - continue - } - if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { - fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil { + fmt.Printf("Error in persisting change event beacon: %v", err) } } } @@ -151,7 +118,31 @@ eventLoop: appState.CleanValkeyClient() } -func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { +type RedisHashable interface { + RedisHashable() (map[string]interface{}, error) + model.BeaconEvent | model.HTTPLocation +} + +func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error { + key := fmt.Sprintf("beacon:%s", id) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + return err + } + if err := client.HSet(ctx, key, hashM).Err(); err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + return err + } + if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { + fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + return err + } + return nil +} + +// Makes no sense to define service as there is no actual business logic +func beaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["beacon_id"] @@ -169,7 +160,7 @@ func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { } } -func beaconsListHandler(appstate *appcontext.AppState) http.HandlerFunc { +func beaconsListController(appstate *appcontext.AppState) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { beacons := appstate.GetAllBeacons() w.Header().Set("Content-Type", "application/json") @@ -179,26 +170,25 @@ func beaconsListHandler(appstate *appcontext.AppState) http.HandlerFunc { } // Probably define value as interface and then reuse this writer in all of the functions -func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { +func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error { valueStr, err := json.Marshal(&value) if err != nil { fmt.Println("error in encoding: ", err) - return false + return err } msg := kafka.Message{ Value: valueStr, } - err = writer.WriteMessages(context.Background(), msg) - if err != nil { - fmt.Println("Error in sending kafka message: ") - return false + if err := writer.WriteMessages(ctx, msg); err != nil { + fmt.Println("Error in sending kafka message: ", err) + return err } - return true + return nil } -func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { +func beaconsDeleteHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) beaconId := vars["beacon_id"] @@ -209,9 +199,8 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { fmt.Println("Sending DELETE message") - flag := sendKafkaMessage(writer, &apiUpdate) - if !flag { - fmt.Println("error in sending Kafka message") + if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { + fmt.Println("error in sending Kafka DELETE message") http.Error(w, "Error in sending kafka message", 500) return } @@ -220,7 +209,7 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { } } -func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { +func beaconsAddHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inBeacon model.Beacon @@ -243,9 +232,8 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { Beacon: inBeacon, } - flag := sendKafkaMessage(writer, &apiUpdate) - if !flag { - fmt.Println("error in sending Kafka message") + if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { + fmt.Println("error in sending Kafka POST message") http.Error(w, "Error in sending kafka message", 500) return } diff --git a/internal/pkg/model/typeMethods.go b/internal/pkg/model/typeMethods.go index b6e54cd..df85427 100644 --- a/internal/pkg/model/typeMethods.go +++ b/internal/pkg/model/typeMethods.go @@ -38,10 +38,10 @@ func convertStructToMap(obj any) (map[string]any, error) { return result, nil } -func (loc *HTTPLocation) RedisHashable() (map[string]any, error) { - return convertStructToMap(*loc) +func (loc HTTPLocation) RedisHashable() (map[string]any, error) { + return convertStructToMap(loc) } -func (be *BeaconEvent) RedisHashable() (map[string]any, error) { - return convertStructToMap(*be) +func (be BeaconEvent) RedisHashable() (map[string]any, error) { + return convertStructToMap(be) }