ソースを参照

chore: several fixes, including token, kafka backoff, timeouts, using context with timeout, adding volumes to postgres and kafka, scripts for creating and restoring sql dump

master
Blaz Smehov 2週間前
コミット
f87452fb3a
27個のファイルの変更354行の追加218行の削除
  1. +0
    -138
      CODE_GRADE_AND_REFACTOR.md
  2. +210
    -0
      PRODUCTION_READINESS_REPORT.md
  3. バイナリ
      bridge
  4. +16
    -0
      build/docker-compose.db.yml
  5. +12
    -7
      build/docker-compose.dev.yml
  6. +8
    -2
      build/docker-compose.yaml
  7. バイナリ
      decoder
  8. +14
    -1
      internal/pkg/apiclient/data.go
  9. +1
    -0
      internal/pkg/apiclient/updatedb.go
  10. +1
    -0
      internal/pkg/apiclient/utils.go
  11. +3
    -7
      internal/pkg/bridge/handler.go
  12. +3
    -2
      internal/pkg/controller/settings_controller.go
  13. +4
    -6
      internal/pkg/controller/trackers_controller.go
  14. +4
    -8
      internal/pkg/decoder/process.go
  15. +3
    -5
      internal/pkg/kafkaclient/consumer.go
  16. +11
    -7
      internal/pkg/kafkaclient/manager.go
  17. +16
    -0
      internal/pkg/kafkaclient/write.go
  18. +7
    -11
      internal/pkg/location/filter.go
  19. +14
    -3
      internal/pkg/location/inference.go
  20. +11
    -10
      internal/pkg/service/beacon_service.go
  21. +3
    -2
      internal/pkg/service/parser_service.go
  22. バイナリ
      location
  23. +9
    -9
      scripts/build/build.sh
  24. +1
    -0
      scripts/db/dump.sh
  25. バイナリ
      scripts/db/dump.sql
  26. +3
    -0
      scripts/db/restore.sh
  27. バイナリ
      server

+ 0
- 138
CODE_GRADE_AND_REFACTOR.md ファイルの表示

@@ -1,138 +0,0 @@
# Code Grade & Production Readiness Report (Updated)

## Overall grade: **7.0 / 10**

The codebase has been refactored into a clear app/service layout with thin `cmd` entrypoints, shared `internal/pkg` libraries, health/readiness endpoints, structured middleware, and addressed reliability/security items. It is suitable for development and staging; production use still requires CORS restriction, optional metrics/tracing, and (if desired) request validation and OpenAPI.

---

## 1. What’s working well

| Area | Notes |
|------|--------|
| **Structure** | `cmd/<service>/main.go` is thin (~25 lines); `internal/app/*` holds per-service composition; `internal/pkg` has api (response, middleware, handler), location, bridge, decoder, config, kafkaclient, logger, model, controller, service, database, apiclient, appcontext. |
| **Concurrency** | Channels, `sync.WaitGroup`, and `AppState` with RWMutex; event loops live in app layer, not in main. |
| **Shutdown** | `signal.NotifyContext` + app `Run`/`Shutdown`; Kafka and MQTT cleanup in app. |
| **Kafka** | `KafkaManager`, generic `Consume[T]`, graceful close. |
| **Observability** | `/health` and `/ready` (DB ping); middleware: logging, recovery, request ID, CORS; logging to file with fallback to stderr if file open fails. |
| **Reliability** | No panics in library code for logger (fallback to stderr); MQTT connect returns error; server init returns error; `WriteMessages` errors checked in parser service and settings controller. |
| **Security** | TLS skip verify is configurable via `TLS_INSECURE_SKIP_VERIFY` (default false). |
| **Testing** | Unit tests for appcontext, utils, model, controller, service, config; integration tests for bridge/decoder. |
| **Dependencies** | Modern stack (slog, segmentio/kafka-go, gorilla/mux, gorm). |

---

## 2. Fixes applied since last report

### 2.1 Startup and library behavior

- **Bridge:** MQTT connect failure no longer panics; `internal/pkg/bridge/mqtt.go` returns error from `NewMQTTClient`, `cmd/bridge/main.go` exits with `log.Fatalf` on error.
- **Server:** DB and config init live in `internal/app/server`; `New`/`Init` return errors; `cmd/server/main.go` uses `log.Fatalf` on error (no panic in library).
- **Logger:** `CreateLogger` no longer uses `log.Fatalf`; on log file open failure it returns a logger that writes only to stderr and a no-op cleanup.

### 2.2 Ignored errors

- **parser_service.go:** `writer.WriteMessages(ctx, msg)` return value is checked and propagated.
- **settings_controller.go:** `writer.WriteMessages` error is checked; on failure returns 500 and logs; response sets `Content-Type: application/json`.
- **database:** Unused global `var DB *gorm.DB` removed.

### 2.3 Security and configuration

- **TLS:** `config.Config` has `TLSInsecureSkipVerify bool` (env `TLS_INSECURE_SKIP_VERIFY`, default false). Used in `apiclient.UpdateDB` and in location inference (`NewDefaultInferencer(cfg.TLSInsecureSkipVerify)`).
- **CORS:** Not changed (origin policy left to operator; middleware supports configurable origins).

### 2.4 Observability

