瀏覽代碼

feat: boilerplate for settings handler, chore: refactor kafka client package name

chore/proposed-structure
Blaz Smehov 4 週之前
父節點
當前提交
78fad3cbcb
共有 4 個檔案被更改,包括 131 行新增27 行删除
  1. +2
    -2
      cmd/bridge/main.go
  2. +26
    -9
      cmd/decoder/main.go
  3. +88
    -16
      cmd/server/main.go
  4. +15
    -0
      internal/pkg/redis/redis.go

+ 2
- 2
cmd/bridge/main.go 查看文件

@@ -5,7 +5,7 @@ import (

"github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafka"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)
@@ -35,7 +35,7 @@ func main() {

fmt.Println("Successfuly connected to MQTT broker")

writer := kafka.KafkaWriter(cfg.KafkaURL, "rawbeacons")
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons")
defer writer.Close()

err = cli.Subscribe(&client.SubscribeOptions{


+ 26
- 9
cmd/decoder/main.go 查看文件

@@ -8,7 +8,7 @@ import (
"time"

"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafka"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
presenseredis "github.com/AFASystems/presence/internal/pkg/redis"
@@ -36,26 +36,36 @@ func main() {
LatestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon),
},
Settings: model.Settings{
Location_confidence: 4,
Last_seen_threshold: 15,
Beacon_metrics_size: 30,
HA_send_interval: 5,
HA_send_changes_only: false,
},
}

cfg := config.Load()

// Kafka writer idk why yet
writer := kafka.KafkaWriter(cfg.KafkaURL, "beacons")
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons")
defer writer.Close()

// Kafka reader for Raw MQTT beacons
rawReader := kafka.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
defer rawReader.Close()

// Kafka reader for API server updates
apiReader := kafka.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
defer apiReader.Close()

// Kafka reader for latest list updates
latestReader := kafka.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
defer latestReader.Close()

defer writer.Close()
// Kafka reader for settings updates
settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID")
defer settingsReader.Close()

ctx := context.Background()

@@ -71,14 +81,19 @@ func main() {
latestList := presenseredis.LoadLatestList(client, ctx)
appCtx.LatestList.LatestList = latestList

settings := presenseredis.LoadSettings(client, ctx)
appCtx.Settings = settings

// declare channel for collecting Kafka messages
chRaw := make(chan model.Incoming_json, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chLatest := make(chan model.Incoming_json, 2000)
chSettings := make(chan model.Settings, 10)

go kafka.Consume(rawReader, chRaw)
go kafka.Consume(apiReader, chApi)
go kafka.Consume(latestReader, chLatest)
go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
go kafkaclient.Consume(latestReader, chLatest)
go kafkaclient.Consume(settingsReader, chSettings)

go func() {
// Syncing Redis cache every 1s with 2 lists: beacons, latest list
@@ -112,6 +127,8 @@ func main() {
appCtx.Beacons.Lock.Unlock()
case msg := <-chLatest:
fmt.Println("latest msg: ", msg)
case msg := <-chSettings:
fmt.Println("settings channel: ", msg)
}
}
}


+ 88
- 16
cmd/server/main.go 查看文件

@@ -6,8 +6,8 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
@@ -25,9 +25,12 @@ func HttpServer(addr string) {
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

// Kafka writer that relays messages
writer := kafkaWriter("kafka:9092", "apibeacons")
writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons")
defer writer.Close()

settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings")
defer settingsWriter.Close()

r := mux.NewRouter()

client := redis.NewClient(&redis.Options{
@@ -40,25 +43,20 @@ func HttpServer(addr string) {
r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
}

// All the functions should do is just relay messages to the decoder through Kafka
r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")

func kafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
}

func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) {
// This looks wrong, should handle error somehow

func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
valueStr, err := json.Marshal(&value)
if err != nil {
fmt.Println("error in encoding: ", err)
return false
}
msg := kafka.Message{
Value: valueStr,
@@ -67,7 +65,10 @@ func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) {
err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in sending kafka message: ")
return false
}

return true
}

func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
@@ -79,7 +80,13 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
ID: beaconId,
}

sendKafkaMessage(writer, &apiUpdate)
flag := sendKafkaMessage(writer, &apiUpdate)
if !flag {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}

w.Write([]byte("ok"))
}
}
@@ -105,7 +112,12 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
Beacon: inBeacon,
}

sendKafkaMessage(writer, &apiUpdate)
flag := sendKafkaMessage(writer, &apiUpdate)
if !flag {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}

w.Write([]byte("ok"))
}
@@ -125,3 +137,63 @@ func beaconsListHandler(client *redis.Client) http.HandlerFunc {
}
}
}

func settingsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
settings, err := client.Get(context.Background(), "settings").Result()
if err == redis.Nil {
fmt.Println("no settings persisted, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(settings))
}
}
}

func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inSettings model.Settings
if err := decoder.Decode(&inSettings); err != nil {
http.Error(w, err.Error(), 400)
fmt.Println("Error in decoding Settings body: ", err)
return
}

if !settingsCheck(inSettings) {
http.Error(w, "values must be greater than 0", 400)
fmt.Println("settings values must be greater than 0")
return
}

valueStr, err := json.Marshal(&inSettings)
if err != nil {
http.Error(w, "Error in encoding settings", 500)
fmt.Println("Error in encoding settings: ", err)
return
}

msg := kafka.Message{
Value: valueStr,
}

if err := writer.WriteMessages(context.Background(), msg); err != nil {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}

w.Write([]byte("ok"))
}
}

func settingsCheck(settings model.Settings) bool {
if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 {
return false
}

return true
}

+ 15
- 0
internal/pkg/redis/redis.go 查看文件

@@ -39,6 +39,21 @@ func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model.
return latestMap
}

func LoadSettings(client *redis.Client, ctx context.Context) model.Settings {
redisSettings, err := client.Get(ctx, "settings").Result()
var settings model.Settings

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
fmt.Println("no connection to redis")
} else {
json.Unmarshal([]byte(redisSettings), &settings)
}

return settings
}

func SaveBeaconsList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.Beacons.Lock.Lock()
data, _ := json.Marshal(appCtx.Beacons.Beacons)


Loading…
取消
儲存