Ver a proveniência

feat: basis for decoding different beacon types

chore/restructure-decoder
Blaz Smehov há 2 semanas
ascendente
cometimento
8dd27e74d1
11 ficheiros alterados com 10467 adições e 225 eliminações
  1. +70
    -87
      build/docker-compose.yml
  2. +77
    -99
      cmd/decoder/main.go
  3. +3488
    -0
      cmd/decoder/save.txt
  4. +9
    -31
      cmd/server/main.go
  5. +3238
    -0
      cmd/testbench/debug.txt
  6. +86
    -0
      cmd/testbench/main.go
  7. +3488
    -0
      cmd/testbench/save.txt
  8. +7
    -5
      internal/pkg/bridge/mqtthandler/mqtthandler.go
  9. +1
    -2
      internal/pkg/config/config.go
  10. +2
    -0
      internal/pkg/kafkaclient/writer.go
  11. +1
    -1
      scripts/testAPI.sh

+ 70
- 87
build/docker-compose.yml Ver ficheiro

@@ -1,99 +1,82 @@
services:
# emqx:
# image: emqx/emqx:5.8.8
# container_name: emqx
# environment:
# - EMQX_DASHBOARD__DEFAULT_USERNAME=user
# - EMQX_DASHBOARD__DEFAULT_PASSWORD=pass
# ports:
# - "127.0.0.1:1883:1883"
# healthcheck:
# test: ["CMD", "curl", "-f", "http://localhost:18083/api/v5/status"]
# interval: 10s
# timeout: 5s
# retries: 10
# start_period: 20s

kafka:
image: apache/kafka:3.9.0
container_name: kafka
command:
- sh
- -c
- |
CLUSTER_ID=$$(/opt/kafka/bin/kafka-storage.sh random-uuid)
/opt/kafka/bin/kafka-storage.sh format --config /opt/kafka/config/kraft/server.properties --cluster-id $$CLUSTER_ID
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties &
pid=$!

# wait until Kafka is actually alive
until /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1; do
sleep 1
done

# create topic
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --if-not-exists --topic rawbeacons \
--partitions 1 --replication-factor 1

# mark ready
touch /tmp/ready

wait $pid
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 9092"]
interval: 10s
timeout: 5s
retries: 10
environment:
- KAFKA_NODE_ID=1
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_LOG_DIRS=/tmp/kraft-combined-logs
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
test: ["CMD-SHELL", "[ -f /tmp/ready ]"]
interval: 3s
timeout: 2s
retries: 20
ports:
- "127.0.0.1:9092:9092"

valkey:
image: valkey/valkey:9.0.0
container_name: valkey
ports:
- "127.0.0.1:6379:6379"

node-red:
image: nodered/node-red:latest-22
container_name: node-red
ports:
- "127.0.0.1:1880:1880"
volumes:
- "../volumes/node-red:/data"
# presense-decoder:
# build:
# context: ../
# dockerfile: build/package/Dockerfile.decoder
# image: presense-decoder
# container_name: presense-decoder
# environment:
# - REDIS_URL=valkey:6379
# - KAFKA_URL=kafka:9092
# depends_on:
# kafka:
# condition: service_healthy
# restart: always

presense-decoder:
build:
context: ../
dockerfile: build/package/Dockerfile.decoder
network: host
image: presense-decoder
container_name: presense-decoder
environment:
- REDIS_URL=valkey:6379
- KAFKA_URL=kafka:9092
depends_on:
- kafka
- valkey
restart: always
# presense-server:
# build:
# context: ../
# dockerfile: build/package/Dockerfile.server
# image: presense-server
# container_name: presense-server
# environment:
# - REDIS_URL=valkey:6379
# - KAFKA_URL=kafka:9092
# depends_on:
# kafka:
# condition: service_healthy

presense-server:
build:
context: ../
dockerfile: build/package/Dockerfile.server
network: host
image: presense-server
container_name: presense-server
environment:
- REDIS_URL=valkey:6379
- KAFKA_URL=kafka:9092
depends_on:
- kafka
# - emqx
ports:
- "127.0.0.1:1902:1902"
restart: always
# ports:
# - "127.0.0.1:1902:1902"
# restart: always

