diff --git a/CODE_GRADE_AND_REFACTOR.md b/CODE_GRADE_AND_REFACTOR.md deleted file mode 100644 index 8f58b4a..0000000 --- a/CODE_GRADE_AND_REFACTOR.md +++ /dev/null @@ -1,138 +0,0 @@ -# Code Grade & Production Readiness Report (Updated) - -## Overall grade: **7.0 / 10** - -The codebase has been refactored into a clear app/service layout with thin `cmd` entrypoints, shared `internal/pkg` libraries, health/readiness endpoints, structured middleware, and addressed reliability/security items. It is suitable for development and staging; production use still requires CORS restriction, optional metrics/tracing, and (if desired) request validation and OpenAPI. - ---- - -## 1. What’s working well - -| Area | Notes | -|------|--------| -| **Structure** | `cmd//main.go` is thin (~25 lines); `internal/app/*` holds per-service composition; `internal/pkg` has api (response, middleware, handler), location, bridge, decoder, config, kafkaclient, logger, model, controller, service, database, apiclient, appcontext. | -| **Concurrency** | Channels, `sync.WaitGroup`, and `AppState` with RWMutex; event loops live in app layer, not in main. | -| **Shutdown** | `signal.NotifyContext` + app `Run`/`Shutdown`; Kafka and MQTT cleanup in app. | -| **Kafka** | `KafkaManager`, generic `Consume[T]`, graceful close. | -| **Observability** | `/health` and `/ready` (DB ping); middleware: logging, recovery, request ID, CORS; logging to file with fallback to stderr if file open fails. | -| **Reliability** | No panics in library code for logger (fallback to stderr); MQTT connect returns error; server init returns error; `WriteMessages` errors checked in parser service and settings controller. | -| **Security** | TLS skip verify is configurable via `TLS_INSECURE_SKIP_VERIFY` (default false). | -| **Testing** | Unit tests for appcontext, utils, model, controller, service, config; integration tests for bridge/decoder. | -| **Dependencies** | Modern stack (slog, segmentio/kafka-go, gorilla/mux, gorm). | - ---- - -## 2. Fixes applied since last report - -### 2.1 Startup and library behavior - -- **Bridge:** MQTT connect failure no longer panics; `internal/pkg/bridge/mqtt.go` returns error from `NewMQTTClient`, `cmd/bridge/main.go` exits with `log.Fatalf` on error. -- **Server:** DB and config init live in `internal/app/server`; `New`/`Init` return errors; `cmd/server/main.go` uses `log.Fatalf` on error (no panic in library). -- **Logger:** `CreateLogger` no longer uses `log.Fatalf`; on log file open failure it returns a logger that writes only to stderr and a no-op cleanup. - -### 2.2 Ignored errors - -- **parser_service.go:** `writer.WriteMessages(ctx, msg)` return value is checked and propagated. -- **settings_controller.go:** `writer.WriteMessages` error is checked; on failure returns 500 and logs; response sets `Content-Type: application/json`. -- **database:** Unused global `var DB *gorm.DB` removed. - -### 2.3 Security and configuration - -- **TLS:** `config.Config` has `TLSInsecureSkipVerify bool` (env `TLS_INSECURE_SKIP_VERIFY`, default false). Used in `apiclient.UpdateDB` and in location inference (`NewDefaultInferencer(cfg.TLSInsecureSkipVerify)`). -- **CORS:** Not changed (origin policy left to operator; middleware supports configurable origins). - -### 2.4 Observability - -- **Health/readiness:** Server exposes `/health` (liveness) and `/ready` (DB ping) via `internal/pkg/api/handler/health.go`. -- **Middleware:** Recovery (panic → 500), logging (method, path, status, duration), request ID (`X-Request-ID`), CORS. - -### 2.5 Code quality - -- **Bridge:** MQTT topic parsing uses `strings.SplitN(topic, "/", 2)` to avoid panic; CSV branch validates and logs (no writer usage yet). -- **Location:** Magic numbers moved to named constants in `internal/pkg/location/filter.go` (e.g. `SeenWeight`, `RSSIWeight`, `DefaultDistance`). -- **Duplication:** Bootstrap removed; each service uses `internal/app/` for init, run, and shutdown. - ---- - -## 3. Remaining / known limitations - -### 3.1 Config and env - -- **`getEnvPanic`** in `config` still panics on missing required env. To avoid panics in library, consider a `LoadServerSafe` (or similar) that returns `(*Config, error)` and use it only from `main` with explicit exit. Not changed in this pass. - -### 3.2 Security - -- **CORS:** Defaults remain permissive (e.g. `*`). Restrict to known frontend origins when deploying (e.g. via env or config). -- **Secrets:** Still loaded from env only; ensure no secrets in logs; consider a secret manager for production. - -### 3.3 API and validation - -- No OpenAPI/Swagger; no formal request/response contracts. -- Many handlers still use `http.Error` or `w.Write` without a single response helper; `api/response` exists for new/consistent endpoints. -- No request body validation (e.g. go-playground/validator); no idempotency keys. - -### 3.4 Resilience and operations - -- Kafka consumer: on `ReadMessage`/unmarshal error, logs and continues; no dead-letter or backoff yet. -- DB: no documented pool tuning; readiness only checks DB ping. -- No metrics (Prometheus/OpenTelemetry). No distributed tracing. - ---- - -## 4. Grade breakdown (updated) - -| Criterion | Score | Comment | -|---------------------|-------|--------| -| Architecture | 8/10 | Clear app layer, thin main, pkg separation; handlers still take concrete DB/writer (can be abstracted later). | -| Reliability | 7/10 | No panics in logger/bridge init; WriteMessages errors handled; health/ready; logger fallback. | -| Security | 6/10 | TLS skip verify configurable (default off); CORS still broad; secrets in env. | -| Observability | 7/10 | Health/ready, request logging, request ID, recovery; no metrics/tracing. | -| API design | 6/10 | Response helpers and middleware in place; many handlers still ad-hoc; no spec/validation. | -| Testing | 6/10 | Good unit coverage; more integration/E2E would help. | -| Code quality | 8/10 | Clear structure, constants for magic numbers, dead code removed, duplication reduced. | -| Production readiness | 6/10 | Health/ready and error handling in place; CORS, metrics, and validation still to do. | - -**Average ≈ 6.75; grade 7.0/10** – Refactor and applied fixes significantly improve structure, reliability, and observability; remaining work is mostly CORS, validation, and metrics/tracing. - ---- - -## 5. Checklist (updated) - -### 5.1 Reliability - -- [x] Remove panics / `log.Fatalf` from library where possible (logger fallback; bridge returns error). -- [x] Check and handle `WriteMessages` in parser service and settings controller. -- [x] Add `/health` and `/ready` on server. -- [ ] Document or add Kafka consumer retry/backoff and dead-letter if needed. -- [x] Make TLS skip verify configurable; default false. - -### 5.2 Observability - -- [x] Structured logging and request ID middleware. -- [ ] Add metrics (e.g. Prometheus) and optional tracing. - -### 5.3 API and validation - -- [ ] OpenAPI spec and validation. -- [ ] Consistent use of `api/response` and JSON error body across handlers. -- [ ] Restrict CORS to specific origins (operator-defined). - -### 5.4 Operations - -- [ ] Document env vars and deployment topology. -- [ ] Configurable timeouts; rate limiting if required. - -### 5.5 Code and structure - -- [x] Bridge topic parsing and CSV branch behavior clarified. -- [x] Unused `database.DB` global removed. -- [x] Location magic numbers moved to constants. -- [x] App layer and api/middleware/response in place. - ---- - -## 6. Summary - -- **Grade: 7.0/10** – Refactor and targeted fixes improve structure, reliability, and observability. Server has health/ready, middleware, and no panics in logger/bridge init; TLS skip verify is configurable; WriteMessages and logger errors are handled. -- **Still to do for production:** Restrict CORS, add metrics (and optionally tracing), validate requests and adopt consistent API responses, and document operations. Config loading can be made panic-free by adding safe loaders that return errors. -- **Not changed by design:** CORS policy left for operator to configure (e.g. via env or config). diff --git a/PRODUCTION_READINESS_REPORT.md b/PRODUCTION_READINESS_REPORT.md new file mode 100644 index 0000000..e05d1d5 --- /dev/null +++ b/PRODUCTION_READINESS_REPORT.md @@ -0,0 +1,210 @@ +# Production Readiness Report — Microservices (server, bridge, decoder, location) + +**Scope:** `cmd/server`, `cmd/bridge`, `cmd/decoder`, `cmd/location` and all packages they import. +**Date:** 2025-03-05. + +--- + +## Overall grade: **5.5 / 10** + +The codebase has a **solid structure** and **consistent patterns** across the four services, but **security**, **reliability**, and **operational hardening** are not yet at production level. With the changes suggested below, it can be brought to a 7–8/10 for a production deployment. + +--- + +## 1. Summary by dimension + +| Dimension | Grade | Notes | +| --------------------------- | ----- | ------------------------------------------------------------------------------------------------- | +| **Structure & readability** | 7/10 | Clear app lifecycle (New/Run/Shutdown), good package layout, some naming/duplication issues. | +| **Reliability** | 5/10 | Graceful shutdown and Kafka cleanup are good; missing retries, commit semantics, and DB/timeouts. | +| **Security** | 3/10 | No API auth, TLS/DB and client TLS weakened, CORS permissive, no rate limiting. | +| **Observability** | 6/10 | slog, request ID, logging middleware, health/ready; no metrics/tracing. | +| **Correctness** | 6/10 | Path vs body ID bugs in update endpoints, `context.Background()` in hot paths. | + +--- + +## 2. What’s in good shape + +- **Unified app pattern:** All four services use the same lifecycle: `New(cfg)` → optional `Init(ctx)` (server only) → `Run(ctx)` → `Shutdown()`, with `signal.NotifyContext` for graceful shutdown. +- **Graceful shutdown:** HTTP server shutdown, Kafka readers/writers and MQTT disconnect are explicitly closed; `sync.WaitGroup` used for consumer goroutines. +- **Structured logging:** `slog` with JSON handler and file + stderr; request logging with method, path, status, duration, bytes. +- **HTTP middleware:** Recovery (panic → 500), request ID, CORS, logging applied in a clear chain. +- **Health endpoints:** `/health` (liveness) and `/ready` (DB ping) for the server. +- **Kafka usage:** Centralized `KafkaManager` with RWMutex, separate readers/writers, group IDs per service. +- **Shared state:** `AppState` in `common/appcontext` is thread-safe (RWMutex) and used consistently. +- **Config:** Env-based config with service-specific loaders (`LoadServer`, `LoadBridge`, etc.) and `getEnvPanic` for required vars. +- **API responses:** Centralized `response.JSON`, `Error`, `BadRequest`, `InternalError`, `NotFound` with consistent JSON shape. +- **OpenAPI:** Routes reference `api/openapi.yaml` (OpenAPI 3.0), which helps readability and contract clarity. + +--- + +## 3. Critical issues + +### 3.1 Security + +- **No authentication or authorization on the HTTP API** + All server routes (`/reslevis/*`, `/configs/beacons`, etc.) are unauthenticated. Anyone who can reach the server can read/update/delete gateways, zones, trackers, parser configs, settings, alerts, and tracks. + +- **Database connection uses `sslmode=disable`** + In `internal/pkg/database/database.go`, DSN is built with `sslmode=disable`. In production, DB connections should use TLS and `sslmode=verify-full` (or equivalent) with CA verification. + +- **TLS verification disabled for outbound HTTP** + - `internal/pkg/apiclient/updatedb.go`: `TLSClientConfig: &tls.Config{InsecureSkipVerify: true}`. + - `internal/pkg/location/inference.go`: same, and **`NewDefaultInferencer(skipTLSVerify bool)` ignores the parameter** and always uses `InsecureSkipVerify: true`. + +- **CORS defaults to `*`** + In `internal/pkg/api/middleware/cors.go`, when `origins` is nil/empty, `origins = []string{"*"}`. Production should restrict origins to known front-end origins. + +- **Logger file mode `0666`** + In `internal/pkg/logger/logger.go`, `os.OpenFile(..., 0666)` makes the log file world-readable and -writable. Prefer `0600` or `0640`. + +- **No rate limiting or request body size limits** + No protection against abuse or large-body DoS; consider middleware for max body size and rate limiting (per IP or per key). + +**Recommendations:** + +- Add authentication/authorization middleware (e.g. JWT or API key validation) for all non-health API routes; keep `/health` and optionally `/ready` public. +- Make DB TLS configurable via env (e.g. `DB_SSLMODE`, `DB_SSLROOTCERT`) and use `sslmode=verify-full` in production. +- Use `cfg.TLSInsecureSkipVerify` (or equivalent) for all outbound HTTP clients; fix `NewDefaultInferencer` to respect the parameter. +- Configure CORS with explicit allowed origins (and optionally credentials) from config. +- Set log file mode to `0600` (or `0640` if a group needs read). +- Add middleware to limit request body size (e.g. `http.MaxBytesReader`) and consider rate limiting for API routes. + +--- + +### 3.2 Reliability + +- **Kafka consumer: decode errors and commit semantics** + In `internal/pkg/kafkaclient/consumer.go`, when `json.Unmarshal` fails, the code logs and `continue`s without committing. Depending on reader config, this can cause repeated redelivery of bad messages or ambiguous semantics. Production should either skip and commit, or send to a dead-letter path and commit. + + Answer: because readers are using consumer groups messages are auto commited, meaning bad unmarshal still commits as the message was technically read + +- **No retries on Kafka produce** + Event loops (server, bridge, decoder, location) call `WriteMessages` once; transient Kafka errors are not retried. Consider retry with backoff (and optional circuit breaker) for critical topics. + + Answer: the Writer object is already holding the default configuration for timeout, backoff and retries, but I still added some extra configurations + +- **Database: no explicit pool or timeouts** + `database.Connect` uses GORM defaults. For production, set `MaxOpenConns`, `MaxIdleConns`, and connection/timeout settings (e.g. `SetConnMaxLifetime`) on the underlying `*sql.DB`. + +- **UpdateDB and Init: failures only logged** + In `internal/app/server/app.go`, `apiclient.UpdateDB` errors are only logged; Init continues. Consider failing Init (or marking “degraded”) if sync is required for correct operation, or add retries/backoff. + +- **Use of `context.Background()` in async paths** + e.g. `internal/pkg/bridge/handler.go` and `internal/pkg/decoder/process.go` use `context.Background()` for Kafka writes. Prefer passing the request/event context (or a derived timeout) so shutdown and timeouts propagate. + +**Recommendations:** + +- Define a clear policy for Kafka consumer errors (skip+commit vs DLQ); avoid silent continue without commit unless intended. +- Add retry (with backoff) for Kafka produce in critical paths; consider a small wrapper or helper. +- Configure DB pool and timeouts in `database.Connect` (and optionally make them configurable via config). +- Decide whether UpdateDB is mandatory for startup; if yes, fail Init on error or retry; if no, document and consider a “degraded” readiness state. +- Pass context from the caller (or a timeout context) into Kafka write calls instead of `context.Background()`. + +--- + +### 3.3 Correctness and consistency + +- **Update endpoints: path `id` vs body `id`** + - **GatewayUpdateController** (`internal/pkg/controller/gateways_controller.go`): Uses `mux.Vars(r)["id"]` only to check existence with `First(..., "id = ?", id)`, then decodes body into `gateway` and calls `Save(&gateway)`. The updated record is identified by `gateway.ID` from the body, not the path. A client can send a different ID in the body and update another resource. + - **ZoneUpdateController**: Route is `updateZone` (no `{id}` in path); uses `zone.ID` from body only. If the API contract expects path-based ID, this is inconsistent. + Recommendation: For update-by-id, use the path parameter as the single source of truth: load by path `id`, decode body into a DTO or partial struct, then update only allowed fields for that id (e.g. selective updates or merge then update by path id). + +- **TrackerUpdateController** + Uses body `tracker.ID` for lookup and save; route has no `{id}` in path. If other update endpoints use path `{id}`, align behavior and documentation. + +**Recommendations:** + +- Standardize update semantics: either path `{id}` only (body has no id or it must match path) or document “body id is canonical” and ensure no IDOR. +- Prefer path-based resource identification for updates/deletes and bind body to allowed fields only. + +--- + +## 4. Minor issues and improvements + +- **Logging:** Replace `fmt.Println` in `internal/pkg/apiclient/auth.go` and any `internal/pkg/apiclient/updatedb.go` / inference paths with `slog` (or structured logger) so logs are consistent and configurable. +- **Token lifecycle:** `DefaultInferencer` caches token in a struct field with no expiry or refresh; token may be used after expiry. Use token expiry from auth response and refresh when needed. +- **BeaconLookup naming:** In `appcontext`, `BeaconExists(id string)` is used with MAC (e.g. in bridge handler). Rename parameter to `mac` (or the method to `LookupIDByMAC`) to avoid confusion. +- **Bridge/location/decoder:** No `/health` or `/ready` endpoints. For orchestration (e.g. Kubernetes), consider a small HTTP server or at least a process-level health check so the platform can restart unhealthy instances. +- **Dependencies:** `go.mod` is clear; consider auditing indirect deps and keeping them updated (e.g. `go list -m -u all` and Dependabot/Renovate). + +--- + +## 5. Propositions (prioritized) + +### P0 — Before production + +1. **Add API authentication/authorization** + Protect all non-health routes (e.g. JWT or API key middleware); document required claims/scopes if using JWT. + +2. **Enable and verify DB TLS** + Make `sslmode` (and optional root cert) configurable; use `verify-full` (or equivalent) in production. + +3. **Respect TLS config for outbound HTTP** + Use config (e.g. `TLSInsecureSkipVerify`) for apiclient and location inferencer; fix `NewDefaultInferencer(skipTLSVerify bool)` to use the parameter. + +4. **Fix update controllers** + Use path `id` as source of truth for update (and optionally delete); ensure body cannot override resource id in an unsafe way. + +5. **Tighten CORS and log file permissions** + Explicit allowed origins from config; set log file mode to `0600` (or `0640`). + +### P1 — High + +6. **Kafka consumer error policy** + Define and implement skip+commit or DLQ for bad messages; avoid infinite redelivery of poison messages. + +7. **DB connection pool and timeouts** + Set `MaxOpenConns`, `MaxIdleConns`, `ConnMaxLifetime`, and timeouts in `database.Connect`. + +8. **Request body size limit** + Middleware (e.g. `http.MaxBytesReader`) for API routes to prevent large-body DoS. + +9. **Replace fmt.Println with slog** + In apiclient and any remaining places; ensure response body is closed after read (e.g. `defer res.Body.Close()` in auth/data clients if not already). + +### P2 — Medium + +10. **Retries for Kafka produce** + Retry with backoff (and optionally circuit breaker) for critical `WriteMessages` calls. + +11. **Context propagation** + Pass request/event context (or bounded context) into Kafka writes instead of `context.Background()`. + +12. **Token refresh in Inferencer** + Use expiry from auth response and refresh token before inference calls. + +13. **Health for bridge/decoder/location** + Add minimal health/readiness (HTTP or signal file) for orchestration and load balancers. + +### P3 — Nice to have + +14. **Metrics and tracing** + Add metrics (e.g. request duration, Kafka lag, error counts) and optional distributed tracing (e.g. OTel). + +15. **Rate limiting** + Per-IP or per-token rate limiting on API routes. + +16. **Structured validation** + Use a validator (e.g. go-playground/validator) for request bodies and path params (IDs, limits). + +17. **Documentation** + Short runbooks for deploy, config env vars, and dependency on Kafka/DB/MQTT and external auth/API. + +--- + +## 6. Files reviewed (representative) + +- **Entrypoints:** `cmd/server/main.go`, `cmd/bridge/main.go`, `cmd/decoder/main.go`, `cmd/location/main.go` +- **Apps:** `internal/app/server/*`, `internal/app/bridge/app.go`, `internal/app/decoder/app.go`, `internal/app/location/app.go` +- **Config:** `internal/pkg/config/config.go` +- **Infra:** `internal/pkg/database/database.go`, `internal/pkg/kafkaclient/manager.go`, `internal/pkg/kafkaclient/consumer.go`, `internal/pkg/logger/logger.go` +- **API:** `internal/pkg/api/handler/health.go`, `internal/pkg/api/middleware/*`, `internal/pkg/api/response/response.go`, `internal/app/server/routes.go` +- **Controllers:** `internal/pkg/controller/*.go` (gateways, trackers, zone, parser, settings, alerts, tracks, trackerzones) +- **Services:** `internal/pkg/service/beacon_service.go`, `internal/pkg/apiclient/*.go`, `internal/pkg/bridge/mqtt.go`, `internal/pkg/bridge/handler.go`, `internal/pkg/decoder/process.go`, `internal/pkg/location/inference.go`, `internal/pkg/common/appcontext/context.go` + +--- + +## 7. Conclusion + +The microservices are **well-structured and readable**, with **consistent lifecycle and shutdown**. The main gaps are **security (no API auth, weak TLS usage)** and **reliability (Kafka and DB tuning, retries, context usage)**. Addressing the **P0 and P1** items above would bring the system much closer to production grade (around **7–8/10**); adding **P2/P3** would further improve operability and resilience. diff --git a/bridge b/bridge new file mode 100755 index 0000000..f04f2dd Binary files /dev/null and b/bridge differ diff --git a/build/docker-compose.db.yml b/build/docker-compose.db.yml new file mode 100644 index 0000000..d7b9b35 --- /dev/null +++ b/build/docker-compose.db.yml @@ -0,0 +1,16 @@ +version: "2" +services: + postgres: + image: postgres:18 + container_name: postgres + restart: always + ports: + - "127.0.0.1:5433:5432" + env_file: + - ./env/db.env + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 5s + timeout: 5s + retries: 5 + start_period: 30s \ No newline at end of file diff --git a/build/docker-compose.dev.yml b/build/docker-compose.dev.yml index a7501b7..7decc20 100644 --- a/build/docker-compose.dev.yml +++ b/build/docker-compose.dev.yml @@ -1,7 +1,7 @@ version: "2" services: db: - image: postgres + image: postgres:18 container_name: db restart: always ports: @@ -14,6 +14,8 @@ services: timeout: 5s retries: 5 start_period: 30s + volumes: + - pgdata:/var/postgresql/data kafdrop: image: obsidiandynamics/kafdrop @@ -39,6 +41,8 @@ services: timeout: 5s retries: 10 start_period: 20s + volumes: + - kafkadata:/var/lib/kafka/data kafka-init: image: apache/kafka:3.9.0 @@ -73,7 +77,7 @@ services: restart: always volumes: - ../:/app - command: air --build.cmd "go build -buildvcs=false -o /tmp/decoder ./cmd/decoder" --build.bin "/tmp/decoder" + command: air --build.cmd "go build -buildvcs=false -o ./decoder ./cmd/decoder" --build.bin "./decoder" presense-server: build: @@ -95,7 +99,7 @@ services: restart: always volumes: - ../:/app - command: air --build.cmd "go build -buildvcs=false -o /tmp/server ./cmd/server" --build.bin "/tmp/server" + command: air --build.cmd "go build -buildvcs=false -o ./server ./cmd/server" --build.bin "./server" presense-bridge: build: @@ -113,7 +117,7 @@ services: restart: always volumes: - ../:/app - command: air --build.cmd "go build -buildvcs=false -o /tmp/bridge ./cmd/bridge" --build.bin "/tmp/bridge" + command: air --build.cmd "go build -buildvcs=false -o ./bridge ./cmd/bridge" --build.bin "./bridge" presense-location: build: @@ -131,7 +135,8 @@ services: restart: always volumes: - ../:/app - command: air --build.cmd "go build -buildvcs=false -o /tmp/location ./cmd/location" --build.bin "/tmp/location" + command: air --build.cmd "go build -buildvcs=false -o ./location ./cmd/location" --build.bin "./location" - - +volumes: + pgdata: + kafkadata: \ No newline at end of file diff --git a/build/docker-compose.yaml b/build/docker-compose.yaml index bbffc31..9fc7990 100644 --- a/build/docker-compose.yaml +++ b/build/docker-compose.yaml @@ -1,6 +1,6 @@ services: db: - image: postgres + image: postgres:18 container_name: db restart: always ports: @@ -13,6 +13,8 @@ services: timeout: 5s retries: 5 start_period: 30s + volumes: + - pgdata:/var/postgresql/data kafdrop: image: obsidiandynamics/kafdrop @@ -37,6 +39,8 @@ services: timeout: 5s retries: 10 start_period: 20s + volumes: + - kafkadata:/var/lib/kafka/data kafka-init: image: apache/kafka:3.9.0 @@ -108,4 +112,6 @@ services: restart: always - +volumes: + pgdata: + kafkadata: diff --git a/decoder b/decoder new file mode 100755 index 0000000..c8b10c3 Binary files /dev/null and b/decoder differ diff --git a/internal/pkg/apiclient/data.go b/internal/pkg/apiclient/data.go index 3fc119a..2553afb 100644 --- a/internal/pkg/apiclient/data.go +++ b/internal/pkg/apiclient/data.go @@ -3,6 +3,7 @@ package apiclient import ( "encoding/json" "fmt" + "io" "net/http" "github.com/AFASystems/presence/internal/pkg/config" @@ -16,6 +17,13 @@ func GetTrackers(token string, client *http.Client, cfg *config.Config) ([]model return []model.Tracker{}, err } + bodyBytes, err := io.ReadAll(res.Body) + if err != nil { + fmt.Printf("error read body: %+v\n", err) + return []model.Tracker{}, err + } + fmt.Printf("body: %s\n", string(bodyBytes)) + var i []model.Tracker err = json.NewDecoder(res.Body).Decode(&i) if err != nil { @@ -74,7 +82,6 @@ func GetZones(token string, client *http.Client, cfg *config.Config) ([]model.Zo func InferPosition(token string, client *http.Client, cfg *config.Config) (model.PositionResponse, error) { url := fmt.Sprintf("%s/ble-ai/infer", cfg.APIBaseURL) - fmt.Printf("url: %s\n", url) req, err := http.NewRequest("GET", url, nil) if err != nil { fmt.Printf("error new request: %+v\n", err) @@ -89,6 +96,12 @@ func InferPosition(token string, client *http.Client, cfg *config.Config) (model return model.PositionResponse{}, err } + fmt.Printf("res.status: %s\n", res.Status) + if res.StatusCode != 200 { + fmt.Printf("error status code: %d\n", res.StatusCode) + return model.PositionResponse{}, fmt.Errorf("status code: %d", res.StatusCode) + } + var i model.PositionResponse err = json.NewDecoder(res.Body).Decode(&i) if err != nil { diff --git a/internal/pkg/apiclient/updatedb.go b/internal/pkg/apiclient/updatedb.go index 8239164..ce4bae4 100644 --- a/internal/pkg/apiclient/updatedb.go +++ b/internal/pkg/apiclient/updatedb.go @@ -29,6 +29,7 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk } if trackers, err := GetTrackers(token, client, cfg); err == nil { + fmt.Printf("trackers: %+v\n", trackers) syncTable(db, trackers) if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil { msg := fmt.Sprintf("Error in sending delete all from lookup message: %v", err) diff --git a/internal/pkg/apiclient/utils.go b/internal/pkg/apiclient/utils.go index 52d6bf0..9bfdd1e 100644 --- a/internal/pkg/apiclient/utils.go +++ b/internal/pkg/apiclient/utils.go @@ -14,6 +14,7 @@ func setHeader(req *http.Request, token string) { func getRequest(token, route string, client *http.Client, cfg *config.Config) (*http.Response, error) { url := fmt.Sprintf("%s/reslevis/%s", cfg.APIBaseURL, route) + fmt.Printf("url: %s\n", url) req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err diff --git a/internal/pkg/bridge/handler.go b/internal/pkg/bridge/handler.go index 4c61055..88d167a 100644 --- a/internal/pkg/bridge/handler.go +++ b/internal/pkg/bridge/handler.go @@ -7,15 +7,11 @@ import ( "strings" "time" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) -// RawBeaconWriter writes beacon advertisements to the rawbeacons topic. -type RawBeaconWriter interface { - WriteMessages(ctx context.Context, msgs ...kafka.Message) error -} - // BeaconLookup provides MAC->ID lookup (e.g. AppState). type BeaconLookup interface { BeaconExists(mac string) (id string, ok bool) @@ -24,7 +20,7 @@ type BeaconLookup interface { // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". -func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer RawBeaconWriter) { +func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer *kafka.Writer) { parts := strings.SplitN(topic, "/", 2) hostname := "" if len(parts) >= 2 { @@ -58,7 +54,7 @@ func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer slog.Error("marshaling beacon advertisement", "err", err) break } - if err := writer.WriteMessages(context.Background(), kafka.Message{Value: encoded}); err != nil { + if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: encoded}); err != nil { slog.Error("writing to Kafka", "err", err) time.Sleep(1 * time.Second) break diff --git a/internal/pkg/controller/settings_controller.go b/internal/pkg/controller/settings_controller.go index 1ec2d0b..148e839 100644 --- a/internal/pkg/controller/settings_controller.go +++ b/internal/pkg/controller/settings_controller.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/AFASystems/presence/internal/pkg/api/response" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" "gorm.io/gorm" @@ -45,8 +46,8 @@ func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, context context } kafkaMsg := kafka.Message{Value: eMsg} - if err := writer.WriteMessages(context, kafkaMsg); err != nil { - slog.Error("writing settings to Kafka", "err", err) + if err := kafkaclient.Write(context, writer, kafkaMsg); err != nil { + slog.Error("error writing settings to Kafka", "error", err) response.InternalError(w, "failed to publish settings update", err) return } diff --git a/internal/pkg/controller/trackers_controller.go b/internal/pkg/controller/trackers_controller.go index 6de84f2..37265e4 100644 --- a/internal/pkg/controller/trackers_controller.go +++ b/internal/pkg/controller/trackers_controller.go @@ -3,11 +3,11 @@ package controller import ( "context" "encoding/json" - "fmt" "log/slog" "net/http" "github.com/AFASystems/presence/internal/pkg/api/response" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/mux" "github.com/segmentio/kafka-go" @@ -17,17 +17,15 @@ import ( func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, context context.Context) error { valueStr, err := json.Marshal(&value) if err != nil { - msg := fmt.Sprintf("error in encoding: %v", err) - slog.Error(msg) + slog.Error("error encoding", "error", err) return err } msg := kafka.Message{ Value: valueStr, } - if err := writer.WriteMessages(context, msg); err != nil { - msg := fmt.Sprintf("Error in sending kafka message: %v", err) - slog.Error(msg) + if err := kafkaclient.Write(context, writer, msg); err != nil { + slog.Error("error sending kafka message", "error", err) return err } diff --git a/internal/pkg/decoder/process.go b/internal/pkg/decoder/process.go index cdf1c97..cd19bee 100644 --- a/internal/pkg/decoder/process.go +++ b/internal/pkg/decoder/process.go @@ -10,24 +10,20 @@ import ( "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/common/utils" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) -// AlertWriter writes decoded beacon events (e.g. to alertbeacons topic). -type AlertWriter interface { - WriteMessages(ctx context.Context, msgs ...kafka.Message) error -} - // ProcessIncoming decodes a beacon advertisement and writes the event to the writer if it changed. -func ProcessIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer AlertWriter, registry *model.ParserRegistry) { +func ProcessIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, registry *model.ParserRegistry) { if err := DecodeBeacon(adv, appState, writer, registry); err != nil { slog.Error("decoding beacon", "err", err, "id", adv.ID) } } // DecodeBeacon hex-decodes the payload, runs the parser registry, dedupes by event hash, and writes to writer. -func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer AlertWriter, registry *model.ParserRegistry) error { +func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, registry *model.ParserRegistry) error { beacon := strings.TrimSpace(adv.Data) id := adv.ID if beacon == "" { @@ -63,7 +59,7 @@ func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, return err } - if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil { + if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: eMsg}); err != nil { return fmt.Errorf("write alert: %w", err) } diff --git a/internal/pkg/kafkaclient/consumer.go b/internal/pkg/kafkaclient/consumer.go index bd37721..8a541f1 100644 --- a/internal/pkg/kafkaclient/consumer.go +++ b/internal/pkg/kafkaclient/consumer.go @@ -3,7 +3,6 @@ package kafkaclient import ( "context" "encoding/json" - "fmt" "log/slog" "sync" @@ -21,15 +20,14 @@ func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync. default: msg, err := r.ReadMessage(ctx) if err != nil { - msg := fmt.Sprintf("error reading message: %v", err) - slog.Error(msg) + slog.Error("error reading message", "error", err) continue } var data T if err := json.Unmarshal(msg.Value, &data); err != nil { - msg := fmt.Sprintf("error decoding: %v", err) - slog.Error(msg) + + slog.Error("error decoding", "error", err) continue } diff --git a/internal/pkg/kafkaclient/manager.go b/internal/pkg/kafkaclient/manager.go index a904e0b..d68399f 100644 --- a/internal/pkg/kafkaclient/manager.go +++ b/internal/pkg/kafkaclient/manager.go @@ -38,13 +38,17 @@ func InitKafkaManager() *KafkaManager { func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { kafkaWriter := &kafka.Writer{ - Addr: kafka.TCP(kafkaUrl), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - Async: false, - RequiredAcks: kafka.RequireAll, - BatchSize: 100, - BatchTimeout: 10 * time.Millisecond, + Addr: kafka.TCP(kafkaUrl), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + RequiredAcks: kafka.RequireAll, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + MaxAttempts: 5, + WriteBackoffMin: 100 * time.Millisecond, + WriteBackoffMax: 1 * time.Second, + WriteTimeout: 5 * time.Second, } m.kafkaWritersMap.KafkaWritersLock.Lock() diff --git a/internal/pkg/kafkaclient/write.go b/internal/pkg/kafkaclient/write.go new file mode 100644 index 0000000..a1104b5 --- /dev/null +++ b/internal/pkg/kafkaclient/write.go @@ -0,0 +1,16 @@ +package kafkaclient + +import ( + "context" + "time" + + "github.com/segmentio/kafka-go" +) + +func Write(ctx context.Context, writer *kafka.Writer, message kafka.Message) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err := writer.WriteMessages(ctx, message) + return err +} diff --git a/internal/pkg/location/filter.go b/internal/pkg/location/filter.go index a963611..a4f956a 100644 --- a/internal/pkg/location/filter.go +++ b/internal/pkg/location/filter.go @@ -7,26 +7,22 @@ import ( "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) // Score weights for location algorithm (configurable via constants). const ( - SeenWeight = 1.5 - RSSIWeight = 0.75 + SeenWeight = 1.5 + RSSIWeight = 0.75 DefaultDistance = 999 DefaultLastSeen = 999 ) -// LocationWriter writes location events (e.g. to Kafka). -type LocationWriter interface { - WriteMessages(ctx context.Context, msgs ...kafka.Message) error -} - // GetLikelyLocations runs the filter algorithm: scores beacons by RSSI and seen count, // updates app state with best location and confidence, and writes HTTPLocation to the writer. -func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) { +func GetLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { ctx := context.Background() beacons := appState.GetAllBeacons() settings := appState.GetSettingsValue() @@ -36,7 +32,7 @@ func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) { Method: "Standard", Distance: DefaultDistance, ID: beacon.ID, - Location: "", + Location: "", LastSeen: DefaultLastSeen, } @@ -89,8 +85,8 @@ func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) { continue } - if err := writer.WriteMessages(ctx, kafka.Message{Value: js}); err != nil { - slog.Error("sending kafka location message", "err", err, "beacon_id", beacon.ID) + if err := kafkaclient.Write(ctx, writer, kafka.Message{Value: js}); err != nil { + slog.Error("error sending kafka location message", "error", err, "beacon_id", beacon.ID) } } } diff --git a/internal/pkg/location/inference.go b/internal/pkg/location/inference.go index 8f0dc24..78b3991 100644 --- a/internal/pkg/location/inference.go +++ b/internal/pkg/location/inference.go @@ -3,7 +3,6 @@ package location import ( "context" "crypto/tls" - "fmt" "net/http" "github.com/AFASystems/presence/internal/pkg/apiclient" @@ -35,7 +34,6 @@ func NewDefaultInferencer(skipTLSVerify bool) *DefaultInferencer { // Infer gets a token and calls the inference API. func (d *DefaultInferencer) Infer(ctx context.Context, cfg *config.Config) (model.PositionResponse, error) { if d.Token == "" { - fmt.Printf("getting token\n") token, err := apiclient.GetToken(ctx, cfg, d.Client) if err != nil { return model.PositionResponse{}, err @@ -43,5 +41,18 @@ func (d *DefaultInferencer) Infer(ctx context.Context, cfg *config.Config) (mode d.Token = token } - return apiclient.InferPosition(d.Token, d.Client, cfg) + response, err := apiclient.InferPosition(d.Token, d.Client, cfg) + if err != nil { + token, err := apiclient.GetToken(ctx, cfg, d.Client) + if err != nil { + return model.PositionResponse{}, err + } + d.Token = token + response, err = apiclient.InferPosition(d.Token, d.Client, cfg) + if err != nil { + return model.PositionResponse{}, err + } + } + + return response, nil } diff --git a/internal/pkg/service/beacon_service.go b/internal/pkg/service/beacon_service.go index ce495db..ac07762 100644 --- a/internal/pkg/service/beacon_service.go +++ b/internal/pkg/service/beacon_service.go @@ -10,21 +10,18 @@ import ( "strings" "time" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/google/uuid" "github.com/segmentio/kafka-go" "gorm.io/gorm" ) -// KafkaWriter defines the interface for writing Kafka messages (allows mocking in tests) -type KafkaWriter interface { - WriteMessages(ctx context.Context, msgs ...kafka.Message) error -} - func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) { + fmt.Printf("Finding tracker for MAC: %s, ID: %s\n", msg.MAC, msg.ID) var tracker model.Tracker if msg.MAC != "" { - if err := db.Where("mac = ?", msg.MAC).Find(&tracker).Error; err != nil { + if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil { return model.Tracker{}, err } @@ -56,7 +53,7 @@ func findZones(trackerID string, db *gorm.DB) ([]string, error) { return allowedZones, nil } -func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) { +func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { tracker, err := findTracker(msg, db) if err != nil { msg := fmt.Sprintf("Error in finding tracker: %v", err) @@ -95,7 +92,7 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWr sendAlert(gw.ID, msg.ID, writer, ctx, allowedZones, db) } -func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) { +func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { tracker, err := findTracker(msg, db) if err != nil { msg := fmt.Sprintf("Error in finding tracker: %v", err) @@ -133,7 +130,7 @@ func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer Kafka sendAlert(gw.ID, tracker.ID, writer, ctx, allowedZones, db) } -func sendAlert(gwId, trackerId string, writer KafkaWriter, ctx context.Context, allowedZones []string, db *gorm.DB) { +func sendAlert(gwId, trackerId string, writer *kafka.Writer, ctx context.Context, allowedZones []string, db *gorm.DB) { if len(allowedZones) != 0 && !slices.Contains(allowedZones, gwId) { alert := model.Alert{ ID: uuid.New().String(), @@ -157,7 +154,11 @@ func sendAlert(gwId, trackerId string, writer KafkaWriter, ctx context.Context, msg := kafka.Message{ Value: eMsg, } - writer.WriteMessages(ctx, msg) + if err := kafkaclient.Write(ctx, writer, msg); err != nil { + msg := fmt.Sprintf("Error in writing message: %v", err) + slog.Error(msg) + return + } } } } diff --git a/internal/pkg/service/parser_service.go b/internal/pkg/service/parser_service.go index 87fcea1..8c0face 100644 --- a/internal/pkg/service/parser_service.go +++ b/internal/pkg/service/parser_service.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" + "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) -func SendParserConfig(kp model.KafkaParser, writer KafkaWriter, ctx context.Context) error { +func SendParserConfig(kp model.KafkaParser, writer *kafka.Writer, ctx context.Context) error { eMsg, err := json.Marshal(kp) if err != nil { return err @@ -17,7 +18,7 @@ func SendParserConfig(kp model.KafkaParser, writer KafkaWriter, ctx context.Cont Value: eMsg, } - if err := writer.WriteMessages(ctx, msg); err != nil { + if err := kafkaclient.Write(ctx, writer, msg); err != nil { return err } return nil diff --git a/location b/location new file mode 100755 index 0000000..a61f4a7 Binary files /dev/null and b/location differ diff --git a/scripts/build/build.sh b/scripts/build/build.sh index fc51de5..6dced23 100755 --- a/scripts/build/build.sh +++ b/scripts/build/build.sh @@ -1,20 +1,20 @@ #!/bin/bash # Build the server -docker build -t afasystemadmin/ble-ai-localizer:server_v1 -f ../../build/package/Dockerfile.server ../../ - +docker build -t afasystemadmin/ble-ai-localizer:server_v1.1 -f ../../build/package/Dockerfile.server ../../ +¸ # Build the location -docker build -t afasystemadmin/ble-ai-localizer:location_v1 -f ../../build/package/Dockerfile.location ../../ +docker build -t afasystemadmin/ble-ai-localizer:location_v1.1 -f ../../build/package/Dockerfile.location ../../ # Build the decoder -docker build -t afasystemadmin/ble-ai-localizer:decoder_v1 -f ../../build/package/Dockerfile.decoder ../../ +docker build -t afasystemadmin/ble-ai-localizer:decoder_v1.1 -f ../../build/package/Dockerfile.decoder ../../ # Build the bridge -docker build -t afasystemadmin/ble-ai-localizer:bridge_v1 -f ../../build/package/Dockerfile.bridge ../../ +docker build -t afasystemadmin/ble-ai-localizer:bridge_v1.1 -f ../../build/package/Dockerfile.bridge ../../ docker image ls -docker push afasystemadmin/ble-ai-localizer:server_v1 -docker push afasystemadmin/ble-ai-localizer:location_v1 -docker push afasystemadmin/ble-ai-localizer:decoder_v1 -docker push afasystemadmin/ble-ai-localizer:bridge_v1 \ No newline at end of file +docker push afasystemadmin/ble-ai-localizer:server_v1.1 +docker push afasystemadmin/ble-ai-localizer:location_v1.1 +docker push afasystemadmin/ble-ai-localizer:decoder_v1.1 +docker push afasystemadmin/ble-ai-localizer:bridge_v1.1 \ No newline at end of file diff --git a/scripts/db/dump.sh b/scripts/db/dump.sh new file mode 100755 index 0000000..5e01791 --- /dev/null +++ b/scripts/db/dump.sh @@ -0,0 +1 @@ +docker exec db pg_dump -U postgres -F c postgres > dump.sql \ No newline at end of file diff --git a/scripts/db/dump.sql b/scripts/db/dump.sql new file mode 100644 index 0000000..fa69fed Binary files /dev/null and b/scripts/db/dump.sql differ diff --git a/scripts/db/restore.sh b/scripts/db/restore.sh new file mode 100755 index 0000000..4d39c4d --- /dev/null +++ b/scripts/db/restore.sh @@ -0,0 +1,3 @@ +docker cp dump.sql postgres:/tmp/dump.sql + +docker exec -t postgres pg_restore -U postgres -d postgres /tmp/dump.sql \ No newline at end of file diff --git a/server b/server new file mode 100755 index 0000000..eb43498 Binary files /dev/null and b/server differ