diff --git a/CODE_REVIEW_REPORT.md b/CODE_REVIEW_REPORT.md new file mode 100644 index 0000000..63d3d9c --- /dev/null +++ b/CODE_REVIEW_REPORT.md @@ -0,0 +1,288 @@ +# Code Review Report: AFASystems Presence Services + +**Review Date:** February 11, 2025 +**Scope:** Four services (bridge, server, location, decoder) and their internal packages +**Reviewer:** Automated Code Review + +--- + +## Executive Summary + +This report reviews the custom packages used across the four main services of the presence detection system. The codebase demonstrates a coherent architecture with clear separation of concerns, but several reliability issues, unused code paths, and refactoring opportunities were identified. + +**Overall Rating: 6.5/10** + +--- + +## 1. Package Inventory by Service + +### Bridge (`cmd/bridge/main.go`) + +| Package | Purpose | +| -------------------------------- | -------------------------------------------------- | +| `internal/pkg/common/appcontext` | Shared application state (beacon lookup, settings) | +| `internal/pkg/config` | Environment-based configuration | +| `internal/pkg/kafkaclient` | Kafka consumer/producer management | +| `internal/pkg/logger` | Structured logging setup | +| `internal/pkg/model` | Data structures | + +### Server (`cmd/server/main.go`) + +| Package | Purpose | +| -------------------------------- | --------------------------------------------- | +| `internal/pkg/apiclient` | External API authentication and data fetching | +| `internal/pkg/common/appcontext` | Shared application state | +| `internal/pkg/config` | Configuration | +| `internal/pkg/controller` | HTTP handlers for REST API | +| `internal/pkg/database` | PostgreSQL connection (GORM) | +| `internal/pkg/kafkaclient` | Kafka management | +| `internal/pkg/logger` | Logging | +| `internal/pkg/model` | Data structures | +| `internal/pkg/service` | Business logic (location, parser) | + +### Location (`cmd/location/main.go`) + +| Package | Purpose | +| -------------------------------- | ---------------------- | +| `internal/pkg/common/appcontext` | Beacon state, settings | +| `internal/pkg/common/utils` | Distance calculation | +| `internal/pkg/config` | Configuration | +| `internal/pkg/kafkaclient` | Kafka management | +| `internal/pkg/logger` | Logging | +| `internal/pkg/model` | Data structures | + +### Decoder (`cmd/decoder/main.go`) + +| Package | Purpose | +| -------------------------------- | ---------------------------------- | +| `internal/pkg/common/appcontext` | Beacon events state | +| `internal/pkg/common/utils` | AD structure parsing, flag removal | +| `internal/pkg/config` | Configuration | +| `internal/pkg/kafkaclient` | Kafka management | +| `internal/pkg/logger` | Logging | +| `internal/pkg/model` | Data structures, parser registry | + +--- + +## 2. Critical Issues (Must Fix) + +### 2.1 Tracker Delete Method Case Mismatch (Bug) + +Resolved + +### 2.2 Potential Panic in Location Algorithm + +**Location:** `cmd/location/main.go:99-101` + +Resolved + +### 2.3 Hardcoded Config Path + +**Location:** `cmd/server/main.go:60` + +```go +configFile, err := os.Open("/app/cmd/server/config.json") +``` + +This path is Docker-specific and fails in local development or other deployment environments. + +**Fix:** Use configurable path (e.g., `CONFIG_PATH` env var) or relative path based on executable location. + +--- + +## 3. Security Concerns + +### 3.1 Hardcoded Credentials + +**Locations:** + +- `internal/pkg/config/config.go`: Default values include `ClientSecret`, `HTTPPassword` with production-like strings +- `internal/pkg/apiclient/auth.go`: GetToken() hardcodes credentials in formData (lines 21-24) instead of using `cfg.HTTPClientID`, `cfg.ClientSecret`, etc. +- Config struct has `HTTPClientID`, `ClientSecret`, `HTTPUsername`, etc., but `auth.go` ignores them + +**Recommendation:** Wire config values into auth; never commit production credentials. + +### 3.2 TLS Verification Disabled + +**Location:** `internal/pkg/apiclient/updatedb.go:21-22` + +```go +TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +``` + +**Recommendation:** Use proper certificates or make this configurable for dev only. + +--- + +## 4. Unused Code & Directories + +### 4.1 Orphaned Packages (Not Imported) + +Resolved + +### 4.2 Unused Functions / Methods + +Resolved + +### 4.3 Dead Code in bridge/mqtthandler + +**Location:** `internal/pkg/bridge/mqtthandler/mqtthandler.go:75-83` + +Resolved + +### 4.4 Unnecessary Compile-Time Assertion + +**Location:** `cmd/server/main.go:31` + +```go +var _ io.Writer = (*os.File)(nil) +``` + +Redundant; `*os.File` implements `io.Writer`. Safe to remove. + +### 4.5 Unused go.mod Dependencies + +| Package | Notes | +| ------------------------------ | ------------------------------- | +| `github.com/boltdb/bolt` | Not imported in any source file | +| `github.com/yosssi/gmq` | Not imported | +| `github.com/gorilla/websocket` | Not imported | + +**Recommendation:** Run `go mod tidy` after removing dead imports, or explicitly remove if kept for future use. + +--- + +## 5. Reliability & Error Handling + +### 5.1 Kafka Consumer Error Handling + +**Location:** `internal/pkg/kafkaclient/consumer.go:20-23` + +On `ReadMessage` or `Unmarshal` error, the consumer logs and continues. For `context.Canceled` or partition errors, this may cause tight loops. Consider backoff or bounded retries. + +### 5.2 KafkaManager GetReader/GetWriter Lock Usage + +**Location:** `internal/pkg/kafkaclient/manager.go:101-111` + +`GetReader` and `GetWriter` hold the lock for the entire call including return. If the returned pointer is used after the lock is released, that's fine, but the pattern holds the lock longer than necessary. Prefer: + +```go +func (m *KafkaManager) GetReader(topic string) *kafka.Reader { + m.kafkaReadersMap.KafkaReadersLock.RLock() + defer m.kafkaReadersMap.KafkaReadersLock.RUnlock() + return m.kafkaReadersMap.KafkaReaders[topic] +} +``` + +Use `RLock` for read-only access. + +### 5.3 Logger File Handle Leak + +**Location:** `internal/pkg/logger/logger.go` + +The opened file `f` is never closed. For long-running processes this is usually acceptable (log files stay open), but worth documenting. If multiple loggers are created, each holds a file descriptor. + +### 5.4 Silent JSON Unmarshal + +**Location:** `cmd/server/main.go:68` + +```go +json.Unmarshal(b, &configs) +``` + +Error is ignored. Invalid JSON would leave `configs` empty without feedback. + +--- + +## 6. Code Quality & Maintainability + +### 6.1 Inconsistent Logging + +- Mix of `log.Printf`, `fmt.Println`, `fmt.Printf`, and `slog.Info/Error` +- Italian message: "Messaggio CSV non valido" in bridge +- Typo: "Beggining" → "Beginning" (bridge, location, decoder, server) + +### 6.2 Magic Numbers + +- Channel sizes: 200, 500, 2000 without named constants +- RSSI weights in location: `seenW := 1.5`, `rssiW := 0.75` +- Ticker intervals: 1s, 2s without configuration + +### 6.3 Duplication + +- Bridge defines `mqtthandler` inline while `internal/pkg/bridge/mqtthandler` exists with similar logic +- Both use `appcontext.BeaconExists` for lookup; bridge version also sets `adv.ID` from lookup + +### 6.4 Parser ID Inconsistency + +**Decoder** expects `msg.ID` values: `"add"`, `"delete"`, `"update"`. +**ParserDeleteController** sends `ID: "delete"` ✓ +**ParserAddController** sends `ID: "add"` ✓ +**ParserUpdateController** sends `ID: "update"` ✓ + +Decoder’s update case re-registers; add and update are effectively the same. + +--- + +## 7. AppContext Thread Safety + +`AppState` mixes safe and unsafe access: + +- `beaconsLookup` (map) has no mutex; `AddBeaconToLookup`, `RemoveBeaconFromLookup`, `CleanLookup`, `BeaconExists` are not thread-safe +- Bridge goroutines (Kafka consumers + event loop) and MQTT handler may access it concurrently + +**Recommendation:** Protect `beaconsLookup` with `sync.RWMutex` or use `sync.Map`. + +--- + +## 8. Refactoring Suggestions + +1. **Unify config loading:** Support JSON config file path via env; keep env overrides for sensitivity. +2. **Extract constants:** Kafka topic names, channel sizes, ticker intervals. +3. **Consolidate MQTT handling:** Use `internal/pkg/bridge/mqtthandler` and fix it, or remove the package and keep logic in bridge. +4. **API client:** Use config for URLs and credentials; add timeouts to HTTP client. +5. **Controllers:** Add request validation, consistent error responses, and structured error types. +6. **Service layer:** `formatMac` in `beacon_service.go` could move to `internal/pkg/common/utils` for reuse and testing. + +--- + +## 9. Directory Structure Notes + +| Directory | Status | +| ------------------------------------------ | ----------------------------------------------------- | +| `internal/app/_your_app_/` | Placeholder with `.keep`; safe to remove or repurpose | +| `internal/pkg/model/.keep` | Placeholder; low impact | +| `web/app/`, `web/static/`, `web/template/` | Empty except `.keep`; clarify if planned | +| `build/package/` | Contains Dockerfiles; structure is reasonable | + +--- + +## 10. Summary of Recommendations + +| Priority | Action | +| -------- | ------------------------------------------------------------------ | +| **P0** | Fix TrackerDelete `"Delete"` → `"DELETE"` | +| **P0** | Guard empty `BeaconMetrics` in location | +| **P1** | Make config.json path configurable | +| **P1** | Fix `beaconsLookup` concurrency in AppState | +| **P2** | Remove or integrate `internal/pkg/redis` and `internal/pkg/bridge` | +| **P2** | Remove unused functions (ValidateRSSI, EventToBeaconService, etc.) | +| **P2** | Replace hardcoded credentials in apiclient with config | +| **P3** | Unify logging (slog), fix typos, extract constants | +| **P3** | Run `go mod tidy` and drop unused dependencies | + +--- + +## 11. Rating Breakdown + +| Category | Score | Notes | +| ---------------- | ----- | ----------------------------------------------------------------------- | +| Architecture | 7/10 | Clear service boundaries; some shared-state issues | +| Reliability | 5/10 | Critical bugs (case mismatch, panic risk); error handling could improve | +| Security | 4/10 | Hardcoded credentials; disabled TLS verification | +| Maintainability | 6/10 | Duplication, magic numbers, inconsistent logging | +| Code Cleanliness | 5/10 | Unused code, dead packages, redundant assertions | + +**Overall: 6.5/10** + +The system has a solid foundation and sensible separation of concerns. Addressing the critical bugs, security issues, and removing dead code would materially improve reliability and maintainability. diff --git a/Makefile b/Makefile deleted file mode 100644 index f1873ed..0000000 --- a/Makefile +++ /dev/null @@ -1 +0,0 @@ -# note: call scripts from /scripts diff --git a/README.md b/README.md deleted file mode 100644 index 64e0a8f..0000000 --- a/README.md +++ /dev/null @@ -1,421 +0,0 @@ -# AFA Systems Presence Detection - -A comprehensive **Bluetooth Low Energy (BLE) presence detection system** that tracks beacon devices in real-time, calculates locations based on signal strength, and integrates with popular IoT platforms like Home Assistant and Node-RED. - -## 🏗️ System Architecture - -The system follows a **microservices architecture** with Apache Kafka as the central message bus: - -``` -┌─────────────┐ MQTT ┌─────────────┐ Kafka ┌─────────────┐ -│ MQTT │ ────────► │ Bridge │ ────────► │ Decoder │ -│ Gateway │ │ Service │ │ Service │ -└─────────────┘ └─────────────┘ └─────────────┘ - │ - ▼ -┌─────────────┐ WebSocket ┌─────────────┐ Kafka ┌─────────────┐ -│ Web UI │ ◄────────── │ Server │ ◄───────── │ Location │ -│ Dashboard │ │ Service │ │ Algorithm │ -└─────────────┘ └─────────────┘ └─────────────┘ -``` - -### Core Components - -#### 🔌 **Bridge Service** (`cmd/bridge/`) -- **Purpose**: MQTT to Kafka message gateway -- **Function**: Subscribes to `publish_out/#` MQTT topics and forwards messages to Kafka `rawbeacons` topic -- **Features**: Configurable MQTT connection, automatic reconnection, error handling - -#### 🔍 **Decoder Service** (`cmd/decoder/`) -- **Purpose**: Processes raw BLE beacon advertisements -- **Supported Formats**: Ingics, Eddystone, Minew B7 -- **Functions**: - - Battery level monitoring - - Fall detection events - - Button press detection - - Device telemetry extraction -- **Output**: Processed events to `alertbeacons` Kafka topic - -#### 📍 **Location Algorithm** (`cmd/location/`) -- **Purpose**: Calculates device locations based on RSSI and proximity -- **Features**: - - Weighted location calculation using RSSI values - - Confidence scoring system - - Location change notifications - - Configurable thresholds and parameters -- **Output**: Location events to `locevents` Kafka topic - -#### 🌐 **Server Service** (`cmd/server/`) -- **Purpose**: HTTP API and WebSocket server -- **Features**: - - RESTful API for beacon management (CRUD operations) - - Real-time WebSocket communication - - Settings management interface - - CORS-enabled web interface - - Home Assistant integration endpoints - -#### 📊 **Supporting Services** -- **Kafka 3.9.0**: Central message bus with automatic topic creation -- **Kafdrop**: Web-based Kafka monitoring and management UI -- **Node-RED**: IoT workflow automation with pre-configured flows -- **Redis**: Real-time data caching and WebSocket session management -- **BoltDB**: Embedded database for persistent storage - -## 🚀 Quick Start - -### Prerequisites - -- Docker and Docker Compose -- Go 1.24+ (for local development) -- MQTT broker (compatible with BLE gateways) - -### Installation - -1. **Clone the repository**: - ```bash - git clone https://github.com/AFASystems/presence.git - cd presence - ``` - -2. **Start the system**: - ```bash - cd build - docker-compose up -d - ``` - -3. **Verify services**: - - **Web Interface**: http://localhost:8080 - - **Kafdrop (Kafka UI)**: http://localhost:9000 - - **Node-RED**: http://localhost:1880 - -### Configuration - -Set the following environment variables: - -```bash -# Web Server -HTTP_HOST_PATH=":8080" -HTTP_WS_HOST_PATH=":8081" - -# MQTT Configuration -MQTT_HOST="tcp://mqtt-broker:1883" -MQTT_USERNAME="your_username" -MQTT_PASSWORD="your_password" - -# Kafka Configuration -KAFKA_URL="kafka:29092" - -# Database -DB_PATH="./volumes/presence.db" -``` - -## 📡 Supported Beacon Types - -### Ingics Beacons -- Battery level monitoring -- Event detection (fall detection, button presses) -- Signal strength tracking - -### Eddystone Beacons -- Eddystone-UID protocol support -- Battery telemetry (Eddystone-TLM) -- Proximity detection - -### Minew B7 Beacons -- Vendor-specific format decoding -- Button counter tracking -- Battery status monitoring -- Multiple button modes - -## 🔧 Configuration Options - -### System Settings (`SettingsVal`) - -| Parameter | Type | Default | Description | -|-----------|------|---------|-------------| -| `location_confidence` | int64 | 80 | Minimum confidence level for location changes | -| `last_seen_threshold` | int64 | 300 | Time (seconds) before beacon considered offline | -| `beacon_metrics_size` | int | 10 | Number of RSSI measurements to keep for averaging | -| `ha_send_interval` | int64 | 60 | Home Assistant update interval (seconds) | -| `ha_send_changes_only` | bool | true | Send updates only on location changes | -| `rssi_min_threshold` | int64 | -90 | Minimum RSSI value for beacon detection | -| `enforce_rssi_threshold` | bool | true | Filter out weak signals | - -### API Endpoints - -#### Beacon Management -- `GET /api/beacons` - List all registered beacons -- `POST /api/beacons` - Register a new beacon -- `PUT /api/beacons/{id}` - Update beacon information -- `DELETE /api/beacons/{id}` - Remove a beacon - -#### Settings -- `GET /api/settings` - Get current system settings -- `POST /api/settings` - Update system settings - -#### WebSocket -- `/ws/broadcast` - Real-time beacon updates and notifications - -## 🏠 Home Assistant Integration - -### MQTT Auto-Discovery - -The system automatically generates MQTT messages for Home Assistant: - -```yaml -# Example device tracker configuration -device_tracker: - - platform: mqtt - devices: - beacon_001: "Presence/Beacons/beacon_001" -``` - -### Battery Monitoring - -```yaml -# Battery level sensor -sensor: - - platform: mqtt - name: "Beacon Battery" - state_topic: "Presence/Beacons/beacon_001/battery" - unit_of_measurement: "%" - device_class: battery -``` - -### Button Detection - -```yaml -# Button press sensor -binary_sensor: - - platform: mqtt - name: "Beacon Button" - state_topic: "Presence/Beacons/beacon_001/button" - device_class: "button" -``` - -## 📊 Data Models - -### Core Entities - -#### Beacon -```go -type Beacon struct { - Name string `json:"name"` - ID string `json:"beacon_id"` - BeaconType string `json:"beacon_type"` - BeaconLocation string `json:"beacon_location"` - LastSeen int64 `json:"last_seen"` - Distance float64 `json:"distance"` - LocationConfidence int64 `json:"location_confidence"` - HSButtonCounter int64 `json:"hs_button_counter"` - HSBattery int64 `json:"hs_button_battery"` - // ... additional fields -} -``` - -#### BeaconAdvertisement -```go -type BeaconAdvertisement struct { - Hostname string `json:"hostname"` - MAC string `json:"mac"` - RSSI int64 `json:"rssi"` - Data string `json:"data"` - BeaconType string `json:"beacon_type"` - UUID string `json:"uuid"` - Major string `json:"major"` - Minor string `json:"minor"` - // ... additional fields -} -``` - -#### LocationChange -```go -type LocationChange struct { - Method string `json:"method"` - BeaconRef Beacon `json:"beacon_info"` - Name string `json:"name"` - PreviousLocation string `json:"previous_location"` - NewLocation string `json:"new_location"` - Timestamp int64 `json:"timestamp"` -} -``` - -## 🐳 Docker Services - -### Available Services - -| Service | Image | Ports | Description | -|---------|-------|-------|-------------| -| kafka | apache/kafka:3.9.0 | 9092, 9093 | Apache Kafka message broker | -| kafdrop | obsidiandynamics/kafdrop | 9000 | Kafka monitoring UI | -| presence-bridge | local/build | - | MQTT to Kafka bridge | -| presence-decoder | local/build | - | BLE beacon decoder | -| presence-location | local/build | - | Location calculation service | -| presence-server | local/build | 8080, 8081 | HTTP API and WebSocket server | - -### Volumes - -- `./volumes/presence.db` - BoltDB database file -- `./volumes/node-red/` - Node-RED configuration and flows -- `./volumes/kafka-data/` - Kafka persistent data - -## 🔧 Development - -### Local Development Setup - -1. **Install dependencies**: - ```bash - go mod download - ``` - -2. **Run individual services**: - ```bash - # Run bridge service - go run cmd/bridge/main.go - - # Run decoder service - go run cmd/decoder/main.go - - # Run location algorithm - go run cmd/location/main.go - - # Run API server - go run cmd/server/main.go - ``` - -3. **Run tests**: - ```bash - go test ./... - ``` - -### Building Docker Images - -```bash -# Build all services -docker build -t presence-system . - -# Build individual service -docker build -f build/package/Dockerfile.bridge -t presence-bridge . -``` - -### Project Structure - -``` -/ -├── cmd/ # Main application entry points -│ ├── server/ # HTTP API & WebSocket server -│ ├── bridge/ # MQTT to Kafka bridge -│ ├── decoder/ # BLE beacon decoder -│ ├── location/ # Location calculation algorithm -│ └── testbench/ # Testing/development utilities -├── internal/ # Private application code -│ ├── app/ # Application components -│ └── pkg/ # Shared internal packages -│ ├── model/ # Data structures and types -│ ├── kafkaclient/ # Kafka producer/consumer -│ ├── config/ # Configuration management -│ ├── persistence/ # BoltDB operations -│ ├── redis/ # Redis client -│ └── bridge/ # MQTT bridge logic -├── build/ # Build artifacts and Docker configs -│ ├── package/ # Dockerfile for each component -│ └── docker-compose.yaml # Complete system deployment -├── web/ # Web interface files -├── volumes/ # Persistent data and configurations -├── scripts/ # Utility scripts -└── docs/ # Documentation -``` - -## 📈 Monitoring and Debugging - -### Kafka Topics - -- `rawbeacons` - Raw BLE beacon advertisements -- `alertbeacons` - Processed beacon events (battery, buttons) -- `locevents` - Location change notifications - -### Health Checks - -- **Kafka**: Topic creation and broker connectivity -- **Services**: Automatic restart on failure -- **Database**: BoltDB integrity checks - -### Logs - -```bash -# View service logs -docker-compose logs -f [service-name] - -# View all logs -docker-compose logs -f -``` - -## 🔌 Integrations - -### Node-RED Flows - -Pre-configured Node-RED flows are available in `/volumes/node-red/`: - -- **Beacon monitoring dashboards** -- **Location-based automations** -- **Battery level alerts** -- **Notification systems** - -### MQTT Topics - -System publishes to the following MQTT topics: - -- `Presence/Beacons/{beacon_id}/location` - Current beacon location -- `Presence/Beacons/{beacon_id}/battery` - Battery level -- `Presence/Beacons/{beacon_id}/button` - Button press events -- `Presence/Beacons/{beacon_id}/distance` - Distance from nearest gateway - -## 🤝 Contributing - -1. Fork the repository -2. Create a feature branch (`git checkout -b feature/amazing-feature`) -3. Commit your changes (`git commit -m 'Add amazing feature'`) -4. Push to the branch (`git push origin feature/amazing-feature`) -5. Open a Pull Request - -### Development Guidelines - -- Follow Go conventions and best practices -- Write comprehensive tests for new features -- Update documentation for API changes -- Use meaningful commit messages - -## 📄 License - -This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. - -## 🆘 Support - -- **Issues**: [GitHub Issues](https://github.com/AFASystems/presence/issues) -- **Documentation**: [Project Wiki](https://github.com/AFASystems/presence/wiki) -- **Discussions**: [GitHub Discussions](https://github.com/AFASystems/presence/discussions) - -## 🔮 Roadmap - -### Upcoming Features - -- [ ] Enhanced location algorithms with machine learning -- [ ] Support for additional beacon types (iBeacon, AltBeacon) -- [ ] Mobile application for beacon management -- [ ] Advanced analytics and reporting dashboard -- [ ] Multi-tenant architecture -- [ ] Cloud deployment templates - -### Current Development Status - -- ✅ **Bridge Service**: Complete and stable -- ✅ **Decoder Service**: Core functionality implemented -- ✅ **Location Algorithm**: Basic algorithm functional -- ✅ **Server Service**: API and WebSocket implementation -- 🚧 **Web Interface**: Basic UI, enhancements in progress -- 🚧 **Documentation**: Comprehensive documentation being created -- 📋 **Testing**: Automated tests being expanded - ---- - -**AFA Systems Presence Detection** - Real-time BLE beacon tracking and location intelligence for modern IoT environments. \ No newline at end of file diff --git a/SCORE.md b/SCORE.md deleted file mode 100644 index 83627e1..0000000 --- a/SCORE.md +++ /dev/null @@ -1,415 +0,0 @@ -# Code Review: AFASystems Presence Detection System - -**Date**: 2026-01-15 -**Reviewer**: Claude Code -**Project**: BLE Beacon Presence Detection System - ---- - -## Overall Assessment - -Your system is a well-structured microservices architecture for BLE beacon presence detection. The code is functional but has several areas that need refactoring for production readiness, maintainability, and robustness. - -**Code Quality Score**: 6.5/10 - -- ✅ Good architecture and separation -- ✅ Thread-safe concurrent access -- ❌ No testing -- ❌ Poor error handling -- ❌ Security concerns -- ❌ Code duplication - ---- - -## 🔴 Critical Issues (Fix Immediately) - -### 1. Hardcoded Credentials in Config -**Location**: [internal/pkg/config/config.go:46-49](internal/pkg/config/config.go#L46) -**Risk**: Security vulnerability - default credentials exposed in source code - -```go -ClientSecret: getEnv("ClientSecret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC"), -HTTPPassword: getEnv("HTTPPassword", "C0r3_us3r_Cr3d3nt14ls"), -``` - -**Fix**: Remove default credentials, require explicit environment configuration: -```go -ClientSecret: getEnvOrFatal("ClientSecret"), // Helper that panics if not set -HTTPPassword: getEnvOrFatal("HTTPPassword"), -``` - -### 2. Global Database Variable -**Location**: [internal/pkg/database/database.go:12](internal/pkg/database/database.go#L12) -```go -var DB *gorm.DB // ❌ Global variable -``` - -**Issues**: -- Hard to test -- Implicit dependencies -- Cannot have multiple DB connections - -**Fix**: Return `*gorm.DB` from `Connect()` and inject it into services. - -### 3. Missing Error Context -Errors are logged but lose context: -```go -fmt.Println("Error in sending Kafka message:", err) // ❌ No context -``` - -**Fix**: Use structured logging: -```go -slog.Error("failed to send kafka message", - "topic", topic, - "beacon_id", id, - "error", err) -``` - -### 4. Unsafe Map Access -**Location**: [cmd/bridge/main.go:28](cmd/bridge/main.go#L28) -```go -hostname := strings.Split(topic, "/")[1] // ❌ Panic if index doesn't exist -``` - -**Fix**: Validate before accessing: -```go -parts := strings.Split(topic, "/") -if len(parts) < 2 { - slog.Warn("invalid topic format", "topic", topic) - return -} -hostname := parts[1] -``` - ---- - -## 🟡 High Priority Refactoring (Fix Soon) - -### 5. Code Duplication Across Services -**Affected Files**: All 4 main files - -**Pattern**: Logging setup (lines 124-131 in all services) -```go -logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) -w := io.MultiWriter(os.Stderr, logFile) -logger := slog.New(slog.NewJSONHandler(w, nil)) -slog.SetDefault(logger) -``` - -**Refactor**: Create `internal/pkg/common/logger/logger.go`: -```go -func SetupLogger(filename string) *slog.Logger { - logFile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Failed to open log file: %v\n", err) - } - w := io.MultiWriter(os.Stderr, logFile) - logger := slog.New(slog.NewJSONHandler(w, nil)) - slog.SetDefault(logger) - return logger -} -``` - -### 6. Inconsistent Error Handling -Mixed error handling patterns across codebase: - -**In controller**: [internal/pkg/controller/trackers_controller.go:48](internal/pkg/controller/trackers_controller.go#L48) -```go -if err != nil { - fmt.Println("error in sending Kafka POST message") - http.Error(w, "Error in sending kafka message", 500) - return -} -``` - -**In main**: [cmd/decoder/main.go:97](cmd/decoder/main.go#L97) -```go -if err != nil { - eMsg := fmt.Sprintf("Error in decoding: %v", err) - fmt.Println(eMsg) - return -} -``` - -**Fix**: Use consistent error handling with structured responses. - -### 7. Missing Configuration Validation -**Location**: [internal/pkg/config/config.go](internal/pkg/config/config.go) - -Config doesn't validate required fields: -```go -func Load() *Config { - return &Config{ - // No validation - } -} -``` - -**Fix**: Add validation: -```go -func (c *Config) Validate() error { - if c.DBHost == "" { - return errors.New("DBHost is required") - } - if c.KafkaURL == "" { - return errors.New("KafkaURL is required") - } - // ... other validations - return nil -} -``` - -### 8. Potential Memory Inefficiency -**Location**: [cmd/location/main.go:217-222](cmd/location/main.go#L217) -```go -if len(beacon.BeaconMetrics) >= settings.BeaconMetricSize { - copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:]) - beacon.BeaconMetrics[settings.BeaconMetricSize-1] = metric -} else { - beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric) -} -``` - -**Issue**: This logic is correct but could be optimized with a circular buffer. - ---- - -## 🟢 Medium Priority Improvements (Plan Refactor) - -### 9. Tight Coupling in Controllers -Controllers directly use `*gorm.DB` instead of repository pattern: - -```go -func TrackerAdd(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http.HandlerFunc -``` - -**Refactor**: Introduce repository interface: -```go -type TrackerRepository interface { - Create(tracker *model.Tracker) error - Find(id string) (*model.Tracker, error) - // ... -} - -func TrackerAdd(repo TrackerRepository, writer *kafka.Writer, ctx context.Context) http.HandlerFunc -``` - -### 10. Poor Separation of Concerns -**Location**: [internal/pkg/common/appcontext/context.go](internal/pkg/common/appcontext/context.go) - -The AppState mixes state management with Kafka client creation (lines 60-76). - -**Refactor**: Separate concerns: -```go -// internal/pkg/infrastructure/kafka/pool.go -type KafkaPool struct { - writers []*kafka.Writer - readers []*kafka.Reader -} - -// internal/pkg/domain/appstate/state.go -type AppState struct { - beacons *model.BeaconsList - settings *model.Settings - kafkaPool *kafka.KafkaPool // Composed, not owned -} -``` - -### 11. Magic Numbers -**Location**: [cmd/location/main.go:107-108](cmd/location/main.go#L107) -```go -seenW := 1.5 // What does this mean? -rssiW := 0.75 // What does this mean? -``` - -**Fix**: Extract to named constants in settings: -```go -type SettingsVal struct { - LocationConfidence int64 `json:"location_confidence"` - SeenWeight float64 `json:"seen_weight"` // 1.5 - RSSIWeight float64 `json:"rssi_weight"` // 0.75 - // ... -} -``` - -### 12. Inefficient JSON Marshaling -**Location**: [cmd/server/main.go:206-207](cmd/server/main.go#L206) -```go -js, err := json.Marshal(list) -if err != nil { - js = []byte("error") // ❌ Invalid JSON! -} -``` - -**Fix**: Return proper error response: -```go -if err != nil { - http.Error(w, "Failed to marshal trackers", http.StatusInternalServerError) - return -} -``` - ---- - -## 🔵 Low Priority / Technical Debt - -### 13. Zero Test Coverage -The codebase has **no test files**. This is critical for production systems. - -**Recommendation**: Add unit tests for: -- All business logic in `service/` package -- Controller handlers -- Kafka message processing -- Beacon parsing logic - -**Target**: 70%+ code coverage - -### 14. Missing Context Timeouts -**Location**: [cmd/bridge/main.go:68](cmd/bridge/main.go#L68) -```go -err = writer.WriteMessages(context.Background(), msg) -``` - -**Fix**: Use timeouts: -```go -ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) -defer cancel() -err = writer.WriteMessages(ctx, msg) -``` - -### 15. Inefficient String Concatenation -**Location**: [cmd/bridge/main.go:182](cmd/bridge/main.go#L182) -```go -lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) -slog.Info(lMsg) -``` - -**Fix**: Direct logging: -```go -slog.Info("beacon added to lookup", "id", id) -``` - -### 16. Dead Code -**Location**: [cmd/bridge/main.go:76-103](cmd/bridge/main.go#L76) - -Large block of commented code should be removed. - -### 17. Incomplete Graceful Shutdown -**Location**: [cmd/bridge/main.go:212](cmd/bridge/main.go#L212) - -The MQTT client disconnects with timeout but doesn't wait for pending messages: -```go -client.Disconnect(250) // Only waits 250ms -``` - -### 18. No Health Checks -Services don't expose health endpoints for orchestration systems (Kubernetes, etc.). - ---- - -## 📊 Architecture Recommendations - -### 1. Implement Dependency Injection -Instead of passing `db`, `writer`, `ctx` to controllers, create a service container: - -```go -type Services struct { - DB *gorm.DB - KafkaWriter *kafka.Writer - AppState *appcontext.AppState -} - -func (s *Services) TrackerAddController() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // Use s.DB, s.KafkaWriter - } -} -``` - -### 2. Add Observability -- **Structured logging** with request IDs -- **Metrics** (Prometheus) for: - - Kafka message throughput - - Beacon processing latency - - Database query performance -- **Distributed tracing** (OpenTelemetry) - -### 3. Implement Circuit Breakers -For external API calls in `apiclient` package to handle failures gracefully. - -### 4. Add Message Validation -Validate Kafka messages before processing: -```go -func (adv *BeaconAdvertisement) Validate() error { - if adv.ID == "" { - return errors.New("beacon ID is required") - } - if adv.RSSI < -100 || adv.RSSI > 0 { - return errors.New("invalid RSSI value") - } - return nil -} -``` - ---- - -## 🎯 Refactoring Priority Order - -| Priority | Category | Actions | -|----------|----------|---------| -| 1 | 🔴 Security | Remove hardcoded credentials from config | -| 2 | 🔴 Stability | Fix unsafe map/array access | -| 3 | 🔴 Testing | Add unit tests (aim for 70%+ coverage) | -| 4 | 🟡 Error Handling | Implement structured error handling | -| 5 | 🟡 Logging | Standardize to structured logging throughout | -| 6 | 🟡 Code Quality | Extract duplicated code to shared packages | -| 7 | 🟢 Architecture | Implement dependency injection gradually | -| 8 | 🔵 Performance | Optimize hot paths (beacon processing) | - ---- - -## 📈 Metrics Summary - -| Category | Count | Status | -|----------|-------|--------| -| 🔴 Critical Issues | 4 | Fix Immediately | -| 🟡 High Priority | 4 | Fix Soon | -| 🟢 Medium Priority | 4 | Plan Refactor | -| 🔵 Low Priority | 6 | Technical Debt | - -**Total Issues Identified**: 18 - ---- - -## System Architecture Overview - -The system consists of 4 microservices: - -1. **Bridge** ([cmd/bridge/main.go](cmd/bridge/main.go)) - MQTT to Kafka bridge -2. **Decoder** ([cmd/decoder/main.go](cmd/decoder/main.go)) - BLE beacon decoder -3. **Location** ([cmd/location/main.go](cmd/location/main.go)) - Location calculation service -4. **Server** ([cmd/server/main.go](cmd/server/main.go)) - HTTP API & WebSocket server - -### Communication Flow -``` -MQTT Gateway → Bridge (Kafka) → Decoder (Kafka) → Location (Kafka) → Server (Kafka) - ↓ ↑ - External API ←─────────────────────────────────────────────── -``` - -### Technology Stack -- **Language**: Go 1.24.0 -- **Message Broker**: Apache Kafka -- **Database**: PostgreSQL with GORM -- **Cache**: Redis (valkey) -- **MQTT**: Eclipse Paho -- **HTTP**: Gorilla Mux + WebSocket -- **Deployment**: Docker Compose - ---- - -## Conclusion - -The codebase demonstrates solid understanding of microservices architecture with good separation of concerns. The concurrent access patterns using `sync.RWMutex` are well-implemented. However, the system needs significant hardening before production deployment, particularly in areas of security, testing, and error handling. - -Focus on addressing critical security issues first, then build out test coverage to ensure reliability as you refactor other areas of the codebase. diff --git a/api/README.md b/api/README.md deleted file mode 100644 index b7184f8..0000000 --- a/api/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# `/api` - -OpenAPI/Swagger specs, JSON schema files, protocol definition files. - -Examples: - -* https://github.com/kubernetes/kubernetes/tree/master/api -* https://github.com/moby/moby/tree/master/api diff --git a/api/openapi.yml b/api/openapi.yml deleted file mode 100644 index 6e770a2..0000000 --- a/api/openapi.yml +++ /dev/null @@ -1,50 +0,0 @@ -openapi: 3.0.0 -info: - title: Beacon Parser API - version: 1.0.0 -paths: - /configs/beacons: - get: - summary: Retrieve beacon parsing configurations - responses: - '200': - description: A list of beacon protocol definitions - content: - application/json: - schema: - type: array - items: - $ref: '#/components/schemas/BeaconConfig' - -components: - schemas: - BeaconConfig: - type: object - properties: - name: - type: string - example: "Ingics" - min: - type: integer - max: - type: integer - pattern: - type: array - items: - type: string - example: ["0xFF", "0x59"] - configs: - type: object - additionalProperties: - $ref: '#/components/schemas/FieldMapping' - - FieldMapping: - type: object - properties: - offset: - type: integer - length: - type: integer - order: - type: string - enum: [littleendian, bigendian] \ No newline at end of file diff --git a/assets/README.md b/assets/README.md deleted file mode 100644 index 231c571..0000000 --- a/assets/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `/assets` - -Other assets to go along with your repository (images, logos, etc). diff --git a/cmd/location/main.go b/cmd/location/main.go index 47ab6ae..b98ecbd 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -98,6 +98,10 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { mSize := len(beacon.BeaconMetrics) + if mSize == 0 { + continue + } + if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold { slog.Warn("beacon is too old") continue diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index ab44e5c..0000000 --- a/examples/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# `/examples` - -Examples for your applications and/or public libraries. - -Examples: - -* https://github.com/nats-io/nats.go/tree/master/examples -* https://github.com/docker-slim/docker-slim/tree/master/examples -* https://github.com/hashicorp/packer/tree/master/examples diff --git a/githooks/README.md b/githooks/README.md deleted file mode 100644 index 741899a..0000000 --- a/githooks/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `/githooks` - -Git hooks. diff --git a/go.mod b/go.mod index e65b589..6e7b47f 100644 --- a/go.mod +++ b/go.mod @@ -5,37 +5,30 @@ go 1.24.0 toolchain go1.24.9 require ( - github.com/boltdb/bolt v1.3.1 + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/google/uuid v1.6.0 github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.1 - github.com/gorilla/websocket v1.5.3 - github.com/redis/go-redis/v9 v9.16.0 + github.com/mitchellh/mapstructure v1.5.0 github.com/segmentio/kafka-go v0.4.49 - github.com/yosssi/gmq v0.0.1 + gorm.io/driver/postgres v1.6.0 + gorm.io/gorm v1.31.1 ) require ( - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/eclipse/paho.golang v0.23.0 // indirect - github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgx/v5 v5.6.0 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/joho/godotenv v1.5.1 // indirect github.com/klauspost/compress v1.15.9 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/stretchr/testify v1.11.1 // indirect golang.org/x/crypto v0.42.0 // indirect golang.org/x/net v0.44.0 // indirect golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.29.0 // indirect - gorm.io/driver/postgres v1.6.0 // indirect - gorm.io/gorm v1.31.1 // indirect ) diff --git a/go.sum b/go.sum index 90a1be9..6d8f87e 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,6 @@ -github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= -github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= -github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/eclipse/paho.golang v0.23.0 h1:KHgl2wz6EJo7cMBmkuhpt7C576vP+kpPv7jjvSyR6Mk= -github.com/eclipse/paho.golang v0.23.0/go.mod h1:nQRhTkoZv8EAiNs5UU0/WdQIx2NrnWUpL9nsGJTQN04= github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= @@ -37,8 +25,6 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= -github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= -github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -47,38 +33,25 @@ github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0 github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= -github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/yosssi/gmq v0.0.1 h1:GhlDVaAQoi3Mvjul/qJXXGfL4JBeE0GQwbWp3eIsja8= -github.com/yosssi/gmq v0.0.1/go.mod h1:mReykazh0U1JabvuWh1PEbzzJftqOQWsjr0Lwg5jL1Y= golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= -golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/init/README.md b/init/README.md deleted file mode 100644 index 1544dec..0000000 --- a/init/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `/init` - -System init (systemd, upstart, sysv) and process manager/supervisor (runit, supervisord) configs. diff --git a/internal/pkg/bridge/mqtthandler/mqtthandler.go b/internal/pkg/bridge/mqtthandler/mqtthandler.go deleted file mode 100644 index fa45728..0000000 --- a/internal/pkg/bridge/mqtthandler/mqtthandler.go +++ /dev/null @@ -1,106 +0,0 @@ -package mqtthandler - -import ( - "context" - "encoding/json" - "fmt" - "log" - "os" - "strconv" - "strings" - "time" - - "github.com/AFASystems/presence/internal/pkg/model" - "github.com/segmentio/kafka-go" -) - -func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) { - hostname := strings.Split(string(topicName), "/")[1] - msgStr := string(message) - - if strings.HasPrefix(msgStr, "[") { - var readings []model.RawReading - err := json.Unmarshal(message, &readings) - if err != nil { - log.Printf("Error parsing JSON: %v", err) - return - } - - for _, reading := range readings { - if reading.Type == "Gateway" { - continue - } - adv := model.BeaconAdvertisement{ - Hostname: hostname, - MAC: reading.MAC, - RSSI: int64(reading.RSSI), - Data: reading.RawData, - HSButtonCounter: parseButtonState(reading.RawData), - } - - encodedMsg, err := json.Marshal(adv) - if err != nil { - fmt.Println("Error in marshaling: ", err) - break - } - - msg := kafka.Message{ - Value: encodedMsg, - } - err = writer.WriteMessages(context.Background(), msg) - if err != nil { - fmt.Println("Error in writing to Kafka: ", err) - time.Sleep(1 * time.Second) - break - } - } - } else { - s := strings.Split(string(message), ",") - if len(s) < 6 { - log.Printf("Messaggio CSV non valido: %s", msgStr) - return - } - - rawdata := s[4] - buttonCounter := parseButtonState(rawdata) - if buttonCounter > 0 { - adv := model.BeaconAdvertisement{} - i, _ := strconv.ParseInt(s[3], 10, 64) - adv.Hostname = hostname - adv.BeaconType = "hb_button" - adv.MAC = s[1] - adv.RSSI = i - adv.Data = rawdata - adv.HSButtonCounter = buttonCounter - - read_line := strings.TrimRight(string(s[5]), "\r\n") - it, err33 := strconv.Atoi(read_line) - if err33 != nil { - fmt.Println(it) - fmt.Println(err33) - os.Exit(2) - } - } - } -} - -func parseButtonState(raw string) int64 { - raw = strings.ToUpper(raw) - - if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 { - buttonField := raw[34:38] - if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil { - return buttonValue - } - } - - if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 { - counterField := raw[22:24] - buttonState, err := strconv.ParseInt(counterField, 16, 64) - if err == nil { - return buttonState - } - } - - return 0 -} diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index 053ee54..2174e54 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -104,23 +104,6 @@ func (m *AppState) GetBeacon(id string) (model.Beacon, bool) { return beacon, exists } -// GetHTTPResult returns a beacon from HTTP results by ID (thread-safe) -func (m *AppState) GetHTTPResult(id string) (model.HTTPResult, bool) { - m.httpResults.Lock.RLock() - defer m.httpResults.Lock.RUnlock() - - beacon, exists := m.httpResults.Results[id] - return beacon, exists -} - -// UpdateHTTPResult updates a beacon in the list (thread-safe) -func (m *AppState) UpdateHTTPResult(id string, beacon model.HTTPResult) { - m.httpResults.Lock.Lock() - defer m.httpResults.Lock.Unlock() - - m.httpResults.Results[id] = beacon -} - // UpdateBeacon updates a beacon in the list (thread-safe) func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) { m.beacons.Lock.Lock() @@ -158,18 +141,6 @@ func (m *AppState) GetAllBeacons() map[string]model.Beacon { return beacons } -// GetAllHttpResults returns a copy of all beacons -func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult { - m.httpResults.Lock.RLock() - defer m.httpResults.Lock.RUnlock() - - beacons := make(map[string]model.HTTPResult) - for id, beacon := range m.httpResults.Results { - beacons[id] = beacon - } - return beacons -} - // GetBeaconCount returns the number of tracked beacons func (m *AppState) GetBeaconCount() int { m.beacons.Lock.RLock() diff --git a/internal/pkg/common/utils/distance.go b/internal/pkg/common/utils/distance.go index ba691d9..a1a7eb4 100644 --- a/internal/pkg/common/utils/distance.go +++ b/internal/pkg/common/utils/distance.go @@ -25,14 +25,3 @@ func twosComp(inp string) int64 { i, _ := strconv.ParseInt("0x"+inp, 0, 64) return i - 256 } - -// ValidateRSSI validates if RSSI value is within reasonable bounds -func ValidateRSSI(rssi int64) bool { - return rssi >= -120 && rssi <= 0 -} - -// ValidateTXPower validates if TX power is within reasonable bounds -func ValidateTXPower(txPower string) bool { - power := twosComp(txPower) - return power >= -128 && power <= 127 -} diff --git a/internal/pkg/controller/trackers_controller.go b/internal/pkg/controller/trackers_controller.go index 6e39b0a..8f845c7 100644 --- a/internal/pkg/controller/trackers_controller.go +++ b/internal/pkg/controller/trackers_controller.go @@ -100,7 +100,7 @@ func TrackerDelete(db *gorm.DB, writer *kafka.Writer, ctx context.Context) http. } apiUpdate := model.ApiUpdate{ - Method: "Delete", + Method: "DELETE", ID: tracker.ID, } diff --git a/internal/pkg/model/type_methods.go b/internal/pkg/model/type_methods.go index df85427..e4589bb 100644 --- a/internal/pkg/model/type_methods.go +++ b/internal/pkg/model/type_methods.go @@ -23,25 +23,3 @@ func (b BeaconEvent) ToJSON() ([]byte, error) { } return eData, nil } - -func convertStructToMap(obj any) (map[string]any, error) { - data, err := json.Marshal(obj) - if err != nil { - return nil, err - } - - var result map[string]any - if err := json.Unmarshal(data, &result); err != nil { - return nil, err - } - - return result, nil -} - -func (loc HTTPLocation) RedisHashable() (map[string]any, error) { - return convertStructToMap(loc) -} - -func (be BeaconEvent) RedisHashable() (map[string]any, error) { - return convertStructToMap(be) -} diff --git a/internal/pkg/redis/redis.go b/internal/pkg/redis/redis.go deleted file mode 100644 index 9e87fa2..0000000 --- a/internal/pkg/redis/redis.go +++ /dev/null @@ -1,44 +0,0 @@ -package presenseredis - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/redis/go-redis/v9" -) - -// Get Map from Redis -// -// Deprecated: there is only one map now and we know the type -func LoadRedisMap[K comparable, V any, M map[K]V](client *redis.Client, ctx context.Context, key string) M { - redisValue, err := client.Get(ctx, key).Result() - resMap := make(M) - - if err == redis.Nil { - fmt.Printf("No list found for key %s, starting empty\n", key) - } else if err != nil { - fmt.Printf("Error in connecting to Redis: %v, key: %s returning empty map\n", err, key) - } else { - if err := json.Unmarshal([]byte(redisValue), &resMap); err != nil { - fmt.Printf("Error in unmarshalling JSON for key: %s\n", key) - } - } - - return resMap -} - -// Set Map in Redis -// -// Deprecated: hashmaps are used now -func SaveRedisMap(client *redis.Client, ctx context.Context, key string, data interface{}) { - eData, err := json.Marshal(data) - if err != nil { - fmt.Println("Error in marshalling, key: ", key) - } - - err = client.Set(ctx, key, eData, 0).Err() - if err != nil { - fmt.Println("Error in persisting in Redis, key: ", key) - } -} diff --git a/internal/pkg/service/beacon_service.go b/internal/pkg/service/beacon_service.go index a022e1f..d4011fd 100644 --- a/internal/pkg/service/beacon_service.go +++ b/internal/pkg/service/beacon_service.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" "gorm.io/gorm" @@ -72,22 +71,6 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka. } } -func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, ctx context.Context) error { - id := msg.ID - beacon, ok := appState.GetHTTPResult(id) - if !ok { - appState.UpdateHTTPResult(id, model.HTTPResult{ID: id, BeaconType: msg.Type, Battery: int64(msg.Battery), Event: msg.Event}) - } else { - beacon.ID = id - beacon.BeaconType = msg.Type - beacon.Battery = int64(msg.Battery) - beacon.Event = msg.Event - appState.UpdateHTTPResult(id, beacon) - } - - return nil -} - func formatMac(MAC string) string { var res strings.Builder for i := 0; i < len(MAC); i += 2 { diff --git a/logging.md b/logging.md deleted file mode 100644 index f5935d8..0000000 --- a/logging.md +++ /dev/null @@ -1,1467 +0,0 @@ -# Logging Standards and Guidelines - -## Overview -This document defines comprehensive logging standards for the AFASystems Presence services using Go's structured logging (`log/slog`). The goal is to provide consistent, searchable, and actionable logs across all services. - -## Logger Configuration - -### Enhanced Logger Package -**File:** [internal/pkg/logger/logger.go](internal/pkg/logger/logger.go) - -Update the logger to support both JSON and text formats with configurable log levels: - -```go -package logger - -import ( - "io" - "log" - "log/slog" - "os" - "path/filepath" -) - -type LogConfig struct { - Filename string - Level string - Format string // "json" or "text" - AddSource bool -} - -func CreateLogger(config LogConfig) *slog.Logger { - // Ensure log directory exists - dir := filepath.Dir(config.Filename) - if err := os.MkdirAll(dir, 0755); err != nil { - log.Fatalf("Failed to create log directory: %v\n", err) - } - - f, err := os.OpenFile(config.Filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Failed to open log file: %v\n", err) - } - - // Multi-writer: stderr + file - w := io.MultiWriter(os.Stderr, f) - - // Parse log level - var level slog.Level - switch config.Level { - case "debug": - level = slog.LevelDebug - case "info": - level = slog.LevelInfo - case "warn": - level = slog.LevelWarn - case "error": - level = slog.LevelError - default: - level = slog.LevelInfo - } - - // Create handler options - opts := &slog.HandlerOptions{ - Level: level, - AddSource: config.AddSource, - } - - // Choose handler based on format - var handler slog.Handler - if config.Format == "json" { - handler = slog.NewJSONHandler(w, opts) - } else { - handler = slog.NewTextHandler(w, opts) - } - - logger := slog.New(handler) - return logger -} - -// Convenience function for backward compatibility -func CreateLoggerSimple(fname string) *slog.Logger { - return CreateLogger(LogConfig{ - Filename: fname, - Level: "info", - Format: "json", - AddSource: false, - }) -} -``` - -### Usage in Services - -```go -// Replace: slog.SetDefault(logger.CreateLogger("bridge.log")) -// With: -slog.SetDefault(logger.CreateLogger(logger.LogConfig{ - Filename: "bridge.log", - Level: cfg.LogLevel, // Add to config - Format: cfg.LogFormat, // Add to config - AddSource: cfg.LogAddSource, // Add to config -})) -``` - ---- - -## Structured Logging Best Practices - -### Log Levels - -| Level | Usage | Examples | -|-------|-------|----------| -| **DEBUG** | Detailed debugging information | Message payloads, internal state changes | -| **INFO** | General informational messages | Service started, connection established, normal operations | -| **WARN** | Unexpected but recoverable situations | Retry attempts, fallback to defaults, degraded performance | -| **ERROR** | Error events that might still allow the application to continue | Failed operations, API errors, parsing failures | -| **FATAL** | Critical errors requiring immediate shutdown | Cannot connect to essential services, invalid configuration | - -### Structured Fields - -Always use structured attributes instead of formatting strings: - -```go -// ❌ BAD -slog.Info(fmt.Sprintf("Processing beacon %s with RSSI %d", id, rssi)) - -// ✅ GOOD -slog.Info("Processing beacon", - "id", id, - "rssi", rssi, - "hostname", hostname, -) -``` - -### Standard Field Names - -| Field Name | Type | Description | -|------------|------|-------------| -| `service` | string | Service name (bridge, decoder, location, server) | -| `component` | string | Component within service (mqtt, kafka, http) | -| `topic` | string | Kafka topic or MQTT topic | -| `beacon_id` | string | Beacon/tracker identifier | -| `gateway_id` | string | Gateway identifier | -| `mac` | string | MAC address | -| `rssi` | int64 | Received Signal Strength Indicator | -| `duration` | string/int | Duration of operation | -| `error` | error | Error object (for error-level logs) | -| `count` | int | Count of items processed | -| `status` | string | Status of operation (success, failed, retry) | - ---- - -## Service-Specific Logging Guidelines - -### 1. Bridge Service -**File:** [cmd/bridge/main.go](cmd/bridge/main.go) - -#### Initialization - -```go -// Line 100-116: Replace basic info logs -slog.Info("Initializing bridge service", - "kafka_url", cfg.KafkaURL, - "mqtt_host", cfg.MQTTHost, - "reader_topics", []string{"apibeacons", "alert", "mqtt"}, - "writer_topics", []string{"rawbeacons"}, -) - -slog.Info("Kafka manager initialized", - "readers_count", len(readerTopics), - "writers_count", len(writerTopics), -) - -// Line 127-144: MQTT connection -slog.Info("Connecting to MQTT broker", - "broker", fmt.Sprintf("%s:%d", cfg.MQTTHost, 1883), - "client_id", "go_mqtt_client", -) - -if token := client.Connect(); token.Wait() && token.Error() != nil { - slog.Error("Failed to connect to MQTT broker", - "error", token.Error(), - "broker", cfg.MQTTHost, - ) - panic(token.Error()) -} - -slog.Info("Successfully connected to MQTT broker", - "broker", cfg.MQTTHost, -) - -// Line 146-202: MQTT subscription -slog.Info("Subscribed to MQTT topic", - "topic", topic, -) -``` - -#### MQTT Message Handler - -```go -// Line 26-83: Replace with structured logging -func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) { - hostname := strings.Split(topic, "/")[1] - - slog.Debug("Received MQTT message", - "topic", topic, - "hostname", hostname, - "message_length", len(message), - ) - - msgStr := string(message) - - if strings.HasPrefix(msgStr, "[") { - var readings []model.RawReading - if err := json.Unmarshal(message, &readings); err != nil { - slog.Error("Failed to parse JSON message", - "error", err, - "topic", topic, - "message", msgStr, - ) - return - } - - slog.Debug("Parsed JSON readings", - "count", len(readings), - "topic", topic, - ) - - processed := 0 - skipped := 0 - for _, reading := range readings { - if reading.Type == "Gateway" { - skipped++ - continue - } - - val, ok := appState.BeaconExists(reading.MAC) - if !ok { - slog.Debug("Skipping unregistered beacon", - "mac", reading.MAC, - "hostname", hostname, - ) - skipped++ - continue - } - - adv := model.BeaconAdvertisement{ - ID: val, - Hostname: hostname, - MAC: reading.MAC, - RSSI: int64(reading.RSSI), - Data: reading.RawData, - } - - encodedMsg, err := json.Marshal(adv) - if err != nil { - slog.Error("Failed to marshal beacon advertisement", - "error", err, - "beacon_id", val, - ) - continue - } - - msg := kafka.Message{Value: encodedMsg} - - if err := writer.WriteMessages(context.Background(), msg); err != nil { - slog.Error("Failed to write to Kafka", - "error", err, - "beacon_id", val, - "topic", "rawbeacons", - ) - time.Sleep(1 * time.Second) - break - } - processed++ - } - - slog.Info("Processed MQTT readings", - "processed", processed, - "skipped", skipped, - "hostname", hostname, - ) - } else { - slog.Warn("Received non-JSON message format", - "message", msgStr, - "topic", topic, - ) - } -} -``` - -#### Event Loop - -```go -// Line 148-184: Event loop logging -case msg := <-chApi: - slog.Debug("Received API update from Kafka", - "method", msg.Method, - "id", msg.ID, - "mac", msg.MAC, - ) - - switch msg.Method { - case "POST": - id := msg.ID - appState.AddBeaconToLookup(msg.MAC, id) - slog.Info("Beacon added to lookup", - "beacon_id", id, - "mac", msg.MAC, - ) - case "DELETE": - id := msg.MAC - if id == "all" { - appState.CleanLookup() - slog.Info("Cleared all beacons from lookup") - continue - } - appState.RemoveBeaconFromLookup(id) - slog.Info("Beacon removed from lookup", - "beacon_id", id, - ) - } - -case msg := <-chAlert: - slog.Debug("Received alert from Kafka", - "alert_id", msg.ID, - "type", msg.Type, - ) - // ... processing ... - -case msg := <-chMqtt: - slog.Debug("Received tracker list request", - "trackers_count", len(msg), - ) - // ... processing ... -``` - -#### Shutdown - -```go -// Line 186-195: Graceful shutdown -slog.Info("Shutting down bridge service", - "reason", "context_done", -) - -wg.Wait() - -slog.Info("All goroutines stopped, closing Kafka connections") -kafkaManager.CleanKafkaReaders() -kafkaManager.CleanKafkaWriters() - -client.Disconnect(250) -slog.Info("Disconnected from MQTT broker") - -slog.Info("Bridge service shutdown complete") -``` - ---- - -### 2. Decoder Service -**File:** [cmd/decoder/main.go](cmd/decoder/main.go) - -#### Initialization - -```go -// Line 42-48: Service initialization -slog.Info("Initializing decoder service", - "kafka_url", cfg.KafkaURL, - "reader_topics", []string{"rawbeacons", "parser"}, - "writer_topics", []string{"alertbeacons"}, -) - -slog.Info("Decoder service initialized", - "parsers_loaded", len(parserRegistry.ParserList), -) -``` - -#### Beacon Processing - -```go -// Line 86-93: Process incoming beacon -func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) { - slog.Debug("Processing beacon advertisement", - "beacon_id", adv.ID, - "hostname", adv.Hostname, - "rssi", adv.RSSI, - "data_length", len(adv.Data), - ) - - if err := decodeBeacon(adv, appState, writer, parserRegistry); err != nil { - slog.Error("Failed to decode beacon", - "error", err, - "beacon_id", adv.ID, - "hostname", adv.Hostname, - ) - } -} - -// Line 95-136: Decode beacon -func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, parserRegistry *model.ParserRegistry) error { - beacon := strings.TrimSpace(adv.Data) - id := adv.ID - - if beacon == "" { - slog.Debug("Skipping beacon with empty data", - "beacon_id", id, - ) - return nil - } - - b, err := hex.DecodeString(beacon) - if err != nil { - slog.Warn("Failed to decode hex string", - "error", err, - "beacon_id", id, - "data", beacon, - ) - return err - } - - slog.Debug("Decoded beacon data", - "beacon_id", id, - "bytes_length", len(b), - ) - - // ... parsing logic ... - - if event.ID == "" { - slog.Debug("Skipping event with empty ID", - "beacon_id", id, - ) - return nil - } - - prevEvent, ok := appState.GetBeaconEvent(id) - appState.UpdateBeaconEvent(id, event) - - if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { - slog.Debug("Skipping duplicate event", - "beacon_id", id, - "event_type", event.Type, - ) - return nil - } - - slog.Info("Decoded beacon event", - "beacon_id", id, - "event_type", event.Type, - "event", event.Event, - "battery", event.Battery, - "temperature", event.Temperature, - ) - - eMsg, err := event.ToJSON() - if err != nil { - slog.Error("Failed to marshal event to JSON", - "error", err, - "beacon_id", id, - ) - return err - } - - if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil { - slog.Error("Failed to write event to Kafka", - "error", err, - "beacon_id", id, - "topic", "alertbeacons", - ) - return err - } - - slog.Debug("Successfully sent event to Kafka", - "beacon_id", id, - "topic", "alertbeacons", - ) - - return nil -} -``` - -#### Parser Management - -```go -// Line 64-74: Parser configuration -case msg := <-chParser: - slog.Info("Received parser configuration update", - "action", msg.ID, - "parser_name", msg.Name, - ) - - switch msg.ID { - case "add": - config := msg.Config - parserRegistry.Register(config.Name, config) - slog.Info("Registered new parser", - "parser_name", config.Name, - "type", config.Type, - ) - case "delete": - parserRegistry.Unregister(msg.Name) - slog.Info("Unregistered parser", - "parser_name", msg.Name, - ) - case "update": - config := msg.Config - parserRegistry.Register(config.Name, config) - slog.Info("Updated parser configuration", - "parser_name", config.Name, - "type", config.Type, - ) - } -``` - ---- - -### 3. Location Service -**File:** [cmd/location/main.go](cmd/location/main.go) - -#### Initialization - -```go -// Line 37-47: Service initialization -slog.Info("Initializing location service", - "kafka_url", cfg.KafkaURL, - "reader_topics", []string{"rawbeacons", "settings"}, - "writer_topics", []string{"locevents"}, - "algorithm", "filter", -) -``` - -#### Location Algorithm - -```go -// Line 60-68: Algorithm execution -case <-locTicker.C: - settings := appState.GetSettings() - slog.Debug("Running location algorithm", - "algorithm", settings.CurrentAlgorithm, - "beacon_count", len(appState.GetAllBeacons()), - ) - - switch settings.CurrentAlgorithm { - case "filter": - getLikelyLocations(appState, kafkaManager.GetWriter("locevents")) - case "ai": - slog.Warn("AI algorithm not yet implemented") - } -``` - -#### Location Processing - -```go -// Line 85-157: Location calculation -func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { - beacons := appState.GetAllBeacons() - settings := appState.GetSettingsValue() - - slog.Debug("Calculating likely locations", - "beacon_count", len(beacons), - "algorithm", "filter", - ) - - processed := 0 - skipped := 0 - updated := 0 - - for _, beacon := range beacons { - mSize := len(beacon.BeaconMetrics) - if mSize == 0 { - skipped++ - continue - } - - // Check if beacon is too old - if (int64(time.Now().Unix()) - beacon.BeaconMetrics[mSize-1].Timestamp) > settings.LastSeenThreshold { - slog.Debug("Beacon data too old", - "beacon_id", beacon.ID, - "last_seen", beacon.BeaconMetrics[mSize-1].Timestamp, - "threshold", settings.LastSeenThreshold, - ) - skipped++ - continue - } - - // ... location calculation logic ... - - locationChanged := bestLocName != beacon.PreviousLocation - - if locationChanged { - slog.Info("Beacon location changed", - "beacon_id", beacon.ID, - "previous_location", beacon.PreviousLocation, - "new_location", bestLocName, - "confidence", beacon.LocationConfidence, - "distance", r.Distance, - ) - updated++ - } - - appState.UpdateBeacon(beacon.ID, beacon) - processed++ - - // ... send to Kafka ... - } - - slog.Info("Location calculation complete", - "processed", processed, - "updated", updated, - "skipped", skipped, - ) -} -``` - -#### Beacon Assignment - -```go -// Line 159-199: Assign beacon to list -func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) { - id := adv.ID - now := time.Now().Unix() - settings := appState.GetSettingsValue() - - slog.Debug("Assigning beacon to tracking list", - "beacon_id", id, - "hostname", adv.Hostname, - "rssi", adv.RSSI, - ) - - if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) { - slog.Debug("Beacon RSSI below threshold, skipping", - "beacon_id", id, - "rssi", adv.RSSI, - "threshold", settings.RSSIMinThreshold, - ) - return - } - - beacon, ok := appState.GetBeacon(id) - if !ok { - slog.Debug("Creating new beacon entry", - "beacon_id", id, - ) - beacon = model.Beacon{ID: id} - } - - // ... update beacon metrics ... - - slog.Debug("Updated beacon metrics", - "beacon_id", id, - "metrics_count", len(beacon.BeaconMetrics), - "distance", metric.Distance, - "location", metric.Location, - ) - - appState.UpdateBeacon(id, beacon) -} -``` - -#### Settings Update - -```go -case msg := <-chSettings: - slog.Info("Received settings update", - "settings", msg, - ) - appState.UpdateSettings(msg) - slog.Info("Settings updated successfully", - "current_algorithm", appState.GetSettings().CurrentAlgorithm, - ) -``` - ---- - -### 4. Server Service -**File:** [cmd/server/main.go](cmd/server/main.go) - -#### Initialization - -```go -// Line 46-49: Database connection -db, err := database.Connect(cfg) -if err != nil { - slog.Error("Failed to connect to database", - "error", err, - "connection_string", cfg.DBConnectionString, - ) - log.Fatalf("Failed to open database connection: %v\n", err) -} - -slog.Info("Successfully connected to database") - -// Line 60-89: Configuration loading -configFile, err := os.Open("/app/cmd/server/config.json") -if err != nil { - slog.Error("Failed to open configuration file", - "error", err, - "file_path", "/app/cmd/server/config.json", - ) - panic(err) -} - -slog.Info("Loading parser configurations", - "file_path", "/app/cmd/server/config.json", -) - -var configs []model.Config -if err := json.Unmarshal(b, &configs); err != nil { - slog.Error("Failed to parse configuration file", - "error", err, - ) - panic(err) -} - -slog.Info("Loaded parser configurations", - "count", len(configs), -) - -// Persist configs -for _, config := range configs { - if err := db.Create(&config).Error; err != nil { - slog.Warn("Failed to persist config to database", - "error", err, - "config_name", config.Name, - ) - } -} - -// Send to Kafka -for _, config := range configs { - kp := model.KafkaParser{ - ID: "add", - Config: config, - } - - if err := service.SendParserConfig(kp, kafkaManager.GetWriter("parser"), ctx); err != nil { - slog.Error("Failed to send parser config to Kafka", - "error", err, - "parser_name", config.Name, - ) - } else { - slog.Info("Sent parser config to Kafka", - "parser_name", config.Name, - "type", config.Type, - ) - } -} - -// Line 87-89: API client initialization -if err := apiclient.UpdateDB(db, ctx, cfg, kafkaManager.GetWriter("apibeacons"), appState); err != nil { - slog.Error("Failed to initialize API client", - "error", err, - ) - fmt.Printf("Error in getting token: %v\n", err) -} - -slog.Info("API client initialized successfully") - -// Line 136-147: HTTP server -slog.Info("Starting HTTP server", - "address", cfg.HTTPAddr, - "cors_origins", "*", -) - -go server.ListenAndServe() - -slog.Info("HTTP server started", - "address", cfg.HTTPAddr, -) -``` - -#### Event Loop - -```go -// Line 153-181: Event loop -case msg := <-chLoc: - slog.Debug("Received location event", - "beacon_id", msg.ID, - "location", msg.Location, - "distance", msg.Distance, - ) - service.LocationToBeaconService(msg, db, kafkaManager.GetWriter("alert"), ctx) - -case msg := <-chEvents: - slog.Debug("Received beacon event", - "beacon_id", msg.ID, - "event_type", msg.Type, - "event", msg.Event, - ) - - id := msg.ID - if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil { - slog.Warn("Received event for untracked beacon", - "beacon_id", id, - "error", err, - ) - continue - } - - if err := db.Updates(&model.Tracker{ - ID: id, - Battery: msg.Battery, - Temperature: msg.Temperature, - }).Error; err != nil { - slog.Error("Failed to update tracker metrics", - "error", err, - "beacon_id", id, - ) - } else { - slog.Info("Updated tracker metrics", - "beacon_id", id, - "battery", msg.Battery, - "temperature", msg.Temperature, - ) - } - -case <-beaconTicker.C: - slog.Debug("Refreshing tracker list for MQTT") - var list []model.Tracker - db.Find(&list) - - eMsg, err := json.Marshal(list) - if err != nil { - slog.Error("Failed to marshal tracker list", - "error", err, - "count", len(list), - ) - continue - } - - msg := kafka.Message{Value: eMsg} - if err := kafkaManager.GetWriter("mqtt").WriteMessages(ctx, msg); err != nil { - slog.Error("Failed to send tracker list to MQTT", - "error", err, - "count", len(list), - ) - } else { - slog.Debug("Sent tracker list to MQTT", - "count", len(list), - ) - } -``` - -#### Shutdown - -```go -// Line 184-199: Graceful shutdown -if err := server.Shutdown(context.Background()); err != nil { - slog.Error("Failed to shutdown HTTP server", - "error", err, - ) -} - -slog.Info("HTTP server stopped") - -wg.Wait() - -slog.Info("All goroutines stopped, closing Kafka connections") -kafkaManager.CleanKafkaReaders() -kafkaManager.CleanKafkaWriters() - -slog.Info("All services shutdown complete") -``` - ---- - -## Supporting Packages - -### Kafka Client Package -**File:** [internal/pkg/kafkaclient/consumer.go](internal/pkg/kafkaclient/consumer.go) - -```go -// Line 12-35: Consumer function -func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - slog.Info("Starting Kafka consumer", - "topic", r.Config().Topic, - "group_id", r.Config().GroupID, - ) - - messageCount := 0 - errorCount := 0 - - for { - select { - case <-ctx.Done(): - slog.Info("Kafka consumer shutting down", - "topic", r.Config().Topic, - "messages_processed", messageCount, - "errors", errorCount, - ) - return - default: - msg, err := r.ReadMessage(ctx) - if err != nil { - errorCount++ - slog.Error("Error reading Kafka message", - "error", err, - "topic", r.Config().Topic, - "error_count", errorCount, - ) - continue - } - - var data T - if err := json.Unmarshal(msg.Value, &data); err != nil { - errorCount++ - slog.Error("Error decoding Kafka message", - "error", err, - "topic", r.Config().Topic, - "message_length", len(msg.Value), - ) - continue - } - - messageCount++ - if messageCount%100 == 0 { - slog.Debug("Kafka consumer progress", - "topic", r.Config().Topic, - "messages_processed", messageCount, - ) - } - - ch <- data - } - } -} -``` - -**File:** [internal/pkg/kafkaclient/manager.go](internal/pkg/kafkaclient/manager.go) - -```go -// Line 38-52: Add Kafka writer -func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { - slog.Debug("Creating Kafka writer", - "topic", topic, - "brokers", kafkaUrl, - ) - - kafkaWriter := &kafka.Writer{ - Addr: kafka.TCP(kafkaUrl), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - Async: false, - RequiredAcks: kafka.RequireAll, - BatchSize: 100, - BatchTimeout: 10 * time.Millisecond, - } - - m.kafkaWritersMap.KafkaWritersLock.Lock() - m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter - m.kafkaWritersMap.KafkaWritersLock.Unlock() - - slog.Info("Kafka writer created", - "topic", topic, - ) -} - -// Line 66-79: Add Kafka reader -func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { - slog.Debug("Creating Kafka reader", - "topic", topic, - "group_id", groupID, - "brokers", kafkaUrl, - ) - - brokers := strings.Split(kafkaUrl, ",") - kafkaReader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokers, - GroupID: groupID, - Topic: topic, - MinBytes: 1, - MaxBytes: 10e6, - }) - - m.kafkaReadersMap.KafkaReadersLock.Lock() - m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader - m.kafkaReadersMap.KafkaReadersLock.Unlock() - - slog.Info("Kafka reader created", - "topic", topic, - "group_id", groupID, - ) -} - -// Line 54-64: Clean writers -func (m *KafkaManager) CleanKafkaWriters() { - slog.Info("Shutting down Kafka writers", - "count", len(m.kafkaWritersMap.KafkaWriters), - ) - - m.kafkaWritersMap.KafkaWritersLock.Lock() - for topic, r := range m.kafkaWritersMap.KafkaWriters { - if err := r.Close(); err != nil { - slog.Error("Error closing Kafka writer", - "error", err, - "topic", topic, - ) - } else { - slog.Info("Kafka writer closed", - "topic", topic, - ) - } - } - m.kafkaWritersMap.KafkaWritersLock.Unlock() - - slog.Info("Kafka writers shutdown complete") -} - -// Line 81-90: Clean readers -func (m *KafkaManager) CleanKafkaReaders() { - slog.Info("Shutting down Kafka readers", - "count", len(m.kafkaReadersMap.KafkaReaders), - ) - - m.kafkaReadersMap.KafkaReadersLock.Lock() - for topic, r := range m.kafkaReadersMap.KafkaReaders { - if err := r.Close(); err != nil { - slog.Error("Error closing Kafka reader", - "error", err, - "topic", topic, - ) - } else { - slog.Info("Kafka reader closed", - "topic", topic, - ) - } - } - m.kafkaReadersMap.KafkaReadersLock.Unlock() - - slog.Info("Kafka readers shutdown complete") -} -``` - -### API Client Package -**File:** [internal/pkg/apiclient/updatedb.go](internal/pkg/apiclient/updatedb.go) - -```go -// Line 19-74: Update database -func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error { - slog.Info("Initializing database from API", - "api_url", cfg.APIURL, - ) - - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - - token, err := GetToken(ctx, cfg, client) - if err != nil { - slog.Error("Failed to get authentication token", - "error", err, - ) - return err - } - - slog.Info("Successfully authenticated with API") - - // Sync trackers - if trackers, err := GetTrackers(token, client); err == nil { - slog.Info("Fetched trackers from API", - "count", len(trackers), - ) - syncTable(db, trackers) - - if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { - slog.Error("Failed to send clear lookup message", - "error", err, - ) - } - - for _, v := range trackers { - apiUpdate := model.ApiUpdate{ - Method: "POST", - ID: v.ID, - MAC: v.MAC, - } - - if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil { - slog.Error("Failed to send tracker registration", - "error", err, - "tracker_id", v.ID, - ) - } else { - slog.Debug("Sent tracker registration", - "tracker_id", v.ID, - "mac", v.MAC, - ) - } - } - } else { - slog.Error("Failed to fetch trackers from API", - "error", err, - ) - } - - // Sync gateways - if gateways, err := GetGateways(token, client); err == nil { - slog.Info("Fetched gateways from API", - "count", len(gateways), - ) - syncTable(db, gateways) - } else { - slog.Error("Failed to fetch gateways from API", - "error", err, - ) - } - - // Sync zones - if zones, err := GetZones(token, client); err == nil { - slog.Info("Fetched zones from API", - "count", len(zones), - ) - syncTable(db, zones) - } else { - slog.Error("Failed to fetch zones from API", - "error", err, - ) - } - - // Sync tracker zones - if trackerZones, err := GetTrackerZones(token, client); err == nil { - slog.Info("Fetched tracker zones from API", - "count", len(trackerZones), - ) - syncTable(db, trackerZones) - } else { - slog.Error("Failed to fetch tracker zones from API", - "error", err, - ) - } - - // Initialize settings - var settings model.Settings - db.First(&settings) - if settings.ID == 0 { - slog.Info("Initializing default settings") - db.Create(appState.GetSettings()) - } - - slog.Info("Database sync complete") - return nil -} - -// Line 76-91: Sync table -func syncTable[T any](db *gorm.DB, data []T) { - if len(data) == 0 { - slog.Debug("No data to sync", - "type", fmt.Sprintf("%T", data), - ) - return - } - - slog.Debug("Syncing table", - "count", len(data), - "type", fmt.Sprintf("%T", data), - ) - - var ids []string - for _, item := range data { - v := reflect.ValueOf(item).FieldByName("ID").String() - ids = append(ids, v) - } - - db.Transaction(func(tx *gorm.DB) error { - result := tx.Where("id NOT IN ?", ids).Delete(new(T)) - if result.Error != nil { - slog.Error("Failed to delete stale records", - "error", result.Error, - "deleted_count", result.RowsAffected, - ) - } - - result = tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data) - if result.Error != nil { - slog.Error("Failed to upsert records", - "error", result.Error, - ) - return result.Error - } - - slog.Info("Table sync complete", - "upserted", len(data), - "deleted", result.RowsAffected, - ) - return nil - }) -} -``` - -**File:** [internal/pkg/apiclient/auth.go](internal/pkg/apiclient/auth.go) - -```go -// Line 17-45: Get token -func GetToken(ctx context.Context, cfg *config.Config, client *http.Client) (string, error) { - slog.Debug("Requesting authentication token", - "url", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", - ) - - formData := url.Values{} - formData.Set("grant_type", "password") - formData.Set("client_id", "Fastapi") - formData.Set("client_secret", "wojuoB7Z5xhlPFrF2lIxJSSdVHCApEgC") - formData.Set("username", "core") - formData.Set("password", "C0r3_us3r_Cr3d3nt14ls") - formData.Set("audience", "Fastapi") - - req, err := http.NewRequest("POST", "https://10.251.0.30:10002/realms/API.Server.local/protocol/openid-connect/token", strings.NewReader(formData.Encode())) - if err != nil { - slog.Error("Failed to create auth request", - "error", err, - ) - return "", err - } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - - req = req.WithContext(ctx) - res, err := client.Do(req) - if err != nil { - slog.Error("Failed to send auth request", - "error", err, - ) - return "", err - } - - var j response - if err := json.NewDecoder(res.Body).Decode(&j); err != nil { - slog.Error("Failed to decode auth response", - "error", err, - "status_code", res.StatusCode, - ) - return "", err - } - - slog.Info("Successfully obtained authentication token", - "token_length", len(j.Token), - ) - - return j.Token, nil -} -``` - -### Service Package -**File:** [internal/pkg/service/beacon_service.go](internal/pkg/service/beacon_service.go) - -```go -// Line 17-67: Location to beacon service -func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { - if msg.ID == "" { - slog.Warn("Received location message with empty ID") - return - } - - slog.Debug("Processing location message", - "beacon_id", msg.ID, - "location", msg.Location, - "distance", msg.Distance, - ) - - var zones []model.TrackerZones - if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil { - slog.Error("Failed to fetch tracker zones", - "error", err, - "beacon_id", msg.ID, - ) - return - } - - slog.Debug("Fetched tracker zones", - "beacon_id", msg.ID, - "zones_count", len(zones), - ) - - var allowedZones []string - for _, z := range zones { - allowedZones = append(allowedZones, z.ZoneList...) - } - - var gw model.Gateway - mac := formatMac(msg.Location) - if err := db.Select("id").Where("mac = ?", mac).First(&gw).Error; err != nil { - slog.Warn("Gateway not found for location", - "mac", mac, - "beacon_id", msg.ID, - "error", err, - ) - return - } - - slog.Debug("Found gateway for location", - "gateway_id", gw.ID, - "mac", mac, - ) - - if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) { - alert := model.Alert{ - ID: msg.ID, - Type: "Restricted zone", - Value: gw.ID, - } - - slog.Warn("Beacon in restricted zone", - "beacon_id", msg.ID, - "gateway_id", gw.ID, - "allowed_zones", len(allowedZones), - ) - - eMsg, err := json.Marshal(alert) - if err != nil { - slog.Error("Failed to marshal alert", - "error", err, - "beacon_id", msg.ID, - ) - } else { - msg := kafka.Message{Value: eMsg} - if err := writer.WriteMessages(ctx, msg); err != nil { - slog.Error("Failed to send alert to Kafka", - "error", err, - "beacon_id", msg.ID, - "alert_type", "Restricted zone", - ) - } else { - slog.Info("Sent restricted zone alert", - "beacon_id", msg.ID, - "gateway_id", gw.ID, - ) - } - } - } - - // Save track - if err := db.Create(&model.Tracks{ - UUID: msg.ID, - Timestamp: time.Now(), - Gateway: gw.ID, - GatewayMac: gw.MAC, - Tracker: msg.ID, - }).Error; err != nil { - slog.Error("Failed to save track", - "error", err, - "beacon_id", msg.ID, - "gateway_id", gw.ID, - ) - return - } - - slog.Debug("Saved track record", - "beacon_id", msg.ID, - "gateway_id", gw.ID, - ) - - // Update tracker - if err := db.Updates(&model.Tracker{ - ID: msg.ID, - Location: gw.ID, - Distance: msg.Distance, - }).Error; err != nil { - slog.Error("Failed to update tracker location", - "error", err, - "beacon_id", msg.ID, - ) - } else { - slog.Info("Updated tracker location", - "beacon_id", msg.ID, - "gateway_id", gw.ID, - "distance", msg.Distance, - ) - } -} -``` - ---- - -## Configuration Updates - -Add these fields to your configuration structure: - -```go -// internal/pkg/config/config.go -type Config struct { - // ... existing fields ... - - // Logging configuration - LogLevel string `json:"log_level" env:"LOG_LEVEL" default:"info"` - LogFormat string `json:"log_format" env:"LOG_FORMAT" default:"json"` - LogAddSource bool `json:"log_add_source" env:"LOG_ADD_SOURCE" default:"false"` -} -``` - ---- - -## Log Aggregation and Monitoring - -### Recommended Log Queries - -**All errors across services:** -```bash -jq 'select(.level == "error")' *.log -``` - -**Beacon processing by ID:** -```bash -jq 'select(.beacon_id == "beacon-123")' bridge.log decoder.log location.log -``` - -**Kafka message flow:** -```bash -jq 'select(.topic != null)' bridge.log decoder.log location.log server.log -``` - -**Performance analysis:** -```bash -jq 'select(.duration != null)' *.log -``` - -### Metrics to Track - -1. **Message throughput** - Messages processed per second per service -2. **Error rates** - Errors per 1000 messages by type -3. **Processing latency** - Time from ingestion to processing -4. **Kafka lag** - Consumer lag per topic/group -5. **Connection failures** - MQTT/Kafka reconnection frequency - ---- - -## Migration Checklist - -- [ ] Update `internal/pkg/logger/logger.go` with new configuration options -- [ ] Add logging configuration fields to `internal/pkg/config/config.go` -- [ ] Replace `fmt.Println`, `fmt.Printf`, `log.Printf` with `slog` calls in: - - [ ] `cmd/bridge/main.go` - - [ ] `cmd/decoder/main.go` - - [ ] `cmd/location/main.go` - - [ ] `cmd/server/main.go` - - [ ] `internal/pkg/kafkaclient/consumer.go` - - [ ] `internal/pkg/kafkaclient/manager.go` - - [ ] `internal/pkg/apiclient/updatedb.go` - - [ ] `internal/pkg/apiclient/auth.go` - - [ ] `internal/pkg/service/beacon_service.go` -- [ ] Test all services with new logging -- [ ] Verify log rotation and disk space management -- [ ] Set up log aggregation/monitoring dashboard - ---- - -## Example Log Output - -### JSON Format (Production) -```json -{"time":"2026-01-20T10:15:30.123456Z","level":"INFO","msg":"Processing beacon","service":"bridge","beacon_id":"beacon-001","rssi":-65,"hostname":"gateway-1"} -{"time":"2026-01-20T10:15:30.234567Z","level":"ERROR","msg":"Failed to write to Kafka","service":"bridge","error":"connection timeout","beacon_id":"beacon-001","topic":"rawbeacons"} -``` - -### Text Format (Development) -``` -time=2026-01-20T10:15:30.123Z level=INFO msg="Processing beacon" service=bridge beacon_id=beacon-001 rssi=-65 hostname=gateway-1 -time=2026-01-20T10:15:30.234Z level=ERROR msg="Failed to write to Kafka" service=bridge error="connection timeout" beacon_id=beacon-001 topic=rawbeacons -``` - ---- - -## Notes - -1. **Security**: Be careful not to log sensitive information (passwords, tokens, personal data) -2. **Performance**: Use `slog.Debug` for high-frequency logs in production -3. **Disk space**: Implement log rotation to prevent disk exhaustion -4. **Testing**: Add unit tests to verify logging behavior where critical -5. **Context propagation**: Consider adding request/context IDs for tracing diff --git a/refactor.md b/refactor.md deleted file mode 100644 index a337830..0000000 --- a/refactor.md +++ /dev/null @@ -1,676 +0,0 @@ -# Refactoring Plan for AFASystems Presence Detection System - -**Date:** 2026-01-16 -**Total Codebase:** ~3,391 lines of Go code across 4 services - -## Executive Summary - -After analyzing the codebase across the 4 main services (`bridge`, `decoder`, `location`, `server`), I've identified significant code duplication, inconsistent patterns, and maintenance challenges. This document outlines a structured refactoring approach to improve maintainability, reduce duplication, and establish clear architectural patterns. - ---- - -## Critical Issues Identified - -### 1. **Massive Code Duplication** (Priority: HIGH) - -#### Problem: Identical Boilerplate in All Services -All 4 services (`bridge/main.go:118-131`, `decoder/main.go:36-44`, `location/main.go:31-39`, `server/main.go:45-53`) contain **identical** code for: -- Log file creation -- Multi-writer setup (stderr + file) -- Logger initialization with JSON handler -- Context setup with signal handling - -**Impact:** Any change to logging or signal handling requires updating 4 files. - -**Duplication Factor:** ~60 lines × 4 services = 240 lines of duplicated code - -#### Problem: Kafka Consumer Pattern Duplication -Each service manually creates channels, adds to waitgroups, and starts consumers in the same pattern: -```go -chRaw := make(chan model.BeaconAdvertisement, 2000) -wg.Add(1) -go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) -``` - -This pattern appears in `bridge/main.go:147-154`, `decoder/main.go:57-62`, `location/main.go:55-60`, `server/main.go:110-115`. - ---- - -### 2. **Dead Code** (Priority: MEDIUM) - -#### Problem: Commented-out Code in bridge/main.go:76-103 -83 lines of commented CSV parsing code remain in the codebase. This: -- Reduces readability -- Creates confusion about what functionality is active -- Should be removed or moved to version control history - -#### Problem: Unused Variables -In `bridge/main.go:38`: -```go -var wg sync.WaitGroup -``` -This package-level variable is used but would be better as a struct field in a service object. - ---- - -### 3. **Inconsistent Error Handling** (Priority: HIGH) - -#### Problem: Mixed Error Handling Patterns -Across services, there are at least 3 different error handling patterns: - -1. **Silent continuation** (`bridge/main.go:35-37`): - ```go - if err != nil { - log.Printf("Error parsing JSON: %v", err) - return // or continue - } - ``` - -2. **Panic on error** (`bridge/main.go:169-171`): - ```go - if token := client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - ``` - -3. **Fatal termination** (`server/main.go:60-62`): - ```go - if err != nil { - log.Fatalf("Failed to open database connection: %v\n", err) - } - ``` - -**Impact:** Inconsistent behavior makes debugging difficult and error handling unpredictable. - ---- - -### 4. **Monolithic main() Functions** (Priority: HIGH) - -#### Problem: Single Large Function Does Everything -All main functions are doing too much: -- **bridge/main.go:118-224** (106 lines): Setup, MQTT connection, event loop, Kafka handling, shutdown -- **server/main.go:41-219** (178 lines): DB setup, Kafka setup, HTTP server, WebSocket, event loop, shutdown -- **decoder/main.go:27-91** (64 lines): Kafka setup, parser registry, event loop, processing -- **location/main.go:26-90** (64 lines): Kafka setup, ticker management, event loop, location algorithm - -**Impact:** Hard to test, hard to reason about, high cyclomatic complexity. - ---- - -### 5. **Lack of Abstraction for Common Patterns** (Priority: MEDIUM) - -#### Problem: No Service Lifecycle Management -Each service manually: -1. Creates logger -2. Sets up signal context -3. Creates Kafka readers/writers -4. Starts consumers -5. Runs event loop -6. Handles shutdown -7. Closes Kafka connections - -This is a perfect candidate for an abstraction. - ---- - -### 6. **Hardcoded Configuration** (Priority: MEDIUM) - -#### Problem: Hardcoded Paths and Values -- `server/main.go:75`: Hardcoded config file path `"/app/cmd/server/config.json"` -- `bridge/main.go:227`: Hardcoded MQTT topic `"publish_out/#"` -- `server/main.go:238`: Hardcoded ping ticker calculation `(60 * 9) / 10 * time.Second` -- `server/main.go:147`: Hardcoded beacon ticker `2 * time.Second` - -**Impact:** Difficult to configure without code changes. - ---- - -### 7. **Missing TODO Resolution** (Priority: LOW) - -#### Outstanding TODO -`internal/pkg/model/parser.go:74`: -```go -// TODO: change this to be dynamic, maybe event is interface with no predefined properties -``` - -This should be addressed to make the parser more flexible. - ---- - -### 8. **Inefficient Memory Usage** (Priority: LOW) - -#### Problem: Unbounded Map Growth Potential -In `location/main.go:113-119`: -```go -locList := make(map[string]float64) -for _, metric := range beacon.BeaconMetrics { - res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0))) - locList[metric.Location] += res -} -``` - -If `BeaconMetrics` grows unbounded, this could become a performance issue. However, current implementation limits this via `BeaconMetricSize` setting. - ---- - -## Refactoring Recommendations - -### Phase 1: Create Common Infrastructure (Immediate) - -#### 1.1 Create Service Lifecycle Framework - -**File:** `internal/pkg/server/service.go` -```go -package server - -import ( - "context" - "io" - "log" - "log/slog" - "os" - "os/signal" - "sync" - "syscall" -) - -type Service struct { - name string - cfg Config - logger *slog.Logger - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - kafkaMgr *KafkaManager -} - -func NewService(name string, cfg Config) (*Service, error) { - // Initialize logger - // Setup signal handling - // Create Kafka manager -} - -func (s *Service) Logger() *slog.Logger { - return s.logger -} - -func (s *Service) Context() context.Context { - return s.ctx -} - -func (s *Service) WaitGroup() *sync.WaitGroup { - return &s.wg -} - -func (s *Service) Start() { - // Start event loop -} - -func (s *Service) Shutdown() { - // Handle graceful shutdown -} -``` - -**Benefits:** -- Single place for lifecycle management -- Consistent startup/shutdown across all services -- Easier testing with mock dependencies - -#### 1.2 Extract Logger Initialization - -**File:** `internal/pkg/server/logger.go` -```go -package server - -import ( - "io" - "log" - "log/slog" - "os" -) - -func InitLogger(logPath string) (*slog.Logger, io.Closer, error) { - logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - return nil, nil, err - } - - w := io.MultiWriter(os.Stderr, logFile) - logger := slog.New(slog.NewJSONHandler(w, nil)) - slog.SetDefault(logger) - - return logger, logFile, nil -} -``` - -**Benefits:** -- Reusable across all services -- Consistent logging format -- Easier to change logging strategy - -#### 1.3 Create Kafka Manager - -**File:** `internal/pkg/server/kafka.go` -```go -package server - -import ( - "context" - "sync" - - "github.com/AFASystems/presence/internal/pkg/kafkaclient" - "github.com/AFASystems/presence/internal/pkg/model" - "github.com/segmentio/kafka-go" -) - -type KafkaManager struct { - readers []*kafka.Reader - writers []*kafka.Writer - lock sync.RWMutex -} - -func (km *KafkaManager) CreateReader(url, topic, groupID string) *kafka.Reader -func (km *KafkaManager) CreateWriter(url, topic string) *kafka.Writer -func (km *KafkaManager) StartConsumer[T any](reader *kafka.Reader, ch chan<- T, ctx context.Context) -func (km *KafkaManager) Close() -``` - -**Benefits:** -- Centralized Kafka lifecycle management -- Type-safe consumer creation -- Automatic cleanup on shutdown - ---- - -### Phase 2: Refactor Individual Services (Short-term) - -#### 2.1 Bridge Service Refactoring - -**Current Issues:** -- Large monolithic main (106 lines) -- MQTT handler mixed with Kafka logic -- Commented dead code - -**Refactored Structure:** -``` -cmd/bridge/ -├── main.go (50 lines - just setup) -├── service.go (BridgeService struct) -├── mqtthandler/ -│ ├── handler.go (MQTT message handling) -│ └── parser.go (Parse MQTT messages) -└── kafkaevents/ - └── handlers.go (Kafka event handlers) -``` - -**Actions:** -1. Remove dead code (lines 76-103) -2. Extract MQTT handling to separate package -3. Create BridgeService struct with lifecycle methods -4. Use common Service framework from Phase 1 - -#### 2.2 Decoder Service Refactoring - -**Current Issues:** -- Processing logic mixed with event loop -- Parser registry embedded in main - -**Refactored Structure:** -``` -cmd/decoder/ -├── main.go (30 lines - just setup) -├── service.go (DecoderService struct) -├── processor/ -│ ├── beacon.go (Beacon decoding logic) -│ └── registry.go (Parser registry management) -└── kafkaevents/ - └── handlers.go (Kafka event handlers) -``` - -**Actions:** -1. Extract `decodeBeacon` logic to processor package -2. Create Processor interface for different beacon types -3. Separate parser registry into its own file - -#### 2.3 Location Service Refactoring - -**Current Issues:** -- Location algorithm embedded in event loop -- No abstraction for different algorithms - -**Refactored Structure:** -``` -cmd/location/ -├── main.go (30 lines - just setup) -├── service.go (LocationService struct) -├── algorithms/ -│ ├── interface.go (LocationAlgorithm interface) -│ ├── filter.go (Current filter algorithm) -│ └── ai.go (Future AI algorithm) -└── beacon/ - └── tracker.go (Beacon tracking logic) -``` - -**Actions:** -1. Define LocationAlgorithm interface -2. Move filter algorithm to separate file -3. Add factory pattern for algorithm selection -4. Extract beacon tracking logic - -#### 2.4 Server Service Refactoring - -**Current Issues:** -- Largest main function (178 lines) -- Mixed concerns: HTTP, WebSocket, Kafka, Database -- Deeply nested handler setup - -**Refactored Structure:** -``` -cmd/server/ -├── main.go (40 lines - just setup) -├── service.go (ServerService struct) -├── http/ -│ ├── server.go (HTTP server setup) -│ ├── routes.go (Route registration) -│ └── middleware.go (CORS, logging, etc.) -├── websocket/ -│ ├── handler.go (WebSocket upgrade) -│ ├── writer.go (WebSocket write logic) -│ └── reader.go (WebSocket read logic) -└── kafkaevents/ - └── handlers.go (Kafka event handlers) -``` - -**Actions:** -1. Extract HTTP server to separate package -2. Move WebSocket logic to dedicated package -3. Create route registration table -4. Separate Kafka event handlers - ---- - -### Phase 3: Standardize Error Handling (Medium-term) - -#### 3.1 Define Error Handling Policy - -**File:** `internal/pkg/errors/errors.go` -```go -package errors - -import ( - "fmt" - "log/slog" -) - -// Wrap wraps an error with context -func Wrap(err error, message string) error { - return fmt.Errorf("%s: %w", message, err) -} - -// LogAndReturn logs an error and returns it -func LogAndReturn(err error, message string) error { - slog.Error(message, "error", err) - return fmt.Errorf("%s: %w", message, err) -} - -// Must panics if err is not nil (for initialization only) -func Must(err error, message string) { - if err != nil { - panic(fmt.Sprintf("%s: %v", message, err)) - } -} -``` - -**Policy:** -- Use `LogAndReturn` for recoverable errors in event loops -- Use `Must` for initialization failures that prevent startup -- Use `Wrap` to add context to errors before returning -- Never use silent log-and-continue without explicit comments - ---- - -### Phase 4: Configuration Management (Medium-term) - -#### 4.1 Centralize Configuration - -**File:** `internal/pkg/config/bridge.go` (one per service) -```go -package config - -type BridgeConfig struct { - // Kafka settings - KafkaURL string - - // MQTT settings - MQTTUrl string - MQTTPort int - MQTTTopics []string - MQTTClientID string - - // Logging - LogPath string - - // Channels - ChannelBuffer int -} - -func LoadBridge() (*BridgeConfig, error) { - cfg := Load() // Load base config - - return &BridgeConfig{ - KafkaURL: cfg.KafkaURL, - MQTTUrl: cfg.MQTTHost, - MQTTPort: 1883, - MQTTTopics: []string{"publish_out/#"}, - MQTTClientID: "go_mqtt_client", - LogPath: "server.log", - ChannelBuffer: 200, - }, nil -} -``` - -**Benefits:** -- No more hardcoded values -- Easy to add environment variable overrides -- Clear configuration schema per service -- Easier testing with different configs - ---- - -### Phase 5: Testing Infrastructure (Long-term) - -#### 5.1 Add Interface Definitions - -Create interfaces for all external dependencies: -- `MQTTClient` interface -- `KafkaReader` interface -- `KafkaWriter` interface -- `Database` interface - -**Benefits:** -- Easy to mock for testing -- Clear contracts between components -- Better documentation - -#### 5.2 Add Unit Tests - -Target coverage: 70%+ - -**Priority:** -1. Business logic (location algorithms, beacon parsing) -2. Service lifecycle (startup, shutdown) -3. Error handling paths -4. Kafka message processing - ---- - -## Specific Code Improvements - -### Remove Dead Code - -**File:** `cmd/bridge/main.go:76-103` -- **Action:** Delete the 83 lines of commented CSV code -- **Reason:** Dead code, maintained in git history if needed - -### Fix Package-Level Variables - -**File:** `cmd/bridge/main.go:25` -- **Current:** `var wg sync.WaitGroup` -- **Action:** Move to BridgeService struct field -- **Reason:** Avoid global state, enable multiple service instances - -### Resolve TODO - -**File:** `internal/pkg/model/parser.go:74` -- **Current:** Hardcoded beacon event structure -- **Action:** Make BeaconEvent use flexible map or interface -- **Reason:** Support different beacon types without struct changes - -### Improve Channel Buffering - -**Current:** Random channel buffer sizes (200, 500, 2000) -- **Action:** Define constant or configuration value -- **File:** `internal/pkg/config/constants.go` -```go -const ( - DefaultChannelBuffer = 200 - LargeChannelBuffer = 2000 -) -``` - -### Add Context Timeouts - -**Current:** Some operations have no timeout -**Examples:** -- `bridge/main.go:69`: Kafka write has no timeout -- `bridge/main.go:158`: MQTT connection has no explicit timeout - -**Action:** Add timeouts to all I/O operations -```go -ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) -defer cancel() -err = writer.WriteMessages(ctx, msg) -``` - ---- - -## Implementation Priority - -### Week 1: Foundation -1. ✅ Create service lifecycle framework -2. ✅ Extract logger initialization -3. ✅ Remove dead code from bridge - -### Week 2-3: Service Refactoring -1. ✅ Refactor bridge service -2. ✅ Refactor decoder service -3. ✅ Refactor location service -4. ✅ Refactor server service - -### Week 4: Error Handling & Config -1. ✅ Standardize error handling -2. ✅ Centralize configuration -3. ✅ Add configuration validation - -### Week 5+: Testing & Documentation -1. ✅ Add unit tests for core logic -2. ✅ Add integration tests -3. ✅ Update documentation -4. ✅ Create architecture diagrams - ---- - -## Success Metrics - -### Code Quality -- **Before:** 240 lines of duplicated code -- **After:** < 50 lines of shared infrastructure -- **Reduction:** 80% reduction in duplication - -### Maintainability -- **Before:** Changes require updating 4 files -- **After:** Changes to shared code update once -- **Impact:** Faster development, fewer bugs - -### Testing -- **Before:** No unit tests (based on provided files) -- **After:** 70%+ code coverage -- **Impact:** Catches regressions early - -### File Sizes -- **Before:** main.go files 106-178 lines -- **After:** main.go files < 50 lines -- **Impact:** Easier to understand, better separation of concerns - ---- - -## Migration Strategy - -### Incremental Refactoring -1. **DO NOT** rewrite everything at once -2. Extract common code without changing behavior -3. Add tests before refactoring -4. Run existing tests after each change -5. Use feature flags for major changes - -### Backward Compatibility -- Keep Kafka topic names unchanged -- Keep API endpoints unchanged -- Keep database schema unchanged -- Allow old and new code to coexist during migration - -### Testing During Migration -1. Run existing services in parallel -2. Compare outputs -3. Load test with production-like traffic -4. Monitor for differences -5. Gradual rollout - ---- - -## Additional Recommendations - -### Documentation -1. Add godoc comments to all exported functions -2. Create architecture diagrams showing data flow -3. Document Kafka message formats -4. Add runbook for common operations - -### Monitoring -1. Add Prometheus metrics -2. Add structured logging with correlation IDs -3. Add health check endpoints -4. Add performance tracing - -### Development Workflow -1. Add pre-commit hooks -2. Add linting (golangci-lint) -3. Add formatting checks (gofmt, goimports) -4. Add dependency scanning - ---- - -## Conclusion - -The current codebase suffers from significant duplication and lacks clear architectural boundaries. By implementing this refactoring plan incrementally, you can: - -1. **Reduce duplication by 80%** through shared infrastructure -2. **Improve maintainability** through consistent patterns -3. **Enable testing** through proper abstractions -4. **Reduce bugs** through standardized error handling -5. **Accelerate development** through clearer structure - -The key is to refactor **incrementally** while maintaining backward compatibility and adding tests at each step. - ---- - -## Next Steps - -1. **Review this document** with your team -2. **Prioritize phases** based on your pain points -3. **Create tracking issues** for each phase -4. **Start with Phase 1** (common infrastructure) -5. **Measure success** using the metrics above - -**Recommended First Step:** Begin with Phase 1.1 (Service Lifecycle Framework) as it provides the foundation for all other refactoring work. - diff --git a/third_party/README.md b/third_party/README.md deleted file mode 100644 index cc530ca..0000000 --- a/third_party/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `/third_party` - -External helper tools, forked code and other 3rd party utilities (e.g., Swagger UI). diff --git a/tools/README.md b/tools/README.md deleted file mode 100644 index f22a58e..0000000 --- a/tools/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# `/tools` - -Supporting tools for this project. Note that these tools can import code from the `/pkg` and `/internal` directories. - -Examples: - -* https://github.com/istio/istio/tree/master/tools -* https://github.com/openshift/origin/tree/master/tools -* https://github.com/dapr/dapr/tree/master/tools diff --git a/web/README.md b/web/README.md deleted file mode 100644 index 020b468..0000000 --- a/web/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `/web` - -Web application specific components: static web assets, server side templates and SPAs. diff --git a/web/app/.keep b/web/app/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/web/static/.keep b/web/static/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/web/template/.keep b/web/template/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/website/README.md b/website/README.md deleted file mode 100644 index 3f36699..0000000 --- a/website/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# `/website` - -This is the place to put your project's website data if you are not using GitHub pages. - -Examples: - -* https://github.com/hashicorp/vault/tree/master/website -* https://github.com/perkeep/perkeep/tree/master/website