- **Health/readiness:** Server exposes `/health` (liveness) and `/ready` (DB ping) via `internal/pkg/api/handler/health.go`.
- **Middleware:** Recovery (panic → 500), logging (method, path, status, duration), request ID (`X-Request-ID`), CORS.

### 2.5 Code quality

- **Bridge:** MQTT topic parsing uses `strings.SplitN(topic, "/", 2)` to avoid panic; CSV branch validates and logs (no writer usage yet).
- **Location:** Magic numbers moved to named constants in `internal/pkg/location/filter.go` (e.g. `SeenWeight`, `RSSIWeight`, `DefaultDistance`).
- **Duplication:** Bootstrap removed; each service uses `internal/app/<service>` for init, run, and shutdown.

---

## 3. Remaining / known limitations

### 3.1 Config and env

- **`getEnvPanic`** in `config` still panics on missing required env. To avoid panics in library, consider a `LoadServerSafe` (or similar) that returns `(*Config, error)` and use it only from `main` with explicit exit. Not changed in this pass.

### 3.2 Security

- **CORS:** Defaults remain permissive (e.g. `*`). Restrict to known frontend origins when deploying (e.g. via env or config).
- **Secrets:** Still loaded from env only; ensure no secrets in logs; consider a secret manager for production.

### 3.3 API and validation

- No OpenAPI/Swagger; no formal request/response contracts.
- Many handlers still use `http.Error` or `w.Write` without a single response helper; `api/response` exists for new/consistent endpoints.
- No request body validation (e.g. go-playground/validator); no idempotency keys.

### 3.4 Resilience and operations

- Kafka consumer: on `ReadMessage`/unmarshal error, logs and continues; no dead-letter or backoff yet.
- DB: no documented pool tuning; readiness only checks DB ping.
- No metrics (Prometheus/OpenTelemetry). No distributed tracing.

---

## 4. Grade breakdown (updated)

| Criterion | Score | Comment |
|---------------------|-------|--------|
| Architecture | 8/10 | Clear app layer, thin main, pkg separation; handlers still take concrete DB/writer (can be abstracted later). |
| Reliability | 7/10 | No panics in logger/bridge init; WriteMessages errors handled; health/ready; logger fallback. |
| Security | 6/10 | TLS skip verify configurable (default off); CORS still broad; secrets in env. |
| Observability | 7/10 | Health/ready, request logging, request ID, recovery; no metrics/tracing. |
| API design | 6/10 | Response helpers and middleware in place; many handlers still ad-hoc; no spec/validation. |
| Testing | 6/10 | Good unit coverage; more integration/E2E would help. |
| Code quality | 8/10 | Clear structure, constants for magic numbers, dead code removed, duplication reduced. |
| Production readiness | 6/10 | Health/ready and error handling in place; CORS, metrics, and validation still to do. |

**Average ≈ 6.75; grade 7.0/10** – Refactor and applied fixes significantly improve structure, reliability, and observability; remaining work is mostly CORS, validation, and metrics/tracing.

---

## 5. Checklist (updated)

### 5.1 Reliability

- [x] Remove panics / `log.Fatalf` from library where possible (logger fallback; bridge returns error).
- [x] Check and handle `WriteMessages` in parser service and settings controller.
- [x] Add `/health` and `/ready` on server.
- [ ] Document or add Kafka consumer retry/backoff and dead-letter if needed.
- [x] Make TLS skip verify configurable; default false.

### 5.2 Observability

- [x] Structured logging and request ID middleware.
- [ ] Add metrics (e.g. Prometheus) and optional tracing.

### 5.3 API and validation

- [ ] OpenAPI spec and validation.
- [ ] Consistent use of `api/response` and JSON error body across handlers.
- [ ] Restrict CORS to specific origins (operator-defined).

### 5.4 Operations

- [ ] Document env vars and deployment topology.
- [ ] Configurable timeouts; rate limiting if required.

### 5.5 Code and structure

- [x] Bridge topic parsing and CSV branch behavior clarified.
- [x] Unused `database.DB` global removed.
- [x] Location magic numbers moved to constants.
- [x] App layer and api/middleware/response in place.

---

## 6. Summary

- **Grade: 7.0/10** – Refactor and targeted fixes improve structure, reliability, and observability. Server has health/ready, middleware, and no panics in logger/bridge init; TLS skip verify is configurable; WriteMessages and logger errors are handled.
- **Still to do for production:** Restrict CORS, add metrics (and optionally tracing), validate requests and adopt consistent API responses, and document operations. Config loading can be made panic-free by adding safe loaders that return errors.
- **Not changed by design:** CORS policy left for operator to configure (e.g. via env or config).

+ 210
- 0
PRODUCTION_READINESS_REPORT.md ファイルの表示

@@ -0,0 +1,210 @@
# Production Readiness Report — Microservices (server, bridge, decoder, location)

**Scope:** `cmd/server`, `cmd/bridge`, `cmd/decoder`, `cmd/location` and all packages they import.
**Date:** 2025-03-05.

---

## Overall grade: **5.5 / 10**

The codebase has a **solid structure** and **consistent patterns** across the four services, but **security**, **reliability**, and **operational hardening** are not yet at production level. With the changes suggested below, it can be brought to a 7–8/10 for a production deployment.