presense-bridge:
build:
context: ../
dockerfile: build/package/Dockerfile.bridge
network: host
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:9092
- MQTT_HOST=192.168.1.101:1883
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
depends_on:
kafka:
condition: service_healthy
restart: always
# presense-bridge:
# build:
# context: ../
# dockerfile: build/package/Dockerfile.bridge
# image: presense-bridge
# container_name: presense-bridge
# environment:
# - KAFKA_URL=kafka:9092
# - MQTT_HOST=192.168.1.101:1883
# - MQTT_USERNAME=user
# - MQTT_PASSWORD=pass
# depends_on:
# kafka:
# condition: service_healthy
# restart: always

+ 77
- 99
cmd/decoder/main.go Ver ficheiro

@@ -1,7 +1,7 @@
package main

import (
"context"
"encoding/hex"
"fmt"
"math"
"strconv"
@@ -11,8 +11,6 @@ import (
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
presenseredis "github.com/AFASystems/presence/internal/pkg/redis"
"github.com/redis/go-redis/v9"
)

func main() {
@@ -35,94 +33,65 @@ func main() {
},
}

cfg := config.Load()
fmt.Println("init")

// Kafka writer idk why yet
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons")
defer writer.Close()
cfg := config.Load()

// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
defer rawReader.Close()

// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
defer apiReader.Close()

// Kafka reader for latest list updates
latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
defer latestReader.Close()

// Kafka reader for settings updates
settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID")
defer settingsReader.Close()
// apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
// defer apiReader.Close()

ctx := context.Background()
// // Kafka reader for latest list updates
// latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "gid-latest")
// defer latestReader.Close()

// Init Redis Client
client := redis.NewClient(&redis.Options{
Addr: cfg.RedisURL,
Password: "",
})

beaconsList := presenseredis.LoadBeaconsList(client, ctx)
appCtx.Beacons.Beacons = beaconsList

latestList := presenseredis.LoadLatestList(client, ctx)
appCtx.LatestList.LatestList = latestList

settings := presenseredis.LoadSettings(client, ctx)
appCtx.Settings.Settings = settings
// // Kafka reader for settings updates
// settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "gid-settings")
// defer settingsReader.Close()

// declare channel for collecting Kafka messages
chRaw := make(chan model.Incoming_json, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chLatest := make(chan model.Incoming_json, 2000)
chSettings := make(chan model.SettingsVal, 10)
// chApi := make(chan model.ApiUpdate, 2000)
// chLatest := make(chan model.Incoming_json, 2000)
// chSettings := make(chan model.SettingsVal, 10)

go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
go kafkaclient.Consume(latestReader, chLatest)
go kafkaclient.Consume(settingsReader, chSettings)

go func() {
// Syncing Redis cache every 1s with 2 lists: beacons, latest list
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
presenseredis.SaveBeaconsList(&appCtx, client, ctx)
presenseredis.SaveLatestList(&appCtx, client, ctx)
presenseredis.SaveSettings(&appCtx, client, ctx)
}
}()
// go kafkaclient.Consume(apiReader, chApi)
// go kafkaclient.Consume(latestReader, chLatest)
// go kafkaclient.Consume(settingsReader, chSettings)

for {
select {
case msg := <-chRaw:
processIncoming(msg, &appCtx)
case msg := <-chApi:
switch msg.Method {
case "POST":
appCtx.Beacons.Lock.Lock()
appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
case "DELETE":
_, exists := appCtx.Beacons.Beacons[msg.ID]
if exists {
appCtx.Beacons.Lock.Lock()
delete(appCtx.Beacons.Beacons, msg.ID)
}
default:
fmt.Println("unknown method: ", msg.Method)
}
appCtx.Beacons.Lock.Unlock()
case msg := <-chLatest:
fmt.Println("latest msg: ", msg)
case msg := <-chSettings:
appCtx.Settings.Lock.Lock()
appCtx.Settings.Settings = msg
fmt.Println("settings channel: ", msg)
appCtx.Settings.Lock.Unlock()
// case msg := <-chApi:
// switch msg.Method {
// case "POST":
// fmt.Println("Incoming POST")
// appCtx.Beacons.Lock.Lock()
// appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
// case "DELETE":
// fmt.Println("Incoming delete")
// _, exists := appCtx.Beacons.Beacons[msg.ID]
// if exists {
// appCtx.Beacons.Lock.Lock()
// delete(appCtx.Beacons.Beacons, msg.ID)
// }
// default:
// fmt.Println("unknown method: ", msg.Method)
// }
// appCtx.Beacons.Lock.Unlock()
// case msg := <-chLatest:
// fmt.Println("latest msg: ", msg)
// case msg := <-chSettings:
// appCtx.Settings.Lock.Lock()
// appCtx.Settings.Settings = msg
// fmt.Println("settings channel: ", msg)
// appCtx.Settings.Lock.Unlock()
}
}
}
@@ -134,46 +103,57 @@ func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) {
}
}()

