package main import ( "context" "encoding/json" "fmt" "net/http" "strings" "time" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" ) func main() { HttpServer("0.0.0.0:1902") } func HttpServer(addr string) { headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) // Kafka writer that relays messages writer := kafkaWriter("kafka:9092", "apibeacons") defer writer.Close() r := mux.NewRouter() client := redis.NewClient(&redis.Options{ Addr: "valkey:6379", Password: "", }) // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } // All the functions should do is just relay messages to the decoder through Kafka func kafkaWriter(kafkaURL, topic string) *kafka.Writer { return &kafka.Writer{ Addr: kafka.TCP(kafkaURL), Topic: topic, Balancer: &kafka.LeastBytes{}, BatchSize: 100, BatchTimeout: 10 * time.Millisecond, } } func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) { valueStr, err := json.Marshal(&value) if err != nil { fmt.Println("error in encoding: ", err) } msg := kafka.Message{ Value: valueStr, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending kafka message: ") } } func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) beaconId := vars["beacon_id"] apiUpdate := model.ApiUpdate{ Method: "DELETE", ID: beaconId, } sendKafkaMessage(writer, &apiUpdate) w.Write([]byte("ok")) } } func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inBeacon model.Beacon err := decoder.Decode(&inBeacon) if err != nil { http.Error(w, err.Error(), 400) return } if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) { http.Error(w, "name and beacon_id cannot be blank", 400) return } apiUpdate := model.ApiUpdate{ Method: "POST", Beacon: inBeacon, } sendKafkaMessage(writer, &apiUpdate) w.Write([]byte("ok")) } } func beaconsListHandler(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { beaconsList, err := client.Get(context.Background(), "beaconsList").Result() if err == redis.Nil { fmt.Println("no beacons list, starting empty") http.Error(w, "list is empty", 500) } else if err != nil { http.Error(w, "Internal server error", 500) panic(err) } else { w.Write([]byte(beaconsList)) } } }