---

## 1. Summary by dimension

| Dimension | Grade | Notes |
| --------------------------- | ----- | ------------------------------------------------------------------------------------------------- |
| **Structure & readability** | 7/10 | Clear app lifecycle (New/Run/Shutdown), good package layout, some naming/duplication issues. |
| **Reliability** | 5/10 | Graceful shutdown and Kafka cleanup are good; missing retries, commit semantics, and DB/timeouts. |
| **Security** | 3/10 | No API auth, TLS/DB and client TLS weakened, CORS permissive, no rate limiting. |
| **Observability** | 6/10 | slog, request ID, logging middleware, health/ready; no metrics/tracing. |
| **Correctness** | 6/10 | Path vs body ID bugs in update endpoints, `context.Background()` in hot paths. |

---

## 2. What’s in good shape

- **Unified app pattern:** All four services use the same lifecycle: `New(cfg)` → optional `Init(ctx)` (server only) → `Run(ctx)` → `Shutdown()`, with `signal.NotifyContext` for graceful shutdown.
- **Graceful shutdown:** HTTP server shutdown, Kafka readers/writers and MQTT disconnect are explicitly closed; `sync.WaitGroup` used for consumer goroutines.
- **Structured logging:** `slog` with JSON handler and file + stderr; request logging with method, path, status, duration, bytes.
- **HTTP middleware:** Recovery (panic → 500), request ID, CORS, logging applied in a clear chain.
- **Health endpoints:** `/health` (liveness) and `/ready` (DB ping) for the server.
- **Kafka usage:** Centralized `KafkaManager` with RWMutex, separate readers/writers, group IDs per service.
- **Shared state:** `AppState` in `common/appcontext` is thread-safe (RWMutex) and used consistently.
- **Config:** Env-based config with service-specific loaders (`LoadServer`, `LoadBridge`, etc.) and `getEnvPanic` for required vars.
- **API responses:** Centralized `response.JSON`, `Error`, `BadRequest`, `InternalError`, `NotFound` with consistent JSON shape.
- **OpenAPI:** Routes reference `api/openapi.yaml` (OpenAPI 3.0), which helps readability and contract clarity.

---

## 3. Critical issues

### 3.1 Security

- **No authentication or authorization on the HTTP API**
All server routes (`/reslevis/*`, `/configs/beacons`, etc.) are unauthenticated. Anyone who can reach the server can read/update/delete gateways, zones, trackers, parser configs, settings, alerts, and tracks.

- **Database connection uses `sslmode=disable`**
In `internal/pkg/database/database.go`, DSN is built with `sslmode=disable`. In production, DB connections should use TLS and `sslmode=verify-full` (or equivalent) with CA verification.

- **TLS verification disabled for outbound HTTP**
- `internal/pkg/apiclient/updatedb.go`: `TLSClientConfig: &tls.Config{InsecureSkipVerify: true}`.
- `internal/pkg/location/inference.go`: same, and **`NewDefaultInferencer(skipTLSVerify bool)` ignores the parameter** and always uses `InsecureSkipVerify: true`.

- **CORS defaults to `*`**
In `internal/pkg/api/middleware/cors.go`, when `origins` is nil/empty, `origins = []string{"*"}`. Production should restrict origins to known front-end origins.

- **Logger file mode `0666`**
In `internal/pkg/logger/logger.go`, `os.OpenFile(..., 0666)` makes the log file world-readable and -writable. Prefer `0600` or `0640`.

- **No rate limiting or request body size limits**
No protection against abuse or large-body DoS; consider middleware for max body size and rate limiting (per IP or per key).

**Recommendations:**

- Add authentication/authorization middleware (e.g. JWT or API key validation) for all non-health API routes; keep `/health` and optionally `/ready` public.
- Make DB TLS configurable via env (e.g. `DB_SSLMODE`, `DB_SSLROOTCERT`) and use `sslmode=verify-full` in production.
- Use `cfg.TLSInsecureSkipVerify` (or equivalent) for all outbound HTTP clients; fix `NewDefaultInferencer` to respect the parameter.
- Configure CORS with explicit allowed origins (and optionally credentials) from config.
- Set log file mode to `0600` (or `0640` if a group needs read).
- Add middleware to limit request body size (e.g. `http.MaxBytesReader`) and consider rate limiting for API routes.

---

### 3.2 Reliability

- **Kafka consumer: decode errors and commit semantics**
In `internal/pkg/kafkaclient/consumer.go`, when `json.Unmarshal` fails, the code logs and `continue`s without committing. Depending on reader config, this can cause repeated redelivery of bad messages or ambiguous semantics. Production should either skip and commit, or send to a dead-letter path and commit.

Answer: because readers are using consumer groups messages are auto commited, meaning bad unmarshal still commits as the message was technically read

- **No retries on Kafka produce**
Event loops (server, bridge, decoder, location) call `WriteMessages` once; transient Kafka errors are not retried. Consider retry with backoff (and optional circuit breaker) for critical topics.

Answer: the Writer object is already holding the default configuration for timeout, backoff and retries, but I still added some extra configurations

- **Database: no explicit pool or timeouts**
`database.Connect` uses GORM defaults. For production, set `MaxOpenConns`, `MaxIdleConns`, and connection/timeout settings (e.g. `SetConnMaxLifetime`) on the underlying `*sql.DB`.

