Sfoglia il codice sorgente

feat: add kafka server connection to decoder, api server (beacons route only)

chore/proposed-structure
Blaz Smehov 1 mese fa
parent
commit
0674ca9112
3 ha cambiato i file con 180 aggiunte e 8 eliminazioni
  1. +47
    -8
      cmd/decoder/main.go
  2. +127
    -0
      cmd/server/main.go
  3. +6
    -0
      internal/pkg/model/types.go

+ 47
- 8
cmd/decoder/main.go Vedi File

@@ -71,19 +71,59 @@ func main() {

// declare channel for collecting Kafka messages
chRaw := make(chan model.Incoming_json, 2000)
chApi := make(chan model.Incoming_json, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chLatest := make(chan model.Incoming_json, 2000)

go consume(rawReader, chRaw)
go consume(apiReader, chApi)
go consume(latestReader, chLatest)

go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
appCtx.Beacons.Lock.Lock()
data, _ := json.Marshal(appCtx.Beacons.Beacons)
appCtx.Beacons.Lock.Unlock()

err := client.Set(ctx, "beaconsList", data, 0).Err()
if err != nil {
fmt.Println("error saving to redis:", err)
}

appCtx.LatestList.Lock.Lock()
ldata, _ := json.Marshal(appCtx.LatestList.LatestList)
appCtx.LatestList.Lock.Unlock()

err = client.Set(ctx, "latestList", ldata, 0).Err()
if err != nil {
fmt.Println("error saving latest list:", err)
}
}
}()

for {
select {
case msg := <-chRaw:
processIncoming(msg, &appCtx)
case msg := <-chApi:
fmt.Println("api msg: ", msg)
switch msg.Method {
case "POST":
fmt.Println("method POST")
appCtx.Beacons.Lock.Lock()
appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
case "DELETE":
_, exists := appCtx.Beacons.Beacons[msg.ID]
if exists {
appCtx.Beacons.Lock.Lock()
delete(appCtx.Beacons.Beacons, msg.ID)
}
fmt.Println("method DELETE")
default:
fmt.Println("unknown method: ", msg.Method)
}
appCtx.Beacons.Lock.Unlock()
case msg := <-chLatest:
fmt.Println("latest msg: ", msg)
}
@@ -111,7 +151,7 @@ func kafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
})
}

func consume(r *kafka.Reader, ch chan<- model.Incoming_json) {
func consume[T any](r *kafka.Reader, ch chan<- T) {
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
@@ -119,14 +159,13 @@ func consume(r *kafka.Reader, ch chan<- model.Incoming_json) {
continue
}

var incoming model.Incoming_json

if err := json.Unmarshal(msg.Value, &incoming); err != nil {
fmt.Println("error in decoding string: ", err)
var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}

ch <- incoming
ch <- data
}
}



+ 127
- 0
cmd/server/main.go Vedi File

@@ -0,0 +1,127 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

func main() {
HttpServer("127.0.0.1:1902")
}

func HttpServer(addr string) {
headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

// Kafka writer that relays messages
writer := kafkaWriter("127.0.0.1:9092", "apibeacons")
defer writer.Close()

r := mux.NewRouter()

client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})

// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
}

// All the functions should do is just relay messages to the decoder through Kafka

func kafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}
}

func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) {
valueStr, err := json.Marshal(&value)
if err != nil {
fmt.Println("error in encoding: ", err)
}
msg := kafka.Message{
Value: valueStr,
}

err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in sending kafka message: ")
}
}

func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
beaconId := vars["beacon_id"]
apiUpdate := model.ApiUpdate{
Method: "DELETE",
ID: beaconId,
}

sendKafkaMessage(writer, &apiUpdate)
w.Write([]byte("ok"))
}
}

func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon
err := decoder.Decode(&inBeacon)

if err != nil {
http.Error(w, err.Error(), 400)
return
}

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
}

apiUpdate := model.ApiUpdate{
Method: "POST",
Beacon: inBeacon,
}

sendKafkaMessage(writer, &apiUpdate)

w.Write([]byte("ok"))
}
}

func beaconsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
beaconsList, err := client.Get(context.Background(), "beaconsList").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(beaconsList))
}
}
}

+ 6
- 0
internal/pkg/model/types.go Vedi File

@@ -193,6 +193,12 @@ type AppContext struct {
LatestList LatestBeaconsList
}

type ApiUpdate struct {
Method string
Beacon Beacon
ID string
}

var World = []byte("presence")

var Db *bolt.DB


Caricamento…
Annulla
Salva