瀏覽代碼

chore: refactor, extending decoder func

chore/proposed-structure
Blaz Smehov 1 月之前
父節點
當前提交
f8aa11f5fd
共有 11 個檔案被更改,包括 341 行新增300 行删除
  1. +6
    -16
      cmd/bridge/main.go
  2. +95
    -90
      cmd/decoder/main.go
  3. +71
    -0
      cmd/location/main.go
  4. +0
    -87
      copy_files/_main.go
  5. +0
    -107
      copy_files/_maincopy.go
  6. +4
    -0
      internal/pkg/config/config.go
  7. +27
    -0
      internal/pkg/kafka/consumer.go
  8. +18
    -0
      internal/pkg/kafka/reader.go
  9. +17
    -0
      internal/pkg/kafka/writer.go
  10. +62
    -0
      internal/pkg/redis/redis.go
  11. +41
    -0
      scripts/testAPI.sh

+ 6
- 16
cmd/bridge/main.go 查看文件

@@ -1,13 +1,13 @@
package main

import (
"time"
"fmt"

"github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafka"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
"github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler"
"github.com/segmentio/kafka-go"
)

func main() {
@@ -33,15 +33,15 @@ func main() {
panic(err)
}

writer := kafkaWriter("127.0.0.1:9092", "rawbeacons")
writer := kafka.KafkaWriter("127.0.0.1:9092", "rawbeacons")
defer writer.Close()

err = cli.Subscribe(&client.SubscribeOptions{
SubReqs: []*client.SubReq{
&client.SubReq{
{
TopicFilter: []byte("publish_out/#"),
QoS: mqtt.QoS0,
Handler: func(topicName, message[]byte) {
Handler: func(topicName, message []byte) {
mqtthandler.MqttHandler(writer, topicName, message)
},
},
@@ -53,13 +53,3 @@ func main() {

select {}
}

func kafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
}

+ 95
- 90
cmd/decoder/main.go 查看文件

@@ -2,17 +2,31 @@ package main

import (
"context"
"encoding/json"
"fmt"
"strings"
"math"
"strconv"
"time"

"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafka"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
presenseredis "github.com/AFASystems/presence/internal/pkg/redis"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

// Move Kafka topics, Redis keys, intervals to env config
// Replace hardcoded IPs with env vars
// avoid defers -> lock and unlock right before and after usage
// Distance formula uses twos_comp incorrectly should parse signed int not hex string
// Use buffered log instead of fmt.Println ???
// Limit metrics slice size with ring buffer ??
// handle go routine exit signals with context.WithCancel() ??

// Make internal package for Kafka and Redis
// Make internal package for processor:
// Helper functions: twos_comp, getBeaconId

func main() {
// Load global context to init beacons and latest list
appCtx := model.AppContext{
@@ -24,19 +38,21 @@ func main() {
},
}

cfg := config.Load()

// Kafka writer idk why yet
writer := kafkaWriter("127.0.0.1:9092", "beacons")
writer := kafka.KafkaWriter(cfg.KafkaURL, "beacons")

// Kafka reader for Raw MQTT beacons
rawReader := kafkaReader("127.0.0.1:9092", "rawbeacons", "someID")
rawReader := kafka.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
defer rawReader.Close()

// Kafka reader for API server updates
apiReader := kafkaReader("127.0.0.1:9092", "apibeacons", "someID")
apiReader := kafka.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
defer apiReader.Close()

// Kafka reader for latest list updates
latestReader := kafkaReader("127.0.0.1:9092", "latestbeacons", "someID")
latestReader := kafka.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
defer latestReader.Close()

defer writer.Close()
@@ -45,61 +61,33 @@ func main() {

// Init Redis Client
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Addr: cfg.RedisURL,
Password: "",
})

// Initialize list values from Redis
beaconsList, err := client.Get(ctx, "beaconsList").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
json.Unmarshal([]byte(beaconsList), &appCtx.Beacons.Beacons)
}
beaconsList := presenseredis.LoadBeaconsList(client, ctx)
appCtx.Beacons.Beacons = beaconsList

// Initialize list values from Redis
latestList, err := client.Get(ctx, "latestList").Result()
if err == redis.Nil {
fmt.Println("no latest list, starting empty")
} else if err != nil {
panic(err)
} else {
json.Unmarshal([]byte(latestList), &appCtx.LatestList.LatestList)
}
latestList := presenseredis.LoadLatestList(client, ctx)
appCtx.LatestList.LatestList = latestList

// declare channel for collecting Kafka messages
chRaw := make(chan model.Incoming_json, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chLatest := make(chan model.Incoming_json, 2000)

go consume(rawReader, chRaw)
go consume(apiReader, chApi)
go consume(latestReader, chLatest)
go kafka.Consume(rawReader, chRaw)
go kafka.Consume(apiReader, chApi)
go kafka.Consume(latestReader, chLatest)

go func() {
// Syncing Redis cache every 1s with 2 lists: beacons, latest list
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
appCtx.Beacons.Lock.Lock()
data, _ := json.Marshal(appCtx.Beacons.Beacons)
appCtx.Beacons.Lock.Unlock()

err := client.Set(ctx, "beaconsList", data, 0).Err()
if err != nil {
fmt.Println("error saving to redis:", err)
}

appCtx.LatestList.Lock.Lock()
ldata, _ := json.Marshal(appCtx.LatestList.LatestList)
appCtx.LatestList.Lock.Unlock()

err = client.Set(ctx, "latestList", ldata, 0).Err()
if err != nil {
fmt.Println("error saving latest list:", err)
}
presenseredis.SaveBeaconsList(&appCtx, client, ctx)
presenseredis.SaveLatestList(&appCtx, client, ctx)
}
}()

@@ -110,7 +98,6 @@ func main() {
case msg := <-chApi:
switch msg.Method {
case "POST":
fmt.Println("method POST")
appCtx.Beacons.Lock.Lock()
appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
case "DELETE":
@@ -119,7 +106,6 @@ func main() {
appCtx.Beacons.Lock.Lock()
delete(appCtx.Beacons.Beacons, msg.ID)
}
fmt.Println("method DELETE")
default:
fmt.Println("unknown method: ", msg.Method)
}
@@ -130,45 +116,6 @@ func main() {
}
}

func kafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
}

func kafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})
}

func consume[T any](r *kafka.Reader, ch chan<- T) {
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println("error reading message:", err)
continue
}

var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}

