diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 6a1782b..0bcfb22 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -5,7 +5,7 @@ import ( "github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler" "github.com/AFASystems/presence/internal/pkg/config" - "github.com/AFASystems/presence/internal/pkg/kafka" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" ) @@ -35,7 +35,7 @@ func main() { fmt.Println("Successfuly connected to MQTT broker") - writer := kafka.KafkaWriter(cfg.KafkaURL, "rawbeacons") + writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons") defer writer.Close() err = cli.Subscribe(&client.SubscribeOptions{ diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index 8aee360..e864793 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -8,7 +8,7 @@ import ( "time" "github.com/AFASystems/presence/internal/pkg/config" - "github.com/AFASystems/presence/internal/pkg/kafka" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/mqttclient" presenseredis "github.com/AFASystems/presence/internal/pkg/redis" @@ -36,26 +36,36 @@ func main() { LatestList: model.LatestBeaconsList{ LatestList: make(map[string]model.Beacon), }, + Settings: model.Settings{ + Location_confidence: 4, + Last_seen_threshold: 15, + Beacon_metrics_size: 30, + HA_send_interval: 5, + HA_send_changes_only: false, + }, } cfg := config.Load() // Kafka writer idk why yet - writer := kafka.KafkaWriter(cfg.KafkaURL, "beacons") + writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons") + defer writer.Close() // Kafka reader for Raw MQTT beacons - rawReader := kafka.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID") + rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID") defer rawReader.Close() // Kafka reader for API server updates - apiReader := kafka.KafkaReader(cfg.KafkaURL, "apibeacons", "someID") + apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID") defer apiReader.Close() // Kafka reader for latest list updates - latestReader := kafka.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID") + latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID") defer latestReader.Close() - defer writer.Close() + // Kafka reader for settings updates + settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID") + defer settingsReader.Close() ctx := context.Background() @@ -71,14 +81,19 @@ func main() { latestList := presenseredis.LoadLatestList(client, ctx) appCtx.LatestList.LatestList = latestList + settings := presenseredis.LoadSettings(client, ctx) + appCtx.Settings = settings + // declare channel for collecting Kafka messages chRaw := make(chan model.Incoming_json, 2000) chApi := make(chan model.ApiUpdate, 2000) chLatest := make(chan model.Incoming_json, 2000) + chSettings := make(chan model.Settings, 10) - go kafka.Consume(rawReader, chRaw) - go kafka.Consume(apiReader, chApi) - go kafka.Consume(latestReader, chLatest) + go kafkaclient.Consume(rawReader, chRaw) + go kafkaclient.Consume(apiReader, chApi) + go kafkaclient.Consume(latestReader, chLatest) + go kafkaclient.Consume(settingsReader, chSettings) go func() { // Syncing Redis cache every 1s with 2 lists: beacons, latest list @@ -112,6 +127,8 @@ func main() { appCtx.Beacons.Lock.Unlock() case msg := <-chLatest: fmt.Println("latest msg: ", msg) + case msg := <-chSettings: + fmt.Println("settings channel: ", msg) } } } diff --git a/cmd/server/main.go b/cmd/server/main.go index f52b806..52e913e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,8 +6,8 @@ import ( "fmt" "net/http" "strings" - "time" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -25,9 +25,12 @@ func HttpServer(addr string) { methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) // Kafka writer that relays messages - writer := kafkaWriter("kafka:9092", "apibeacons") + writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons") defer writer.Close() + settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") + defer settingsWriter.Close() + r := mux.NewRouter() client := redis.NewClient(&redis.Options{ @@ -40,25 +43,20 @@ func HttpServer(addr string) { r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") - http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) -} -// All the functions should do is just relay messages to the decoder through Kafka + r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") + r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") -func kafkaWriter(kafkaURL, topic string) *kafka.Writer { - return &kafka.Writer{ - Addr: kafka.TCP(kafkaURL), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - BatchSize: 100, - BatchTimeout: 10 * time.Millisecond, - } + http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } -func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) { +// This looks wrong, should handle error somehow + +func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { valueStr, err := json.Marshal(&value) if err != nil { fmt.Println("error in encoding: ", err) + return false } msg := kafka.Message{ Value: valueStr, @@ -67,7 +65,10 @@ func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) { err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending kafka message: ") + return false } + + return true } func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { @@ -79,7 +80,13 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { ID: beaconId, } - sendKafkaMessage(writer, &apiUpdate) + flag := sendKafkaMessage(writer, &apiUpdate) + if !flag { + fmt.Println("error in sending Kafka message") + http.Error(w, "Error in sending kafka message", 500) + return + } + w.Write([]byte("ok")) } } @@ -105,7 +112,12 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { Beacon: inBeacon, } - sendKafkaMessage(writer, &apiUpdate) + flag := sendKafkaMessage(writer, &apiUpdate) + if !flag { + fmt.Println("error in sending Kafka message") + http.Error(w, "Error in sending kafka message", 500) + return + } w.Write([]byte("ok")) } @@ -125,3 +137,63 @@ func beaconsListHandler(client *redis.Client) http.HandlerFunc { } } } + +func settingsListHandler(client *redis.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + settings, err := client.Get(context.Background(), "settings").Result() + if err == redis.Nil { + fmt.Println("no settings persisted, starting empty") + http.Error(w, "list is empty", 500) + } else if err != nil { + http.Error(w, "Internal server error", 500) + panic(err) + } else { + w.Write([]byte(settings)) + } + } +} + +func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + var inSettings model.Settings + if err := decoder.Decode(&inSettings); err != nil { + http.Error(w, err.Error(), 400) + fmt.Println("Error in decoding Settings body: ", err) + return + } + + if !settingsCheck(inSettings) { + http.Error(w, "values must be greater than 0", 400) + fmt.Println("settings values must be greater than 0") + return + } + + valueStr, err := json.Marshal(&inSettings) + if err != nil { + http.Error(w, "Error in encoding settings", 500) + fmt.Println("Error in encoding settings: ", err) + return + } + + msg := kafka.Message{ + Value: valueStr, + } + + if err := writer.WriteMessages(context.Background(), msg); err != nil { + fmt.Println("error in sending Kafka message") + http.Error(w, "Error in sending kafka message", 500) + return + } + + w.Write([]byte("ok")) + } +} + +func settingsCheck(settings model.Settings) bool { + if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 { + return false + } + + return true +} diff --git a/internal/pkg/redis/redis.go b/internal/pkg/redis/redis.go index c8e4c63..fb7e906 100644 --- a/internal/pkg/redis/redis.go +++ b/internal/pkg/redis/redis.go @@ -39,6 +39,21 @@ func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model. return latestMap } +func LoadSettings(client *redis.Client, ctx context.Context) model.Settings { + redisSettings, err := client.Get(ctx, "settings").Result() + var settings model.Settings + + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + fmt.Println("no connection to redis") + } else { + json.Unmarshal([]byte(redisSettings), &settings) + } + + return settings +} + func SaveBeaconsList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) { appCtx.Beacons.Lock.Lock() data, _ := json.Marshal(appCtx.Beacons.Beacons)