fmt.Println("message came")

incoming = mqttclient.IncomingBeaconFilter(incoming)
// Get ID
id := mqttclient.GetBeaconID(incoming)
now := time.Now().Unix()
fmt.Println(incoming.Data)

beacons := &ctx.Beacons

beacons.Lock.Lock()
defer beacons.Lock.Unlock()

latestList := &ctx.LatestList

latestList.Lock.Lock()
defer latestList.Lock.Unlock()
incoming = mqttclient.IncomingBeaconFilter(incoming)

beacon, exists := beacons.Beacons[id]
if !exists {
x, exists := latestList.LatestList[id]
if exists {
x.Last_seen = now
x.Incoming_JSON = incoming
x.Distance = getBeaconDistance(incoming)
latestList.LatestList[id] = x
} else {
latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)}
}
// Move this to seperate routine?
for k, v := range latestList.LatestList {
if (now - v.Last_seen) > 10 {
delete(latestList.LatestList, k)
}
}
return
}

fmt.Printf("%+v\n", beacon)

updateBeacon(&beacon, incoming)
beacons.Beacons[id] = beacon
}

func processBeacon(hexStr string) {
b, _ := hex.DecodeString(hexStr)

if len(b) > 2 && b[0] == 0x02 && b[1] == 0x01 {
b = b[2+int(b[0]):]
}

ads := ParseADFast(b)
_ = ads
}

func ParseADFast(b []byte) [][2]int {
var res [][2]int
i := 0

for i < len(b) {
l := int(b[i])
if l == 0 || i+1+l > len(b) {
break
}

res = append(res, [2]int{i, i + 1 + l})

i += 1 + l
}

return res
}

func getBeaconDistance(incoming model.Incoming_json) float64 {
rssi := incoming.RSSI
power := incoming.TX_power
@@ -200,7 +180,7 @@ func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) {
beacon.HB_ButtonMode = incoming.HB_ButtonMode

if beacon.Beacon_metrics == nil {
beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now
beacon.Beacon_metrics = make([]model.BeaconMetric, 10)
}

metric := model.BeaconMetric{}
@@ -209,8 +189,6 @@ func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) {
metric.Rssi = int64(incoming.RSSI)
metric.Location = incoming.Hostname
beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric)

// Leave the HB button implementation for now
}

func twos_comp(inp string) int64 {


+ 3488
- 0
cmd/decoder/save.txt
A apresentação das diferenças no ficheiro foi suprimida por ser demasiado grande
Ver ficheiro


+ 9
- 31
cmd/server/main.go Ver ficheiro

@@ -44,11 +44,6 @@ func HttpServer(addr string) {

r := mux.NewRouter()

client := redis.NewClient(&redis.Options{
Addr: "valkey:6379",
Password: "",
})

// declare WS clients list | do I need it though? or will locations worker send message
// to kafka and then only this service (server) is being used for communication with the clients
clients := make(map[*websocket.Conn]bool)
@@ -58,17 +53,17 @@ func HttpServer(addr string) {

// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
// r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")

r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
// r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")

// Handler for WS messages
// No point in having seperate route for each message type, better to handle different message types in one connection
r.HandleFunc("/ws/api/beacons", serveWs(client))
r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
// r.HandleFunc("/ws/api/beacons", serveWs(client))
// r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))

http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
@@ -103,6 +98,8 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
ID: beaconId,
}

fmt.Println("Sending DELETE message")

flag := sendKafkaMessage(writer, &apiUpdate)
if !flag {
fmt.Println("error in sending Kafka message")
@@ -125,6 +122,8 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
return
}

fmt.Println("sending POST message")

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
@@ -148,32 +147,11 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {

func beaconsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
beaconsList, err := client.Get(context.Background(), "beaconsList").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(beaconsList))
}
}
}

func settingsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
settings, err := client.Get(context.Background(), "settings").Result()
if err == redis.Nil {
fmt.Println("no settings persisted, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(settings))
}
}
return func(w http.ResponseWriter, r *http.Request) {}
}

func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {


+ 3238
- 0
cmd/testbench/debug.txt
A apresentação das diferenças no ficheiro foi suprimida por ser demasiado grande
Ver ficheiro


+ 86
- 0
cmd/testbench/main.go Ver ficheiro

@@ -0,0 +1,86 @@
package main

import (
"bufio"
"encoding/hex"
"fmt"
"log"
"os"
"strings"
)

func main() {
file, err := os.Open("save.txt")
if err != nil {
log.Fatalf("Failed to open file: %s", err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
decodeBeacon(line)
}
}

func decodeBeacon(beacon string) {
beacon = strings.TrimSpace(beacon)
if beacon == "" {
return
}

// convert to bytes for faster operations
b, err := hex.DecodeString(beacon)
if err != nil {
fmt.Println("invalid line: ", beacon)
return
}

// remove flag bytes - they hold no structural information
if len(b) > 1 && b[1] == 0x01 {
l := int(b[0])
if 1+l <= len(b) {
b = b[1+l:]
}
}

adBlockIndeces := parseADFast(b)
for _, r := range adBlockIndeces {
ad := b[r[0]:r[1]]
if len(ad) >= 4 &&
ad[1] == 0x16 &&
ad[2] == 0xAA &&
ad[3] == 0xFE {
// fmt.Println("Eddystone:", hex.EncodeToString(b))
return
}
if len(ad) >= 7 &&
ad[1] == 0xFF &&
ad[2] == 0x4C && ad[3] == 0x00 &&
ad[4] == 0x02 && ad[5] == 0x15 {
// fmt.Println("iBeacon:", hex.EncodeToString(b))
return
}

}

fmt.Println(hex.EncodeToString(b))
}

func parseADFast(b []byte) [][2]int {
var res [][2]int
i := 0

for i < len(b) {
l := int(b[i])
if l == 0 || i+1+l > len(b) {
break
}

res = append(res, [2]int{i, i + 1 + l})

i += 1 + l
}

return res
}

+ 3488
- 0
cmd/testbench/save.txt
A apresentação das diferenças no ficheiro foi suprimida por ser demasiado grande
Ver ficheiro


+ 7
- 5
internal/pkg/bridge/mqtthandler/mqtthandler.go Ver ficheiro

@@ -1,13 +1,13 @@
package mqtthandler

import (
"fmt"
"context"
"encoding/json"
"strings"
"fmt"
"log"
"strconv"
"os"
"context"
"strconv"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
@@ -49,6 +49,8 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in writing to Kafka: ", err)
time.Sleep(1 * time.Second)
break
}

fmt.Println("message sent: ", time.Now())
@@ -102,4 +104,4 @@ func parseButtonState(raw string) int64 {
}

return 0
}
}

+ 1
- 2
internal/pkg/config/config.go Ver ficheiro

@@ -26,12 +26,11 @@ func Load() *Config {
return &Config{
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"),
WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"),
MQTTHost: getEnv("MQTT_HOST", "127.0.0.1:11883"),
MQTTHost: getEnv("MQTT_HOST", "192.168.1.101:1883"),
MQTTUser: getEnv("MQTT_USERNAME", "user"),
MQTTPass: getEnv("MQTT_PASSWORD", "pass"),
MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"),
DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
RedisURL: getEnv("REDIS_URL", "127.0.0.1:6379"),
}
}

+ 2
- 0
internal/pkg/kafkaclient/writer.go Ver ficheiro

@@ -11,6 +11,8 @@ func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}


+ 1
- 1
scripts/testAPI.sh Ver ficheiro

@@ -1,6 +1,6 @@
#!/bin/bash
URL="http://127.0.0.1:1902/api/beacons"
BEACON_ID="C3000057B9F7"
BEACON_ID="E017085443A7"

echo "POST (create)"
curl -s -X POST $URL \


Carregando…
Cancelar
Guardar