ch <- data
}
}

func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) {
defer func() {
if err := recover(); err != nil {
@@ -192,10 +139,68 @@ func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) {

beacon, exists := beacons.Beacons[id]
if !exists {
fmt.Println("beacon does not yet exist")
fmt.Println("time now: ", now)
x, exists := latestList.LatestList[id]
if exists {
x.Last_seen = now
x.Incoming_JSON = incoming
x.Distance = getBeaconDistance(incoming)
latestList.LatestList[id] = x
} else {
latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)}
}
// Move this to seperate routine?
for k, v := range latestList.LatestList {
if (now - v.Last_seen) > 10 {
delete(latestList.LatestList, k)
}
}
return
}

fmt.Println("Beacon does exist: ", beacon)
updateBeacon(&beacon, incoming)
beacons.Beacons[id] = beacon
}

func getBeaconDistance(incoming model.Incoming_json) float64 {
rssi := incoming.RSSI
power := incoming.TX_power
distance := 100.0

ratio := float64(rssi) * (1.0 / float64(twos_comp(power)))
if ratio < 1.0 {
distance = math.Pow(ratio, 10)
} else {
distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
}
return distance
}

func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) {
now := time.Now().Unix()

beacon.Incoming_JSON = incoming
beacon.Last_seen = now
beacon.Beacon_type = incoming.Beacon_type
beacon.HB_ButtonCounter = incoming.HB_ButtonCounter
beacon.HB_Battery = incoming.HB_Battery
beacon.HB_RandomNonce = incoming.HB_RandomNonce
beacon.HB_ButtonMode = incoming.HB_ButtonMode

if beacon.Beacon_metrics == nil {
beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now
}

metric := model.BeaconMetric{}
metric.Distance = getBeaconDistance(incoming)
metric.Timestamp = now
metric.Rssi = int64(incoming.RSSI)
metric.Location = incoming.Hostname
beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric)

