package main import ( "context" "encoding/json" "fmt" "net/http" "strings" "time" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Message struct { Type string `json:"type"` Data interface{} `json:"data"` } func main() { HttpServer("0.0.0.0:1902") } func HttpServer(addr string) { headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) // Kafka writer that relays messages writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons") defer writer.Close() settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") defer settingsWriter.Close() // Define if maybe ws writer should have more topics wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages") defer wsWriter.Close() r := mux.NewRouter() client := redis.NewClient(&redis.Options{ Addr: "valkey:6379", Password: "", }) // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") // Handler for WS messages // No point in having seperate route for each message type, better to handle different message types in one connection r.HandleFunc("/ws/api", handleWSApi(wsWriter)) http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } // Probably define value as interface and then reuse this writer in all of the functions func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { valueStr, err := json.Marshal(&value) if err != nil { fmt.Println("error in encoding: ", err) return false } msg := kafka.Message{ Value: valueStr, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending kafka message: ") return false } return true } func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) beaconId := vars["beacon_id"] apiUpdate := model.ApiUpdate{ Method: "DELETE", ID: beaconId, } flag := sendKafkaMessage(writer, &apiUpdate) if !flag { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inBeacon model.Beacon err := decoder.Decode(&inBeacon) if err != nil { http.Error(w, err.Error(), 400) return } if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) { http.Error(w, "name and beacon_id cannot be blank", 400) return } apiUpdate := model.ApiUpdate{ Method: "POST", Beacon: inBeacon, } flag := sendKafkaMessage(writer, &apiUpdate) if !flag { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func beaconsListHandler(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { beaconsList, err := client.Get(context.Background(), "beaconsList").Result() if err == redis.Nil { fmt.Println("no beacons list, starting empty") http.Error(w, "list is empty", 500) } else if err != nil { http.Error(w, "Internal server error", 500) panic(err) } else { w.Write([]byte(beaconsList)) } } } func settingsListHandler(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { settings, err := client.Get(context.Background(), "settings").Result() if err == redis.Nil { fmt.Println("no settings persisted, starting empty") http.Error(w, "list is empty", 500) } else if err != nil { http.Error(w, "Internal server error", 500) panic(err) } else { w.Write([]byte(settings)) } } } func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inSettings model.SettingsVal if err := decoder.Decode(&inSettings); err != nil { http.Error(w, err.Error(), 400) fmt.Println("Error in decoding Settings body: ", err) return } if !settingsCheck(inSettings) { http.Error(w, "values must be greater than 0", 400) fmt.Println("settings values must be greater than 0") return } valueStr, err := json.Marshal(&inSettings) if err != nil { http.Error(w, "Error in encoding settings", 500) fmt.Println("Error in encoding settings: ", err) return } msg := kafka.Message{ Value: valueStr, } if err := writer.WriteMessages(context.Background(), msg); err != nil { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func settingsCheck(settings model.SettingsVal) bool { if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 { return false } return true } func wsWriter(ws *websocket.Conn) { pingTicker := time.NewTicker(30 * time.Second) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() }() for { select { case <-beaconTicker.C: // What exactly is HTTP ... something to be used in locations // http_results_lock.RLock() // js, err := json.Marshal(http_results) // http_results_lock.RUnlock() // if err != nil { // js = []byte("error") // } ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, []byte{1}); err != nil { return } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func handleWSApi(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { fmt.Println("Error in upgrading connection: ", err) } return } defer ws.Close() go wsWriter(ws) for { _, msgBytes, err := ws.ReadMessage() if err != nil { fmt.Println("Connection closed: ", err) break } // What are the WS messages even? msg := kafka.Message{ Value: msgBytes, } fmt.Println("recieved WS message: ", msgBytes) if err := writer.WriteMessages(context.Background(), msg); err != nil { fmt.Println("Error in sending WS Kafka message: ", err) break } } } }