From f27512f183ee20a82461a71d041353af1450bec6 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Thu, 15 Jan 2026 14:09:49 +0100 Subject: [PATCH] feat: add api endpoint for parser registry CRUD operations --- SCORE.md | 415 +++++++++++++++++++ build/package/Dockerfile.bridge | 2 - cmd/decoder/main.go | 33 +- cmd/{decoder => server}/config.json | 0 cmd/server/main.go | 33 ++ internal/pkg/common/utils/beacons.go | 4 +- internal/pkg/controller/parser_controller.go | 112 +++++ internal/pkg/database/database.go | 2 +- internal/pkg/model/parser.go | 31 +- internal/pkg/service/parser_service.go | 23 + 10 files changed, 624 insertions(+), 31 deletions(-) create mode 100644 SCORE.md rename cmd/{decoder => server}/config.json (100%) create mode 100644 internal/pkg/controller/parser_controller.go create mode 100644 internal/pkg/service/parser_service.go diff --git a/SCORE.md b/SCORE.md new file mode 100644 index 0000000..83627e1 --- /dev/null +++ b/SCORE.md @@ -0,0 +1,415 @@ +# Code Review: AFASystems Presence Detection System + +**Date**: 2026-01-15 +**Reviewer**: Claude Code +**Project**: BLE Beacon Presence Detection System + +--- + +## Overall Assessment + +Your system is a well-structured microservices architecture for BLE beacon presence detection. The code is functional but has several areas that need refactoring for production readiness, maintainability, and robustness. + +**Code Quality Score**: 6.5/10 + +- ✅ Good architecture and separation +- ✅ Thread-safe concurrent access +- ❌ No testing +- ❌ Poor error handling +- ❌ Security concerns +- ❌ Code duplication + +--- + +## 🔴 Critical Issues (Fix Immediately) + +### 1. Hardcoded Credentials in Config +**Location**: [internal/pkg/config/config.go:46-49](internal/pkg/config/config.go#L46) +**Risk**: Security vulnerability - default credentials exposed in source code + +```go +ClientSecret: getEnv("ClientSecret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC"), +HTTPPassword: getEnv("HTTPPassword", "C0r3_us3r_Cr3d3nt14ls"), +``` + +**Fix**: Remove default credentials, require explicit environment configuration: +```go +ClientSecret: getEnvOrFatal("ClientSecret"), // Helper that panics if not set +HTTPPassword: getEnvOrFatal("HTTPPassword"), +``` + +### 2. Global Database Variable +**Location**: [internal/pkg/database/database.go:12](internal/pkg/database/database.go#L12) +```go +var DB *gorm.DB // ❌ Global variable +``` + +**Issues**: +- Hard to test +- Implicit dependencies +- Cannot have multiple DB connections + +**Fix**: Return `*gorm.DB` from `Connect()` and inject it into services. + +### 3. Missing Error Context +Errors are logged but lose context: +```go +fmt.Println("Error in sending Kafka message:", err) // ❌ No context +``` + +**Fix**: Use structured logging: +```go +slog.Error("failed to send kafka message", + "topic", topic, + "beacon_id", id, + "error", err) +``` + +### 4. Unsafe Map Access +**Location**: [cmd/bridge/main.go:28](cmd/bridge/main.go#L28) +```go +hostname := strings.Split(topic, "/")[1] // ❌ Panic if index doesn't exist +``` + +**Fix**: Validate before accessing: +```go +parts := strings.Split(topic, "/") +if len(parts) < 2 { + slog.Warn("invalid topic format", "topic", topic) + return +} +hostname := parts[1] +``` + +--- + +## 🟡 High Priority Refactoring (Fix Soon) + +### 5. Code Duplication Across Services +**Affected Files**: All 4 main files + +**Pattern**: Logging setup (lines 124-131 in all services) +```go +logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) +w := io.MultiWriter(os.Stderr, logFile) +logger := slog.New(slog.NewJSONHandler(w, nil)) +slog.SetDefault(logger) +``` + +**Refactor**: Create `internal/pkg/common/logger/logger.go`: +```go +func SetupLogger(filename string) *slog.Logger { + logFile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v\n", err) + } + w := io.MultiWriter(os.Stderr, logFile) + logger := slog.New(slog.NewJSONHandler(w, nil)) + slog.SetDefault(logger) + return logger +} +``` + +### 6. Inconsistent Error Handling +Mixed error handling patterns across codebase: + +**In controller**: [internal/pkg/controller/trackers_controller.go:48](internal/pkg/controller/trackers_controller.go#L48) +```go +if err != nil { + fmt.Println("error in sending Kafka POST message") + http.Error(w, "Error in sending kafka message", 500) + return +} +``` + +**In main**: [cmd/decoder/main.go:97](cmd/decoder/main.go#L97) +```go +if err != nil { + eMsg := fmt.Sprintf("Error in decoding: %v", err) + fmt.Println(eMsg) + return +} +``` + +**Fix**: Use consistent error handling with structured responses. + +### 7. Missing Configuration Validation +**Location**: [internal/pkg/config/config.go](internal/pkg/config/config.go) + +Config doesn't validate required fields: +```go +func Load() *Config { + return &Config{ + // No validation + } +} +``` + +**Fix**: Add validation: +```go +func (c *Config) Validate() error { + if c.DBHost == "" { + return errors.New("DBHost is required") + } + if c.KafkaURL == "" { + return errors.New("KafkaURL is required") + } + // ... other validations + return nil +} +``` + +### 8. Potential Memory Inefficiency +**Location**: [cmd/location/main.go:217-222](cmd/location/main.go#L217) +```go +if len(beacon.BeaconMetrics) >= settings.BeaconMetricSize { + copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:]) + beacon.BeaconMetrics[settings.BeaconMetricSize-1] = metric +} else { + beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric) +} +``` + +**Issue**: This logic is correct but could be optimized with a circular buffer. + +--- + +## 🟢 Medium Priority Improvements (Plan Refactor) + +### 9. Tight Coupling in Controllers +Controllers directly use `*gorm.DB` instead of repository pattern: + +```go +func TrackerAdd(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc +``` + +**Refactor**: Introduce repository interface: +```go +type TrackerRepository interface { + Create(tracker *model.Tracker) error + Find(id string) (*model.Tracker, error) + // ... +} + +func TrackerAdd(repo TrackerRepository, writer *kafka.Writer, ctx context.Context) http.HandlerFunc +``` + +### 10. Poor Separation of Concerns +**Location**: [internal/pkg/common/appcontext/context.go](internal/pkg/common/appcontext/context.go) + +The AppState mixes state management with Kafka client creation (lines 60-76). + +**Refactor**: Separate concerns: +```go +// internal/pkg/infrastructure/kafka/pool.go +type KafkaPool struct { + writers []*kafka.Writer + readers []*kafka.Reader +} + +// internal/pkg/domain/appstate/state.go +type AppState struct { + beacons *model.BeaconsList + settings *model.Settings + kafkaPool *kafka.KafkaPool // Composed, not owned +} +``` + +### 11. Magic Numbers +**Location**: [cmd/location/main.go:107-108](cmd/location/main.go#L107) +```go +seenW := 1.5 // What does this mean? +rssiW := 0.75 // What does this mean? +``` + +**Fix**: Extract to named constants in settings: +```go +type SettingsVal struct { + LocationConfidence int64 `json:"location_confidence"` + SeenWeight float64 `json:"seen_weight"` // 1.5 + RSSIWeight float64 `json:"rssi_weight"` // 0.75 + // ... +} +``` + +### 12. Inefficient JSON Marshaling +**Location**: [cmd/server/main.go:206-207](cmd/server/main.go#L206) +```go +js, err := json.Marshal(list) +if err != nil { + js = []byte("error") // ❌ Invalid JSON! +} +``` + +**Fix**: Return proper error response: +```go +if err != nil { + http.Error(w, "Failed to marshal trackers", http.StatusInternalServerError) + return +} +``` + +--- + +## 🔵 Low Priority / Technical Debt + +### 13. Zero Test Coverage +The codebase has **no test files**. This is critical for production systems. + +**Recommendation**: Add unit tests for: +- All business logic in `service/` package +- Controller handlers +- Kafka message processing +- Beacon parsing logic + +**Target**: 70%+ code coverage + +### 14. Missing Context Timeouts +**Location**: [cmd/bridge/main.go:68](cmd/bridge/main.go#L68) +```go +err = writer.WriteMessages(context.Background(), msg) +``` + +**Fix**: Use timeouts: +```go +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() +err = writer.WriteMessages(ctx, msg) +``` + +### 15. Inefficient String Concatenation +**Location**: [cmd/bridge/main.go:182](cmd/bridge/main.go#L182) +```go +lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) +slog.Info(lMsg) +``` + +**Fix**: Direct logging: +```go +slog.Info("beacon added to lookup", "id", id) +``` + +### 16. Dead Code +**Location**: [cmd/bridge/main.go:76-103](cmd/bridge/main.go#L76) + +Large block of commented code should be removed. + +### 17. Incomplete Graceful Shutdown +**Location**: [cmd/bridge/main.go:212](cmd/bridge/main.go#L212) + +The MQTT client disconnects with timeout but doesn't wait for pending messages: +```go +client.Disconnect(250) // Only waits 250ms +``` + +### 18. No Health Checks +Services don't expose health endpoints for orchestration systems (Kubernetes, etc.). + +--- + +## 📊 Architecture Recommendations + +### 1. Implement Dependency Injection +Instead of passing `db`, `writer`, `ctx` to controllers, create a service container: + +```go +type Services struct { + DB *gorm.DB + KafkaWriter *kafka.Writer + AppState *appcontext.AppState +} + +func (s *Services) TrackerAddController() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Use s.DB, s.KafkaWriter + } +} +``` + +### 2. Add Observability +- **Structured logging** with request IDs +- **Metrics** (Prometheus) for: + - Kafka message throughput + - Beacon processing latency + - Database query performance +- **Distributed tracing** (OpenTelemetry) + +### 3. Implement Circuit Breakers +For external API calls in `apiclient` package to handle failures gracefully. + +### 4. Add Message Validation +Validate Kafka messages before processing: +```go +func (adv *BeaconAdvertisement) Validate() error { + if adv.ID == "" { + return errors.New("beacon ID is required") + } + if adv.RSSI < -100 || adv.RSSI > 0 { + return errors.New("invalid RSSI value") + } + return nil +} +``` + +--- + +## 🎯 Refactoring Priority Order + +| Priority | Category | Actions | +|----------|----------|---------| +| 1 | 🔴 Security | Remove hardcoded credentials from config | +| 2 | 🔴 Stability | Fix unsafe map/array access | +| 3 | 🔴 Testing | Add unit tests (aim for 70%+ coverage) | +| 4 | 🟡 Error Handling | Implement structured error handling | +| 5 | 🟡 Logging | Standardize to structured logging throughout | +| 6 | 🟡 Code Quality | Extract duplicated code to shared packages | +| 7 | 🟢 Architecture | Implement dependency injection gradually | +| 8 | 🔵 Performance | Optimize hot paths (beacon processing) | + +--- + +## 📈 Metrics Summary + +| Category | Count | Status | +|----------|-------|--------| +| 🔴 Critical Issues | 4 | Fix Immediately | +| 🟡 High Priority | 4 | Fix Soon | +| 🟢 Medium Priority | 4 | Plan Refactor | +| 🔵 Low Priority | 6 | Technical Debt | + +**Total Issues Identified**: 18 + +--- + +## System Architecture Overview + +The system consists of 4 microservices: + +1. **Bridge** ([cmd/bridge/main.go](cmd/bridge/main.go)) - MQTT to Kafka bridge +2. **Decoder** ([cmd/decoder/main.go](cmd/decoder/main.go)) - BLE beacon decoder +3. **Location** ([cmd/location/main.go](cmd/location/main.go)) - Location calculation service +4. **Server** ([cmd/server/main.go](cmd/server/main.go)) - HTTP API & WebSocket server + +### Communication Flow +``` +MQTT Gateway → Bridge (Kafka) → Decoder (Kafka) → Location (Kafka) → Server (Kafka) + ↓ ↑ + External API ←─────────────────────────────────────────────── +``` + +### Technology Stack +- **Language**: Go 1.24.0 +- **Message Broker**: Apache Kafka +- **Database**: PostgreSQL with GORM +- **Cache**: Redis (valkey) +- **MQTT**: Eclipse Paho +- **HTTP**: Gorilla Mux + WebSocket +- **Deployment**: Docker Compose + +--- + +## Conclusion + +The codebase demonstrates solid understanding of microservices architecture with good separation of concerns. The concurrent access patterns using `sync.RWMutex` are well-implemented. However, the system needs significant hardening before production deployment, particularly in areas of security, testing, and error handling. + +Focus on addressing critical security issues first, then build out test coverage to ensure reliability as you refactor other areas of the codebase. diff --git a/build/package/Dockerfile.bridge b/build/package/Dockerfile.bridge index 85d4c35..3f067bc 100644 --- a/build/package/Dockerfile.bridge +++ b/build/package/Dockerfile.bridge @@ -1,5 +1,3 @@ -# syntax=docker/dockerfile:1 - FROM golang:1.24.0 AS builder WORKDIR /app diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index 6aad8fb..bf221bc 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "fmt" "io" "log" @@ -31,21 +30,7 @@ func main() { cfg := config.Load() parserRegistry := model.ParserRegistry{ - ParserList: make([]model.BeaconParser, 0), - } - - configFile, err := os.Open("/app/cmd/decoder/config.json") - if err != nil { - panic(err) - } - - b, _ := io.ReadAll(configFile) - - var configs []model.Config - json.Unmarshal(b, &configs) - - for _, config := range configs { - parserRegistry.Register(config.Name, config) + ParserList: make(map[string]model.BeaconParser), } // Create log file @@ -63,15 +48,18 @@ func main() { defer stop() rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") + parserReader := appState.AddKafkaReader(cfg.KafkaURL, "parser", "gid-parser") alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") slog.Info("Decoder initialized, subscribed to Kafka topics") chRaw := make(chan model.BeaconAdvertisement, 2000) + chParser := make(chan model.KafkaParser, 200) - wg.Add(2) + wg.Add(3) go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) + go kafkaclient.Consume(parserReader, chParser, ctx, &wg) eventloop: for { @@ -80,6 +68,17 @@ eventloop: break eventloop case msg := <-chRaw: processIncoming(msg, appState, alertWriter, &parserRegistry) + case msg := <-chParser: + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(msg.Name) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + } } } diff --git a/cmd/decoder/config.json b/cmd/server/config.json similarity index 100% rename from cmd/decoder/config.json rename to cmd/server/config.json diff --git a/cmd/server/main.go b/cmd/server/main.go index d0aec8a..3887d88 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -67,8 +67,36 @@ func main() { writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert") + parserWriter := appState.AddKafkaWriter(cfg.KafkaURL, "parser") slog.Info("Kafka writers topics: apibeacons, settings initialized") + configFile, err := os.Open("/app/cmd/server/config.json") + if err != nil { + panic(err) + } + + b, _ := io.ReadAll(configFile) + + var configs []model.Config + json.Unmarshal(b, &configs) + + for _, config := range configs { + // persist read configs in database + db.Create(&config) + } + + db.Find(&configs) + for _, config := range configs { + kp := model.KafkaParser{ + ID: "add", + Config: config, + } + + if err := service.SendParserConfig(kp, parserWriter, ctx); err != nil { + fmt.Printf("Unable to send parser config to kafka broker %v\n", err) + } + } + if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil { fmt.Printf("Error in getting token: %v\n", err) } @@ -109,6 +137,11 @@ func main() { r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE") r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT") + r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET") + r.HandleFunc("/configs/beacons", controller.ParserAddController(db, parserWriter, ctx)).Methods("POST") + r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, parserWriter, ctx)).Methods("PUT") + r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, parserWriter, ctx)).Methods("DELETE") + wsHandler := http.HandlerFunc(serveWs(db, ctx)) restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/pkg/common/utils/beacons.go b/internal/pkg/common/utils/beacons.go index 9fe5682..ce021e4 100644 --- a/internal/pkg/common/utils/beacons.go +++ b/internal/pkg/common/utils/beacons.go @@ -44,9 +44,9 @@ func LoopADStructures(b []byte, i [][2]int, id string, parserRegistry *model.Par if !isValidADStructure(ad) { break } - for _, parser := range parserRegistry.ParserList { + for name, parser := range parserRegistry.ParserList { if parser.CanParse(ad) { - event, ok := parser.Parse(ad) + event, ok := parser.Parse(name, ad) if ok { event.ID = id event.Name = id diff --git a/internal/pkg/controller/parser_controller.go b/internal/pkg/controller/parser_controller.go new file mode 100644 index 0000000..51b0462 --- /dev/null +++ b/internal/pkg/controller/parser_controller.go @@ -0,0 +1,112 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/service" + "github.com/gorilla/mux" + "github.com/segmentio/kafka-go" + "gorm.io/gorm" +) + +func ParserAddController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + var config model.Config + + if err := decoder.Decode(&config); err != nil { + http.Error(w, err.Error(), 400) + return + } + + db.Create(&config) + + kp := model.KafkaParser{ + ID: "add", + Config: config, + } + + if err := service.SendParserConfig(kp, writer, ctx); err != nil { + http.Error(w, "Unable to send parser config to kafka broker", 400) + fmt.Printf("Unable to send parser config to kafka broker %v\n", err) + return + } + + w.Write([]byte("ok")) + } +} + +func ParserListController(db *gorm.DB) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var configs []model.Config + db.Find(&configs) + res, err := json.Marshal(configs) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + w.Write(res) + } +} + +func ParserDeleteController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + if res := db.Delete(&model.Config{}, "name = ?", id); res.RowsAffected == 0 { + http.Error(w, "no parser config with such name found", 400) + return + } + + kp := model.KafkaParser{ + ID: "delete", + Name: id, + } + + if err := service.SendParserConfig(kp, writer, ctx); err != nil { + http.Error(w, "Unable to send parser config to kafka broker", 400) + fmt.Printf("Unable to send parser config to kafka broker %v\n", err) + return + } + + w.Write([]byte("ok")) + } +} + +func ParserUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + if err := db.First(&model.Config{}, "name = ?", id).Error; err != nil { + http.Error(w, err.Error(), 400) + return + } + + decoder := json.NewDecoder(r.Body) + var config model.Config + + if err := decoder.Decode(&config); err != nil { + http.Error(w, err.Error(), 400) + return + } + + kp := model.KafkaParser{ + ID: "update", + Name: config.Name, + Config: config, + } + + db.Save(&config) + if err := service.SendParserConfig(kp, writer, ctx); err != nil { + http.Error(w, "Unable to send parser config to kafka broker", 400) + fmt.Printf("Unable to send parser config to kafka broker %v\n", err) + return + } + + w.Write([]byte("ok")) + } +} diff --git a/internal/pkg/database/database.go b/internal/pkg/database/database.go index ef406b7..4c27e47 100644 --- a/internal/pkg/database/database.go +++ b/internal/pkg/database/database.go @@ -26,7 +26,7 @@ func Connect(cfg *config.Config) (*gorm.DB, error) { return nil, err } - if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}); err != nil { + if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}); err != nil { return nil, err } diff --git a/internal/pkg/model/parser.go b/internal/pkg/model/parser.go index e118e4a..e6156f8 100644 --- a/internal/pkg/model/parser.go +++ b/internal/pkg/model/parser.go @@ -14,22 +14,27 @@ type ParserConfig struct { } type BeaconParser struct { - Name string CanParse func([]byte) bool configs map[string]ParserConfig } type ParserRegistry struct { - ParserList []BeaconParser + ParserList map[string]BeaconParser rw sync.RWMutex } type Config struct { - Name string `json:"name"` + Name string `json:"name" gorm:"primaryKey"` Min int `json:"min"` Max int `json:"max"` - Pattern []string `json:"pattern"` - Configs map[string]ParserConfig `json:"configs"` + Pattern []string `json:"pattern" gorm:"serializer:json"` + Configs map[string]ParserConfig `json:"configs" gorm:"serializer:json"` +} + +type KafkaParser struct { + ID string + Name string + Config Config } func (pc ParserConfig) GetOrder() binary.ByteOrder { @@ -45,7 +50,6 @@ func (p *ParserRegistry) Register(name string, c Config) { defer p.rw.Unlock() b := BeaconParser{ - Name: name, CanParse: func(ad []byte) bool { if len(ad) < 2 { return false @@ -55,12 +59,21 @@ func (p *ParserRegistry) Register(name string, c Config) { configs: c.Configs, } - p.ParserList = append(p.ParserList, b) + p.ParserList[name] = b +} + +func (p *ParserRegistry) Unregister(name string) { + p.rw.Lock() + delete(p.ParserList, name) + p.rw.Unlock() } -func (b *BeaconParser) Parse(ad []byte) (BeaconEvent, bool) { +// TODO: change this to be dynamic, maybe event is interface with no predefined properties +// or types + +func (b *BeaconParser) Parse(name string, ad []byte) (BeaconEvent, bool) { flag := false - event := BeaconEvent{Type: b.Name} + event := BeaconEvent{Type: name} if cfg, ok := b.configs["battery"]; ok { event.Battery = uint32(b.extract(ad, cfg)) flag = true diff --git a/internal/pkg/service/parser_service.go b/internal/pkg/service/parser_service.go new file mode 100644 index 0000000..2d65102 --- /dev/null +++ b/internal/pkg/service/parser_service.go @@ -0,0 +1,23 @@ +package service + +import ( + "context" + "encoding/json" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" +) + +func SendParserConfig(kp model.KafkaParser, writer *kafka.Writer, ctx context.Context) error { + eMsg, err := json.Marshal(kp) + if err != nil { + return err + } + msg := kafka.Message{ + Value: eMsg, + } + + writer.WriteMessages(ctx, msg) + + return nil +}