// Leave the HB button implementation for now
}

func twos_comp(inp string) int64 {
i, _ := strconv.ParseInt("0x"+inp, 0, 64)
return i - 256
}

+ 71
- 0
cmd/location/main.go 查看文件

@@ -0,0 +1,71 @@
package main

import (
"context"
"encoding/json"
"fmt"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
)

func main() {
ctx := context.Background()
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
}

func getLikelyLocations(client *redis.Client, ctx context.Context) {
beaconsList, err := client.Get(ctx, "beaconsList").Result()
var beacons = make(map[string]model.Beacon)
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
json.Unmarshal([]byte(beaconsList), &beacons)
}

for id, beacon := range beacons {
if len(beacon.Beacon_metrics) == 0 {
continue
}

if isExpired(&beacon, settings) {
handleExpiredBeacon(&beacon, cl, ctx)
continue
}

best := calculateBestLocation(&beacon)
updateBeaconState(&beacon, best, settings, ctx, cl)

appendHTTPResult(ctx, beacon, best)
ctx.Beacons.Beacons[id] = beacon
}
}

// get likely locations:
/*
1. Locks the http_results list
2. inits list to empty struct type -> TODO: what is this list used for
3. loops through beacons list -> should be locked?
4. check for beacon metrics -> how do you get beacon metrics, I guess it has an array of timestamps
5. check for threshold value in the settings
5.1. check for property expired location
5.2. if location is not expired -> mark it as expired, generate message and send to all clients,
if clients do not respond close the connection
6. Init best location with type Best_location{} -> what is this type
7. make locations list -> key: string, val: float64
7.1 set weight for seen and rssi
7.2 loop over metrics of the beacon -> some alogirthm based on location value

I think the algorithm is recording names of different gateways and their rssi's and then from
that it checks gateway name and makes decisions based on calculated values

7.3 writes result in best location and updates list location history with this name if the list
is longer than 10 elements it removes the first element


*/

+ 0
- 87
copy_files/_main.go 查看文件