- **UpdateDB and Init: failures only logged**
In `internal/app/server/app.go`, `apiclient.UpdateDB` errors are only logged; Init continues. Consider failing Init (or marking “degraded”) if sync is required for correct operation, or add retries/backoff.

- **Use of `context.Background()` in async paths**
e.g. `internal/pkg/bridge/handler.go` and `internal/pkg/decoder/process.go` use `context.Background()` for Kafka writes. Prefer passing the request/event context (or a derived timeout) so shutdown and timeouts propagate.

**Recommendations:**

- Define a clear policy for Kafka consumer errors (skip+commit vs DLQ); avoid silent continue without commit unless intended.
- Add retry (with backoff) for Kafka produce in critical paths; consider a small wrapper or helper.
- Configure DB pool and timeouts in `database.Connect` (and optionally make them configurable via config).
- Decide whether UpdateDB is mandatory for startup; if yes, fail Init on error or retry; if no, document and consider a “degraded” readiness state.
- Pass context from the caller (or a timeout context) into Kafka write calls instead of `context.Background()`.

---

### 3.3 Correctness and consistency

- **Update endpoints: path `id` vs body `id`**
- **GatewayUpdateController** (`internal/pkg/controller/gateways_controller.go`): Uses `mux.Vars(r)["id"]` only to check existence with `First(..., "id = ?", id)`, then decodes body into `gateway` and calls `Save(&gateway)`. The updated record is identified by `gateway.ID` from the body, not the path. A client can send a different ID in the body and update another resource.
- **ZoneUpdateController**: Route is `updateZone` (no `{id}` in path); uses `zone.ID` from body only. If the API contract expects path-based ID, this is inconsistent.
Recommendation: For update-by-id, use the path parameter as the single source of truth: load by path `id`, decode body into a DTO or partial struct, then update only allowed fields for that id (e.g. selective updates or merge then update by path id).

- **TrackerUpdateController**
Uses body `tracker.ID` for lookup and save; route has no `{id}` in path. If other update endpoints use path `{id}`, align behavior and documentation.

**Recommendations:**

- Standardize update semantics: either path `{id}` only (body has no id or it must match path) or document “body id is canonical” and ensure no IDOR.
- Prefer path-based resource identification for updates/deletes and bind body to allowed fields only.

---

## 4. Minor issues and improvements

- **Logging:** Replace `fmt.Println` in `internal/pkg/apiclient/auth.go` and any `internal/pkg/apiclient/updatedb.go` / inference paths with `slog` (or structured logger) so logs are consistent and configurable.
- **Token lifecycle:** `DefaultInferencer` caches token in a struct field with no expiry or refresh; token may be used after expiry. Use token expiry from auth response and refresh when needed.
- **BeaconLookup naming:** In `appcontext`, `BeaconExists(id string)` is used with MAC (e.g. in bridge handler). Rename parameter to `mac` (or the method to `LookupIDByMAC`) to avoid confusion.
- **Bridge/location/decoder:** No `/health` or `/ready` endpoints. For orchestration (e.g. Kubernetes), consider a small HTTP server or at least a process-level health check so the platform can restart unhealthy instances.
- **Dependencies:** `go.mod` is clear; consider auditing indirect deps and keeping them updated (e.g. `go list -m -u all` and Dependabot/Renovate).

---

## 5. Propositions (prioritized)

### P0 — Before production

1. **Add API authentication/authorization**
Protect all non-health routes (e.g. JWT or API key middleware); document required claims/scopes if using JWT.

2. **Enable and verify DB TLS**
Make `sslmode` (and optional root cert) configurable; use `verify-full` (or equivalent) in production.

3. **Respect TLS config for outbound HTTP**
Use config (e.g. `TLSInsecureSkipVerify`) for apiclient and location inferencer; fix `NewDefaultInferencer(skipTLSVerify bool)` to use the parameter.

4. **Fix update controllers**
Use path `id` as source of truth for update (and optionally delete); ensure body cannot override resource id in an unsafe way.

5. **Tighten CORS and log file permissions**
Explicit allowed origins from config; set log file mode to `0600` (or `0640`).

### P1 — High

6. **Kafka consumer error policy**
Define and implement skip+commit or DLQ for bad messages; avoid infinite redelivery of poison messages.

7. **DB connection pool and timeouts**
Set `MaxOpenConns`, `MaxIdleConns`, `ConnMaxLifetime`, and timeouts in `database.Connect`.

8. **Request body size limit**
Middleware (e.g. `http.MaxBytesReader`) for API routes to prevent large-body DoS.

9. **Replace fmt.Println with slog**
In apiclient and any remaining places; ensure response body is closed after read (e.g. `defer res.Body.Close()` in auth/data clients if not already).

### P2 — Medium

10. **Retries for Kafka produce**
Retry with backoff (and optionally circuit breaker) for critical `WriteMessages` calls.

11. **Context propagation**
Pass request/event context (or bounded context) into Kafka writes instead of `context.Background()`.

12. **Token refresh in Inferencer**
Use expiry from auth response and refresh token before inference calls.

13. **Health for bridge/decoder/location**
Add minimal health/readiness (HTTP or signal file) for orchestration and load balancers.

