From 0674ca91126a0ec007a0ee6dc792d2fb0b436353 Mon Sep 17 00:00:00 2001 From: blazSenlab Date: Thu, 30 Oct 2025 13:45:23 +0100 Subject: [PATCH] feat: add kafka server connection to decoder, api server (beacons route only) --- cmd/decoder/main.go | 55 +++++++++++++--- cmd/server/main.go | 127 ++++++++++++++++++++++++++++++++++++ internal/pkg/model/types.go | 6 ++ 3 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 cmd/server/main.go diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index a3f6a93..af052af 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -71,19 +71,59 @@ func main() { // declare channel for collecting Kafka messages chRaw := make(chan model.Incoming_json, 2000) - chApi := make(chan model.Incoming_json, 2000) + chApi := make(chan model.ApiUpdate, 2000) chLatest := make(chan model.Incoming_json, 2000) go consume(rawReader, chRaw) go consume(apiReader, chApi) go consume(latestReader, chLatest) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for range ticker.C { + appCtx.Beacons.Lock.Lock() + data, _ := json.Marshal(appCtx.Beacons.Beacons) + appCtx.Beacons.Lock.Unlock() + + err := client.Set(ctx, "beaconsList", data, 0).Err() + if err != nil { + fmt.Println("error saving to redis:", err) + } + + appCtx.LatestList.Lock.Lock() + ldata, _ := json.Marshal(appCtx.LatestList.LatestList) + appCtx.LatestList.Lock.Unlock() + + err = client.Set(ctx, "latestList", ldata, 0).Err() + if err != nil { + fmt.Println("error saving latest list:", err) + } + } + }() + for { select { case msg := <-chRaw: processIncoming(msg, &appCtx) case msg := <-chApi: - fmt.Println("api msg: ", msg) + switch msg.Method { + case "POST": + fmt.Println("method POST") + appCtx.Beacons.Lock.Lock() + appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon + case "DELETE": + _, exists := appCtx.Beacons.Beacons[msg.ID] + if exists { + appCtx.Beacons.Lock.Lock() + delete(appCtx.Beacons.Beacons, msg.ID) + } + fmt.Println("method DELETE") + default: + fmt.Println("unknown method: ", msg.Method) + } + appCtx.Beacons.Lock.Unlock() case msg := <-chLatest: fmt.Println("latest msg: ", msg) } @@ -111,7 +151,7 @@ func kafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { }) } -func consume(r *kafka.Reader, ch chan<- model.Incoming_json) { +func consume[T any](r *kafka.Reader, ch chan<- T) { for { msg, err := r.ReadMessage(context.Background()) if err != nil { @@ -119,14 +159,13 @@ func consume(r *kafka.Reader, ch chan<- model.Incoming_json) { continue } - var incoming model.Incoming_json - - if err := json.Unmarshal(msg.Value, &incoming); err != nil { - fmt.Println("error in decoding string: ", err) + var data T + if err := json.Unmarshal(msg.Value, &data); err != nil { + fmt.Println("error decoding:", err) continue } - ch <- incoming + ch <- data } } diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..912ab5e --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,127 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/gorilla/handlers" + "github.com/gorilla/mux" + "github.com/redis/go-redis/v9" + "github.com/segmentio/kafka-go" +) + +func main() { + HttpServer("127.0.0.1:1902") +} + +func HttpServer(addr string) { + headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) + originsOk := handlers.AllowedOrigins([]string{"*"}) + methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) + + // Kafka writer that relays messages + writer := kafkaWriter("127.0.0.1:9092", "apibeacons") + defer writer.Close() + + r := mux.NewRouter() + + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + Password: "", + }) + + // For now just add beacon DELETE / GET / POST / PUT methods + r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") + r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") + r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") + r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") + http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) +} + +// All the functions should do is just relay messages to the decoder through Kafka + +func kafkaWriter(kafkaURL, topic string) *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(kafkaURL), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } +} + +func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) { + valueStr, err := json.Marshal(&value) + if err != nil { + fmt.Println("error in encoding: ", err) + } + msg := kafka.Message{ + Value: valueStr, + } + + err = writer.WriteMessages(context.Background(), msg) + if err != nil { + fmt.Println("Error in sending kafka message: ") + } +} + +func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + beaconId := vars["beacon_id"] + apiUpdate := model.ApiUpdate{ + Method: "DELETE", + ID: beaconId, + } + + sendKafkaMessage(writer, &apiUpdate) + w.Write([]byte("ok")) + } +} + +func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + var inBeacon model.Beacon + err := decoder.Decode(&inBeacon) + + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) { + http.Error(w, "name and beacon_id cannot be blank", 400) + return + } + + apiUpdate := model.ApiUpdate{ + Method: "POST", + Beacon: inBeacon, + } + + sendKafkaMessage(writer, &apiUpdate) + + w.Write([]byte("ok")) + } +} + +func beaconsListHandler(client *redis.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + beaconsList, err := client.Get(context.Background(), "beaconsList").Result() + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + http.Error(w, "list is empty", 500) + } else if err != nil { + http.Error(w, "Internal server error", 500) + panic(err) + } else { + w.Write([]byte(beaconsList)) + } + } +} diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index 604fc95..26a8007 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -193,6 +193,12 @@ type AppContext struct { LatestList LatestBeaconsList } +type ApiUpdate struct { + Method string + Beacon Beacon + ID string +} + var World = []byte("presence") var Db *bolt.DB