@@ -256,31 +256,18 @@ func parseButtonState(raw string) int64 {

func twos_comp(inp string) int64 {
i, _ := strconv.ParseInt("0x"+inp, 0, 64)

return i - 256
}

func getBeaconID(incoming Incoming_json) string {
unique_id := fmt.Sprintf("%s", incoming.MAC)
/*if incoming.Beacon_type == "ibeacon" {
unique_id = fmt.Sprintf("%s_%s_%s", incoming.UUID, incoming.Major, incoming.Minor)
} else if incoming.Beacon_type == "eddystone" {
unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id)
} else if incoming.Beacon_type == "hb_button" {
unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id)
}*/
return unique_id
}

func incomingBeaconFilter(incoming Incoming_json) Incoming_json {
out_json := incoming
if incoming.Beacon_type == "hb_button" {
//do additional checks here to detect if a Habby Bubbles Button
// looks like 020104020a0011ff045600012d3859db59e1000b9453

raw_data := incoming.Data
//company_id := []byte{0x04, 0x56}
//product_id := []byte{0x00, 0x01}
hb_button_prefix_str := fmt.Sprintf("02010612FF5900")
if strings.HasPrefix(raw_data, hb_button_prefix_str) {
out_json.Namespace = "ddddeeeeeeffff5544ff"
@@ -423,14 +410,6 @@ func sendButtonPressed(bcn Beacon, cl *client.Client) {
fmt.Println(out)
fmt.Println("--- stderr ---")
fmt.Println(errout)

// create the file if it doesn't exists with O_CREATE, Set the file up for read write, add the append flag and set the permission
//f, err := os.OpenFile("/data/conf/presence/db.json", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0660)
//if err != nil {
// log.Fatal(err)
//}
// write to file, f.Write()
//f.Write(btn_msg)
}

func getLikelyLocations(settings Settings, locations_list Locations_list, cl *client.Client) {
@@ -447,14 +426,11 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
for _, beacon := range BEACONS.Beacons {

if len(beacon.beacon_metrics) == 0 {
////fmt.Printf("beacon_metrics = 0:\n")
continue
}

if (int64(time.Now().Unix()) - (beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp)) > settings.Last_seen_threshold {
////fmt.Printf("beacon_metrics timestamp = %s %s \n",beacon.Name, beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp )
if beacon.expired_location == "expired" {
//beacon.Location_confidence = - 1
continue
} else {
beacon.expired_location = "expired"
@@ -469,7 +445,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
log.Printf("error: %v", err)

}
// Send the newly received message to the broadcast channel
broadcast <- msg
}
} else {
@@ -479,7 +454,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl

best_location := Best_location{}

// go through its beacon metrics and pick out the location that appears most often
loc_list := make(map[string]float64)
seen_weight := 1.5
rssi_weight := 0.75
@@ -492,8 +466,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
}
loc_list[metric.location] = loc
}
//fmt.Printf("beacon: %s list: %#v\n", beacon.Name, loc_list)
// now go through the list and find the largest, that's the location
best_name := ""
ts := 0.0
for name, times_seen := range loc_list {
@@ -504,16 +476,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
}
/////fmt.Printf("BEST LOCATION FOR %s IS: %s with score: %f\n", beacon.Name, best_name, ts)
best_location = Best_location{name: best_name, distance: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].distance, last_seen: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp}

// //filter, only let this location become best if it was X times in a row
// if best_location.name == beacon.Previous_location {
// beacon.Location_confidence = beacon.Location_confidence + 1
// } else {
// beacon.Location_confidence = 0
// /////fmt.Printf("beacon.Location_confidence %f\n", beacon.Location_confidence)
// }

// Aggiungiamo il nuovo best_location allo storico
beacon.Location_history = append(beacon.Location_history, best_location.name)
if len(beacon.Location_history) > 10 {
beacon.Location_history = beacon.Location_history[1:] // manteniamo solo gli ultimi 10
@@ -534,7 +496,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
}
}

// Applichiamo un filtro: consideriamo il cambio solo se almeno 7 su 10 votano per una location
if max_count >= 7 {
beacon.Previous_location = most_common_location
if most_common_location == beacon.Previous_confident_location {
@@ -558,11 +519,7 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
r.Location = best_location.name
r.Last_seen = best_location.last_seen

////fmt.Printf("beacon.Location_confidence %s, settings.Location_confidence %s, beacon.Previous_confident_location %s: best_location.name %s\n",beacon.Location_confidence, settings.Location_confidence, beacon.Previous_confident_location, best_location.name)

if (beacon.Location_confidence == settings.Location_confidence && beacon.Previous_confident_location != best_location.name) || beacon.expired_location == "expired" {
// location has changed, send an mqtt message

should_persist = true
fmt.Printf("detected a change!!! %#v\n\n", beacon)
if beacon.Previous_confident_location == "expired" && beacon.expired_location == "" {
@@ -577,7 +534,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
log.Printf("error: %v", err)

}
// Send the newly received message to the broadcast channel
broadcast <- msg
}
beacon.Location_confidence = 0
@@ -646,10 +602,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl
}
}

/////fmt.Printf("\n\n%s is most likely in %s with average distance %f \n\n", beacon.Name, best_location.name, best_location.distance)
////beacon.logger.Printf("Log content: user id %v \n", beacon.Name)
// publish this to a topic
// Publish a message.
err := cl.Publish(&client.PublishOptions{
QoS: mqtt.QoS0,
TopicName: []byte("afa-systems/presence"),
@@ -661,10 +613,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl

}

/*for _, button := range Buttons_list {
http_results.Buttons = append(http_results.Buttons, button)
}*/

if should_persist {
persistBeacons()
}
@@ -737,38 +685,10 @@ func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *
log.Fatal(err)
}

//debug list them out

/*fmt.Println("Database beacons:")
for _, beacon := range BEACONS.Beacons {
fmt.Println("Database has known beacon: " + beacon.Beacon_id + " " + beacon.Name)
dog := new(user)
//createUser( beacon.Name, true)

//user1 := createUser( beacon.Name, true)
//doSomething(beacon, "hello")
//

userFIle := &lumberjack.Logger{
Filename: "/data/presence/presence/beacon_log_" + beacon.Name + ".log",
MaxSize: 250, // mb
MaxBackups: 5,
MaxAge: 10, // in days
}
dog.id = beacon.Name
dog.logger = log.New(userFIle, "User: ", log.Ldate|log.Ltime|log.Lshortfile)
dog.logger.Printf("Log content: user id %v \n", beacon.Name)
logger=append(logger,dog)
}
fmt.Println("leng has %d\n",len(logger))
fmt.Printf("%v", logger)
fmt.Println("Settings has %#v\n", settings)*/
/**/
Latest_beacons_list = make(map[string]Beacon)

Buttons_list = make(map[string]Button)

//create a map of locations, looked up by hostnames
locations_list := Locations_list{}
ls := make(map[string]Location)
locations_list.locations = ls
@@ -793,18 +713,11 @@ func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *
this_beacon_id := getBeaconID(incoming)

now := time.Now().Unix()

///fmt.Println("sawbeacon " + this_beacon_id + " at " + incoming.Hostname)
//logger["FCB8351F5A21"].logger.Printf("Log content: user id \n")
//if this beacon isn't in our search list, add it to the latest_beacons pile.
beacon, ok := BEACONS.Beacons[this_beacon_id]
if !ok {
//should be unique
//if it's already in list, forget it.
latest_list_lock.Lock()
x, ok := Latest_beacons_list[this_beacon_id]
if ok {
//update its timestamp
x.Last_seen = now
x.Incoming_JSON = incoming
x.Distance = getBeaconDistance(incoming)


+ 0
- 107
copy_files/_maincopy.go 查看文件

@@ -1,107 +0,0 @@
package main

import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"time"

//"./utils"

"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)

func main() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, os.Interrupt, os.Kill)

incoming_updates_chan := IncomingMQTTProcessor(1*time.Second, cli, db, loggers)

err = cli.Subscribe(&client.SubscribeOptions{
SubReqs: []*client.SubReq{
&client.SubReq{
TopicFilter: []byte("publish_out/#"),
QoS: mqtt.QoS0,
Handler: func(topicName, message []byte) {
msgStr := string(message)
t := strings.Split(string(topicName), "/")
hostname := t[1]

if strings.HasPrefix(msgStr, "[") {
var readings []RawReading
err := json.Unmarshal(message, &readings)
if err != nil {
log.Printf("Errore parsing JSON: %v", err)
return
}

for _, reading := range readings {
if reading.Type == "Gateway" {
continue
}
incoming := Incoming_json{
Hostname: hostname,
MAC: reading.MAC,
RSSI: int64(reading.RSSI),
Data: reading.RawData,
HB_ButtonCounter: parseButtonState(reading.RawData),
}
incoming_updates_chan <- incoming
}
} else {
s := strings.Split(string(message), ",")
if len(s) < 6 {
log.Printf("Messaggio CSV non valido: %s", msgStr)
return
}

rawdata := s[4]
buttonCounter := parseButtonState(rawdata)
if buttonCounter > 0 {
incoming := Incoming_json{}
i, _ := strconv.ParseInt(s[3], 10, 64)
incoming.Hostname = hostname
incoming.Beacon_type = "hb_button"
incoming.MAC = s[1]
incoming.RSSI = i
incoming.Data = rawdata
incoming.HB_ButtonCounter = buttonCounter

read_line := strings.TrimRight(string(s[5]), "\r\n")
it, err33 := strconv.Atoi(read_line)
if err33 != nil {
fmt.Println(it)
fmt.Println(err33)
os.Exit(2)
}
incoming_updates_chan <- incoming
}
}
},
},
},
})
if err != nil {
panic(err)
}

fmt.Println("CONNECTED TO MQTT")
fmt.Println("\n ")
fmt.Println("Visit http://" + *http_host_path_ptr + " on your browser to see the web interface")
fmt.Println("\n ")

go startServer()

// Wait for receiving a signal.
<-sigc

// Disconnect the Network Connection.
if err := cli.Disconnect(); err != nil {
panic(err)
}
}

+ 4
- 0
internal/pkg/config/config.go 查看文件

@@ -10,6 +10,8 @@ type Config struct {
MQTTPass string
MQTTClientID string
DBPath string
KafkaURL string
RedisURL string
}

// getEnv returns env var value or a default if not set.
@@ -29,5 +31,7 @@ func Load() *Config {
MQTTPass: getEnv("MQTT_PASSWORD", "sandbox2024"),
MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"),
DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
RedisURL: getEnv("REDIS_URL", "127.0.0.1:6379"),
}
}

+ 27
- 0
internal/pkg/kafka/consumer.go 查看文件

@@ -0,0 +1,27 @@
package kafka

import (
"context"
"encoding/json"
"fmt"

"github.com/segmentio/kafka-go"
)

func Consume[T any](r *kafka.Reader, ch chan<- T) {
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println("error reading message:", err)
continue
}

var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}

