From f8aa11f5fd8f432db5303b6c86954099b7682ef7 Mon Sep 17 00:00:00 2001 From: blazSenlab Date: Mon, 3 Nov 2025 13:26:55 +0100 Subject: [PATCH] chore: refactor, extending decoder func --- cmd/bridge/main.go | 22 ++-- cmd/decoder/main.go | 185 +++++++++++++++++---------------- cmd/location/main.go | 71 +++++++++++++ copy_files/_main.go | 87 ---------------- copy_files/_maincopy.go | 107 ------------------- internal/pkg/config/config.go | 4 + internal/pkg/kafka/consumer.go | 27 +++++ internal/pkg/kafka/reader.go | 18 ++++ internal/pkg/kafka/writer.go | 17 +++ internal/pkg/redis/redis.go | 62 +++++++++++ scripts/testAPI.sh | 41 ++++++++ 11 files changed, 341 insertions(+), 300 deletions(-) create mode 100644 cmd/location/main.go delete mode 100644 copy_files/_maincopy.go create mode 100644 internal/pkg/kafka/consumer.go create mode 100644 internal/pkg/kafka/reader.go create mode 100644 internal/pkg/kafka/writer.go create mode 100644 internal/pkg/redis/redis.go create mode 100755 scripts/testAPI.sh diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 5832043..f2f3411 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -1,13 +1,13 @@ package main import ( - "time" "fmt" + + "github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler" "github.com/AFASystems/presence/internal/pkg/config" + "github.com/AFASystems/presence/internal/pkg/kafka" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" - "github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler" - "github.com/segmentio/kafka-go" ) func main() { @@ -33,15 +33,15 @@ func main() { panic(err) } - writer := kafkaWriter("127.0.0.1:9092", "rawbeacons") + writer := kafka.KafkaWriter("127.0.0.1:9092", "rawbeacons") defer writer.Close() err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ - &client.SubReq{ + { TopicFilter: []byte("publish_out/#"), QoS: mqtt.QoS0, - Handler: func(topicName, message[]byte) { + Handler: func(topicName, message []byte) { mqtthandler.MqttHandler(writer, topicName, message) }, }, @@ -53,13 +53,3 @@ func main() { select {} } - -func kafkaWriter(kafkaURL, topic string) *kafka.Writer { - return &kafka.Writer{ - Addr: kafka.TCP(kafkaURL), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - BatchSize: 100, - BatchTimeout: 10 * time.Millisecond, - } -} \ No newline at end of file diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index af052af..8aee360 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -2,17 +2,31 @@ package main import ( "context" - "encoding/json" "fmt" - "strings" + "math" + "strconv" "time" + "github.com/AFASystems/presence/internal/pkg/config" + "github.com/AFASystems/presence/internal/pkg/kafka" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/mqttclient" + presenseredis "github.com/AFASystems/presence/internal/pkg/redis" "github.com/redis/go-redis/v9" - "github.com/segmentio/kafka-go" ) +// Move Kafka topics, Redis keys, intervals to env config +// Replace hardcoded IPs with env vars +// avoid defers -> lock and unlock right before and after usage +// Distance formula uses twos_comp incorrectly should parse signed int not hex string +// Use buffered log instead of fmt.Println ??? +// Limit metrics slice size with ring buffer ?? +// handle go routine exit signals with context.WithCancel() ?? + +// Make internal package for Kafka and Redis +// Make internal package for processor: +// Helper functions: twos_comp, getBeaconId + func main() { // Load global context to init beacons and latest list appCtx := model.AppContext{ @@ -24,19 +38,21 @@ func main() { }, } + cfg := config.Load() + // Kafka writer idk why yet - writer := kafkaWriter("127.0.0.1:9092", "beacons") + writer := kafka.KafkaWriter(cfg.KafkaURL, "beacons") // Kafka reader for Raw MQTT beacons - rawReader := kafkaReader("127.0.0.1:9092", "rawbeacons", "someID") + rawReader := kafka.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID") defer rawReader.Close() // Kafka reader for API server updates - apiReader := kafkaReader("127.0.0.1:9092", "apibeacons", "someID") + apiReader := kafka.KafkaReader(cfg.KafkaURL, "apibeacons", "someID") defer apiReader.Close() // Kafka reader for latest list updates - latestReader := kafkaReader("127.0.0.1:9092", "latestbeacons", "someID") + latestReader := kafka.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID") defer latestReader.Close() defer writer.Close() @@ -45,61 +61,33 @@ func main() { // Init Redis Client client := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", + Addr: cfg.RedisURL, Password: "", }) - // Initialize list values from Redis - beaconsList, err := client.Get(ctx, "beaconsList").Result() - if err == redis.Nil { - fmt.Println("no beacons list, starting empty") - } else if err != nil { - panic(err) - } else { - json.Unmarshal([]byte(beaconsList), &appCtx.Beacons.Beacons) - } + beaconsList := presenseredis.LoadBeaconsList(client, ctx) + appCtx.Beacons.Beacons = beaconsList - // Initialize list values from Redis - latestList, err := client.Get(ctx, "latestList").Result() - if err == redis.Nil { - fmt.Println("no latest list, starting empty") - } else if err != nil { - panic(err) - } else { - json.Unmarshal([]byte(latestList), &appCtx.LatestList.LatestList) - } + latestList := presenseredis.LoadLatestList(client, ctx) + appCtx.LatestList.LatestList = latestList // declare channel for collecting Kafka messages chRaw := make(chan model.Incoming_json, 2000) chApi := make(chan model.ApiUpdate, 2000) chLatest := make(chan model.Incoming_json, 2000) - go consume(rawReader, chRaw) - go consume(apiReader, chApi) - go consume(latestReader, chLatest) + go kafka.Consume(rawReader, chRaw) + go kafka.Consume(apiReader, chApi) + go kafka.Consume(latestReader, chLatest) go func() { + // Syncing Redis cache every 1s with 2 lists: beacons, latest list ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { - appCtx.Beacons.Lock.Lock() - data, _ := json.Marshal(appCtx.Beacons.Beacons) - appCtx.Beacons.Lock.Unlock() - - err := client.Set(ctx, "beaconsList", data, 0).Err() - if err != nil { - fmt.Println("error saving to redis:", err) - } - - appCtx.LatestList.Lock.Lock() - ldata, _ := json.Marshal(appCtx.LatestList.LatestList) - appCtx.LatestList.Lock.Unlock() - - err = client.Set(ctx, "latestList", ldata, 0).Err() - if err != nil { - fmt.Println("error saving latest list:", err) - } + presenseredis.SaveBeaconsList(&appCtx, client, ctx) + presenseredis.SaveLatestList(&appCtx, client, ctx) } }() @@ -110,7 +98,6 @@ func main() { case msg := <-chApi: switch msg.Method { case "POST": - fmt.Println("method POST") appCtx.Beacons.Lock.Lock() appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon case "DELETE": @@ -119,7 +106,6 @@ func main() { appCtx.Beacons.Lock.Lock() delete(appCtx.Beacons.Beacons, msg.ID) } - fmt.Println("method DELETE") default: fmt.Println("unknown method: ", msg.Method) } @@ -130,45 +116,6 @@ func main() { } } -func kafkaWriter(kafkaURL, topic string) *kafka.Writer { - return &kafka.Writer{ - Addr: kafka.TCP(kafkaURL), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - BatchSize: 100, - BatchTimeout: 10 * time.Millisecond, - } -} - -func kafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { - brokers := strings.Split(kafkaURL, ",") - return kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokers, - GroupID: groupID, - Topic: topic, - MinBytes: 1, - MaxBytes: 10e6, - }) -} - -func consume[T any](r *kafka.Reader, ch chan<- T) { - for { - msg, err := r.ReadMessage(context.Background()) - if err != nil { - fmt.Println("error reading message:", err) - continue - } - - var data T - if err := json.Unmarshal(msg.Value, &data); err != nil { - fmt.Println("error decoding:", err) - continue - } - - ch <- data - } -} - func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) { defer func() { if err := recover(); err != nil { @@ -192,10 +139,68 @@ func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) { beacon, exists := beacons.Beacons[id] if !exists { - fmt.Println("beacon does not yet exist") - fmt.Println("time now: ", now) + x, exists := latestList.LatestList[id] + if exists { + x.Last_seen = now + x.Incoming_JSON = incoming + x.Distance = getBeaconDistance(incoming) + latestList.LatestList[id] = x + } else { + latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)} + } + // Move this to seperate routine? + for k, v := range latestList.LatestList { + if (now - v.Last_seen) > 10 { + delete(latestList.LatestList, k) + } + } return } - fmt.Println("Beacon does exist: ", beacon) + updateBeacon(&beacon, incoming) + beacons.Beacons[id] = beacon +} + +func getBeaconDistance(incoming model.Incoming_json) float64 { + rssi := incoming.RSSI + power := incoming.TX_power + distance := 100.0 + + ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) + if ratio < 1.0 { + distance = math.Pow(ratio, 10) + } else { + distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 + } + return distance +} + +func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) { + now := time.Now().Unix() + + beacon.Incoming_JSON = incoming + beacon.Last_seen = now + beacon.Beacon_type = incoming.Beacon_type + beacon.HB_ButtonCounter = incoming.HB_ButtonCounter + beacon.HB_Battery = incoming.HB_Battery + beacon.HB_RandomNonce = incoming.HB_RandomNonce + beacon.HB_ButtonMode = incoming.HB_ButtonMode + + if beacon.Beacon_metrics == nil { + beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now + } + + metric := model.BeaconMetric{} + metric.Distance = getBeaconDistance(incoming) + metric.Timestamp = now + metric.Rssi = int64(incoming.RSSI) + metric.Location = incoming.Hostname + beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric) + + // Leave the HB button implementation for now +} + +func twos_comp(inp string) int64 { + i, _ := strconv.ParseInt("0x"+inp, 0, 64) + return i - 256 } diff --git a/cmd/location/main.go b/cmd/location/main.go new file mode 100644 index 0000000..7c09c95 --- /dev/null +++ b/cmd/location/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/redis/go-redis/v9" +) + +func main() { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + Password: "", + }) +} + +func getLikelyLocations(client *redis.Client, ctx context.Context) { + beaconsList, err := client.Get(ctx, "beaconsList").Result() + var beacons = make(map[string]model.Beacon) + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + panic(err) + } else { + json.Unmarshal([]byte(beaconsList), &beacons) + } + + for id, beacon := range beacons { + if len(beacon.Beacon_metrics) == 0 { + continue + } + + if isExpired(&beacon, settings) { + handleExpiredBeacon(&beacon, cl, ctx) + continue + } + + best := calculateBestLocation(&beacon) + updateBeaconState(&beacon, best, settings, ctx, cl) + + appendHTTPResult(ctx, beacon, best) + ctx.Beacons.Beacons[id] = beacon + } +} + +// get likely locations: +/* +1. Locks the http_results list +2. inits list to empty struct type -> TODO: what is this list used for +3. loops through beacons list -> should be locked? +4. check for beacon metrics -> how do you get beacon metrics, I guess it has an array of timestamps +5. check for threshold value in the settings +5.1. check for property expired location +5.2. if location is not expired -> mark it as expired, generate message and send to all clients, + if clients do not respond close the connection +6. Init best location with type Best_location{} -> what is this type +7. make locations list -> key: string, val: float64 +7.1 set weight for seen and rssi +7.2 loop over metrics of the beacon -> some alogirthm based on location value + +I think the algorithm is recording names of different gateways and their rssi's and then from +that it checks gateway name and makes decisions based on calculated values + +7.3 writes result in best location and updates list location history with this name if the list +is longer than 10 elements it removes the first element + + +*/ diff --git a/copy_files/_main.go b/copy_files/_main.go index 970c94f..ee1af13 100644 --- a/copy_files/_main.go +++ b/copy_files/_main.go @@ -256,31 +256,18 @@ func parseButtonState(raw string) int64 { func twos_comp(inp string) int64 { i, _ := strconv.ParseInt("0x"+inp, 0, 64) - return i - 256 } func getBeaconID(incoming Incoming_json) string { unique_id := fmt.Sprintf("%s", incoming.MAC) - /*if incoming.Beacon_type == "ibeacon" { - unique_id = fmt.Sprintf("%s_%s_%s", incoming.UUID, incoming.Major, incoming.Minor) - } else if incoming.Beacon_type == "eddystone" { - unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id) - } else if incoming.Beacon_type == "hb_button" { - unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id) - }*/ return unique_id } func incomingBeaconFilter(incoming Incoming_json) Incoming_json { out_json := incoming if incoming.Beacon_type == "hb_button" { - //do additional checks here to detect if a Habby Bubbles Button - // looks like 020104020a0011ff045600012d3859db59e1000b9453 - raw_data := incoming.Data - //company_id := []byte{0x04, 0x56} - //product_id := []byte{0x00, 0x01} hb_button_prefix_str := fmt.Sprintf("02010612FF5900") if strings.HasPrefix(raw_data, hb_button_prefix_str) { out_json.Namespace = "ddddeeeeeeffff5544ff" @@ -423,14 +410,6 @@ func sendButtonPressed(bcn Beacon, cl *client.Client) { fmt.Println(out) fmt.Println("--- stderr ---") fmt.Println(errout) - - // create the file if it doesn't exists with O_CREATE, Set the file up for read write, add the append flag and set the permission - //f, err := os.OpenFile("/data/conf/presence/db.json", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0660) - //if err != nil { - // log.Fatal(err) - //} - // write to file, f.Write() - //f.Write(btn_msg) } func getLikelyLocations(settings Settings, locations_list Locations_list, cl *client.Client) { @@ -447,14 +426,11 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl for _, beacon := range BEACONS.Beacons { if len(beacon.beacon_metrics) == 0 { - ////fmt.Printf("beacon_metrics = 0:\n") continue } if (int64(time.Now().Unix()) - (beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp)) > settings.Last_seen_threshold { - ////fmt.Printf("beacon_metrics timestamp = %s %s \n",beacon.Name, beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp ) if beacon.expired_location == "expired" { - //beacon.Location_confidence = - 1 continue } else { beacon.expired_location = "expired" @@ -469,7 +445,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl log.Printf("error: %v", err) } - // Send the newly received message to the broadcast channel broadcast <- msg } } else { @@ -479,7 +454,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl best_location := Best_location{} - // go through its beacon metrics and pick out the location that appears most often loc_list := make(map[string]float64) seen_weight := 1.5 rssi_weight := 0.75 @@ -492,8 +466,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl } loc_list[metric.location] = loc } - //fmt.Printf("beacon: %s list: %#v\n", beacon.Name, loc_list) - // now go through the list and find the largest, that's the location best_name := "" ts := 0.0 for name, times_seen := range loc_list { @@ -504,16 +476,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl } /////fmt.Printf("BEST LOCATION FOR %s IS: %s with score: %f\n", beacon.Name, best_name, ts) best_location = Best_location{name: best_name, distance: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].distance, last_seen: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp} - - // //filter, only let this location become best if it was X times in a row - // if best_location.name == beacon.Previous_location { - // beacon.Location_confidence = beacon.Location_confidence + 1 - // } else { - // beacon.Location_confidence = 0 - // /////fmt.Printf("beacon.Location_confidence %f\n", beacon.Location_confidence) - // } - - // Aggiungiamo il nuovo best_location allo storico beacon.Location_history = append(beacon.Location_history, best_location.name) if len(beacon.Location_history) > 10 { beacon.Location_history = beacon.Location_history[1:] // manteniamo solo gli ultimi 10 @@ -534,7 +496,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl } } - // Applichiamo un filtro: consideriamo il cambio solo se almeno 7 su 10 votano per una location if max_count >= 7 { beacon.Previous_location = most_common_location if most_common_location == beacon.Previous_confident_location { @@ -558,11 +519,7 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl r.Location = best_location.name r.Last_seen = best_location.last_seen - ////fmt.Printf("beacon.Location_confidence %s, settings.Location_confidence %s, beacon.Previous_confident_location %s: best_location.name %s\n",beacon.Location_confidence, settings.Location_confidence, beacon.Previous_confident_location, best_location.name) - if (beacon.Location_confidence == settings.Location_confidence && beacon.Previous_confident_location != best_location.name) || beacon.expired_location == "expired" { - // location has changed, send an mqtt message - should_persist = true fmt.Printf("detected a change!!! %#v\n\n", beacon) if beacon.Previous_confident_location == "expired" && beacon.expired_location == "" { @@ -577,7 +534,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl log.Printf("error: %v", err) } - // Send the newly received message to the broadcast channel broadcast <- msg } beacon.Location_confidence = 0 @@ -646,10 +602,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl } } - /////fmt.Printf("\n\n%s is most likely in %s with average distance %f \n\n", beacon.Name, best_location.name, best_location.distance) - ////beacon.logger.Printf("Log content: user id %v \n", beacon.Name) - // publish this to a topic - // Publish a message. err := cl.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte("afa-systems/presence"), @@ -661,10 +613,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl } - /*for _, button := range Buttons_list { - http_results.Buttons = append(http_results.Buttons, button) - }*/ - if should_persist { persistBeacons() } @@ -737,38 +685,10 @@ func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db * log.Fatal(err) } - //debug list them out - - /*fmt.Println("Database beacons:") - for _, beacon := range BEACONS.Beacons { - fmt.Println("Database has known beacon: " + beacon.Beacon_id + " " + beacon.Name) - dog := new(user) - //createUser( beacon.Name, true) - - //user1 := createUser( beacon.Name, true) - //doSomething(beacon, "hello") - // - - userFIle := &lumberjack.Logger{ - Filename: "/data/presence/presence/beacon_log_" + beacon.Name + ".log", - MaxSize: 250, // mb - MaxBackups: 5, - MaxAge: 10, // in days - } - dog.id = beacon.Name - dog.logger = log.New(userFIle, "User: ", log.Ldate|log.Ltime|log.Lshortfile) - dog.logger.Printf("Log content: user id %v \n", beacon.Name) - logger=append(logger,dog) - } - fmt.Println("leng has %d\n",len(logger)) - fmt.Printf("%v", logger) - fmt.Println("Settings has %#v\n", settings)*/ - /**/ Latest_beacons_list = make(map[string]Beacon) Buttons_list = make(map[string]Button) - //create a map of locations, looked up by hostnames locations_list := Locations_list{} ls := make(map[string]Location) locations_list.locations = ls @@ -793,18 +713,11 @@ func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db * this_beacon_id := getBeaconID(incoming) now := time.Now().Unix() - - ///fmt.Println("sawbeacon " + this_beacon_id + " at " + incoming.Hostname) - //logger["FCB8351F5A21"].logger.Printf("Log content: user id \n") - //if this beacon isn't in our search list, add it to the latest_beacons pile. beacon, ok := BEACONS.Beacons[this_beacon_id] if !ok { - //should be unique - //if it's already in list, forget it. latest_list_lock.Lock() x, ok := Latest_beacons_list[this_beacon_id] if ok { - //update its timestamp x.Last_seen = now x.Incoming_JSON = incoming x.Distance = getBeaconDistance(incoming) diff --git a/copy_files/_maincopy.go b/copy_files/_maincopy.go deleted file mode 100644 index 587a31d..0000000 --- a/copy_files/_maincopy.go +++ /dev/null @@ -1,107 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "os" - "os/signal" - "strconv" - "strings" - "time" - - //"./utils" - - "github.com/yosssi/gmq/mqtt" - "github.com/yosssi/gmq/mqtt/client" -) - -func main() { - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, os.Interrupt, os.Kill) - - incoming_updates_chan := IncomingMQTTProcessor(1*time.Second, cli, db, loggers) - - err = cli.Subscribe(&client.SubscribeOptions{ - SubReqs: []*client.SubReq{ - &client.SubReq{ - TopicFilter: []byte("publish_out/#"), - QoS: mqtt.QoS0, - Handler: func(topicName, message []byte) { - msgStr := string(message) - t := strings.Split(string(topicName), "/") - hostname := t[1] - - if strings.HasPrefix(msgStr, "[") { - var readings []RawReading - err := json.Unmarshal(message, &readings) - if err != nil { - log.Printf("Errore parsing JSON: %v", err) - return - } - - for _, reading := range readings { - if reading.Type == "Gateway" { - continue - } - incoming := Incoming_json{ - Hostname: hostname, - MAC: reading.MAC, - RSSI: int64(reading.RSSI), - Data: reading.RawData, - HB_ButtonCounter: parseButtonState(reading.RawData), - } - incoming_updates_chan <- incoming - } - } else { - s := strings.Split(string(message), ",") - if len(s) < 6 { - log.Printf("Messaggio CSV non valido: %s", msgStr) - return - } - - rawdata := s[4] - buttonCounter := parseButtonState(rawdata) - if buttonCounter > 0 { - incoming := Incoming_json{} - i, _ := strconv.ParseInt(s[3], 10, 64) - incoming.Hostname = hostname - incoming.Beacon_type = "hb_button" - incoming.MAC = s[1] - incoming.RSSI = i - incoming.Data = rawdata - incoming.HB_ButtonCounter = buttonCounter - - read_line := strings.TrimRight(string(s[5]), "\r\n") - it, err33 := strconv.Atoi(read_line) - if err33 != nil { - fmt.Println(it) - fmt.Println(err33) - os.Exit(2) - } - incoming_updates_chan <- incoming - } - } - }, - }, - }, - }) - if err != nil { - panic(err) - } - - fmt.Println("CONNECTED TO MQTT") - fmt.Println("\n ") - fmt.Println("Visit http://" + *http_host_path_ptr + " on your browser to see the web interface") - fmt.Println("\n ") - - go startServer() - - // Wait for receiving a signal. - <-sigc - - // Disconnect the Network Connection. - if err := cli.Disconnect(); err != nil { - panic(err) - } -} diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index b289104..fa8976e 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -10,6 +10,8 @@ type Config struct { MQTTPass string MQTTClientID string DBPath string + KafkaURL string + RedisURL string } // getEnv returns env var value or a default if not set. @@ -29,5 +31,7 @@ func Load() *Config { MQTTPass: getEnv("MQTT_PASSWORD", "sandbox2024"), MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), + KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), + RedisURL: getEnv("REDIS_URL", "127.0.0.1:6379"), } } diff --git a/internal/pkg/kafka/consumer.go b/internal/pkg/kafka/consumer.go new file mode 100644 index 0000000..59e6da9 --- /dev/null +++ b/internal/pkg/kafka/consumer.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/segmentio/kafka-go" +) + +func Consume[T any](r *kafka.Reader, ch chan<- T) { + for { + msg, err := r.ReadMessage(context.Background()) + if err != nil { + fmt.Println("error reading message:", err) + continue + } + + var data T + if err := json.Unmarshal(msg.Value, &data); err != nil { + fmt.Println("error decoding:", err) + continue + } + + ch <- data + } +} diff --git a/internal/pkg/kafka/reader.go b/internal/pkg/kafka/reader.go new file mode 100644 index 0000000..e5e379a --- /dev/null +++ b/internal/pkg/kafka/reader.go @@ -0,0 +1,18 @@ +package kafka + +import ( + "strings" + + "github.com/segmentio/kafka-go" +) + +func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { + brokers := strings.Split(kafkaURL, ",") + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + GroupID: groupID, + Topic: topic, + MinBytes: 1, + MaxBytes: 10e6, + }) +} diff --git a/internal/pkg/kafka/writer.go b/internal/pkg/kafka/writer.go new file mode 100644 index 0000000..a1fa619 --- /dev/null +++ b/internal/pkg/kafka/writer.go @@ -0,0 +1,17 @@ +package kafka + +import ( + "time" + + "github.com/segmentio/kafka-go" +) + +func KafkaWriter(kafkaURL, topic string) *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(kafkaURL), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } +} diff --git a/internal/pkg/redis/redis.go b/internal/pkg/redis/redis.go new file mode 100644 index 0000000..c8e4c63 --- /dev/null +++ b/internal/pkg/redis/redis.go @@ -0,0 +1,62 @@ +package presenseredis + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/redis/go-redis/v9" +) + +func LoadBeaconsList(client *redis.Client, ctx context.Context) map[string]model.Beacon { + beaconsList, err := client.Get(ctx, "beaconsList").Result() + beaconsMap := make(map[string]model.Beacon) + + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + fmt.Println("no connection to redis") + } else { + json.Unmarshal([]byte(beaconsList), &beaconsMap) + } + + return beaconsMap +} + +func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model.Beacon { + latestList, err := client.Get(ctx, "latestList").Result() + latestMap := make(map[string]model.Beacon) + + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + fmt.Println("no connection to redis") + } else { + json.Unmarshal([]byte(latestList), &latestMap) + } + + return latestMap +} + +func SaveBeaconsList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) { + appCtx.Beacons.Lock.Lock() + data, _ := json.Marshal(appCtx.Beacons.Beacons) + appCtx.Beacons.Lock.Unlock() + + err := client.Set(ctx, "beaconsList", data, 0).Err() + if err != nil { + fmt.Println("error in saving to redis: ", err) + } +} + +func SaveLatestList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) { + appCtx.LatestList.Lock.Lock() + data, _ := json.Marshal(appCtx.LatestList.LatestList) + appCtx.LatestList.Lock.Unlock() + + err := client.Set(ctx, "latestList", data, 0).Err() + if err != nil { + fmt.Println("error in saving to redis: ", err) + } +} diff --git a/scripts/testAPI.sh b/scripts/testAPI.sh new file mode 100755 index 0000000..056e01e --- /dev/null +++ b/scripts/testAPI.sh @@ -0,0 +1,41 @@ +#!/bin/bash +URL="http://127.0.0.1:1902/api/beacons" +BEACON_ID="C3000057B9F7" + +echo "POST (create)" +curl -s -X POST $URL \ + -H "Content-Type: application/json" \ + -d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1","tx_power":-59,"rssi":-70}' +echo -e "\n" + +sleep 1 + +echo "GET (list after create)" +curl -s -X GET $URL +echo -e "\n" + +sleep 1 + +echo "PUT (update)" +curl -s -X PUT $URL \ + -H "Content-Type: application/json" \ + -d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1-updated","tx_power":-60}' +echo -e "\n" + +sleep 1 + +echo "GET (list after update)" +curl -s -X GET $URL +echo -e "\n" + +sleep 1 + +echo "DELETE" +curl -s -X DELETE "$URL/$BEACON_ID" +echo -e "\n" + +sleep 1 + +echo "GET (list after delete)" +curl -s -X GET $URL +echo -e "\n"