diff --git a/build/docker-compose.yaml b/build/docker-compose.yaml index 8b81a93..03baec3 100644 --- a/build/docker-compose.yaml +++ b/build/docker-compose.yaml @@ -47,4 +47,11 @@ services: - ./init-scripts/create_topic.sh:/tmp/create_topic.sh environment: - TOPIC_NAMES=topic1,topic2,topic3 + + valkey: + image: valkey/valkey:9.0.0 + container_name: valkey + ports: + - "127.0.0.1:6379:6379" + diff --git a/cmd/location/main.go b/cmd/location/main.go index 8089abb..dd13066 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -60,7 +60,6 @@ func main() { } func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { - fmt.Println("get likely locations called") beacons := appState.GetAllBeacons() settings := appState.GetSettingsValue() diff --git a/cmd/server/main.go b/cmd/server/main.go index cb2482a..c374ba2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" @@ -27,20 +28,71 @@ func main() { } func HttpServer(addr string) { + cfg := config.Load() + headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) // Kafka writer that relays messages - writer := kafkaclient.KafkaWriter("127.0.0.1:9092", "apibeacons") + writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons") defer writer.Close() - settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") + settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings") defer settingsWriter.Close() - // Define if maybe ws writer should have more topics - wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages") - defer wsWriter.Close() + // Kafka reader for Raw MQTT beacons + locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv") + defer locationReader.Close() + + // Kafka reader for API server updates + alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") + defer alertsReader.Close() + + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + Password: "", + }) + + ctx := context.Background() + + // Separate channel for location change? + chLoc := make(chan model.HTTPLocation, 200) + chEvents := make(chan model.BeaconEvent, 500) + + go kafkaclient.Consume(locationReader, chLoc) + go kafkaclient.Consume(alertsReader, chEvents) + + go func() { + for { + select { + case msg := <-chLoc: + key := fmt.Sprintf("beacon:%s", msg.ID) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + continue + } + err = client.HSet(ctx, key, hashM).Err() + if err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + continue + } + case msg := <-chEvents: + key := fmt.Sprintf("beacon:%s", msg.ID) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + continue + } + err = client.HSet(ctx, key, hashM).Err() + if err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + continue + } + } + } + }() r := mux.NewRouter() @@ -53,7 +105,7 @@ func HttpServer(addr string) { // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") - // r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") + r.HandleFunc("/api/beacons/{beacon_id}", beaconsListHandler(ctx, client)).Methods("GET") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") @@ -69,6 +121,30 @@ func HttpServer(addr string) { http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } +func beaconsListHandler(ctx context.Context, client *redis.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["beacon_id"] + key := fmt.Sprintf("beacon:%s", id) + beacon, err := client.HGetAll(ctx, key).Result() + if err != nil { + res := fmt.Sprintf("Error in getting beacon data (key: %s), error: %v", key, err) + fmt.Println(res) + http.Error(w, res, 500) + } + + fmt.Printf("%+v", beacon) + rData, err := json.Marshal(beacon) + if err != nil { + res := fmt.Sprintf("Error in marshaling beacon data (key: %s), error: %v", key, err) + fmt.Println(res) + http.Error(w, res, 500) + } + + w.Write(rData) + } +} + // Probably define value as interface and then reuse this writer in all of the functions func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { valueStr, err := json.Marshal(&value) @@ -145,15 +221,6 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { } } -// func beaconsListHandler(client *redis.Client) http.HandlerFunc { -// return func(w http.ResponseWriter, r *http.Request) { -// } -// } - -// func settingsListHandler(client *redis.Client) http.HandlerFunc { -// return func(w http.ResponseWriter, r *http.Request) {} -// } - func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) diff --git a/cmd/valkey-testbench/main.go b/cmd/valkey-testbench/main.go new file mode 100644 index 0000000..b08f7c9 --- /dev/null +++ b/cmd/valkey-testbench/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/redis/go-redis/v9" +) + +type Per struct { + Name string `json:"name"` + Age int `json:"age"` +} + +type Beacon struct { + ID string `json:"id"` // Use JSON tags to ensure correct field names + Type string `json:"type"` + Temp int `json:"temp"` + Name string `json:"name"` +} + +func ConvertStructToMap(obj any) (map[string]any, error) { + // 1. Marshal the struct into a JSON byte slice + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + // 2. Unmarshal the JSON byte slice into the map + var result map[string]any + err = json.Unmarshal(data, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +// func main() { ... } +// client.HSet(ctx, "beacon:123", resultMap).Err() + +func main() { + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + Password: "", + }) + + ctx := context.Background() + + err := client.Set(ctx, "testkey", "hello world", 0).Err() + if err != nil { + fmt.Println("Ok") + } + + val, err := client.Get(ctx, "testkey").Result() + if err != nil { + fmt.Println("Ok") + } + fmt.Println(val) + + b := Beacon{ + ID: "hello", + Type: "node", + Temp: 10, + Name: "Peter", + } + + per := Per{ + Name: "Janez", + Age: 10, + } + + bEncoded, err := ConvertStructToMap(b) + if err != nil { + fmt.Print("error\n") + } + + perEncoded, err := ConvertStructToMap(per) + if err != nil { + fmt.Print("error\n") + } + + err = client.HSet(ctx, "myhash", bEncoded).Err() + fmt.Println(err) + + res, _ := client.HGetAll(ctx, "myhash").Result() + fmt.Println(res) + + err = client.HSet(ctx, "myhash", perEncoded).Err() + fmt.Println(err) + + res, _ = client.HGetAll(ctx, "myhash").Result() + fmt.Println(res) +} diff --git a/internal/pkg/model/typeMethods.go b/internal/pkg/model/typeMethods.go index e4589bb..b6e54cd 100644 --- a/internal/pkg/model/typeMethods.go +++ b/internal/pkg/model/typeMethods.go @@ -23,3 +23,25 @@ func (b BeaconEvent) ToJSON() ([]byte, error) { } return eData, nil } + +func convertStructToMap(obj any) (map[string]any, error) { + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + return nil, err + } + + return result, nil +} + +func (loc *HTTPLocation) RedisHashable() (map[string]any, error) { + return convertStructToMap(*loc) +} + +func (be *BeaconEvent) RedisHashable() (map[string]any, error) { + return convertStructToMap(*be) +} diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index e151535..1fed8a2 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -2,8 +2,6 @@ package model import ( "sync" - - "github.com/boltdb/bolt" ) // Settings defines configuration parameters for presence detection behavior. @@ -120,6 +118,7 @@ type Beacon struct { HSBattery int64 `json:"hs_button_battery"` HSRandomNonce string `json:"hs_button_random"` HSButtonMode string `json:"hs_button_mode"` + Event int `json:"beacon_event"` } type BeaconEvent struct { @@ -198,9 +197,5 @@ type ApiUpdate struct { ID string } -var World = []byte("presence") - -var Db *bolt.DB - var HTTPHostPathPtr *string var HTTPWSHostPathPtr *string diff --git a/scripts/testAPI.sh b/scripts/testAPI.sh index cdb64c5..8cfe711 100755 --- a/scripts/testAPI.sh +++ b/scripts/testAPI.sh @@ -10,6 +10,5 @@ echo -e "\n" sleep 1 -echo "DELETE" -curl -s -X DELETE "$URL/$BEACON_ID" -echo -e "\n" +echo "GET beacon ID: $BEACON_ID" +curl -X GET $URL/$BEACON_ID \ No newline at end of file