### P3 — Nice to have

14. **Metrics and tracing**
Add metrics (e.g. request duration, Kafka lag, error counts) and optional distributed tracing (e.g. OTel).

15. **Rate limiting**
Per-IP or per-token rate limiting on API routes.

16. **Structured validation**
Use a validator (e.g. go-playground/validator) for request bodies and path params (IDs, limits).

17. **Documentation**
Short runbooks for deploy, config env vars, and dependency on Kafka/DB/MQTT and external auth/API.

---

## 6. Files reviewed (representative)

- **Entrypoints:** `cmd/server/main.go`, `cmd/bridge/main.go`, `cmd/decoder/main.go`, `cmd/location/main.go`
- **Apps:** `internal/app/server/*`, `internal/app/bridge/app.go`, `internal/app/decoder/app.go`, `internal/app/location/app.go`
- **Config:** `internal/pkg/config/config.go`
- **Infra:** `internal/pkg/database/database.go`, `internal/pkg/kafkaclient/manager.go`, `internal/pkg/kafkaclient/consumer.go`, `internal/pkg/logger/logger.go`
- **API:** `internal/pkg/api/handler/health.go`, `internal/pkg/api/middleware/*`, `internal/pkg/api/response/response.go`, `internal/app/server/routes.go`
- **Controllers:** `internal/pkg/controller/*.go` (gateways, trackers, zone, parser, settings, alerts, tracks, trackerzones)
- **Services:** `internal/pkg/service/beacon_service.go`, `internal/pkg/apiclient/*.go`, `internal/pkg/bridge/mqtt.go`, `internal/pkg/bridge/handler.go`, `internal/pkg/decoder/process.go`, `internal/pkg/location/inference.go`, `internal/pkg/common/appcontext/context.go`

---

## 7. Conclusion

The microservices are **well-structured and readable**, with **consistent lifecycle and shutdown**. The main gaps are **security (no API auth, weak TLS usage)** and **reliability (Kafka and DB tuning, retries, context usage)**. Addressing the **P0 and P1** items above would bring the system much closer to production grade (around **7–8/10**); adding **P2/P3** would further improve operability and resilience.

バイナリ
bridge ファイルの表示


+ 16
- 0
build/docker-compose.db.yml ファイルの表示

@@ -0,0 +1,16 @@
version: "2"
services:
postgres:
image: postgres:18
container_name: postgres
restart: always
ports:
- "127.0.0.1:5433:5432"
env_file:
- ./env/db.env
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
start_period: 30s

+ 12
- 7
build/docker-compose.dev.yml ファイルの表示

@@ -1,7 +1,7 @@
version: "2"
services:
db:
image: postgres
image: postgres:18
container_name: db
restart: always
ports:
@@ -14,6 +14,8 @@ services:
timeout: 5s
retries: 5
start_period: 30s
volumes:
- pgdata:/var/postgresql/data

kafdrop:
image: obsidiandynamics/kafdrop
@@ -39,6 +41,8 @@ services:
timeout: 5s
retries: 10
start_period: 20s
volumes:
- kafkadata:/var/lib/kafka/data

kafka-init:
image: apache/kafka:3.9.0
@@ -73,7 +77,7 @@ services:
restart: always
volumes:
- ../:/app
command: air --build.cmd "go build -buildvcs=false -o /tmp/decoder ./cmd/decoder" --build.bin "/tmp/decoder"
command: air --build.cmd "go build -buildvcs=false -o ./decoder ./cmd/decoder" --build.bin "./decoder"
presense-server:
build:
@@ -95,7 +99,7 @@ services:
restart: always
volumes:
- ../:/app
command: air --build.cmd "go build -buildvcs=false -o /tmp/server ./cmd/server" --build.bin "/tmp/server"
command: air --build.cmd "go build -buildvcs=false -o ./server ./cmd/server" --build.bin "./server"

presense-bridge:
build:
@@ -113,7 +117,7 @@ services:
restart: always
volumes:
- ../:/app
command: air --build.cmd "go build -buildvcs=false -o /tmp/bridge ./cmd/bridge" --build.bin "/tmp/bridge"
command: air --build.cmd "go build -buildvcs=false -o ./bridge ./cmd/bridge" --build.bin "./bridge"

presense-location:
build:
@@ -131,7 +135,8 @@ services:
restart: always
volumes:
- ../:/app
command: air --build.cmd "go build -buildvcs=false -o /tmp/location ./cmd/location" --build.bin "/tmp/location"
command: air --build.cmd "go build -buildvcs=false -o ./location ./cmd/location" --build.bin "./location"


volumes:
pgdata:
kafkadata:

+ 8
- 2
build/docker-compose.yaml ファイルの表示

@@ -1,6 +1,6 @@
services:
db:
image: postgres
image: postgres:18
container_name: db
restart: always
ports:
@@ -13,6 +13,8 @@ services:
timeout: 5s
retries: 5
start_period: 30s
volumes:
- pgdata:/var/postgresql/data

kafdrop:
image: obsidiandynamics/kafdrop
@@ -37,6 +39,8 @@ services:
timeout: 5s
retries: 10
start_period: 20s
volumes:
- kafkadata:/var/lib/kafka/data

