Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 
 

128 wiersze
3.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "strings"
  8. "time"
  9. "github.com/AFASystems/presence/internal/pkg/model"
  10. "github.com/gorilla/handlers"
  11. "github.com/gorilla/mux"
  12. "github.com/redis/go-redis/v9"
  13. "github.com/segmentio/kafka-go"
  14. )
  15. func main() {
  16. HttpServer("0.0.0.0:1902")
  17. }
  18. func HttpServer(addr string) {
  19. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  20. originsOk := handlers.AllowedOrigins([]string{"*"})
  21. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  22. // Kafka writer that relays messages
  23. writer := kafkaWriter("kafka:9092", "apibeacons")
  24. defer writer.Close()
  25. r := mux.NewRouter()
  26. client := redis.NewClient(&redis.Options{
  27. Addr: "kafka:6379",
  28. Password: "",
  29. })
  30. // For now just add beacon DELETE / GET / POST / PUT methods
  31. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
  32. r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
  33. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
  34. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
  35. http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
  36. }
  37. // All the functions should do is just relay messages to the decoder through Kafka
  38. func kafkaWriter(kafkaURL, topic string) *kafka.Writer {
  39. return &kafka.Writer{
  40. Addr: kafka.TCP(kafkaURL),
  41. Topic: topic,
  42. Balancer: &kafka.LeastBytes{},
  43. BatchSize: 100,
  44. BatchTimeout: 10 * time.Millisecond,
  45. }
  46. }
  47. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) {
  48. valueStr, err := json.Marshal(&value)
  49. if err != nil {
  50. fmt.Println("error in encoding: ", err)
  51. }
  52. msg := kafka.Message{
  53. Value: valueStr,
  54. }
  55. err = writer.WriteMessages(context.Background(), msg)
  56. if err != nil {
  57. fmt.Println("Error in sending kafka message: ")
  58. }
  59. }
  60. func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
  61. return func(w http.ResponseWriter, r *http.Request) {
  62. vars := mux.Vars(r)
  63. beaconId := vars["beacon_id"]
  64. apiUpdate := model.ApiUpdate{
  65. Method: "DELETE",
  66. ID: beaconId,
  67. }
  68. sendKafkaMessage(writer, &apiUpdate)
  69. w.Write([]byte("ok"))
  70. }
  71. }
  72. func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
  73. return func(w http.ResponseWriter, r *http.Request) {
  74. decoder := json.NewDecoder(r.Body)
  75. var inBeacon model.Beacon
  76. err := decoder.Decode(&inBeacon)
  77. if err != nil {
  78. http.Error(w, err.Error(), 400)
  79. return
  80. }
  81. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
  82. http.Error(w, "name and beacon_id cannot be blank", 400)
  83. return
  84. }
  85. apiUpdate := model.ApiUpdate{
  86. Method: "POST",
  87. Beacon: inBeacon,
  88. }
  89. sendKafkaMessage(writer, &apiUpdate)
  90. w.Write([]byte("ok"))
  91. }
  92. }
  93. func beaconsListHandler(client *redis.Client) http.HandlerFunc {
  94. return func(w http.ResponseWriter, r *http.Request) {
  95. beaconsList, err := client.Get(context.Background(), "beaconsList").Result()
  96. if err == redis.Nil {
  97. fmt.Println("no beacons list, starting empty")
  98. http.Error(w, "list is empty", 500)
  99. } else if err != nil {
  100. http.Error(w, "Internal server error", 500)
  101. panic(err)
  102. } else {
  103. w.Write([]byte(beaconsList))
  104. }
  105. }
  106. }