ch <- data
}
}

+ 18
- 0
internal/pkg/kafka/reader.go 查看文件

@@ -0,0 +1,18 @@
package kafka

import (
"strings"

"github.com/segmentio/kafka-go"
)

func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})
}

+ 17
- 0
internal/pkg/kafka/writer.go 查看文件

@@ -0,0 +1,17 @@
package kafka

import (
"time"

"github.com/segmentio/kafka-go"
)

func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
}

+ 62
- 0
internal/pkg/redis/redis.go 查看文件

@@ -0,0 +1,62 @@
package presenseredis

import (
"context"
"encoding/json"
"fmt"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
)

func LoadBeaconsList(client *redis.Client, ctx context.Context) map[string]model.Beacon {
beaconsList, err := client.Get(ctx, "beaconsList").Result()
beaconsMap := make(map[string]model.Beacon)

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
fmt.Println("no connection to redis")
} else {
json.Unmarshal([]byte(beaconsList), &beaconsMap)
}

return beaconsMap
}

func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model.Beacon {
latestList, err := client.Get(ctx, "latestList").Result()
latestMap := make(map[string]model.Beacon)

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
fmt.Println("no connection to redis")
} else {
json.Unmarshal([]byte(latestList), &latestMap)
}

return latestMap
}

func SaveBeaconsList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.Beacons.Lock.Lock()
data, _ := json.Marshal(appCtx.Beacons.Beacons)
appCtx.Beacons.Lock.Unlock()