kafka-init:
image: apache/kafka:3.9.0
@@ -108,4 +112,6 @@ services:
restart: always


volumes:
pgdata:
kafkadata:

バイナリ
decoder ファイルの表示


+ 14
- 1
internal/pkg/apiclient/data.go ファイルの表示

@@ -3,6 +3,7 @@ package apiclient
import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/AFASystems/presence/internal/pkg/config"
@@ -16,6 +17,13 @@ func GetTrackers(token string, client *http.Client, cfg *config.Config) ([]model
return []model.Tracker{}, err
}

bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
fmt.Printf("error read body: %+v\n", err)
return []model.Tracker{}, err
}
fmt.Printf("body: %s\n", string(bodyBytes))

var i []model.Tracker
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {
@@ -74,7 +82,6 @@ func GetZones(token string, client *http.Client, cfg *config.Config) ([]model.Zo

func InferPosition(token string, client *http.Client, cfg *config.Config) (model.PositionResponse, error) {
url := fmt.Sprintf("%s/ble-ai/infer", cfg.APIBaseURL)
fmt.Printf("url: %s\n", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Printf("error new request: %+v\n", err)
@@ -89,6 +96,12 @@ func InferPosition(token string, client *http.Client, cfg *config.Config) (model
return model.PositionResponse{}, err
}

fmt.Printf("res.status: %s\n", res.Status)
if res.StatusCode != 200 {
fmt.Printf("error status code: %d\n", res.StatusCode)
return model.PositionResponse{}, fmt.Errorf("status code: %d", res.StatusCode)
}

var i model.PositionResponse
err = json.NewDecoder(res.Body).Decode(&i)
if err != nil {


+ 1
- 0
internal/pkg/apiclient/updatedb.go ファイルの表示

@@ -29,6 +29,7 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk
}

if trackers, err := GetTrackers(token, client, cfg); err == nil {
fmt.Printf("trackers: %+v\n", trackers)
syncTable(db, trackers)
if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil {
msg := fmt.Sprintf("Error in sending delete all from lookup message: %v", err)


+ 1
- 0
internal/pkg/apiclient/utils.go ファイルの表示

@@ -14,6 +14,7 @@ func setHeader(req *http.Request, token string) {

func getRequest(token, route string, client *http.Client, cfg *config.Config) (*http.Response, error) {
url := fmt.Sprintf("%s/reslevis/%s", cfg.APIBaseURL, route)
fmt.Printf("url: %s\n", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err


+ 3
- 7
internal/pkg/bridge/handler.go ファイルの表示

@@ -7,15 +7,11 @@ import (
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// RawBeaconWriter writes beacon advertisements to the rawbeacons topic.
type RawBeaconWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

// BeaconLookup provides MAC->ID lookup (e.g. AppState).
type BeaconLookup interface {
BeaconExists(mac string) (id string, ok bool)
@@ -24,7 +20,7 @@ type BeaconLookup interface {
// HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV.
// For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup.
// Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/".
func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer RawBeaconWriter) {
func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer *kafka.Writer) {
parts := strings.SplitN(topic, "/", 2)
hostname := ""
if len(parts) >= 2 {
@@ -58,7 +54,7 @@ func HandleMQTTMessage(topic string, payload []byte, lookup BeaconLookup, writer
slog.Error("marshaling beacon advertisement", "err", err)
break
}
if err := writer.WriteMessages(context.Background(), kafka.Message{Value: encoded}); err != nil {
if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: encoded}); err != nil {
slog.Error("writing to Kafka", "err", err)
time.Sleep(1 * time.Second)
break


+ 3
- 2
internal/pkg/controller/settings_controller.go ファイルの表示

@@ -7,6 +7,7 @@ import (
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
"gorm.io/gorm"
@@ -45,8 +46,8 @@ func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, context context
}

kafkaMsg := kafka.Message{Value: eMsg}
if err := writer.WriteMessages(context, kafkaMsg); err != nil {
slog.Error("writing settings to Kafka", "err", err)
if err := kafkaclient.Write(context, writer, kafkaMsg); err != nil {
slog.Error("error writing settings to Kafka", "error", err)
response.InternalError(w, "failed to publish settings update", err)
return
}


+ 4
- 6
internal/pkg/controller/trackers_controller.go ファイルの表示

@@ -3,11 +3,11 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/AFASystems/presence/internal/pkg/api/response"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"github.com/segmentio/kafka-go"
@@ -17,17 +17,15 @@ import (
func SendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, context context.Context) error {
valueStr, err := json.Marshal(&value)
if err != nil {
msg := fmt.Sprintf("error in encoding: %v", err)
slog.Error(msg)
slog.Error("error encoding", "error", err)
return err
}
msg := kafka.Message{
Value: valueStr,
}

if err := writer.WriteMessages(context, msg); err != nil {
msg := fmt.Sprintf("Error in sending kafka message: %v", err)
slog.Error(msg)
if err := kafkaclient.Write(context, writer, msg); err != nil {
slog.Error("error sending kafka message", "error", err)
return err
}



+ 4
- 8
internal/pkg/decoder/process.go ファイルの表示

@@ -10,24 +10,20 @@ import (

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/common/utils"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// AlertWriter writes decoded beacon events (e.g. to alertbeacons topic).
type AlertWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

// ProcessIncoming decodes a beacon advertisement and writes the event to the writer if it changed.
func ProcessIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer AlertWriter, registry *model.ParserRegistry) {
func ProcessIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, registry *model.ParserRegistry) {
if err := DecodeBeacon(adv, appState, writer, registry); err != nil {
slog.Error("decoding beacon", "err", err, "id", adv.ID)
}
}

// DecodeBeacon hex-decodes the payload, runs the parser registry, dedupes by event hash, and writes to writer.
func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer AlertWriter, registry *model.ParserRegistry) error {
func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer, registry *model.ParserRegistry) error {
beacon := strings.TrimSpace(adv.Data)
id := adv.ID
if beacon == "" {
@@ -63,7 +59,7 @@ func DecodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState,
return err
}

if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: eMsg}); err != nil {
return fmt.Errorf("write alert: %w", err)
}



+ 3
- 5
internal/pkg/kafkaclient/consumer.go ファイルの表示

@@ -3,7 +3,6 @@ package kafkaclient
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"

@@ -21,15 +20,14 @@ func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.
default:
msg, err := r.ReadMessage(ctx)
if err != nil {
msg := fmt.Sprintf("error reading message: %v", err)
slog.Error(msg)
slog.Error("error reading message", "error", err)
continue
}

var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
msg := fmt.Sprintf("error decoding: %v", err)
slog.Error(msg)
slog.Error("error decoding", "error", err)
continue
}



+ 11
- 7
internal/pkg/kafkaclient/manager.go ファイルの表示

@@ -38,13 +38,17 @@ func InitKafkaManager() *KafkaManager {

func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
MaxAttempts: 5,
WriteBackoffMin: 100 * time.Millisecond,
WriteBackoffMax: 1 * time.Second,
WriteTimeout: 5 * time.Second,
}

m.kafkaWritersMap.KafkaWritersLock.Lock()


+ 16
- 0
internal/pkg/kafkaclient/write.go ファイルの表示

@@ -0,0 +1,16 @@
package kafkaclient

import (
"context"
"time"

"github.com/segmentio/kafka-go"
)

func Write(ctx context.Context, writer *kafka.Writer, message kafka.Message) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err := writer.WriteMessages(ctx, message)
return err
}

+ 7
- 11
internal/pkg/location/filter.go ファイルの表示

@@ -7,26 +7,22 @@ import (
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// Score weights for location algorithm (configurable via constants).
const (
SeenWeight = 1.5
RSSIWeight = 0.75
SeenWeight = 1.5
RSSIWeight = 0.75
DefaultDistance = 999
DefaultLastSeen = 999
)

// LocationWriter writes location events (e.g. to Kafka).
type LocationWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

// GetLikelyLocations runs the filter algorithm: scores beacons by RSSI and seen count,
// updates app state with best location and confidence, and writes HTTPLocation to the writer.
func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) {
func GetLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
ctx := context.Background()
beacons := appState.GetAllBeacons()
settings := appState.GetSettingsValue()
@@ -36,7 +32,7 @@ func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) {
Method: "Standard",
Distance: DefaultDistance,
ID: beacon.ID,
Location: "",
Location: "",
LastSeen: DefaultLastSeen,
}

@@ -89,8 +85,8 @@ func GetLikelyLocations(appState *appcontext.AppState, writer LocationWriter) {
continue
}

if err := writer.WriteMessages(ctx, kafka.Message{Value: js}); err != nil {
slog.Error("sending kafka location message", "err", err, "beacon_id", beacon.ID)
if err := kafkaclient.Write(ctx, writer, kafka.Message{Value: js}); err != nil {
slog.Error("error sending kafka location message", "error", err, "beacon_id", beacon.ID)
}
}
}

+ 14
- 3
internal/pkg/location/inference.go ファイルの表示

@@ -3,7 +3,6 @@ package location
import (
"context"
"crypto/tls"
"fmt"
"net/http"

"github.com/AFASystems/presence/internal/pkg/apiclient"
@@ -35,7 +34,6 @@ func NewDefaultInferencer(skipTLSVerify bool) *DefaultInferencer {
// Infer gets a token and calls the inference API.
func (d *DefaultInferencer) Infer(ctx context.Context, cfg *config.Config) (model.PositionResponse, error) {
if d.Token == "" {
fmt.Printf("getting token\n")
token, err := apiclient.GetToken(ctx, cfg, d.Client)
if err != nil {
return model.PositionResponse{}, err
@@ -43,5 +41,18 @@ func (d *DefaultInferencer) Infer(ctx context.Context, cfg *config.Config) (mode
d.Token = token
}

return apiclient.InferPosition(d.Token, d.Client, cfg)
response, err := apiclient.InferPosition(d.Token, d.Client, cfg)
if err != nil {
token, err := apiclient.GetToken(ctx, cfg, d.Client)
if err != nil {
return model.PositionResponse{}, err
}
d.Token = token
response, err = apiclient.InferPosition(d.Token, d.Client, cfg)
if err != nil {
return model.PositionResponse{}, err
}
}

return response, nil
}

+ 11
- 10
internal/pkg/service/beacon_service.go ファイルの表示

@@ -10,21 +10,18 @@ import (
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"gorm.io/gorm"
)

// KafkaWriter defines the interface for writing Kafka messages (allows mocking in tests)
type KafkaWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) {
fmt.Printf("Finding tracker for MAC: %s, ID: %s\n", msg.MAC, msg.ID)
var tracker model.Tracker
if msg.MAC != "" {
if err := db.Where("mac = ?", msg.MAC).Find(&tracker).Error; err != nil {
if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil {
return model.Tracker{}, err
}

@@ -56,7 +53,7 @@ func findZones(trackerID string, db *gorm.DB) ([]string, error) {
return allowedZones, nil
}

func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) {
func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
tracker, err := findTracker(msg, db)
if err != nil {
msg := fmt.Sprintf("Error in finding tracker: %v", err)
@@ -95,7 +92,7 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer KafkaWr
sendAlert(gw.ID, msg.ID, writer, ctx, allowedZones, db)
}

func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer KafkaWriter, ctx context.Context) {
func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
tracker, err := findTracker(msg, db)
if err != nil {
msg := fmt.Sprintf("Error in finding tracker: %v", err)
@@ -133,7 +130,7 @@ func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer Kafka
sendAlert(gw.ID, tracker.ID, writer, ctx, allowedZones, db)
}

func sendAlert(gwId, trackerId string, writer KafkaWriter, ctx context.Context, allowedZones []string, db *gorm.DB) {
func sendAlert(gwId, trackerId string, writer *kafka.Writer, ctx context.Context, allowedZones []string, db *gorm.DB) {
if len(allowedZones) != 0 && !slices.Contains(allowedZones, gwId) {
alert := model.Alert{
ID: uuid.New().String(),
@@ -157,7 +154,11 @@ func sendAlert(gwId, trackerId string, writer KafkaWriter, ctx context.Context,
msg := kafka.Message{
Value: eMsg,
}
writer.WriteMessages(ctx, msg)
if err := kafkaclient.Write(ctx, writer, msg); err != nil {
msg := fmt.Sprintf("Error in writing message: %v", err)
slog.Error(msg)
return
}
}
}
}


+ 3
- 2
internal/pkg/service/parser_service.go ファイルの表示

@@ -4,11 +4,12 @@ import (
"context"
"encoding/json"

"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

func SendParserConfig(kp model.KafkaParser, writer KafkaWriter, ctx context.Context) error {
func SendParserConfig(kp model.KafkaParser, writer *kafka.Writer, ctx context.Context) error {
eMsg, err := json.Marshal(kp)
if err != nil {
return err
@@ -17,7 +18,7 @@ func SendParserConfig(kp model.KafkaParser, writer KafkaWriter, ctx context.Cont
Value: eMsg,
}

if err := writer.WriteMessages(ctx, msg); err != nil {
if err := kafkaclient.Write(ctx, writer, msg); err != nil {
return err
}
return nil


バイナリ
location ファイルの表示


+ 9
- 9
scripts/build/build.sh ファイルの表示

@@ -1,20 +1,20 @@
#!/bin/bash

# Build the server
docker build -t afasystemadmin/ble-ai-localizer:server_v1 -f ../../build/package/Dockerfile.server ../../
docker build -t afasystemadmin/ble-ai-localizer:server_v1.1 -f ../../build/package/Dockerfile.server ../../
¸
# Build the location
docker build -t afasystemadmin/ble-ai-localizer:location_v1 -f ../../build/package/Dockerfile.location ../../
docker build -t afasystemadmin/ble-ai-localizer:location_v1.1 -f ../../build/package/Dockerfile.location ../../

# Build the decoder
docker build -t afasystemadmin/ble-ai-localizer:decoder_v1 -f ../../build/package/Dockerfile.decoder ../../
docker build -t afasystemadmin/ble-ai-localizer:decoder_v1.1 -f ../../build/package/Dockerfile.decoder ../../

# Build the bridge
docker build -t afasystemadmin/ble-ai-localizer:bridge_v1 -f ../../build/package/Dockerfile.bridge ../../
docker build -t afasystemadmin/ble-ai-localizer:bridge_v1.1 -f ../../build/package/Dockerfile.bridge ../../

docker image ls

docker push afasystemadmin/ble-ai-localizer:server_v1
docker push afasystemadmin/ble-ai-localizer:location_v1
docker push afasystemadmin/ble-ai-localizer:decoder_v1
docker push afasystemadmin/ble-ai-localizer:bridge_v1
docker push afasystemadmin/ble-ai-localizer:server_v1.1
docker push afasystemadmin/ble-ai-localizer:location_v1.1
docker push afasystemadmin/ble-ai-localizer:decoder_v1.1
docker push afasystemadmin/ble-ai-localizer:bridge_v1.1

+ 1
- 0
scripts/db/dump.sh ファイルの表示

@@ -0,0 +1 @@
docker exec db pg_dump -U postgres -F c postgres > dump.sql

バイナリ
scripts/db/dump.sql ファイルの表示


+ 3
- 0
scripts/db/restore.sh ファイルの表示

@@ -0,0 +1,3 @@
docker cp dump.sql postgres:/tmp/dump.sql

docker exec -t postgres pg_restore -U postgres -d postgres /tmp/dump.sql

バイナリ
server ファイルの表示


読み込み中…
キャンセル
保存