err := client.Set(ctx, "beaconsList", data, 0).Err()
if err != nil {
fmt.Println("error in saving to redis: ", err)
}
}

func SaveLatestList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.LatestList.Lock.Lock()
data, _ := json.Marshal(appCtx.LatestList.LatestList)
appCtx.LatestList.Lock.Unlock()

err := client.Set(ctx, "latestList", data, 0).Err()
if err != nil {
fmt.Println("error in saving to redis: ", err)
}
}

+ 41
- 0
scripts/testAPI.sh 查看文件

@@ -0,0 +1,41 @@
#!/bin/bash
URL="http://127.0.0.1:1902/api/beacons"
BEACON_ID="C3000057B9F7"

echo "POST (create)"
curl -s -X POST $URL \
-H "Content-Type: application/json" \
-d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1","tx_power":-59,"rssi":-70}'
echo -e "\n"

sleep 1

echo "GET (list after create)"
curl -s -X GET $URL
echo -e "\n"

sleep 1

echo "PUT (update)"
curl -s -X PUT $URL \
-H "Content-Type: application/json" \
-d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1-updated","tx_power":-60}'
echo -e "\n"

sleep 1

echo "GET (list after update)"
curl -s -X GET $URL
echo -e "\n"

sleep 1

echo "DELETE"
curl -s -X DELETE "$URL/$BEACON_ID"
echo -e "\n"

sleep 1

echo "GET (list after delete)"
curl -s -X GET $URL
echo -e "\n"

Loading…
取消
儲存