Commits vergleichen

...

Autor SHA1 Nachricht Datum
  Blaz Smehov 55138a0f6c chore: change to logging in file vor 1 Tag
  Blaz Smehov 8c14b77ba8 fix: websocket implementation not working because of the CORS headers vor 2 Tagen
  Blaz Smehov faa941e4bd fix: adding graceful shutdown of the api server vor 2 Tagen
  Blaz Smehov d69952a36d chore: add settings topic to init script vor 2 Tagen
  Blaz Smehov d284f48496 feat: add websocket and dockerfile to build location vor 3 Tagen
  Blaz Smehov 979563eec4 feat: settings controller, chore: refactor api structure vor 3 Tagen
  Blaz Smehov 3bc3fbce5e feat: remove beacon from lookup logic vor 4 Tagen
  Blaz Smehov e14d8e0d97 graceful shutdown of redis client vor 4 Tagen
  Blaz Smehov 28d70bea24 feat: gracefully close kafka connections on docker sigterm signal vor 4 Tagen
  Blaz Smehov d2300aa400 feat: implement basic API, reimplement Redis and some methods for quick insert (persistence in case of crash) using hash maps vor 1 Woche
  Blaz Smehov c401ba3af3 chore: add additional logs vor 1 Woche
  Blaz Smehov cfab61e665 chore: add documentation vor 1 Woche
  Blaz Smehov 6db62dde15 feat: reimplement redis support, using hash maps for quick insert and read vor 1 Woche
  Blaz Smehov 02a72f6853 feat: add hash and toJson methods vor 1 Woche
  Blaz Smehov d75ad867b0 chore: refactor redis persistence to support all keys and types vor 1 Woche
  Blaz Smehov 53fa6cc127 chore: remove redundant files vor 1 Woche
  Blaz Smehov 8e3125d3f8 chore: refactor and move code for beacon logic vor 1 Woche
  Blaz Smehov 5cc3b5f1d2 chore: refactor distance related code for locations algorithm vor 1 Woche
  Blaz Smehov f1e1b8d606 chore: refactor locations algorithm and decoder to use new appState vor 1 Woche
  Blaz Smehov 63a79099af chore: remove redundant files vor 1 Woche
  Blaz Smehov ccaef532d3 chore: rename to appState vor 1 Woche
  Blaz Smehov 7313636441 chore: refactor application context logic vor 1 Woche
  Blaz Smehov c7e02b373a fix: missing beacons init in context, remove redundant properties of httpRes type vor 1 Woche
  Blaz Smehov f3d83a778c fix: locking mechanism for beacons list vor 1 Woche
  Blaz Smehov 4b872a5cc0 feat: locations algorithm, chore: remove unused files vor 1 Woche
  Blaz Smehov ba93c59a8a feat: restructure of decoder and bridge vor 1 Woche
  Blaz Smehov a7af74e7db chore: update docker compose with kafdrop and automatic topic creation vor 1 Woche
  Blaz Smehov 8dd27e74d1 feat: basis for decoding different beacon types vor 2 Wochen
  Blaz Smehov 24c2a06ed4 chore: remove emqx from docker compose vor 3 Wochen
50 geänderte Dateien mit 12877 neuen und 3155 gelöschten Zeilen
  1. +3
    -1
      .gitignore
  2. +391
    -166
      README.md
  3. +113
    -0
      build/docker-compose.yaml
  4. +0
    -99
      build/docker-compose.yml
  5. +26
    -0
      build/init-scripts/create_topic.sh
  6. +17
    -0
      build/package/Dockerfile.location
  7. +2
    -2
      cmd/bridge/main.go
  8. +93
    -181
      cmd/decoder/main.go
  9. +224
    -47
      cmd/location/main.go
  10. +0
    -0
      cmd/presenSe/.keep
  11. +0
    -188
      cmd/presenSe/presense.go
  12. +127
    -260
      cmd/server/main.go
  13. +3238
    -0
      cmd/testbench/debug.txt
  14. +86
    -0
      cmd/testbench/main.go
  15. +3488
    -0
      cmd/testbench/save.txt
  16. +131
    -0
      cmd/valkey-testbench/main.go
  17. +748
    -0
      docs/API.md
  18. +1039
    -0
      docs/DEPLOYMENT.md
  19. +22
    -21
      internal/pkg/bridge/mqtthandler/mqtthandler.go
  20. +291
    -0
      internal/pkg/common/appcontext/context.go
  21. +129
    -0
      internal/pkg/common/utils/beacons.go
  22. +38
    -0
      internal/pkg/common/utils/distance.go
  23. +4
    -3
      internal/pkg/config/config.go
  24. +122
    -0
      internal/pkg/controller/beacons_controller.go
  25. +79
    -0
      internal/pkg/controller/settings_controller.go
  26. +0
    -371
      internal/pkg/httpserver/server.go
  27. +0
    -3
      internal/pkg/httpserver/server.md
  28. +20
    -12
      internal/pkg/kafkaclient/consumer.go
  29. +3
    -0
      internal/pkg/kafkaclient/reader.go
  30. +5
    -0
      internal/pkg/kafkaclient/writer.go
  31. +47
    -0
      internal/pkg/model/type_methods.go
  32. +105
    -101
      internal/pkg/model/types.go
  33. +0
    -128
      internal/pkg/mqttclient/beacon.go
  34. +0
    -35
      internal/pkg/mqttclient/fillter.go
  35. +0
    -165
      internal/pkg/mqttclient/location.go
  36. +0
    -62
      internal/pkg/mqttclient/processor.go
  37. +0
    -18
      internal/pkg/persistence/buckets.go
  38. +0
    -39
      internal/pkg/persistence/load.go
  39. +0
    -50
      internal/pkg/persistence/persist.go
  40. +20
    -64
      internal/pkg/redis/redis.go
  41. +72
    -0
      internal/pkg/service/beacon_service.go
  42. +3
    -29
      scripts/testAPI.sh
  43. +125
    -4
      test/README.md
  44. +560
    -0
      test/beacons_test.go
  45. +294
    -0
      test/distance_test.go
  46. +0
    -160
      test/httpserver_test/httpserver_test.go
  47. +0
    -46
      test/mqtt_test/mqtt_test.go
  48. +568
    -0
      test/mqtthandler_test.go
  49. +0
    -900
      test/node-red-integration-tests/apitest.json
  50. +644
    -0
      test/typeMethods_test.go

+ 3
- 1
.gitignore Datei anzeigen

@@ -24,4 +24,6 @@ cmd/presenSe/presence.db
# Dependency directories (remove the comment below to include it)
vendor/
volumes/node-red/
main
main

*.sh

+ 391
- 166
README.md Datei anzeigen

@@ -1,196 +1,421 @@
# Standard Go Project Layout
# AFA Systems Presence Detection

A comprehensive **Bluetooth Low Energy (BLE) presence detection system** that tracks beacon devices in real-time, calculates locations based on signal strength, and integrates with popular IoT platforms like Home Assistant and Node-RED.

## 🏗️ System Architecture

The system follows a **microservices architecture** with Apache Kafka as the central message bus:

```
┌─────────────┐ MQTT ┌─────────────┐ Kafka ┌─────────────┐
│ MQTT │ ────────► │ Bridge │ ────────► │ Decoder │
│ Gateway │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
┌─────────────┐ WebSocket ┌─────────────┐ Kafka ┌─────────────┐
│ Web UI │ ◄────────── │ Server │ ◄───────── │ Location │
│ Dashboard │ │ Service │ │ Algorithm │
└─────────────┘ └─────────────┘ └─────────────┘
```

### Core Components

#### 🔌 **Bridge Service** (`cmd/bridge/`)
- **Purpose**: MQTT to Kafka message gateway
- **Function**: Subscribes to `publish_out/#` MQTT topics and forwards messages to Kafka `rawbeacons` topic
- **Features**: Configurable MQTT connection, automatic reconnection, error handling

#### 🔍 **Decoder Service** (`cmd/decoder/`)
- **Purpose**: Processes raw BLE beacon advertisements
- **Supported Formats**: Ingics, Eddystone, Minew B7
- **Functions**:
- Battery level monitoring
- Fall detection events
- Button press detection
- Device telemetry extraction
- **Output**: Processed events to `alertbeacons` Kafka topic

#### 📍 **Location Algorithm** (`cmd/location/`)
- **Purpose**: Calculates device locations based on RSSI and proximity
- **Features**:
- Weighted location calculation using RSSI values
- Confidence scoring system
- Location change notifications
- Configurable thresholds and parameters
- **Output**: Location events to `locevents` Kafka topic

#### 🌐 **Server Service** (`cmd/server/`)
- **Purpose**: HTTP API and WebSocket server
- **Features**:
- RESTful API for beacon management (CRUD operations)
- Real-time WebSocket communication
- Settings management interface
- CORS-enabled web interface
- Home Assistant integration endpoints

#### 📊 **Supporting Services**
- **Kafka 3.9.0**: Central message bus with automatic topic creation
- **Kafdrop**: Web-based Kafka monitoring and management UI
- **Node-RED**: IoT workflow automation with pre-configured flows
- **Redis**: Real-time data caching and WebSocket session management
- **BoltDB**: Embedded database for persistent storage

## 🚀 Quick Start

### Prerequisites

- Docker and Docker Compose
- Go 1.24+ (for local development)
- MQTT broker (compatible with BLE gateways)

### Installation

1. **Clone the repository**:
```bash
git clone https://github.com/AFASystems/presence.git
cd presence
```

2. **Start the system**:
```bash
cd build
docker-compose up -d
```

3. **Verify services**:
- **Web Interface**: http://localhost:8080
- **Kafdrop (Kafka UI)**: http://localhost:9000
- **Node-RED**: http://localhost:1880

### Configuration

Set the following environment variables:

```bash
# Web Server
HTTP_HOST_PATH=":8080"
HTTP_WS_HOST_PATH=":8081"

# MQTT Configuration
MQTT_HOST="tcp://mqtt-broker:1883"
MQTT_USERNAME="your_username"
MQTT_PASSWORD="your_password"

# Kafka Configuration
KAFKA_URL="kafka:29092"

# Database
DB_PATH="./volumes/presence.db"
```

## 📡 Supported Beacon Types

### Ingics Beacons
- Battery level monitoring
- Event detection (fall detection, button presses)
- Signal strength tracking

### Eddystone Beacons
- Eddystone-UID protocol support
- Battery telemetry (Eddystone-TLM)
- Proximity detection

### Minew B7 Beacons
- Vendor-specific format decoding
- Button counter tracking
- Battery status monitoring
- Multiple button modes

## 🔧 Configuration Options

### System Settings (`SettingsVal`)

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `location_confidence` | int64 | 80 | Minimum confidence level for location changes |
| `last_seen_threshold` | int64 | 300 | Time (seconds) before beacon considered offline |
| `beacon_metrics_size` | int | 10 | Number of RSSI measurements to keep for averaging |
| `ha_send_interval` | int64 | 60 | Home Assistant update interval (seconds) |
| `ha_send_changes_only` | bool | true | Send updates only on location changes |
| `rssi_min_threshold` | int64 | -90 | Minimum RSSI value for beacon detection |
| `enforce_rssi_threshold` | bool | true | Filter out weak signals |

### API Endpoints

#### Beacon Management
- `GET /api/beacons` - List all registered beacons
- `POST /api/beacons` - Register a new beacon
- `PUT /api/beacons/{id}` - Update beacon information
- `DELETE /api/beacons/{id}` - Remove a beacon

#### Settings
- `GET /api/settings` - Get current system settings
- `POST /api/settings` - Update system settings

#### WebSocket
- `/ws/broadcast` - Real-time beacon updates and notifications

## 🏠 Home Assistant Integration

### MQTT Auto-Discovery

The system automatically generates MQTT messages for Home Assistant:

```yaml
# Example device tracker configuration
device_tracker:
- platform: mqtt
devices:
beacon_001: "Presence/Beacons/beacon_001"
```

### Battery Monitoring

```yaml
# Battery level sensor
sensor:
- platform: mqtt
name: "Beacon Battery"
state_topic: "Presence/Beacons/beacon_001/battery"
unit_of_measurement: "%"
device_class: battery
```

### Button Detection

```yaml
# Button press sensor
binary_sensor:
- platform: mqtt
name: "Beacon Button"
state_topic: "Presence/Beacons/beacon_001/button"
device_class: "button"
```

## 📊 Data Models

### Core Entities

#### Beacon
```go
type Beacon struct {
Name string `json:"name"`
ID string `json:"beacon_id"`
BeaconType string `json:"beacon_type"`
BeaconLocation string `json:"beacon_location"`
LastSeen int64 `json:"last_seen"`
Distance float64 `json:"distance"`
LocationConfidence int64 `json:"location_confidence"`
HSButtonCounter int64 `json:"hs_button_counter"`
HSBattery int64 `json:"hs_button_battery"`
// ... additional fields
}
```

#### BeaconAdvertisement
```go
type BeaconAdvertisement struct {
Hostname string `json:"hostname"`
MAC string `json:"mac"`
RSSI int64 `json:"rssi"`
Data string `json:"data"`
BeaconType string `json:"beacon_type"`
UUID string `json:"uuid"`
Major string `json:"major"`
Minor string `json:"minor"`
// ... additional fields
}
```

#### LocationChange
```go
type LocationChange struct {
Method string `json:"method"`
BeaconRef Beacon `json:"beacon_info"`
Name string `json:"name"`
PreviousLocation string `json:"previous_location"`
NewLocation string `json:"new_location"`
Timestamp int64 `json:"timestamp"`
}
```

## 🐳 Docker Services

### Available Services

| Service | Image | Ports | Description |
|---------|-------|-------|-------------|
| kafka | apache/kafka:3.9.0 | 9092, 9093 | Apache Kafka message broker |
| kafdrop | obsidiandynamics/kafdrop | 9000 | Kafka monitoring UI |
| presence-bridge | local/build | - | MQTT to Kafka bridge |
| presence-decoder | local/build | - | BLE beacon decoder |
| presence-location | local/build | - | Location calculation service |
| presence-server | local/build | 8080, 8081 | HTTP API and WebSocket server |

### Volumes

- `./volumes/presence.db` - BoltDB database file
- `./volumes/node-red/` - Node-RED configuration and flows
- `./volumes/kafka-data/` - Kafka persistent data

## 🔧 Development

### Local Development Setup

1. **Install dependencies**:
```bash
go mod download
```

2. **Run individual services**:
```bash
# Run bridge service
go run cmd/bridge/main.go

# Run decoder service
go run cmd/decoder/main.go

# Run location algorithm
go run cmd/location/main.go

# Run API server
go run cmd/server/main.go
```

3. **Run tests**:
```bash
go test ./...
```

### Building Docker Images

```bash
# Build all services
docker build -t presence-system .

# Build individual service
docker build -f build/package/Dockerfile.bridge -t presence-bridge .
```

### Project Structure

```
/
├── cmd/ # Main application entry points
│ ├── server/ # HTTP API & WebSocket server
│ ├── bridge/ # MQTT to Kafka bridge
│ ├── decoder/ # BLE beacon decoder
│ ├── location/ # Location calculation algorithm
│ └── testbench/ # Testing/development utilities
├── internal/ # Private application code
│ ├── app/ # Application components
│ └── pkg/ # Shared internal packages
│ ├── model/ # Data structures and types
│ ├── kafkaclient/ # Kafka producer/consumer
│ ├── config/ # Configuration management
│ ├── persistence/ # BoltDB operations
│ ├── redis/ # Redis client
│ └── bridge/ # MQTT bridge logic
├── build/ # Build artifacts and Docker configs
│ ├── package/ # Dockerfile for each component
│ └── docker-compose.yaml # Complete system deployment
├── web/ # Web interface files
├── volumes/ # Persistent data and configurations
├── scripts/ # Utility scripts
└── docs/ # Documentation
```

## Overview
## 📈 Monitoring and Debugging

This is a basic layout for Go application projects. Note that it's basic in terms of content because it's focusing only on the general layout and not what you have inside. It's also basic because it's very high level and it doesn't go into great details in terms of how you can structure your project even further. For example, it doesn't try to cover the project structure you'd have with something like Clean Architecture.
### Kafka Topics

This is **`NOT an official standard defined by the core Go dev team`**. This is a set of common historical and emerging project layout patterns in the Go ecosystem. Some of these patterns are more popular than others. It also has a number of small enhancements along with several supporting directories common to any large enough real world application. Note that the **core Go team provides a great set of general guidelines about structuring Go projects** and what it means for your project when it's imported and when it's installed. See the [`Organizing a Go module`](https://go.dev/doc/modules/layout) page in the official Go docs for more details. It includes the `internal` and `cmd` directory patterns (described below) and other useful information.
- `rawbeacons` - Raw BLE beacon advertisements
- `alertbeacons` - Processed beacon events (battery, buttons)
- `locevents` - Location change notifications

**`If you are trying to learn Go or if you are building a PoC or a simple project for yourself this project layout is an overkill. Start with something really simple instead (a single `main.go` file and `go.mod` is more than enough).`** As your project grows keep in mind that it'll be important to make sure your code is well structured otherwise you'll end up with a messy code with lots of hidden dependencies and global state. When you have more people working on the project you'll need even more structure. That's when it's important to introduce a common way to manage packages/libraries. When you have an open source project or when you know other projects import the code from your project repository that's when it's important to have private (aka `internal`) packages and code. Clone the repository, keep what you need and delete everything else! Just because it's there it doesn't mean you have to use it all. None of these patterns are used in every single project. Even the `vendor` pattern is not universal.
### Health Checks

With Go 1.14 [`Go Modules`](https://go.dev/wiki/Modules) are finally ready for production. Use [`Go Modules`](https://blog.golang.org/using-go-modules) unless you have a specific reason not to use them and if you do then you don’t need to worry about $GOPATH and where you put your project. The basic `go.mod` file in the repo assumes your project is hosted on GitHub, but it's not a requirement. The module path can be anything though the first module path component should have a dot in its name (the current version of Go doesn't enforce it anymore, but if you are using slightly older versions don't be surprised if your builds fail without it). See Issues [`37554`](https://github.com/golang/go/issues/37554) and [`32819`](https://github.com/golang/go/issues/32819) if you want to know more about it.
- **Kafka**: Topic creation and broker connectivity
- **Services**: Automatic restart on failure
- **Database**: BoltDB integrity checks

This project layout is intentionally generic and it doesn't try to impose a specific Go package structure.
### Logs

This is a community effort. Open an issue if you see a new pattern or if you think one of the existing patterns needs to be updated.
```bash
# View service logs
docker-compose logs -f [service-name]

If you need help with naming, formatting and style start by running [`gofmt`](https://golang.org/cmd/gofmt/) and [`staticcheck`](https://github.com/dominikh/go-tools/tree/master/cmd/staticcheck). The previous standard linter, golint, is now deprecated and not maintained; use of a maintained linter such as staticcheck is recommended. Also make sure to read these Go code style guidelines and recommendations:
* https://talks.golang.org/2014/names.slide
* https://golang.org/doc/effective_go.html#names
* https://blog.golang.org/package-names
* https://go.dev/wiki/CodeReviewComments
* [Style guideline for Go packages](https://rakyll.org/style-packages) (rakyll/JBD)
# View all logs
docker-compose logs -f
```

See [`Go Project Layout`](https://medium.com/golang-learn/go-project-layout-e5213cdcfaa2) for additional background information.
## 🔌 Integrations

More about naming and organizing packages as well as other code structure recommendations:
* [GopherCon EU 2018: Peter Bourgon - Best Practices for Industrial Programming](https://www.youtube.com/watch?v=PTE4VJIdHPg)
* [GopherCon Russia 2018: Ashley McNamara + Brian Ketelsen - Go best practices.](https://www.youtube.com/watch?v=MzTcsI6tn-0)
* [GopherCon 2017: Edward Muller - Go Anti-Patterns](https://www.youtube.com/watch?v=ltqV6pDKZD8)
* [GopherCon 2018: Kat Zien - How Do You Structure Your Go Apps](https://www.youtube.com/watch?v=oL6JBUk6tj0)
### Node-RED Flows

A Chinese post about Package-Oriented-Design guidelines and Architecture layer
* [面向包的设计和架构分层](https://github.com/danceyoung/paper-code/blob/master/package-oriented-design/packageorienteddesign.md)
Pre-configured Node-RED flows are available in `/volumes/node-red/`:

## Go Directories
- **Beacon monitoring dashboards**
- **Location-based automations**
- **Battery level alerts**
- **Notification systems**

### `/cmd`
### MQTT Topics

Main applications for this project.
System publishes to the following MQTT topics:

The directory name for each application should match the name of the executable you want to have (e.g., `/cmd/myapp`).
- `Presence/Beacons/{beacon_id}/location` - Current beacon location
- `Presence/Beacons/{beacon_id}/battery` - Battery level
- `Presence/Beacons/{beacon_id}/button` - Button press events
- `Presence/Beacons/{beacon_id}/distance` - Distance from nearest gateway

Don't put a lot of code in the application directory. If you think the code can be imported and used in other projects, then it should live in the `/pkg` directory. If the code is not reusable or if you don't want others to reuse it, put that code in the `/internal` directory. You'll be surprised what others will do, so be explicit about your intentions!
## 🤝 Contributing

It's common to have a small `main` function that imports and invokes the code from the `/internal` and `/pkg` directories and nothing else.
1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

See the [`/cmd`](cmd/README.md) directory for examples.
### Development Guidelines

### `/internal`
- Follow Go conventions and best practices
- Write comprehensive tests for new features
- Update documentation for API changes
- Use meaningful commit messages

Private application and library code. This is the code you don't want others importing in their applications or libraries. Note that this layout pattern is enforced by the Go compiler itself. See the Go 1.4 [`release notes`](https://golang.org/doc/go1.4#internalpackages) for more details. Note that you are not limited to the top level `internal` directory. You can have more than one `internal` directory at any level of your project tree.
## 📄 License

You can optionally add a bit of extra structure to your internal packages to separate your shared and non-shared internal code. It's not required (especially for smaller projects), but it's nice to have visual clues showing the intended package use. Your actual application code can go in the `/internal/app` directory (e.g., `/internal/app/myapp`) and the code shared by those apps in the `/internal/pkg` directory (e.g., `/internal/pkg/myprivlib`).
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

You use internal directories to make packages private. If you put a package inside an internal directory, then other packages can’t import it unless they share a common ancestor. And it’s the only directory named in Go’s documentation and has special compiler treatment.
## 🆘 Support

### `/pkg`
- **Issues**: [GitHub Issues](https://github.com/AFASystems/presence/issues)
- **Documentation**: [Project Wiki](https://github.com/AFASystems/presence/wiki)
- **Discussions**: [GitHub Discussions](https://github.com/AFASystems/presence/discussions)

Library code that's ok to use by external applications (e.g., `/pkg/mypubliclib`). Other projects will import these libraries expecting them to work, so think twice before you put something here :-) Note that the `internal` directory is a better way to ensure your private packages are not importable because it's enforced by Go. The `/pkg` directory is still a good way to explicitly communicate that the code in that directory is safe for use by others. The [`I'll take pkg over internal`](https://travisjeffery.com/b/2019/11/i-ll-take-pkg-over-internal/) blog post by Travis Jeffery provides a good overview of the `pkg` and `internal` directories and when it might make sense to use them.
## 🔮 Roadmap

It's also a way to group Go code in one place when your root directory contains lots of non-Go components and directories making it easier to run various Go tools (as mentioned in these talks: [`Best Practices for Industrial Programming`](https://www.youtube.com/watch?v=PTE4VJIdHPg) from GopherCon EU 2018, [GopherCon 2018: Kat Zien - How Do You Structure Your Go Apps](https://www.youtube.com/watch?v=oL6JBUk6tj0) and [GoLab 2018 - Massimiliano Pippi - Project layout patterns in Go](https://www.youtube.com/watch?v=3gQa1LWwuzk)).
### Upcoming Features

See the [`/pkg`](pkg/README.md) directory if you want to see which popular Go repos use this project layout pattern. This is a common layout pattern, but it's not universally accepted and some in the Go community don't recommend it.
- [ ] Enhanced location algorithms with machine learning
- [ ] Support for additional beacon types (iBeacon, AltBeacon)
- [ ] Mobile application for beacon management
- [ ] Advanced analytics and reporting dashboard
- [ ] Multi-tenant architecture
- [ ] Cloud deployment templates

It's ok not to use it if your app project is really small and where an extra level of nesting doesn't add much value (unless you really want to :-)). Think about it when it's getting big enough and your root directory gets pretty busy (especially if you have a lot of non-Go app components).
### Current Development Status

The `pkg` directory origins: The old Go source code used to use `pkg` for its packages and then various Go projects in the community started copying the pattern (see [`this`](https://twitter.com/bradfitz/status/1039512487538970624) Brad Fitzpatrick's tweet for more context).
- ✅ **Bridge Service**: Complete and stable
- ✅ **Decoder Service**: Core functionality implemented
- ✅ **Location Algorithm**: Basic algorithm functional
- ✅ **Server Service**: API and WebSocket implementation
- 🚧 **Web Interface**: Basic UI, enhancements in progress
- 🚧 **Documentation**: Comprehensive documentation being created
- 📋 **Testing**: Automated tests being expanded

### `/vendor`
---

Application dependencies (managed manually or by your favorite dependency management tool like the new built-in [`Go Modules`](https://go.dev/wiki/Modules) feature). The `go mod vendor` command will create the `/vendor` directory for you. Note that you might need to add the `-mod=vendor` flag to your `go build` command if you are not using Go 1.14 where it's on by default.

Don't commit your application dependencies if you are building a library.

Note that since [`1.13`](https://golang.org/doc/go1.13#modules) Go also enabled the module proxy feature (using [`https://proxy.golang.org`](https://proxy.golang.org) as their module proxy server by default). Read more about it [`here`](https://blog.golang.org/module-mirror-launch) to see if it fits all of your requirements and constraints. If it does, then you won't need the `vendor` directory at all.

## Service Application Directories

### `/api`

OpenAPI/Swagger specs, JSON schema files, protocol definition files.

See the [`/api`](api/README.md) directory for examples.

## Web Application Directories

### `/web`

Web application specific components: static web assets, server side templates and SPAs.

## Common Application Directories

### `/configs`

Configuration file templates or default configs.

Put your `confd` or `consul-template` template files here.

### `/init`

System init (systemd, upstart, sysv) and process manager/supervisor (runit, supervisord) configs.

### `/scripts`

Scripts to perform various build, install, analysis, etc operations.

These scripts keep the root level Makefile small and simple (e.g., [`https://github.com/hashicorp/terraform/blob/main/Makefile`](https://github.com/hashicorp/terraform/blob/main/Makefile)).

See the [`/scripts`](scripts/README.md) directory for examples.

### `/build`

Packaging and Continuous Integration.

Put your cloud (AMI), container (Docker), OS (deb, rpm, pkg) package configurations and scripts in the `/build/package` directory.

Put your CI (travis, circle, drone) configurations and scripts in the `/build/ci` directory. Note that some of the CI tools (e.g., Travis CI) are very picky about the location of their config files. Try putting the config files in the `/build/ci` directory linking them to the location where the CI tools expect them (when possible).

### `/deployments`

IaaS, PaaS, system and container orchestration deployment configurations and templates (docker-compose, kubernetes/helm, terraform). Note that in some repos (especially apps deployed with kubernetes) this directory is called `/deploy`.

### `/test`

Additional external test apps and test data. Feel free to structure the `/test` directory anyway you want. For bigger projects it makes sense to have a data subdirectory. For example, you can have `/test/data` or `/test/testdata` if you need Go to ignore what's in that directory. Note that Go will also ignore directories or files that begin with "." or "_", so you have more flexibility in terms of how you name your test data directory.

See the [`/test`](test/README.md) directory for examples.

## Other Directories

### `/docs`

Design and user documents (in addition to your godoc generated documentation).

See the [`/docs`](docs/README.md) directory for examples.

### `/tools`

Supporting tools for this project. Note that these tools can import code from the `/pkg` and `/internal` directories.

See the [`/tools`](tools/README.md) directory for examples.

### `/examples`

Examples for your applications and/or public libraries.

See the [`/examples`](examples/README.md) directory for examples.

### `/third_party`

External helper tools, forked code and other 3rd party utilities (e.g., Swagger UI).

### `/githooks`

Git hooks.

### `/assets`

Other assets to go along with your repository (images, logos, etc).

### `/website`

This is the place to put your project's website data if you are not using GitHub pages.

See the [`/website`](website/README.md) directory for examples.

## Directories You Shouldn't Have

### `/src`

Some Go projects do have a `src` folder, but it usually happens when the devs came from the Java world where it's a common pattern. If you can help yourself try not to adopt this Java pattern. You really don't want your Go code or Go projects to look like Java :-)

Don't confuse the project level `/src` directory with the `/src` directory Go uses for its workspaces as described in [`How to Write Go Code`](https://golang.org/doc/code.html). The `$GOPATH` environment variable points to your (current) workspace (by default it points to `$HOME/go` on non-windows systems). This workspace includes the top level `/pkg`, `/bin` and `/src` directories. Your actual project ends up being a sub-directory under `/src`, so if you have the `/src` directory in your project the project path will look like this: `/some/path/to/workspace/src/your_project/src/your_code.go`. Note that with Go 1.11 it's possible to have your project outside of your `GOPATH`, but it still doesn't mean it's a good idea to use this layout pattern.


## Badges

* [Go Report Card](https://goreportcard.com/) - It will scan your code with `gofmt`, `go vet`, `gocyclo`, `golint`, `ineffassign`, `license` and `misspell`. Replace `github.com/golang-standards/project-layout` with your project reference.

[![Go Report Card](https://goreportcard.com/badge/github.com/golang-standards/project-layout?style=flat-square)](https://goreportcard.com/report/github.com/golang-standards/project-layout)

* ~~[GoDoc](http://godoc.org) - It will provide online version of your GoDoc generated documentation. Change the link to point to your project.~~

[![Go Doc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square)](http://godoc.org/github.com/golang-standards/project-layout)

* [Pkg.go.dev](https://pkg.go.dev) - Pkg.go.dev is a new destination for Go discovery & docs. You can create a badge using the [badge generation tool](https://pkg.go.dev/badge).

[![PkgGoDev](https://pkg.go.dev/badge/github.com/golang-standards/project-layout)](https://pkg.go.dev/github.com/golang-standards/project-layout)

* Release - It will show the latest release number for your project. Change the github link to point to your project.

[![Release](https://img.shields.io/github/release/golang-standards/project-layout.svg?style=flat-square)](https://github.com/golang-standards/project-layout/releases/latest)

## Notes

A more opinionated project template with sample/reusable configs, scripts and code is a WIP.
**AFA Systems Presence Detection** - Real-time BLE beacon tracking and location intelligence for modern IoT environments.

+ 113
- 0
build/docker-compose.yaml Datei anzeigen

@@ -0,0 +1,113 @@
version: "2"
services:
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "127.0.0.1:9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
depends_on:
- "kafka"
kafka:
image: apache/kafka:3.9.0
restart: "no"
ports:
# - "127.0.0.1:2181:2181"
- "127.0.0.1:9092:9092"
- "127.0.0.1:9093:9093"
healthcheck: # <-- ADD THIS BLOCK
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"]
interval: 10s
timeout: 5s
retries: 10
start_period: 20s
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092,CONTROLLER://127.0.0.1:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3

kafka-init:
image: apache/kafka:3.9.0
command: [ "/bin/bash", "-c", "chmod +x /tmp/create_topic.sh && /tmp/create_topic.sh"]
depends_on:
kafka:
condition: service_healthy
volumes:
- ./init-scripts/create_topic.sh:/tmp/create_topic.sh
environment:
- TOPIC_NAMES=topic1,topic2,topic3
valkey:
image: valkey/valkey:9.0.0
container_name: valkey
ports:
- "127.0.0.1:6379:6379"

presense-decoder:
build:
context: ../
dockerfile: build/package/Dockerfile.decoder
image: presense-decoder
container_name: presense-decoder
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always
presense-server:
build:
context: ../
dockerfile: build/package/Dockerfile.server
image: presense-server
container_name: presense-server
environment:
- VALKEY_URL=valkey:6379
- KAFKA_URL=kafka:29092
ports:
- "127.0.0.1:1902:1902"
depends_on:
- kafka-init
- valkey
restart: always

presense-bridge:
build:
context: ../
dockerfile: build/package/Dockerfile.bridge
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:29092
- MQTT_HOST=192.168.1.101:1883
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
depends_on:
- kafka-init
restart: always

presense-location:
build:
context: ../
dockerfile: build/package/Dockerfile.location
image: presense-location
container_name: presense-location
environment:
- KAFKA_URL=kafka:29092
depends_on:
- kafka-init
restart: always



+ 0
- 99
build/docker-compose.yml Datei anzeigen

@@ -1,99 +0,0 @@
services:
emqx:
image: emqx/emqx:5.8.8
container_name: emqx
environment:
- EMQX_DASHBOARD__DEFAULT_USERNAME=user
- EMQX_DASHBOARD__DEFAULT_PASSWORD=pass
ports:
- "127.0.0.1:1883:1883"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:18083/api/v5/status"]
interval: 10s
timeout: 5s
retries: 10
start_period: 20s

kafka:
image: apache/kafka:3.9.0
container_name: kafka
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 9092"]
interval: 10s
timeout: 5s
retries: 10
environment:
- KAFKA_NODE_ID=1
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_LOG_DIRS=/tmp/kraft-combined-logs
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
ports:
- "127.0.0.1:9092:9092"

valkey:
image: valkey/valkey:9.0.0
container_name: valkey
ports:
- "127.0.0.1:6379:6379"

node-red:
image: nodered/node-red:latest-22
container_name: node-red
ports:
- "127.0.0.1:1880:1880"
volumes:
- "../volumes/node-red:/data"

presense-decoder:
build:
context: ../
dockerfile: build/package/Dockerfile.decoder
network: host
image: presense-decoder
container_name: presense-decoder
environment:
- REDIS_URL=valkey:6379
- KAFKA_URL=kafka:9092
depends_on:
- kafka
- valkey
restart: always

presense-server:
build:
context: ../
dockerfile: build/package/Dockerfile.server
network: host
image: presense-server
container_name: presense-server
environment:
- REDIS_URL=valkey:6379
- KAFKA_URL=kafka:9092
depends_on:
- kafka
- emqx
ports:
- "127.0.0.1:1902:1902"
restart: always

presense-bridge:
build:
context: ../
dockerfile: build/package/Dockerfile.bridge
network: host
image: presense-bridge
container_name: presense-bridge
environment:
- KAFKA_URL=kafka:9092
- MQTT_HOST=192.168.1.101:1883
- MQTT_USERNAME=user
- MQTT_PASSWORD=pass
depends_on:
kafka:
condition: service_healthy
restart: always

+ 26
- 0
build/init-scripts/create_topic.sh Datei anzeigen

@@ -0,0 +1,26 @@
#!/bin/bash

# create topic rawbeacons
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic rawbeacons \
--partitions 1 --replication-factor 1

# create topic apibeacons
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic apibeacons \
--partitions 1 --replication-factor 1

# create topic alertBeacons
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic alertbeacons \
--partitions 1 --replication-factor 1

# create topic locevents
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic locevents \
--partitions 1 --replication-factor 1

# create topic settings
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \
--create --if-not-exists --topic settings \
--partitions 1 --replication-factor 1

+ 17
- 0
build/package/Dockerfile.location Datei anzeigen

@@ -0,0 +1,17 @@
# syntax=docker/dockerfile:1

FROM golang:1.24.0 AS builder
WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o location ./cmd/location

FROM alpine:latest
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /app/location .

ENTRYPOINT ["./location"]

+ 2
- 2
cmd/bridge/main.go Datei anzeigen

@@ -29,11 +29,11 @@ func main() {
})

if err != nil {
fmt.Println("Could not connect to MQTT broker")
fmt.Printf("Could not connect to MQTT broker, addr: %s", cfg.MQTTHost)
panic(err)
}

fmt.Println("Successfuly connected to MQTT broker")
fmt.Printf("Successfuly connected to MQTT broker, addr: %s", cfg.MQTTHost)

writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons")
defer writer.Close()


+ 93
- 181
cmd/decoder/main.go Datei anzeigen

@@ -1,231 +1,143 @@
package main

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"strconv"
"time"

"io"
"log"
"log/slog"
"os"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/common/utils"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
presenseredis "github.com/AFASystems/presence/internal/pkg/redis"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

// Move Kafka topics, Redis keys, intervals to env config
// Replace hardcoded IPs with env vars
// avoid defers -> lock and unlock right before and after usage
// Distance formula uses twos_comp incorrectly should parse signed int not hex string
// Use buffered log instead of fmt.Println ???
// Limit metrics slice size with ring buffer ??
// handle go routine exit signals with context.WithCancel() ??

// Make internal package for Kafka and Redis
// Make internal package for processor:
// Helper functions: twos_comp, getBeaconId
var wg sync.WaitGroup

func main() {
// Load global context to init beacons and latest list
appCtx := model.AppContext{
Beacons: model.BeaconsList{
Beacons: make(map[string]model.Beacon),
},
LatestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon),
},
Settings: model.Settings{
Settings: model.SettingsVal{
Location_confidence: 4,
Last_seen_threshold: 15,
Beacon_metrics_size: 30,
HA_send_interval: 5,
HA_send_changes_only: false,
},
},
}

appState := appcontext.NewAppState()
cfg := config.Load()

// Kafka writer idk why yet
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons")
defer writer.Close()

// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
defer rawReader.Close()

// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
defer apiReader.Close()

// Kafka reader for latest list updates
latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
defer latestReader.Close()

// Kafka reader for settings updates
settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID")
defer settingsReader.Close()

ctx := context.Background()

// Init Redis Client
client := redis.NewClient(&redis.Options{
Addr: cfg.RedisURL,
Password: "",
})

beaconsList := presenseredis.LoadBeaconsList(client, ctx)
appCtx.Beacons.Beacons = beaconsList
// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

latestList := presenseredis.LoadLatestList(client, ctx)
appCtx.LatestList.LatestList = latestList
// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

settings := presenseredis.LoadSettings(client, ctx)
appCtx.Settings.Settings = settings
rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")

// declare channel for collecting Kafka messages
chRaw := make(chan model.Incoming_json, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chLatest := make(chan model.Incoming_json, 2000)
chSettings := make(chan model.SettingsVal, 10)
alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")

go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
go kafkaclient.Consume(latestReader, chLatest)
go kafkaclient.Consume(settingsReader, chSettings)
slog.Info("Decoder initialized, subscribed to Kafka topics")

go func() {
// Syncing Redis cache every 1s with 2 lists: beacons, latest list
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 200)

for range ticker.C {
presenseredis.SaveBeaconsList(&appCtx, client, ctx)
presenseredis.SaveLatestList(&appCtx, client, ctx)
presenseredis.SaveSettings(&appCtx, client, ctx)
}
}()
wg.Add(2)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)

eventloop:
for {
select {
case <-ctx.Done():
break eventloop
case msg := <-chRaw:
processIncoming(msg, &appCtx)
processIncoming(msg, appState, alertWriter)
case msg := <-chApi:
switch msg.Method {
case "POST":
appCtx.Beacons.Lock.Lock()
appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
id := msg.Beacon.ID
appState.AddBeaconToLookup(id)
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
case "DELETE":
_, exists := appCtx.Beacons.Beacons[msg.ID]
if exists {
appCtx.Beacons.Lock.Lock()
delete(appCtx.Beacons.Beacons, msg.ID)
}
default:
fmt.Println("unknown method: ", msg.Method)
id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
}
appCtx.Beacons.Lock.Unlock()
case msg := <-chLatest:
fmt.Println("latest msg: ", msg)
case msg := <-chSettings:
appCtx.Settings.Lock.Lock()
appCtx.Settings.Settings = msg
fmt.Println("settings channel: ", msg)
appCtx.Settings.Lock.Unlock()
}
}
}

func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) {
defer func() {
if err := recover(); err != nil {
fmt.Println("work failed:", err)
}
}()

fmt.Println("message came")

incoming = mqttclient.IncomingBeaconFilter(incoming)
id := mqttclient.GetBeaconID(incoming)
now := time.Now().Unix()
slog.Info("broken out of the main event loop")
wg.Wait()

beacons := &ctx.Beacons

beacons.Lock.Lock()
defer beacons.Lock.Unlock()

latestList := &ctx.LatestList

latestList.Lock.Lock()
defer latestList.Lock.Unlock()
slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}

beacon, exists := beacons.Beacons[id]
if !exists {
x, exists := latestList.LatestList[id]
if exists {
x.Last_seen = now
x.Incoming_JSON = incoming
x.Distance = getBeaconDistance(incoming)
latestList.LatestList[id] = x
} else {
latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)}
}
// Move this to seperate routine?
for k, v := range latestList.LatestList {
if (now - v.Last_seen) > 10 {
delete(latestList.LatestList, k)
}
}
func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) {
id := adv.MAC
ok := appState.BeaconExists(id)
if !ok {
return
}

updateBeacon(&beacon, incoming)
beacons.Beacons[id] = beacon
err := decodeBeacon(adv, appState, writer)
if err != nil {
eMsg := fmt.Sprintf("Error in decoding: %v", err)
fmt.Println(eMsg)
return
}
}

func getBeaconDistance(incoming model.Incoming_json) float64 {
rssi := incoming.RSSI
power := incoming.TX_power
distance := 100.0
func decodeBeacon(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) error {
beacon := strings.TrimSpace(adv.Data)
id := adv.MAC
if beacon == "" {
return nil
}

ratio := float64(rssi) * (1.0 / float64(twos_comp(power)))
if ratio < 1.0 {
distance = math.Pow(ratio, 10)
} else {
distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
b, err := hex.DecodeString(beacon)
if err != nil {
return err
}
return distance
}

func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) {
now := time.Now().Unix()
b = utils.RemoveFlagBytes(b)

indeces := utils.ParseADFast(b)
event := utils.LoopADStructures(b, indeces, id)

beacon.Incoming_JSON = incoming
beacon.Last_seen = now
beacon.Beacon_type = incoming.Beacon_type
beacon.HB_ButtonCounter = incoming.HB_ButtonCounter
beacon.HB_Battery = incoming.HB_Battery
beacon.HB_RandomNonce = incoming.HB_RandomNonce
beacon.HB_ButtonMode = incoming.HB_ButtonMode
if event.ID == "" {
return nil
}

if beacon.Beacon_metrics == nil {
beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now
prevEvent, ok := appState.GetBeaconEvent(id)
appState.UpdateBeaconEvent(id, event)
if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
return nil
}

metric := model.BeaconMetric{}
metric.Distance = getBeaconDistance(incoming)
metric.Timestamp = now
metric.Rssi = int64(incoming.RSSI)
metric.Location = incoming.Hostname
beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric)
eMsg, err := event.ToJSON()
if err != nil {
return err
}

// Leave the HB button implementation for now
}
if err := writer.WriteMessages(context.Background(), kafka.Message{Value: eMsg}); err != nil {
return err
}

func twos_comp(inp string) int64 {
i, _ := strconv.ParseInt("0x"+inp, 0, 64)
return i - 256
return nil
}

+ 224
- 47
cmd/location/main.go Datei anzeigen

@@ -4,68 +4,245 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/common/utils"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func main() {
ctx := context.Background()
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
}
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()

func getLikelyLocations(client *redis.Client, ctx context.Context) {
beaconsList, err := client.Get(ctx, "beaconsList").Result()
var beacons = make(map[string]model.Beacon)
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
json.Unmarshal([]byte(beaconsList), &beacons)
// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

// Define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
settingsReader := appState.AddKafkaReader(cfg.KafkaURL, "settings", "gid-settings-loc")

writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")

slog.Info("Locations algorithm initialized, subscribed to Kafka topics")

locTicker := time.NewTicker(1 * time.Second)
defer locTicker.Stop()

chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 2000)
chSettings := make(chan model.SettingsVal, 5)

wg.Add(3)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
go kafkaclient.Consume(settingsReader, chSettings, ctx, &wg)

eventLoop:
for {
select {
case <-ctx.Done():
break eventLoop
case <-locTicker.C:
getLikelyLocations(appState, writer)
case msg := <-chRaw:
assignBeaconToList(msg, appState)
case msg := <-chApi:
switch msg.Method {
case "POST":
id := msg.Beacon.ID
lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
slog.Info(lMsg)
appState.AddBeaconToLookup(id)
case "DELETE":
id := msg.Beacon.ID
appState.RemoveBeaconFromLookup(id)
lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
slog.Info(lMsg)
}
case msg := <-chSettings:
appState.UpdateSettings(msg)
}
}

for id, beacon := range beacons {
if len(beacon.Beacon_metrics) == 0 {
slog.Info("broken out of the main event loop")
wg.Wait()

slog.Info("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}

func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
beacons := appState.GetAllBeacons()
settings := appState.GetSettingsValue()

for _, beacon := range beacons {
// Shrinking the model because other properties have nothing to do with the location
r := model.HTTPLocation{
Method: "Standard",
Distance: 999,
ID: beacon.ID,
Location: "",
LastSeen: 999,
}

mSize := len(beacon.BeaconMetrics)

if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold {
slog.Warn("beacon is too old")
continue
}

if isExpired(&beacon, settings) {
handleExpiredBeacon(&beacon, cl, ctx)
locList := make(map[string]float64)
seenW := 1.5
rssiW := 0.75
for _, metric := range beacon.BeaconMetrics {
res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0)))
locList[metric.Location] += res
}

bestLocName := ""
maxScore := 0.0
for locName, score := range locList {
if score > maxScore {
maxScore = score
bestLocName = locName
}
}

if bestLocName == beacon.PreviousLocation {
beacon.LocationConfidence++
} else {
beacon.LocationConfidence = 0
}

r.Distance = beacon.BeaconMetrics[mSize-1].Distance
r.Location = bestLocName
r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp

if beacon.LocationConfidence == settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName {
beacon.LocationConfidence = 0

// Why do I need this if I am sending entire structure anyways? who knows
js, err := json.Marshal(model.LocationChange{
Method: "LocationChange",
BeaconRef: beacon,
Name: beacon.Name,
PreviousLocation: beacon.PreviousConfidentLocation,
NewLocation: bestLocName,
Timestamp: time.Now().Unix(),
})

if err != nil {
eMsg := fmt.Sprintf("Error in marshaling: %v", err)
slog.Error(eMsg)
beacon.PreviousConfidentLocation = bestLocName
beacon.PreviousLocation = bestLocName
appState.UpdateBeacon(beacon.ID, beacon)
continue
}

msg := kafka.Message{
Value: js,
}

err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in sending Kafka message")
}
}

beacon.PreviousLocation = bestLocName
appState.UpdateBeacon(beacon.ID, beacon)

js, err := json.Marshal(r)
if err != nil {
eMsg := fmt.Sprintf("Error in marshaling location: %v", err)
slog.Error(eMsg)
continue
}

best := calculateBestLocation(&beacon)
updateBeaconState(&beacon, best, settings, ctx, cl)
msg := kafka.Message{
Value: js,
}

appendHTTPResult(ctx, beacon, best)
ctx.Beacons.Beacons[id] = beacon
err = writer.WriteMessages(context.Background(), msg)
if err != nil {
eMsg := fmt.Sprintf("Error in sending Kafka message: %v", err)
slog.Error(eMsg)
}
}
}

// get likely locations:
/*
1. Locks the http_results list
2. inits list to empty struct type -> TODO: what is this list used for
3. loops through beacons list -> should be locked?
4. check for beacon metrics -> how do you get beacon metrics, I guess it has an array of timestamps
5. check for threshold value in the settings
5.1. check for property expired location
5.2. if location is not expired -> mark it as expired, generate message and send to all clients,
if clients do not respond close the connection
6. Init best location with type Best_location{} -> what is this type
7. make locations list -> key: string, val: float64
7.1 set weight for seen and rssi
7.2 loop over metrics of the beacon -> some alogirthm based on location value

I think the algorithm is recording names of different gateways and their rssi's and then from
that it checks gateway name and makes decisions based on calculated values

7.3 writes result in best location and updates list location history with this name if the list
is longer than 10 elements it removes the first element


*/
func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) {
id := adv.MAC
ok := appState.BeaconExists(id)
now := time.Now().Unix()

if !ok {
appState.UpdateLatestBeacon(id, model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: utils.CalculateDistance(adv)})
return
}

settings := appState.GetSettingsValue()

if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
slog.Info("Settings returns")
return
}

beacon, ok := appState.GetBeacon(id)
if !ok {
beacon = model.Beacon{
ID: id,
}
}

beacon.IncomingJSON = adv
beacon.LastSeen = now

if beacon.BeaconMetrics == nil {
beacon.BeaconMetrics = make([]model.BeaconMetric, 0, settings.BeaconMetricSize)
}

metric := model.BeaconMetric{
Distance: utils.CalculateDistance(adv),
Timestamp: now,
RSSI: int64(adv.RSSI),
Location: adv.Hostname,
}

if len(beacon.BeaconMetrics) >= settings.BeaconMetricSize {
copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:])
beacon.BeaconMetrics[settings.BeaconMetricSize-1] = metric
} else {
beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric)
}

appState.UpdateBeacon(id, beacon)
}

+ 0
- 0
cmd/presenSe/.keep Datei anzeigen


+ 0
- 188
cmd/presenSe/presense.go Datei anzeigen

@@ -1,188 +0,0 @@
package main

import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/httpserver"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
"github.com/AFASystems/presence/internal/pkg/persistence"
"github.com/boltdb/bolt"
"github.com/gorilla/websocket"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)

func main() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, os.Interrupt)
cfg := config.Load()

fmt.Println("hello world")

db, err := bolt.Open("presence.db", 0644, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()

model.Db = db

cli := client.New(&client.Options{
ErrorHandler: func(err error) {
fmt.Println(err)
},
})

defer cli.Terminate()

fmt.Println("host: ", cfg.MQTTHost, " Client ID: ", cfg.MQTTClientID, "user: ", cfg.MQTTUser)

err = cli.Connect(&client.ConnectOptions{
Network: "tcp",
Address: cfg.MQTTHost,
ClientID: []byte(cfg.MQTTClientID),
UserName: []byte(cfg.MQTTUser),
Password: []byte(cfg.MQTTPass),
})

if err != nil {
fmt.Println("Error comes from here")
panic(err)
}

ctx := &model.AppContext{
HTTPResults: model.HTTPResultsList{
HTTPResults: model.HTTPLocationsList{Beacons: []model.HTTPLocation{}},
},
Beacons: model.BeaconsList{
Beacons: make(map[string]model.Beacon),
},
ButtonsList: make(map[string]model.Button),
Settings: model.Settings{
Location_confidence: 4,
Last_seen_threshold: 15,
Beacon_metrics_size: 30,
HA_send_interval: 5,
HA_send_changes_only: false,
},
Clients: make(map[*websocket.Conn]bool),
Broadcast: make(chan model.Message, 100),
Locations: model.LocationsList{Locations: make(map[string]model.Location)},
LatestList: model.LatestBeaconsList{LatestList: make(map[string]model.Beacon)},
}

persistence.LoadState(model.Db, ctx)
incomingChan := mqttclient.IncomingMQTTProcessor(1*time.Second, cli, model.Db, ctx)

err = cli.Subscribe(&client.SubscribeOptions{
SubReqs: []*client.SubReq{
&client.SubReq{
TopicFilter: []byte("publish_out/#"),
QoS: mqtt.QoS0,
Handler: func(topicName, message []byte) {
msgStr := string(message)
t := strings.Split(string(topicName), "/")
hostname := t[1]
fmt.Println("hostname: ", hostname)

if strings.HasPrefix(msgStr, "[") {
var readings []model.RawReading
err := json.Unmarshal(message, &readings)
if err != nil {
log.Printf("Error parsing JSON: %v", err)
return
}

for _, reading := range readings {
if reading.Type == "Gateway" {
continue
}
incoming := model.Incoming_json{
Hostname: hostname,
MAC: reading.MAC,
RSSI: int64(reading.RSSI),
Data: reading.RawData,
HB_ButtonCounter: parseButtonState(reading.RawData),
}
incomingChan <- incoming
}
} else {
s := strings.Split(string(message), ",")
if len(s) < 6 {
log.Printf("Messaggio CSV non valido: %s", msgStr)
return
}

rawdata := s[4]
buttonCounter := parseButtonState(rawdata)
if buttonCounter > 0 {
incoming := model.Incoming_json{}
i, _ := strconv.ParseInt(s[3], 10, 64)
incoming.Hostname = hostname
incoming.Beacon_type = "hb_button"
incoming.MAC = s[1]
incoming.RSSI = i
incoming.Data = rawdata
incoming.HB_ButtonCounter = buttonCounter

read_line := strings.TrimRight(string(s[5]), "\r\n")
it, err33 := strconv.Atoi(read_line)
if err33 != nil {
fmt.Println(it)
fmt.Println(err33)
os.Exit(2)
}
incomingChan <- incoming
}
}
},
},
},
})
if err != nil {
panic(err)
}

fmt.Println("CONNECTED TO MQTT")
fmt.Println("\n ")
fmt.Println("Visit http://" + cfg.HTTPAddr + " on your browser to see the web interface")
fmt.Println("\n ")

go httpserver.StartHTTPServer(cfg.HTTPAddr, ctx)

<-sigc

if err := cli.Disconnect(); err != nil {
panic(err)
}
}

func parseButtonState(raw string) int64 {
raw = strings.ToUpper(raw)

if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 {
buttonField := raw[34:38]
if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil {
return buttonValue
}
}

if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 {
counterField := raw[22:24]
buttonState, err := strconv.ParseInt(counterField, 16, 64)
if err == nil {
return buttonState
}
}

return 0
}

+ 127
- 260
cmd/server/main.go Datei anzeigen

@@ -4,307 +4,187 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/config"
"github.com/AFASystems/presence/internal/pkg/controller"
"github.com/AFASystems/presence/internal/pkg/kafkaclient"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/service"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

func main() {
HttpServer("0.0.0.0:1902")
}

func HttpServer(addr string) {
headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

// Kafka writer that relays messages
writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons")
defer writer.Close()

settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings")
defer settingsWriter.Close()

// Define if maybe ws writer should have more topics
wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages")
defer wsWriter.Close()

r := mux.NewRouter()

client := redis.NewClient(&redis.Options{
Addr: "valkey:6379",
Password: "",
})

// declare WS clients list | do I need it though? or will locations worker send message
// to kafka and then only this service (server) is being used for communication with the clients
clients := make(map[*websocket.Conn]bool)

// Declare broadcast channel
broadcast := make(chan model.Message)
var _ io.Writer = (*os.File)(nil)

// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
var wg sync.WaitGroup

r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")

// Handler for WS messages
// No point in having seperate route for each message type, better to handle different message types in one connection
r.HandleFunc("/ws/api/beacons", serveWs(client))
r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))

http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
}
func main() {
cfg := config.Load()
appState := appcontext.NewAppState()

// Probably define value as interface and then reuse this writer in all of the functions
func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
valueStr, err := json.Marshal(&value)
// Create log file
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
fmt.Println("error in encoding: ", err)
return false
}
msg := kafka.Message{
Value: valueStr,
log.Fatalf("Failed to open log file: %v\n", err)
}
// shell and log file multiwriter
w := io.MultiWriter(os.Stderr, logFile)
logger := slog.New(slog.NewJSONHandler(w, nil))
slog.SetDefault(logger)

err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in sending kafka message: ")
return false
}
// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

return true
}
headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
beaconId := vars["beacon_id"]
apiUpdate := model.ApiUpdate{
Method: "DELETE",
ID: beaconId,
}
writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
slog.Info("Kafka writers topics: apibeacons, settings initialized")

flag := sendKafkaMessage(writer, &apiUpdate)
if !flag {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}
locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
slog.Info("Kafka readers topics: locevents, alertbeacons initialized")

w.Write([]byte("ok"))
}
}
client := appState.AddValkeyClient(cfg.ValkeyURL)
slog.Info("Valkey DB client created")

func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon
err := decoder.Decode(&inBeacon)
chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500)

if err != nil {
http.Error(w, err.Error(), 400)
return
}
wg.Add(2)
go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
}
r := mux.NewRouter()

apiUpdate := model.ApiUpdate{
Method: "POST",
Beacon: inBeacon,
}
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST")
r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT")

flag := sendKafkaMessage(writer, &apiUpdate)
if !flag {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")

wsHandler := http.HandlerFunc(serveWs(appState, ctx))
restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
wsHandler.ServeHTTP(w, r)
return
}

w.Write([]byte("ok"))
}
}

func beaconsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
beaconsList, err := client.Get(context.Background(), "beaconsList").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(beaconsList))
}
}
}
restApiHandler.ServeHTTP(w, r)
})

func settingsListHandler(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
settings, err := client.Get(context.Background(), "settings").Result()
if err == redis.Nil {
fmt.Println("no settings persisted, starting empty")
http.Error(w, "list is empty", 500)
} else if err != nil {
http.Error(w, "Internal server error", 500)
panic(err)
} else {
w.Write([]byte(settings))
}
server := http.Server{
Addr: cfg.HTTPAddr,
Handler: mainHandler,
}
}

func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inSettings model.SettingsVal
if err := decoder.Decode(&inSettings); err != nil {
http.Error(w, err.Error(), 400)
fmt.Println("Error in decoding Settings body: ", err)
return
}
go server.ListenAndServe()

if !settingsCheck(inSettings) {
http.Error(w, "values must be greater than 0", 400)
fmt.Println("settings values must be greater than 0")
return
}

valueStr, err := json.Marshal(&inSettings)
if err != nil {
http.Error(w, "Error in encoding settings", 500)
fmt.Println("Error in encoding settings: ", err)
return
eventLoop:
for {
select {
case <-ctx.Done():
break eventLoop
case msg := <-chLoc:
if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
slog.Error(eMsg)
}
case msg := <-chEvents:
if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
slog.Error(eMsg)
}
}
}

msg := kafka.Message{
Value: valueStr,
}
if err := server.Shutdown(context.Background()); err != nil {
eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
slog.Error(eMsg)
}

if err := writer.WriteMessages(context.Background(), msg); err != nil {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}
slog.Info("API SERVER: \n")
slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
wg.Wait()

w.Write([]byte("ok"))
}
}
slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()

func settingsCheck(settings model.SettingsVal) bool {
if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 {
return false
}
slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
appState.CleanValkeyClient()

return true
slog.Info("API server shutting down")
logFile.Close()
}

func serveWs(client *redis.Client) http.HandlerFunc {
func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
slog.Error(eMsg)
}
return
}
go writer(ws, client)
reader(ws)
wg.Add(2)
go writer(ws, appstate, ctx)
go reader(ws, ctx)
}
}

func writer(ws *websocket.Conn, client *redis.Client) {
pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
beaconTicker := time.NewTicker(2 * time.Second)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
wg.Done()
}()
for {
select {
case <-ctx.Done():
slog.Info("WebSocket writer received shutdown signal.")
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return
case <-beaconTicker.C:
httpresults, err := client.Get(context.Background(), "httpresults").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
return
}
beacons := appstate.GetAllBeacons()
js, err := json.Marshal(beacons)
if err != nil {
js = []byte("error")
}
case <-pingTicker.C:

ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
return
}
}
}
}

func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
}
return
}

go latestBeaconWriter(ws, client)
reader(ws)
}
}

// This and writer can be refactored in one function
func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
beaconTicker := time.NewTicker(2 * time.Second)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
}()
for {
select {
case <-beaconTicker.C:
latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
return
}
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
@@ -314,37 +194,24 @@ func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
}
}

func reader(ws *websocket.Conn) {
defer ws.Close()
func reader(ws *websocket.Conn, ctx context.Context) {
defer func() {
ws.Close()
wg.Done()
}()
ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
}
}
}

func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
}
defer ws.Close()
clients[ws] = true

for {
var msg model.Message
err := ws.ReadJSON(&msg)
select {
case <-ctx.Done():
slog.Info("closing ws reader")
return
default:
_, _, err := ws.ReadMessage()
if err != nil {
log.Printf("error: %v", err)
delete(clients, ws)
break
return
}
broadcast <- msg
}
}
}

+ 3238
- 0
cmd/testbench/debug.txt
Datei-Diff unterdrückt, da er zu groß ist
Datei anzeigen


+ 86
- 0
cmd/testbench/main.go Datei anzeigen

@@ -0,0 +1,86 @@
package main

import (
"bufio"
"encoding/hex"
"fmt"
"log"
"os"
"strings"
)

func main() {
file, err := os.Open("save.txt")
if err != nil {
log.Fatalf("Failed to open file: %s", err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
decodeBeacon(line)
}
}

func decodeBeacon(beacon string) {
beacon = strings.TrimSpace(beacon)
if beacon == "" {
return
}

// convert to bytes for faster operations
b, err := hex.DecodeString(beacon)
if err != nil {
fmt.Println("invalid line: ", beacon)
return
}

// remove flag bytes - they hold no structural information
if len(b) > 1 && b[1] == 0x01 {
l := int(b[0])
if 1+l <= len(b) {
b = b[1+l:]
}
}

adBlockIndeces := parseADFast(b)
for _, r := range adBlockIndeces {
ad := b[r[0]:r[1]]
if len(ad) >= 4 &&
ad[1] == 0x16 &&
ad[2] == 0xAA &&
ad[3] == 0xFE {
// fmt.Println("Eddystone:", hex.EncodeToString(b))
return
}
if len(ad) >= 7 &&
ad[1] == 0xFF &&
ad[2] == 0x4C && ad[3] == 0x00 &&
ad[4] == 0x02 && ad[5] == 0x15 {
// fmt.Println("iBeacon:", hex.EncodeToString(b))
return
}

}

fmt.Println(hex.EncodeToString(b))
}

func parseADFast(b []byte) [][2]int {
var res [][2]int
i := 0

for i < len(b) {
l := int(b[i])
if l == 0 || i+1+l > len(b) {
break
}

res = append(res, [2]int{i, i + 1 + l})

i += 1 + l
}

return res
}

+ 3488
- 0
cmd/testbench/save.txt
Datei-Diff unterdrückt, da er zu groß ist
Datei anzeigen


+ 131
- 0
cmd/valkey-testbench/main.go Datei anzeigen

@@ -0,0 +1,131 @@
package main

import (
"context"
"encoding/json"
"fmt"
"reflect"

"github.com/redis/go-redis/v9"
)

type Per struct {
Name string `json:"name"`
Age int `json:"age"`
}

type Beacon struct {
ID string `json:"id"` // Use JSON tags to ensure correct field names
Type string `json:"type"`
Temp int `json:"temp"`
Name string `json:"name"`
}

func ConvertStructToMap(obj any) (map[string]any, error) {
// 1. Marshal the struct into a JSON byte slice
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}

// 2. Unmarshal the JSON byte slice into the map
var result map[string]any
err = json.Unmarshal(data, &result)
if err != nil {
return nil, err
}

return result, nil
}

// func main() { ... }
// client.HSet(ctx, "beacon:123", resultMap).Err()

func main() {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})

ctx := context.Background()

err := client.Set(ctx, "testkey", "hello world", 0).Err()
if err != nil {
fmt.Println("Ok")
}

val, err := client.Get(ctx, "testkey").Result()
if err != nil {
fmt.Println("Ok")
}
fmt.Println(val)

err = client.SAdd(ctx, "myset", "b-1").Err()
if err != nil {
fmt.Println(err)
}

res, err := client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)

err = client.SAdd(ctx, "myset", "b-2").Err()
if err != nil {
fmt.Println(err)
}

res, err = client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)

err = client.SAdd(ctx, "myset", "b-1").Err()
if err != nil {
fmt.Println(err)
}

res, err = client.SMembers(ctx, "myset").Result()
if err != nil {
fmt.Println(err)
}
fmt.Println("res1: ", res)
fmt.Println("type: ", reflect.TypeOf(res))

// b := Beacon{
// ID: "hello",
// Type: "node",
// Temp: 10,
// Name: "Peter",
// }

// per := Per{
// Name: "Janez",
// Age: 10,
// }

// bEncoded, err := ConvertStructToMap(b)
// if err != nil {
// fmt.Print("error\n")
// }

// perEncoded, err := ConvertStructToMap(per)
// if err != nil {
// fmt.Print("error\n")
// }

// err = client.HSet(ctx, "myhash", bEncoded).Err()
// fmt.Println(err)

// res, _ := client.HGetAll(ctx, "myhash").Result()
// fmt.Println(res)

// err = client.HSet(ctx, "myhash", perEncoded).Err()
// fmt.Println(err)

// res, _ = client.HGetAll(ctx, "myhash").Result()
// fmt.Println(res)

}

+ 748
- 0
docs/API.md Datei anzeigen

@@ -0,0 +1,748 @@
# API Documentation

## Overview

The AFA Systems Presence Detection API provides RESTful endpoints for managing beacons, settings, and real-time WebSocket communication for live updates.

## Base URL

```
http://localhost:8080
```

## Authentication

Currently, the API does not implement authentication. This should be added for production deployments.

## REST API Endpoints

### Beacon Management

#### Get All Beacons
Retrieves a list of all registered beacons with their current status and location information.

```http
GET /api/beacons
```

**Response:**
```json
{
"beacons": [
{
"name": "Conference Room Beacon",
"beacon_id": "beacon_001",
"beacon_type": "ingics",
"beacon_location": "conference_room",
"last_seen": 1703078400,
"distance": 2.5,
"location_confidence": 85,
"hs_button_counter": 42,
"hs_button_battery": 85,
"hs_button_random": "abc123",
"hs_button_mode": "normal"
}
]
}
```

#### Create Beacon
Registers a new beacon in the system.

```http
POST /api/beacons
Content-Type: application/json
```

**Request Body:**
```json
{
"name": "Meeting Room Beacon",
"beacon_id": "beacon_002",
"beacon_type": "eddystone",
"beacon_location": "meeting_room",
"hs_button_counter": 0,
"hs_button_battery": 100
}
```

**Response:**
```json
{
"message": "Beacon created successfully",
"beacon": {
"name": "Meeting Room Beacon",
"beacon_id": "beacon_002",
"beacon_type": "eddystone",
"beacon_location": "meeting_room",
"last_seen": 0,
"distance": 0,
"location_confidence": 0,
"hs_button_counter": 0,
"hs_button_battery": 100
}
}
```

#### Update Beacon
Updates an existing beacon's information.

```http
PUT /api/beacons/{id}
Content-Type: application/json
```

**Path Parameters:**
- `id` (string): The beacon ID to update

**Request Body:**
```json
{
"name": "Updated Conference Room Beacon",
"beacon_location": "main_conference",
"location_confidence": 90
}
```

**Response:**
```json
{
"message": "Beacon updated successfully",
"beacon": {
"name": "Updated Conference Room Beacon",
"beacon_id": "beacon_001",
"beacon_type": "ingics",
"beacon_location": "main_conference",
"last_seen": 1703078400,
"distance": 2.5,
"location_confidence": 90,
"hs_button_counter": 42,
"hs_button_battery": 85
}
}
```

#### Delete Beacon
Removes a beacon from the system.

```http
DELETE /api/beacons/{id}
```

**Path Parameters:**
- `id` (string): The beacon ID to delete

**Response:**
```json
{
"message": "Beacon deleted successfully"
}
```

### Settings Management

#### Get System Settings
Retrieves current system configuration settings.

```http
GET /api/settings
```

**Response:**
```json
{
"settings": {
"location_confidence": 80,
"last_seen_threshold": 300,
"beacon_metrics_size": 10,
"ha_send_interval": 60,
"ha_send_changes_only": true,
"rssi_min_threshold": -90,
"enforce_rssi_threshold": true
}
}
```

#### Update System Settings
Updates system configuration settings.

```http
POST /api/settings
Content-Type: application/json
```

**Request Body:**
```json
{
"location_confidence": 85,
"last_seen_threshold": 600,
"beacon_metrics_size": 15,
"ha_send_interval": 30,
"rssi_min_threshold": -85
}
```

**Response:**
```json
{
"message": "Settings updated successfully",
"settings": {
"location_confidence": 85,
"last_seen_threshold": 600,
"beacon_metrics_size": 15,
"ha_send_interval": 30,
"ha_send_changes_only": true,
"rssi_min_threshold": -85,
"enforce_rssi_threshold": true
}
}
```

### Location Information

#### Get Beacon Locations
Retrieves current location information for all beacons.

```http
GET /api/locations
```

**Response:**
```json
{
"beacons": [
{
"method": "location_update",
"previous_confident_location": "reception",
"distance": 3.2,
"id": "beacon_001",
"location": "conference_room",
"last_seen": 1703078450
},
{
"method": "location_update",
"previous_confident_location": "office_a",
"distance": 1.8,
"id": "beacon_002",
"location": "meeting_room",
"last_seen": 1703078440
}
]
}
```

#### Get Specific Beacon Location
Retrieves location information for a specific beacon.

```http
GET /api/locations/{id}
```

**Path Parameters:**
- `id` (string): The beacon ID

**Response:**
```json
{
"method": "location_update",
"previous_confident_location": "reception",
"distance": 3.2,
"id": "beacon_001",
"location": "conference_room",
"last_seen": 1703078450
}
```

### Health Check

#### System Health
Check if the API server is running and basic systems are operational.

```http
GET /api/health
```

**Response:**
```json
{
"status": "healthy",
"timestamp": "2024-12-20T10:30:00Z",
"services": {
"database": "connected",
"kafka": "connected",
"redis": "connected"
}
}
```

## WebSocket API

### WebSocket Connection
Connect to the WebSocket endpoint for real-time updates.

```
ws://localhost:8080/ws/broadcast
```

### WebSocket Message Format

#### Beacon Update Notification
```json
{
"type": "beacon_update",
"data": {
"method": "location_update",
"beacon_info": {
"name": "Conference Room Beacon",
"beacon_id": "beacon_001",
"beacon_type": "ingics",
"distance": 2.5
},
"name": "conference_room",
"beacon_name": "Conference Room Beacon",
"previous_location": "reception",
"new_location": "conference_room",
"timestamp": 1703078450
}
}
```

#### Button Press Event
```json
{
"type": "button_event",
"data": {
"beacon_id": "beacon_001",
"button_counter": 43,
"button_mode": "normal",
"timestamp": 1703078460
}
}
```

#### Battery Alert
```json
{
"type": "battery_alert",
"data": {
"beacon_id": "beacon_002",
"battery_level": 15,
"alert_level": "warning",
"timestamp": 1703078470
}
}
```

#### Fall Detection Event
```json
{
"type": "fall_detection",
"data": {
"beacon_id": "beacon_001",
"event_type": "fall_detected",
"confidence": 92,
"timestamp": 1703078480
}
}
```

#### System Status Update
```json
{
"type": "system_status",
"data": {
"active_beacons": 12,
"total_locations": 8,
"kafka_status": "connected",
"redis_status": "connected",
"timestamp": 1703078490
}
}
```

## Data Models

### Beacon Model
```typescript
interface Beacon {
name: string;
beacon_id: string;
beacon_type: "ingics" | "eddystone" | "minew_b7" | "ibeacon";
beacon_location: string;
last_seen: number; // Unix timestamp
distance: number; // Distance in meters
previous_location?: string;
previous_confident_location?: string;
expired_location?: string;
location_confidence: number; // 0-100
location_history: string[];
beacon_metrics: BeaconMetric[];

// Handshake/Button specific fields
hs_button_counter: number;
hs_button_prev: number;
hs_button_battery: number;
hs_button_random: string;
hs_button_mode: string;
}
```

### BeaconMetric Model
```typescript
interface BeaconMetric {
location: string;
distance: number;
rssi: number;
timestamp: number;
}
```

### Settings Model
```typescript
interface Settings {
location_confidence: number; // Minimum confidence level (0-100)
last_seen_threshold: number; // Seconds before beacon considered offline
beacon_metrics_size: number; // Number of RSSI measurements to keep
ha_send_interval: number; // Home Assistant update interval (seconds)
ha_send_changes_only: boolean; // Only send updates on changes
rssi_min_threshold: number; // Minimum RSSI for detection
enforce_rssi_threshold: boolean; // Filter weak signals
}
```

### LocationChange Model
```typescript
interface LocationChange {
method: string; // "location_update" | "beacon_added" | "beacon_removed"
beacon_ref: Beacon; // Complete beacon information
name: string; // Beacon name
beacon_name: string; // Beacon name (duplicate)
previous_location: string; // Previous location
new_location: string; // New location
timestamp: number; // Unix timestamp
}
```

## Error Responses

### Standard Error Format
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid request data",
"details": {
"field": "beacon_id",
"reason": "Beacon ID is required"
}
}
}
```

### Common Error Codes

| Code | HTTP Status | Description |
|------|-------------|-------------|
| `VALIDATION_ERROR` | 400 | Request data validation failed |
| `NOT_FOUND` | 404 | Resource not found |
| `CONFLICT` | 409 | Resource already exists |
| `INTERNAL_ERROR` | 500 | Internal server error |
| `SERVICE_UNAVAILABLE` | 503 | Required service is unavailable |

### Validation Error Example
```http
POST /api/beacons
Content-Type: application/json
```

**Invalid Request:**
```json
{
"name": "",
"beacon_type": "invalid_type"
}
```

**Response:**
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid request data",
"details": {
"name": "Name cannot be empty",
"beacon_type": "Invalid beacon type. Must be one of: ingics, eddystone, minew_b7, ibeacon"
}
}
}
```

## Rate Limiting

Currently, the API does not implement rate limiting. Consider implementing rate limiting for production deployments:

- Suggested limits: 100 requests per minute per IP address
- WebSocket connections: Maximum 50 concurrent connections
- Consider authentication-based rate limiting

## CORS Configuration

The API server is configured with CORS enabled for development. Production deployments should restrict CORS origins to specific domains.

## Integration Examples

### JavaScript/TypeScript Client

```typescript
class PresenceAPIClient {
private baseURL: string;

constructor(baseURL: string = 'http://localhost:8080') {
this.baseURL = baseURL;
}

async getBeacons(): Promise<Beacon[]> {
const response = await fetch(`${this.baseURL}/api/beacons`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
return data.beacons;
}

async createBeacon(beacon: Partial<Beacon>): Promise<Beacon> {
const response = await fetch(`${this.baseURL}/api/beacons`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(beacon),
});

if (!response.ok) {
const error = await response.json();
throw new Error(error.error?.message || 'Failed to create beacon');
}

const data = await response.json();
return data.beacon;
}

connectWebSocket(onMessage: (message: any) => void): WebSocket {
const ws = new WebSocket(`${this.baseURL.replace('http', 'ws')}/ws/broadcast`);

ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
onMessage(message);
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};

ws.onerror = (error) => {
console.error('WebSocket error:', error);
};

ws.onclose = () => {
console.log('WebSocket connection closed');
};

return ws;
}
}

// Usage example
const client = new PresenceAPIClient();

// Get all beacons
const beacons = await client.getBeacons();
console.log('Active beacons:', beacons);

// Create a new beacon
const newBeacon = await client.createBeacon({
name: 'Test Beacon',
beacon_id: 'test_beacon_001',
beacon_type: 'eddystone',
beacon_location: 'test_room'
});

// Connect to WebSocket for real-time updates
const ws = client.connectWebSocket((message) => {
switch (message.type) {
case 'beacon_update':
console.log('Beacon location updated:', message.data);
break;
case 'button_event':
console.log('Button pressed:', message.data);
break;
case 'battery_alert':
console.log('Low battery warning:', message.data);
break;
}
});
```

### Python Client

```python
import requests
import websocket
import json
from typing import List, Dict, Any

class PresenceAPIClient:
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url
self.ws_url = base_url.replace("http", "ws")

def get_beacons(self) -> List[Dict[str, Any]]:
"""Get all registered beacons."""
response = requests.get(f"{self.base_url}/api/beacons")
response.raise_for_status()
data = response.json()
return data["beacons"]

def create_beacon(self, beacon_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new beacon."""
response = requests.post(
f"{self.base_url}/api/beacons",
json=beacon_data
)
response.raise_for_status()
data = response.json()
return data["beacon"]

def update_beacon(self, beacon_id: str, beacon_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing beacon."""
response = requests.put(
f"{self.base_url}/api/beacons/{beacon_id}",
json=beacon_data
)
response.raise_for_status()
data = response.json()
return data["beacon"]

def delete_beacon(self, beacon_id: str) -> None:
"""Delete a beacon."""
response = requests.delete(f"{self.base_url}/api/beacons/{beacon_id}")
response.raise_for_status()

def get_settings(self) -> Dict[str, Any]:
"""Get system settings."""
response = requests.get(f"{self.base_url}/api/settings")
response.raise_for_status()
return response.json()["settings"]

def update_settings(self, settings_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update system settings."""
response = requests.post(
f"{self.base_url}/api/settings",
json=settings_data
)
response.raise_for_status()
return response.json()["settings"]

# Usage example
client = PresenceAPIClient()

# Get all beacons
beacons = client.get_beacons()
print(f"Found {len(beacons)} beacons")

# Create a new beacon
new_beacon = client.create_beacon({
"name": "Python Test Beacon",
"beacon_id": "python_test_001",
"beacon_type": "eddystone",
"beacon_location": "python_room"
})
print(f"Created beacon: {new_beacon['name']}")

# Update settings
settings = client.update_settings({
"location_confidence": 85,
"ha_send_interval": 30
})
print(f"Updated settings: {settings}")
```

## Testing

### Unit Testing Example (Go)

```go
package api_test

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)

func TestGetBeacons(t *testing.T) {
// Setup test server
router := setupTestRouter()

req, _ := http.NewRequest("GET", "/api/beacons", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)

assert.Equal(t, http.StatusOK, w.Code)

var response map[string]interface{}
err := json.Unmarshal(w.Body.Bytes(), &response)
assert.NoError(t, err)
assert.Contains(t, response, "beacons")
}

func TestCreateBeacon(t *testing.T) {
router := setupTestRouter()

beaconData := map[string]interface{}{
"name": "Test Beacon",
"beacon_id": "test_001",
"beacon_type": "eddystone",
"beacon_location": "test_room",
}

jsonData, _ := json.Marshal(beaconData)
req, _ := http.NewRequest("POST", "/api/beacons", bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")

w := httptest.NewRecorder()
router.ServeHTTP(w, req)

assert.Equal(t, http.StatusCreated, w.Code)

var response map[string]interface{}
err := json.Unmarshal(w.Body.Bytes(), &response)
assert.NoError(t, err)
assert.Contains(t, response, "beacon")
}
```

## Security Considerations

For production deployments, consider implementing:

1. **Authentication**: JWT tokens or API key authentication
2. **Authorization**: Role-based access control (RBAC)
3. **Rate Limiting**: Prevent API abuse
4. **Input Validation**: Comprehensive input sanitization
5. **HTTPS**: TLS encryption for all API communications
6. **CORS**: Restrict origins to trusted domains
7. **Logging**: Comprehensive audit logging
8. **Security Headers**: Implement security HTTP headers

## API Versioning

The current API is version 1. Future versions will be:

- Version 1: `/api/v1/...` (current, implied)
- Version 2: `/api/v2/...` (future breaking changes)

Backward compatibility will be maintained within major versions.

+ 1039
- 0
docs/DEPLOYMENT.md
Datei-Diff unterdrückt, da er zu groß ist
Datei anzeigen


+ 22
- 21
internal/pkg/bridge/mqtthandler/mqtthandler.go Datei anzeigen

@@ -1,13 +1,13 @@
package mqtthandler

import (
"fmt"
"context"
"encoding/json"
"strings"
"fmt"
"log"
"strconv"
"os"
"context"
"strconv"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
@@ -30,17 +30,18 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
if reading.Type == "Gateway" {
continue
}
incoming := model.Incoming_json{
Hostname: hostname,
MAC: reading.MAC,
RSSI: int64(reading.RSSI),
Data: reading.RawData,
HB_ButtonCounter: parseButtonState(reading.RawData),
adv := model.BeaconAdvertisement{
Hostname: hostname,
MAC: reading.MAC,
RSSI: int64(reading.RSSI),
Data: reading.RawData,
HSButtonCounter: parseButtonState(reading.RawData),
}

encodedMsg, err := json.Marshal(incoming)
encodedMsg, err := json.Marshal(adv)
if err != nil {
fmt.Println("Error in marshaling: ", err)
break
}

msg := kafka.Message{
@@ -49,9 +50,9 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
err = writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println("Error in writing to Kafka: ", err)
time.Sleep(1 * time.Second)
break
}

fmt.Println("message sent: ", time.Now())
}
} else {
s := strings.Split(string(message), ",")
@@ -63,14 +64,14 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
rawdata := s[4]
buttonCounter := parseButtonState(rawdata)
if buttonCounter > 0 {
incoming := model.Incoming_json{}
adv := model.BeaconAdvertisement{}
i, _ := strconv.ParseInt(s[3], 10, 64)
incoming.Hostname = hostname
incoming.Beacon_type = "hb_button"
incoming.MAC = s[1]
incoming.RSSI = i
incoming.Data = rawdata
incoming.HB_ButtonCounter = buttonCounter
adv.Hostname = hostname
adv.BeaconType = "hb_button"
adv.MAC = s[1]
adv.RSSI = i
adv.Data = rawdata
adv.HSButtonCounter = buttonCounter

read_line := strings.TrimRight(string(s[5]), "\r\n")
it, err33 := strconv.Atoi(read_line)
@@ -102,4 +103,4 @@ func parseButtonState(raw string) int64 {
}

return 0
}
}

+ 291
- 0
internal/pkg/common/appcontext/context.go Datei anzeigen

@@ -0,0 +1,291 @@
package appcontext

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

// AppState provides centralized access to application state
type AppState struct {
beacons model.BeaconsList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]struct{}
latestList model.LatestBeaconsList
kafkaReadersList model.KafkaReadersList
kafkaWritersList model.KafkaWritersList
valkeyDB *redis.Client
}

// NewAppState creates a new application context AppState with default values
func NewAppState() *AppState {
return &AppState{
beacons: model.BeaconsList{
Beacons: make(map[string]model.Beacon),
},
settings: model.Settings{
Settings: model.SettingsVal{
LocationConfidence: 4,
LastSeenThreshold: 15,
BeaconMetricSize: 30,
HASendInterval: 5,
HASendChangesOnly: false,
RSSIEnforceThreshold: false,
RSSIMinThreshold: 100,
},
},
beaconEvents: model.BeaconEventList{
Beacons: make(map[string]model.BeaconEvent),
},
beaconsLookup: make(map[string]struct{}),
latestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon),
},
kafkaReadersList: model.KafkaReadersList{
KafkaReaders: make([]*kafka.Reader, 0),
},
kafkaWritersList: model.KafkaWritersList{
KafkaWriters: make([]*kafka.Writer, 0),
},
}
}

func (m *AppState) AddValkeyClient(url string) *redis.Client {
valkeyDB := redis.NewClient(&redis.Options{
Addr: url,
Password: "",
})

m.valkeyDB = valkeyDB
return valkeyDB
}

func (m *AppState) CleanValkeyClient() {
fmt.Println("shutdown of valkey client starts")
if err := m.valkeyDB.Close(); err != nil {
fmt.Println("Error in shuting down valkey client")
}

fmt.Println("Succesfully shutting down valkey client")
}

func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}

m.kafkaWritersList.KafkaWritersLock.Lock()
m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
m.kafkaWritersList.KafkaWritersLock.Unlock()

return kafkaWriter
}

func (m *AppState) CleanKafkaWriters() {
fmt.Println("shutdown of kafka readers starts")
for _, r := range m.kafkaWritersList.KafkaWriters {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka writer %v", err)
}
}

fmt.Println("Kafka writers graceful shutdown complete")
}

func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})

m.kafkaReadersList.KafkaReadersLock.Lock()
m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
m.kafkaReadersList.KafkaReadersLock.Unlock()

return kafkaReader
}

func (m *AppState) CleanKafkaReaders() {
for _, r := range m.kafkaReadersList.KafkaReaders {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka reader %v", err)
}
}

fmt.Println("Kafka readers graceful shutdown complete")
}

// GetBeacons returns thread-safe access to beacons list
func (m *AppState) GetBeacons() *model.BeaconsList {
return &m.beacons
}

// GetSettings returns thread-safe access to settings
func (m *AppState) GetSettings() *model.Settings {
return &m.settings
}

// GetBeaconEvents returns thread-safe access to beacon events
func (m *AppState) GetBeaconEvents() *model.BeaconEventList {
return &m.beaconEvents
}

// GetBeaconsLookup returns thread-safe access to beacon lookup map
func (m *AppState) GetBeaconsLookup() map[string]struct{} {
return m.beaconsLookup
}

// GetLatestList returns thread-safe access to latest beacons list
func (m *AppState) GetLatestList() *model.LatestBeaconsList {
return &m.latestList
}

// AddBeaconToLookup adds a beacon ID to the lookup map
func (m *AppState) AddBeaconToLookup(id string) {
m.beaconsLookup[id] = struct{}{}
}

// RemoveBeaconFromLookup removes a beacon ID from the lookup map
func (m *AppState) RemoveBeaconFromLookup(id string) {
delete(m.beaconsLookup, id)
}

func (m *AppState) RemoveBeacon(id string) {
m.beacons.Lock.Lock()
delete(m.beacons.Beacons, id)
m.beacons.Lock.Unlock()
}

// BeaconExists checks if a beacon exists in the lookup
func (m *AppState) BeaconExists(id string) bool {
_, exists := m.beaconsLookup[id]
return exists
}

// GetBeacon returns a beacon by ID (thread-safe)
func (m *AppState) GetBeacon(id string) (model.Beacon, bool) {
m.beacons.Lock.RLock()
defer m.beacons.Lock.RUnlock()

beacon, exists := m.beacons.Beacons[id]
return beacon, exists
}

// UpdateBeacon updates a beacon in the list (thread-safe)
func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) {
m.beacons.Lock.Lock()
defer m.beacons.Lock.Unlock()

m.beacons.Beacons[id] = beacon
}

// GetBeaconEvent returns a beacon event by ID (thread-safe)
func (m *AppState) GetBeaconEvent(id string) (model.BeaconEvent, bool) {
m.beaconEvents.Lock.RLock()
defer m.beaconEvents.Lock.RUnlock()

event, exists := m.beaconEvents.Beacons[id]
return event, exists
}

// UpdateBeaconEvent updates a beacon event in the list (thread-safe)
func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
m.beaconEvents.Lock.Lock()
defer m.beaconEvents.Lock.Unlock()

m.beaconEvents.Beacons[id] = event
}

// GetLatestBeacon returns the latest beacon by ID (thread-safe)
func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
m.latestList.Lock.RLock()
defer m.latestList.Lock.RUnlock()

beacon, exists := m.latestList.LatestList[id]
return beacon, exists
}

// UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
m.latestList.Lock.Lock()
defer m.latestList.Lock.Unlock()

m.latestList.LatestList[id] = beacon
}

// GetAllBeacons returns a copy of all beacons
func (m *AppState) GetAllBeacons() map[string]model.Beacon {
m.beacons.Lock.RLock()
defer m.beacons.Lock.RUnlock()

beacons := make(map[string]model.Beacon)
for id, beacon := range m.beacons.Beacons {
beacons[id] = beacon
}
return beacons
}

// GetAllLatestBeacons returns a copy of all latest beacons
func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
m.latestList.Lock.RLock()
defer m.latestList.Lock.RUnlock()

beacons := make(map[string]model.Beacon)
for id, beacon := range m.latestList.LatestList {
beacons[id] = beacon
}
return beacons
}

// GetBeaconCount returns the number of tracked beacons
func (m *AppState) GetBeaconCount() int {
m.beacons.Lock.RLock()
defer m.beacons.Lock.RUnlock()

return len(m.beacons.Beacons)
}

// GetSettingsValue returns current settings as a value
func (m *AppState) GetSettingsValue() model.SettingsVal {
m.settings.Lock.RLock()
defer m.settings.Lock.RUnlock()

return m.settings.Settings
}

// UpdateSettings updates the system settings (thread-safe)
func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {
m.settings.Lock.Lock()
defer m.settings.Lock.Unlock()

m.settings.Settings = newSettings
}

func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) {
d, err := json.Marshal(m.GetSettingsValue())
if err != nil {
fmt.Printf("Error in marshalling settings: %v", err)
return
}

if err := client.Set(ctx, "settings", d, 0).Err(); err != nil {
fmt.Printf("Error in persisting settings: %v", err)
}
}

+ 129
- 0
internal/pkg/common/utils/beacons.go Datei anzeigen

@@ -0,0 +1,129 @@
package utils

import (
"encoding/binary"
"fmt"

"github.com/AFASystems/presence/internal/pkg/model"
)

// ParseADFast efficiently parses Advertising Data structures
// Returns slice of [startIndex, endIndex] pairs for each AD structure
func ParseADFast(b []byte) [][2]int {
var res [][2]int
i := 0

for i < len(b) {
l := int(b[i])
if l == 0 || i+1+l > len(b) {
break
}

res = append(res, [2]int{i, i + 1 + l})

i += 1 + l
}

return res
}

// RemoveFlagBytes removes Bluetooth advertising flag bytes if present
// Some beacons include flag bytes as the first AD structure
func RemoveFlagBytes(b []byte) []byte {
if len(b) > 1 && b[1] == 0x01 {
l := int(b[0])
if 1+l <= len(b) {
return b[1+l:]
}
}
return b
}

// Generate event based on the Beacon type
func LoopADStructures(b []byte, i [][2]int, id string) model.BeaconEvent {
be := model.BeaconEvent{}
for _, r := range i {
ad := b[r[0]:r[1]]
if !isValidADStructure(ad) {
break
}
if checkIngics(ad) {
be = parseIngicsState(ad)
be.ID = id
be.Name = id
break
} else if checkEddystoneTLM(ad) {
be = parseEddystoneState(ad)
be.ID = id
be.Name = id
break
} else if checkMinewB7(ad) {
fmt.Println("Minew B7 vendor format")
break
}
}

return be
}

// IsValidADStructure validates if an AD structure is well-formed
func isValidADStructure(data []byte) bool {
if len(data) < 2 {
return false
}

length := int(data[0])
return length > 0 && int(length)+1 <= len(data)
}

func checkIngics(ad []byte) bool {
if len(ad) >= 6 &&
ad[1] == 0xFF &&
ad[2] == 0x59 &&
ad[3] == 0x00 &&
ad[4] == 0x80 &&
ad[5] == 0xBC {
return true
}

return false
}

func parseIngicsState(ad []byte) model.BeaconEvent {
return model.BeaconEvent{
Battery: uint32(binary.LittleEndian.Uint16(ad[6:8])),
Event: int(ad[8]),
Type: "Ingics",
}
}

func checkEddystoneTLM(ad []byte) bool {
if len(ad) >= 4 &&
ad[1] == 0x16 &&
ad[2] == 0xAA &&
ad[3] == 0xFE &&
ad[4] == 0x20 {
return true
}

return false
}

func parseEddystoneState(ad []byte) model.BeaconEvent {
return model.BeaconEvent{
Battery: uint32(binary.BigEndian.Uint16(ad[6:8])),
Type: "Eddystone",
}
}

// I dont think this is always true, but for testing is ok
func checkMinewB7(ad []byte) bool {
if len(ad) >= 4 &&
ad[1] == 0x16 &&
ad[2] == 0xE1 &&
ad[3] == 0xFF {
return true
}

return false
}

+ 38
- 0
internal/pkg/common/utils/distance.go Datei anzeigen

@@ -0,0 +1,38 @@
package utils

import (
"math"
"strconv"

"github.com/AFASystems/presence/internal/pkg/model"
)

func CalculateDistance(adv model.BeaconAdvertisement) float64 {
rssi := adv.RSSI
power := adv.TXPower
ratio := float64(rssi) * (1.0 / float64(twosComp(power)))
distance := 100.0
if ratio < 1.0 {
distance = math.Pow(ratio, 10)
} else {
distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
}
return distance
}

// TwosComp converts a two's complement hexadecimal string to int64
func twosComp(inp string) int64 {
i, _ := strconv.ParseInt("0x"+inp, 0, 64)
return i - 256
}

// ValidateRSSI validates if RSSI value is within reasonable bounds
func ValidateRSSI(rssi int64) bool {
return rssi >= -120 && rssi <= 0
}

// ValidateTXPower validates if TX power is within reasonable bounds
func ValidateTXPower(txPower string) bool {
power := twosComp(txPower)
return power >= -128 && power <= 127
}

+ 4
- 3
internal/pkg/config/config.go Datei anzeigen

@@ -12,6 +12,7 @@ type Config struct {
DBPath string
KafkaURL string
RedisURL string
ValkeyURL string
}

// getEnv returns env var value or a default if not set.
@@ -24,14 +25,14 @@ func getEnv(key, def string) string {

func Load() *Config {
return &Config{
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"),
HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"),
WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"),
MQTTHost: getEnv("MQTT_HOST", "127.0.0.1:11883"),
MQTTHost: getEnv("MQTT_HOST", "192.168.1.101:1883"),
MQTTUser: getEnv("MQTT_USERNAME", "user"),
MQTTPass: getEnv("MQTT_PASSWORD", "pass"),
MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"),
DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
RedisURL: getEnv("REDIS_URL", "127.0.0.1:6379"),
ValkeyURL: getEnv("VALKEY_URL", "127.0.0.1:6379"),
}
}

+ 122
- 0
internal/pkg/controller/beacons_controller.go Datei anzeigen

@@ -0,0 +1,122 @@
package controller

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/gorilla/mux"
"github.com/segmentio/kafka-go"
)

func BeaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id := vars["beacon_id"]
beacon, ok := appstate.GetBeacon(id)
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"})
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(beacon)
}
}

func BeaconsListController(appstate *appcontext.AppState) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
beacons := appstate.GetAllBeacons()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(beacons)
}
}

// Probably define value as interface and then reuse this writer in all of the functions
func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error {
valueStr, err := json.Marshal(&value)
if err != nil {
fmt.Println("error in encoding: ", err)
return err
}
msg := kafka.Message{
Value: valueStr,
}

if err := writer.WriteMessages(ctx, msg); err != nil {
fmt.Println("Error in sending kafka message: ", err)
return err
}

return nil
}

func BeaconsDeleteController(writer *kafka.Writer, ctx context.Context, appstate *appcontext.AppState) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
beaconId := vars["beacon_id"]
apiUpdate := model.ApiUpdate{
Method: "DELETE",
ID: beaconId,
}

fmt.Printf("Sending DELETE beacon id: %s message\n", beaconId)

if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
fmt.Println("error in sending Kafka DELETE message")
http.Error(w, "Error in sending kafka message", 500)
return
}

// If message is succesfully sent delete the beacon from the list
appstate.RemoveBeacon(beaconId)
w.Write([]byte("ok"))
}
}

func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon
err := decoder.Decode(&inBeacon)
fmt.Printf("hello world\n")

if err != nil {
http.Error(w, err.Error(), 400)
return
}
fmt.Printf("hello world\n")
fmt.Printf("in beacon: %+v\n", inBeacon)

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
}

fmt.Printf("Adding new print here also\n")
// fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID)

apiUpdate := model.ApiUpdate{
Method: "POST",
Beacon: inBeacon,
}

fmt.Printf("message: %+v\n", apiUpdate)

if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
fmt.Println("error in sending Kafka POST message")
http.Error(w, "Error in sending kafka message", 500)
return
}

w.Write([]byte("ok"))
}
}

+ 79
- 0
internal/pkg/controller/settings_controller.go Datei anzeigen

@@ -0,0 +1,79 @@
package controller

import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

func SettingsEditController(writer *kafka.Writer, appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inSettings model.SettingsVal
if err := decoder.Decode(&inSettings); err != nil {
http.Error(w, err.Error(), 400)
fmt.Println("Error in decoding Settings body: ", err)
return
}

if !settingsCheck(inSettings) {
http.Error(w, "values must be greater than 0", 400)
fmt.Println("settings values must be greater than 0")
return
}

valueStr, err := json.Marshal(&inSettings)
if err != nil {
http.Error(w, "Error in encoding settings", 500)
fmt.Println("Error in encoding settings: ", err)
return
}

msg := kafka.Message{
Value: valueStr,
}

if err := writer.WriteMessages(ctx, msg); err != nil {
fmt.Println("error in sending Kafka message")
http.Error(w, "Error in sending kafka message", 500)
return
}

// if all is OK persist settings
appstate.UpdateSettings(inSettings)
appstate.PersistSettings(client, ctx)

w.Write([]byte("ok"))
}
}

func settingsCheck(settings model.SettingsVal) bool {
if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 {
return false
}

return true
}

func SettingsListController(appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
v, err := client.Get(ctx, "settings").Result()
if err == redis.Nil {
msg := "No list found for key settings, starting empty"
fmt.Println(msg)
http.Error(w, "msg", 500)
} else if err != nil {
msg := fmt.Sprintf("Error in connecting to Redis: %v, key: settings returning empty map\n", err)
fmt.Println(msg)
http.Error(w, msg, 500)
} else {
w.Write([]byte(v))
}
}
}

+ 0
- 371
internal/pkg/httpserver/server.go Datei anzeigen

@@ -1,371 +0,0 @@
package httpserver

import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/persistence"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)

const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
beaconPeriod = 2 * time.Second
)

// Init store in main or anywhere else and pass it to all initializer functions
// called in main, then with controllers or handlers use wrapper that takes entire store
// allocates only the properties that need to be passed into the controller

func StartHTTPServer(addr string, ctx *model.AppContext) {
headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

// Set up HTTP server
r := mux.NewRouter()
r.HandleFunc("/api/results", resultsHandler(&ctx.HTTPResults))

r.HandleFunc("/api/beacons/{beacon_id}", BeaconsDeleteHandler(&ctx.Beacons, ctx.ButtonsList)).Methods("DELETE")
r.HandleFunc("/api/beacons", BeaconsListHandler(&ctx.Beacons)).Methods("GET")
r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("POST") //since beacons are hashmap, just have put and post be same thing. it'll either add or modify that entry
r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("PUT")

r.HandleFunc("/api/latest-beacons", latestBeaconsListHandler(&ctx.LatestList)).Methods("GET")

r.HandleFunc("/api/settings", SettingsListHandler(&ctx.Settings)).Methods("GET")
r.HandleFunc("/api/settings", SettingsEditHandler(&ctx.Settings)).Methods("POST")

r.PathPrefix("/js/").Handler(http.StripPrefix("/js/", http.FileServer(http.Dir("static_html/js/"))))
r.PathPrefix("/css/").Handler(http.StripPrefix("/css/", http.FileServer(http.Dir("static_html/css/"))))
r.PathPrefix("/img/").Handler(http.StripPrefix("/img/", http.FileServer(http.Dir("static_html/img/"))))
r.PathPrefix("/").Handler(http.FileServer(http.Dir("static_html/")))

http.Handle("/", r)

mxWS := mux.NewRouter()
mxWS.HandleFunc("/ws/api/beacons", serveWs(&ctx.HTTPResults))
mxWS.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(&ctx.LatestList))
mxWS.HandleFunc("/ws/broadcast", handleConnections(ctx.Clients, &ctx.Broadcast))
http.Handle("/ws/", mxWS)

go handleMessages(ctx.Clients, &ctx.Broadcast)

http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))

}

func resultsHandler(httpResults *model.HTTPResultsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
httpResults.HTTPResultsLock.Lock()
defer httpResults.HTTPResultsLock.Unlock()
js, err := json.Marshal(httpResults.HTTPResults)

if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write(js)
}
}

func BeaconsListHandler(beacons *model.BeaconsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
beacons.Lock.RLock()
js, err := json.Marshal(beacons.Beacons)
beacons.Lock.RUnlock()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write(js)
}
}

func BeaconsAddHandler(beacons *model.BeaconsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inBeacon model.Beacon
err := decoder.Decode(&inBeacon)

if err != nil {
http.Error(w, err.Error(), 400)
return
}

if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
http.Error(w, "name and beacon_id cannot be blank", 400)
return
}

beacons.Beacons[inBeacon.Beacon_id] = inBeacon

err = persistence.PersistBeacons(beacons)

if err != nil {
http.Error(w, "trouble persisting beacons list, create bucket", 500)
return
}

w.Write([]byte("ok"))
}
}

func BeaconsDeleteHandler(beacons *model.BeaconsList, buttonsList map[string]model.Button) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
fmt.Println("route param: ", vars)
beaconId := vars["beacon_id"]
_, ok := beacons.Beacons[beaconId]
if !ok {
http.Error(w, "no beacon with the specified id", 400) // change the status code
return
}
delete(beacons.Beacons, beaconId)

_, ok = buttonsList[beaconId]
if ok {
delete(buttonsList, beaconId)
}

err := persistence.PersistBeacons(beacons)
if err != nil {
http.Error(w, "trouble persisting beacons list, create bucket", 500)
return
}

w.Write([]byte("ok"))
}
}

func latestBeaconsListHandler(latestList *model.LatestBeaconsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
latestList.Lock.RLock()
var la = make([]model.Beacon, 0)
for _, b := range latestList.LatestList {
la = append(la, b)
}
latestList.Lock.RUnlock()
js, err := json.Marshal(la)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write(js)
}
}

func SettingsListHandler(settings *model.Settings) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
js, err := json.Marshal(settings)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write(js)
}
}

func SettingsEditHandler(settings *model.Settings) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inSettings model.Settings
err := decoder.Decode(&inSettings)
if err != nil {
http.Error(w, err.Error(), 400)
return
}

//make sure values are > 0
if (inSettings.Location_confidence <= 0) ||
(inSettings.Last_seen_threshold <= 0) ||
(inSettings.HA_send_interval <= 0) {
http.Error(w, "values must be greater than 0", 400)
return
}

*settings = inSettings

err = persistence.PersistSettings(settings)
if err != nil {
http.Error(w, "trouble persisting settings, create bucket", 500)
return
}

w.Write([]byte("ok"))
}
}

func reader(ws *websocket.Conn) {
defer ws.Close()
ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
}
}
}

func writer(ws *websocket.Conn, httpResult *model.HTTPResultsList) {
pingTicker := time.NewTicker(pingPeriod)
beaconTicker := time.NewTicker(beaconPeriod)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
}()
for {
select {
case <-beaconTicker.C:
httpResult.HTTPResultsLock.Lock()
defer httpResult.HTTPResultsLock.Unlock()
js, err := json.Marshal(httpResult.HTTPResults)

if err != nil {
js = []byte("error")
}

ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
return
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

func serveWs(httpResult *model.HTTPResultsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
}
return
}

go writer(ws, httpResult)
reader(ws)
}
}

func latestBeaconWriter(ws *websocket.Conn, latestBeaconsList map[string]model.Beacon, lock *sync.RWMutex) {
pingTicker := time.NewTicker(pingPeriod)
beaconTicker := time.NewTicker(beaconPeriod)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
}()
for {
select {
case <-beaconTicker.C:

lock.RLock()
var la = make([]model.Beacon, 0)
for _, b := range latestBeaconsList {
la = append(la, b)
}
lock.RUnlock()
js, err := json.Marshal(la)

if err != nil {
js = []byte("error")
}

ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
return
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

func serveLatestBeaconsWs(latestList *model.LatestBeaconsList) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
}
return
}

go latestBeaconWriter(ws, latestList.LatestList, &latestList.Lock)
reader(ws)
}
}

func handleConnections(clients map[*websocket.Conn]bool, broadcast *chan model.Message) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
}

defer ws.Close()

clients[ws] = true

for {
var msg model.Message
err := ws.ReadJSON(&msg)
if err != nil {
log.Printf("error: %v", err)
delete(clients, ws)
break
}
*broadcast <- msg
}
}
}

func handleMessages(clients map[*websocket.Conn]bool, broadcast *chan model.Message) {
for {
msg := <-*broadcast
for client := range clients {
err := client.WriteJSON(msg)
if err != nil {
log.Printf("error: %v", err)
client.Close()
delete(clients, client)
}
}
}
}

+ 0
- 3
internal/pkg/httpserver/server.md Datei anzeigen

@@ -1,3 +0,0 @@
# Server

TODO: refactor to structure: router -> controller -> service, possibly use swagger or any other package to define structure of the API server

+ 20
- 12
internal/pkg/kafkaclient/consumer.go Datei anzeigen

@@ -4,24 +4,32 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/segmentio/kafka-go"
)

func Consume[T any](r *kafka.Reader, ch chan<- T) {
func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println("error reading message:", err)
continue
}
select {
case <-ctx.Done():
fmt.Println("consumer closed")
return
default:
msg, err := r.ReadMessage(ctx)
if err != nil {
fmt.Println("error reading message:", err)
continue
}

var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}
var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}

ch <- data
ch <- data
}
}
}

+ 3
- 0
internal/pkg/kafkaclient/reader.go Datei anzeigen

@@ -6,6 +6,9 @@ import (
"github.com/segmentio/kafka-go"
)

// Create Kafka reader
//
// Deprecated: Use context manager object instead
func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{


+ 5
- 0
internal/pkg/kafkaclient/writer.go Datei anzeigen

@@ -6,11 +6,16 @@ import (
"github.com/segmentio/kafka-go"
)

// Create Kafka writer
//
// Deprecated: Use context manager object instead
func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}


+ 47
- 0
internal/pkg/model/type_methods.go Datei anzeigen

@@ -0,0 +1,47 @@
package model

import (
"crypto/sha256"
"encoding/json"
"fmt"
)

func (b *BeaconEvent) Hash() []byte {
rBatt := (b.Battery / 10) * 10
c := fmt.Sprintf("%d%d%s%s%s", rBatt, b.Event, b.ID, b.Name, b.Type)
h := sha256.New()
h.Write([]byte(c))
bs := h.Sum(nil)

return bs
}

func (b BeaconEvent) ToJSON() ([]byte, error) {
eData, err := json.Marshal(b)
if err != nil {
return nil, err
}
return eData, nil
}

func convertStructToMap(obj any) (map[string]any, error) {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}

var result map[string]any
if err := json.Unmarshal(data, &result); err != nil {
return nil, err
}

return result, nil
}

func (loc HTTPLocation) RedisHashable() (map[string]any, error) {
return convertStructToMap(loc)
}

func (be BeaconEvent) RedisHashable() (map[string]any, error) {
return convertStructToMap(be)
}

+ 105
- 101
internal/pkg/model/types.go Datei anzeigen

@@ -3,17 +3,18 @@ package model
import (
"sync"

"github.com/boltdb/bolt"
"github.com/gorilla/websocket"
"github.com/segmentio/kafka-go"
)

// Settings defines configuration parameters for presence detection behavior.
type SettingsVal struct {
Location_confidence int64 `json:"location_confidence"`
Last_seen_threshold int64 `json:"last_seen_threshold"`
Beacon_metrics_size int `json:"beacon_metrics_size"`
HA_send_interval int64 `json:"ha_send_interval"`
HA_send_changes_only bool `json:"ha_send_changes_only"`
LocationConfidence int64 `json:"location_confidence"`
LastSeenThreshold int64 `json:"last_seen_threshold"`
BeaconMetricSize int `json:"beacon_metrics_size"`
HASendInterval int64 `json:"ha_send_interval"`
HASendChangesOnly bool `json:"ha_send_changes_only"`
RSSIMinThreshold int64 `json:"rssi_min_threshold"`
RSSIEnforceThreshold bool `json:"enforce_rssi_threshold"`
}

type Settings struct {
@@ -21,126 +22,129 @@ type Settings struct {
Lock sync.RWMutex
}

// Incoming_json represents the JSON payload received from beacon messages.
type Incoming_json struct {
Hostname string `json:"hostname"`
MAC string `json:"mac"`
RSSI int64 `json:"rssi"`
Is_scan_response string `json:"is_scan_response"`
Ttype string `json:"type"`
Data string `json:"data"`
Beacon_type string `json:"beacon_type"`
UUID string `json:"uuid"`
Major string `json:"major"`
Minor string `json:"minor"`
TX_power string `json:"tx_power"`
Namespace string `json:"namespace"`
Instance_id string `json:"instance_id"`
// button stuff
HB_ButtonCounter int64 `json:"hb_button_counter"`
HB_ButtonCounter_Prev int64 `json:"hb_button_counter_prev"`
HB_Battery int64 `json:"hb_button_battery"`
HB_RandomNonce string `json:"hb_button_random"`
HB_ButtonMode string `json:"hb_button_mode"`
// BeaconAdvertisement represents the JSON payload received from beacon advertisements.
type BeaconAdvertisement struct {
Hostname string `json:"hostname"`
MAC string `json:"mac"`
RSSI int64 `json:"rssi"`
ScanResponse string `json:"is_scan_response"`
Type string `json:"type"`
Data string `json:"data"`
BeaconType string `json:"beacon_type"`
UUID string `json:"uuid"`
Major string `json:"major"`
Minor string `json:"minor"`
TXPower string `json:"tx_power"`
NamespaceID string `json:"namespace"`
InstanceID string `json:"instance_id"`
HSButtonCounter int64 `json:"hb_button_counter"`
HSButtonPrev int64 `json:"hb_button_counter_prev"`
HSBatteryLevel int64 `json:"hb_button_battery"`
HSRandomNonce string `json:"hb_button_random"`
HSButtonMode string `json:"hb_button_mode"`
}

// Advertisement describes a generic beacon advertisement payload.
type Advertisement struct {
ttype string
content string
seen int64
Type string
Content string
Seen int64
}

// BeaconMetric stores signal and distance data for a beacon.
type BeaconMetric struct {
Location string
Distance float64
Rssi int64
RSSI int64
Timestamp int64
}

// Location defines a physical location and synchronization control.
type Location struct {
name string
lock sync.RWMutex
Name string
Lock sync.RWMutex
}

// BestLocation represents the most probable location of a beacon.
type BestLocation struct {
Distance float64
Name string
Last_seen int64
Distance float64
Name string
LastSeen int64
}

// HTTPLocation describes a beacon's state as served over HTTP.
type HTTPLocation struct {
Previous_confident_location string `json:"previous_confident_location"`
Distance float64 `json:"distance"`
Name string `json:"name"`
Beacon_name string `json:"beacon_name"`
Beacon_id string `json:"beacon_id"`
Beacon_type string `json:"beacon_type"`
HB_Battery int64 `json:"hb_button_battery"`
HB_ButtonMode string `json:"hb_button_mode"`
HB_ButtonCounter int64 `json:"hb_button_counter"`
Location string `json:"location"`
Last_seen int64 `json:"last_seen"`
Method string `json:"method"`
PreviousConfidentLocation string `json:"previous_confident_location"`
Distance float64 `json:"distance"`
ID string `json:"id"`
Location string `json:"location"`
LastSeen int64 `json:"last_seen"`
}

// LocationChange defines a change event for a beacon's detected location.
type LocationChange struct {
Beacon_ref Beacon `json:"beacon_info"`
Name string `json:"name"`
Beacon_name string `json:"beacon_name"`
Previous_location string `json:"previous_location"`
New_location string `json:"new_location"`
Timestamp int64 `json:"timestamp"`
Method string `json:"method"`
BeaconRef Beacon `json:"beacon_info"`
Name string `json:"name"`
BeaconName string `json:"beacon_name"`
PreviousLocation string `json:"previous_location"`
NewLocation string `json:"new_location"`
Timestamp int64 `json:"timestamp"`
}

// HAMessage represents a Home Assistant integration payload.
type HAMessage struct {
Beacon_id string `json:"id"`
Beacon_name string `json:"name"`
Distance float64 `json:"distance"`
ID string `json:"id"`
BeaconName string `json:"name"`
Distance float64 `json:"distance"`
}

// Beacon holds all relevant information about a tracked beacon device.
type Beacon struct {
Name string `json:"name"`
Beacon_id string `json:"beacon_id"`
Beacon_type string `json:"beacon_type"`
Beacon_location string `json:"beacon_location"`
Last_seen int64 `json:"last_seen"`
Incoming_JSON Incoming_json `json:"incoming_json"`
Distance float64 `json:"distance"`
Previous_location string
Previous_confident_location string
Expired_location string
Location_confidence int64
Location_history []string
Beacon_metrics []BeaconMetric

HB_ButtonCounter int64 `json:"hb_button_counter"`
HB_ButtonCounter_Prev int64 `json:"hb_button_counter_prev"`
HB_Battery int64 `json:"hb_button_battery"`
HB_RandomNonce string `json:"hb_button_random"`
HB_ButtonMode string `json:"hb_button_mode"`
Name string `json:"name"`
ID string `json:"beacon_id"`
BeaconType string `json:"beacon_type"`
BeaconLocation string `json:"beacon_location"`
LastSeen int64 `json:"last_seen"`
IncomingJSON BeaconAdvertisement `json:"incoming_json"`
Distance float64 `json:"distance"`
PreviousLocation string
PreviousConfidentLocation string
ExpiredLocation string
LocationConfidence int64
LocationHistory []string
BeaconMetrics []BeaconMetric
Location string `json:"location"`
HSButtonCounter int64 `json:"hs_button_counter"`
HSButtonPrev int64 `json:"hs_button_counter_prev"`
HSBattery int64 `json:"hs_button_battery"`
HSRandomNonce string `json:"hs_button_random"`
HSButtonMode string `json:"hs_button_mode"`
Event int `json:"beacon_event"`
}

type BeaconEvent struct {
Name string
ID string
Type string
Battery uint32
Event int
}

// Button represents a hardware button beacon device.
type Button struct {
Name string `json:"name"`
Button_id string `json:"button_id"`
Button_type string `json:"button_type"`
Button_location string `json:"button_location"`
Incoming_JSON Incoming_json `json:"incoming_json"`
Distance float64 `json:"distance"`
Last_seen int64 `json:"last_seen"`

HB_ButtonCounter int64 `json:"hb_button_counter"`
HB_Battery int64 `json:"hb_button_battery"`
HB_RandomNonce string `json:"hb_button_random"`
HB_ButtonMode string `json:"hb_button_mode"`
Name string `json:"name"`
ButtonID string `json:"button_id"`
ButtonType string `json:"button_type"`
ButtonLocation string `json:"button_location"`
IncomingJSON BeaconAdvertisement `json:"incoming_json"`
Distance float64 `json:"distance"`
LastSeen int64 `json:"last_seen"`
HSButtonCounter int64 `json:"hs_button_counter"`
HSBattery int64 `json:"hs_button_battery"`
HSRandomNonce string `json:"hs_button_random"`
HSButtonMode string `json:"hs_button_mode"`
}

// BeaconsList holds all known beacons and their synchronization lock.
@@ -149,6 +153,11 @@ type BeaconsList struct {
Lock sync.RWMutex
}

type BeaconEventList struct {
Beacons map[string]BeaconEvent
Lock sync.RWMutex
}

// LocationsList holds all known locations with concurrency protection.
type LocationsList struct {
Locations map[string]Location
@@ -185,26 +194,21 @@ type HTTPResultsList struct {
HTTPResults HTTPLocationsList
}

type AppContext struct {
HTTPResults HTTPResultsList
Beacons BeaconsList
ButtonsList map[string]Button
Settings Settings
Broadcast chan Message
Locations LocationsList
LatestList LatestBeaconsList
Clients map[*websocket.Conn]bool
}

type ApiUpdate struct {
Method string
Beacon Beacon
ID string
}

var World = []byte("presence")
type KafkaReadersList struct {
KafkaReadersLock sync.RWMutex
KafkaReaders []*kafka.Reader
}

var Db *bolt.DB
type KafkaWritersList struct {
KafkaWritersLock sync.RWMutex
KafkaWriters []*kafka.Writer
}

var HTTPHostPathPtr *string
var HTTPWSHostPathPtr *string

+ 0
- 128
internal/pkg/mqttclient/beacon.go Datei anzeigen

@@ -1,128 +0,0 @@
package mqttclient

import (
"bytes"
"encoding/json"
"fmt"
"log"
"math"
"os/exec"
"strconv"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)

func GetBeaconID(incoming model.Incoming_json) string {
unique_id := fmt.Sprintf("%s", incoming.MAC)
return unique_id
}

func updateLatestList(incoming model.Incoming_json, now int64, latestList *model.LatestBeaconsList) {
latestList.Lock.Lock()
defer latestList.Lock.Unlock()

b := model.Beacon{
Beacon_id: GetBeaconID(incoming),
Beacon_type: incoming.Beacon_type,
Last_seen: now,
Incoming_JSON: incoming,
Beacon_location: incoming.Hostname,
Distance: getBeaconDistance(incoming),
}

latestList.LatestList[b.Beacon_id] = b

for id, v := range latestList.LatestList {
if now-v.Last_seen > 10 {
delete(latestList.LatestList, id)
}
}
}

func updateBeaconData(beacon *model.Beacon, incoming model.Incoming_json, now int64, cl *client.Client, settings *model.SettingsVal) {
beacon.Incoming_JSON = incoming
beacon.Last_seen = now
beacon.Beacon_type = incoming.Beacon_type
beacon.HB_ButtonCounter = incoming.HB_ButtonCounter
beacon.HB_Battery = incoming.HB_Battery
beacon.HB_RandomNonce = incoming.HB_RandomNonce
beacon.HB_ButtonMode = incoming.HB_ButtonMode

m := model.BeaconMetric{
Distance: getBeaconDistance(incoming),
Timestamp: now,
Rssi: int64(incoming.RSSI),
Location: incoming.Hostname,
}

beacon.Beacon_metrics = append(beacon.Beacon_metrics, m)
if len(beacon.Beacon_metrics) > settings.Beacon_metrics_size {
beacon.Beacon_metrics = beacon.Beacon_metrics[1:]
}

if beacon.HB_ButtonCounter_Prev != beacon.HB_ButtonCounter {
beacon.HB_ButtonCounter_Prev = incoming.HB_ButtonCounter
sendButtonPressed(*beacon, cl)
}
}

func sendButtonPressed(beacon model.Beacon, cl *client.Client) {
btn_msg, err := json.Marshal(beacon)
if err != nil {
panic(err)
}

err = cl.Publish(&client.PublishOptions{
QoS: mqtt.QoS1,
TopicName: []byte("afa-systems/presence/button/" + beacon.Beacon_id),
Message: btn_msg,
})
if err != nil {
panic(err)
}
s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --st=%d", beacon.Beacon_id, beacon.Incoming_JSON.Hostname, beacon.HB_ButtonCounter)
err, out, errout := Shellout(s)
if err != nil {
log.Printf("error: %v\n", err)
}
fmt.Println("--- stdout ---")
fmt.Println(out)
fmt.Println("--- stderr ---")
fmt.Println(errout)
}

func getBeaconDistance(incoming model.Incoming_json) float64 {
distance := 1000.0
distance = getiBeaconDistance(incoming.RSSI, incoming.TX_power)

return distance
}

func getiBeaconDistance(rssi int64, power string) float64 {
ratio := float64(rssi) * (1.0 / float64(twos_comp(power)))
distance := 100.0
if ratio < 1.0 {
distance = math.Pow(ratio, 10)
} else {
distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
}
return distance
}

func twos_comp(inp string) int64 {
i, _ := strconv.ParseInt("0x"+inp, 0, 64)

return i - 256
}

func Shellout(command string) (error, string, string) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("bash", "-c", command)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
return err, stdout.String(), stderr.String()
}

+ 0
- 35
internal/pkg/mqttclient/fillter.go Datei anzeigen

@@ -1,35 +0,0 @@
package mqttclient

import (
"fmt"
"strconv"
"strings"

"github.com/AFASystems/presence/internal/pkg/model"
)

func IncomingBeaconFilter(incoming model.Incoming_json) model.Incoming_json {
out_json := incoming
if incoming.Beacon_type == "hb_button" {
raw_data := incoming.Data
hb_button_prefix_str := fmt.Sprintf("02010612FF5900")
if strings.HasPrefix(raw_data, hb_button_prefix_str) {
out_json.Namespace = "ddddeeeeeeffff5544ff"
counter_str := fmt.Sprintf("0x%s", raw_data[22:24])
counter, _ := strconv.ParseInt(counter_str, 0, 64)
out_json.HB_ButtonCounter = counter

battery_str := fmt.Sprintf("0x%s%s", raw_data[20:22], raw_data[18:20])

battery, _ := strconv.ParseInt(battery_str, 0, 64)
out_json.HB_Battery = battery

out_json.TX_power = fmt.Sprintf("0x%s", "4")

out_json.Beacon_type = "hb_button"
out_json.HB_ButtonMode = "presence_button"
}
}

return out_json
}

+ 0
- 165
internal/pkg/mqttclient/location.go Datei anzeigen

@@ -1,165 +0,0 @@
package mqttclient

import (
"encoding/json"
"log"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/persistence"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)

func getLikelyLocations(settings *model.SettingsVal, ctx *model.AppContext, cl *client.Client) {
ctx.HTTPResults.HTTPResultsLock.Lock()
defer ctx.HTTPResults.HTTPResultsLock.Unlock()
ctx.HTTPResults.HTTPResults = model.HTTPLocationsList{Beacons: []model.HTTPLocation{}}

shouldPersist := false

for id, beacon := range ctx.Beacons.Beacons {
if len(beacon.Beacon_metrics) == 0 {
continue
}

if isExpired(&beacon, settings) {
handleExpiredBeacon(&beacon, cl, ctx)
continue
}

best := calculateBestLocation(&beacon)
updateBeaconState(&beacon, best, settings, ctx, cl)

appendHTTPResult(ctx, beacon, best)
ctx.Beacons.Beacons[id] = beacon
shouldPersist = true
}

if shouldPersist {
persistence.PersistBeacons(&ctx.Beacons)
}
}

func isExpired(b *model.Beacon, s *model.SettingsVal) bool {
return time.Now().Unix()-b.Beacon_metrics[len(b.Beacon_metrics)-1].Timestamp > s.Last_seen_threshold
}

func handleExpiredBeacon(b *model.Beacon, cl *client.Client, ctx *model.AppContext) {
if b.Expired_location == "expired" {
return
}
b.Expired_location = "expired"
msg := model.Message{
Email: b.Previous_confident_location,
Username: b.Name,
Message: "expired",
}
data, _ := json.Marshal(msg)
log.Println(string(data))
ctx.Broadcast <- msg
}

func calculateBestLocation(b *model.Beacon) model.BestLocation {
locScores := map[string]float64{}
for _, m := range b.Beacon_metrics {
score := 1.5 + 0.75*(1.0-(float64(m.Rssi)/-100.0))
locScores[m.Location] += score
}
bestName, bestScore := "", 0.0
for name, score := range locScores {
if score > bestScore {
bestName, bestScore = name, score
}
}
last := b.Beacon_metrics[len(b.Beacon_metrics)-1]
return model.BestLocation{Name: bestName, Distance: last.Distance, Last_seen: last.Timestamp}
}

func updateBeaconState(b *model.Beacon, best model.BestLocation, s *model.SettingsVal, ctx *model.AppContext, cl *client.Client) {
updateLocationHistory(b, best.Name)
updateConfidence(b, best.Name, s)

if locationChanged(b, best, s) {
publishLocationChange(b, best, cl)
b.Location_confidence = 0
b.Previous_confident_location = best.Name
}
}

func updateLocationHistory(b *model.Beacon, loc string) {
b.Location_history = append(b.Location_history, loc)
if len(b.Location_history) > 10 {
b.Location_history = b.Location_history[1:]
}
}

func updateConfidence(b *model.Beacon, loc string, s *model.SettingsVal) {
counts := map[string]int{}
for _, l := range b.Location_history {
counts[l]++
}

maxCount, mostCommon := 0, ""
for l, c := range counts {
if c > maxCount {
maxCount, mostCommon = c, l
}
}

if maxCount >= 7 {
if mostCommon == b.Previous_confident_location {
b.Location_confidence++
} else {
b.Location_confidence = 1
b.Previous_confident_location = mostCommon
}
}
}

func locationChanged(b *model.Beacon, best model.BestLocation, s *model.SettingsVal) bool {
return (b.Location_confidence == s.Location_confidence &&
b.Previous_confident_location != best.Name) ||
b.Expired_location == "expired"
}

func publishLocationChange(b *model.Beacon, best model.BestLocation, cl *client.Client) {
location := best.Name
if b.Expired_location == "expired" {
location = "expired"
}

js, err := json.Marshal(model.LocationChange{
Beacon_ref: *b,
Name: b.Name,
Previous_location: b.Previous_confident_location,
New_location: location,
Timestamp: time.Now().Unix(),
})
if err != nil {
return
}

err = cl.Publish(&client.PublishOptions{
QoS: mqtt.QoS1,
TopicName: []byte("afa-systems/presence/changes"),
Message: js,
})
if err != nil {
log.Printf("mqtt publish error: %v", err)
}
}

func appendHTTPResult(ctx *model.AppContext, b model.Beacon, best model.BestLocation) {
ctx.HTTPResults.HTTPResultsLock.Lock()
defer ctx.HTTPResults.HTTPResultsLock.Unlock()

r := model.HTTPLocation{
Name: b.Name,
Beacon_id: b.Beacon_id,
Location: best.Name,
Distance: best.Distance,
Last_seen: best.Last_seen,
}
ctx.HTTPResults.HTTPResults.Beacons = append(ctx.HTTPResults.HTTPResults.Beacons, r)
}

+ 0
- 62
internal/pkg/mqttclient/processor.go Datei anzeigen

@@ -1,62 +0,0 @@
package mqttclient

import (
"fmt"
"log"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/persistence"
"github.com/boltdb/bolt"
"github.com/yosssi/gmq/mqtt/client"
)

func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, ctx *model.AppContext) chan<- model.Incoming_json {
ch := make(chan model.Incoming_json, 2000)
persistence.CreateBucketIfNotExists(db)

ticker := time.NewTicker(updateInterval)
go runProcessor(ticker, cl, ch, ctx)

return ch
}

func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incoming_json, ctx *model.AppContext) {
for {
select {
case <-ticker.C:
getLikelyLocations(&ctx.Settings.Settings, ctx, cl)
case incoming := <-ch:
ProcessIncoming(incoming, cl, ctx)
}
}
}

func ProcessIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model.AppContext) {
defer func() {
if err := recover(); err != nil {
log.Println("work failed:", err)
}
}()

incoming = IncomingBeaconFilter(incoming)
id := GetBeaconID(incoming)
now := time.Now().Unix()

beacons := &ctx.Beacons

beacons.Lock.Lock()
defer beacons.Lock.Unlock()

latestList := &ctx.LatestList
settings := &ctx.Settings.Settings

beacon, ok := beacons.Beacons[id]
if !ok {
updateLatestList(incoming, now, latestList)
return
}
fmt.Println("updating beacon data")
updateBeaconData(&beacon, incoming, now, cl, settings)
beacons.Beacons[beacon.Beacon_id] = beacon
}

+ 0
- 18
internal/pkg/persistence/buckets.go Datei anzeigen

@@ -1,18 +0,0 @@
package persistence

import (
"log"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/boltdb/bolt"
)

func CreateBucketIfNotExists(db *bolt.DB) {
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(model.World)
return err
})
if err != nil {
log.Fatal(err)
}
}

+ 0
- 39
internal/pkg/persistence/load.go Datei anzeigen

@@ -1,39 +0,0 @@
package persistence

import (
"bytes"
"encoding/gob"
"log"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/boltdb/bolt"
)

func LoadState(db *bolt.DB, ctx *model.AppContext) {
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(model.World)
if bucket == nil {
return nil
}

decode := func(key string, dest interface{}) {
val := bucket.Get([]byte(key))
if val == nil {
return
}
buf := bytes.NewBuffer(val)
if err := gob.NewDecoder(buf).Decode(dest); err != nil {
log.Fatal("decode error: ", err)
}
}

decode("beaconsList", &ctx.Beacons.Beacons)
decode("buttonsList", &ctx.ButtonsList)
decode("settings", &ctx.Settings)
return nil
})

if err != nil {
log.Fatal(err)
}
}

+ 0
- 50
internal/pkg/persistence/persist.go Datei anzeigen

@@ -1,50 +0,0 @@
package persistence

import (
"bytes"
"encoding/gob"
"fmt"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/boltdb/bolt"
)

func PersistBeacons(beacons *model.BeaconsList) error {
buf := &bytes.Buffer{}
enc := gob.NewEncoder(buf)
if err := enc.Encode(beacons.Beacons); err != nil {
fmt.Println("error in encoding: ", err)
return err
}

key := []byte("beacons_list")
err := model.Db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(model.World)
if err != nil {
fmt.Println("error in creating a bucket")
return err
}
return bucket.Put(key, buf.Bytes())
})

return err
}

func PersistSettings(settings *model.Settings) error {
buf := &bytes.Buffer{}
enc := gob.NewEncoder(buf)
if err := enc.Encode(settings); err != nil {
return err
}

key := []byte("settings")
err := model.Db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(model.World)
if err != nil {
return err
}
return bucket.Put(key, buf.Bytes())
})

return err
}

+ 20
- 64
internal/pkg/redis/redis.go Datei anzeigen

@@ -5,84 +5,40 @@ import (
"encoding/json"
"fmt"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
)

func LoadBeaconsList(client *redis.Client, ctx context.Context) map[string]model.Beacon {
beaconsList, err := client.Get(ctx, "beaconsList").Result()
beaconsMap := make(map[string]model.Beacon)
// Get Map from Redis
//
// Deprecated: there is only one map now and we know the type
func LoadRedisMap[K comparable, V any, M map[K]V](client *redis.Client, ctx context.Context, key string) M {
redisValue, err := client.Get(ctx, key).Result()
resMap := make(M)

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
fmt.Printf("No list found for key %s, starting empty\n", key)
} else if err != nil {
fmt.Println("no connection to redis")
fmt.Printf("Error in connecting to Redis: %v, key: %s returning empty map\n", err, key)
} else {
json.Unmarshal([]byte(beaconsList), &beaconsMap)
if err := json.Unmarshal([]byte(redisValue), &resMap); err != nil {
fmt.Printf("Error in unmarshalling JSON for key: %s\n", key)
}
}

return beaconsMap
return resMap
}

func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model.Beacon {
latestList, err := client.Get(ctx, "latestList").Result()
latestMap := make(map[string]model.Beacon)

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
fmt.Println("no connection to redis")
} else {
json.Unmarshal([]byte(latestList), &latestMap)
}

return latestMap
}

func LoadSettings(client *redis.Client, ctx context.Context) model.SettingsVal {
redisSettings, err := client.Get(ctx, "settings").Result()
var settings model.SettingsVal

if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
fmt.Println("no connection to redis")
} else {
json.Unmarshal([]byte(redisSettings), &settings)
}

return settings
}

func SaveBeaconsList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.Beacons.Lock.Lock()
data, _ := json.Marshal(appCtx.Beacons.Beacons)
appCtx.Beacons.Lock.Unlock()

err := client.Set(ctx, "beaconsList", data, 0).Err()
if err != nil {
fmt.Println("error in saving to redis: ", err)
}
}

func SaveLatestList(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.LatestList.Lock.Lock()
data, _ := json.Marshal(appCtx.LatestList.LatestList)
appCtx.LatestList.Lock.Unlock()

err := client.Set(ctx, "latestList", data, 0).Err()
// Set Map in Redis
//
// Deprecated: hashmaps are used now
func SaveRedisMap(client *redis.Client, ctx context.Context, key string, data interface{}) {
eData, err := json.Marshal(data)
if err != nil {
fmt.Println("error in saving to redis: ", err)
fmt.Println("Error in marshalling, key: ", key)
}
}

func SaveSettings(appCtx *model.AppContext, client *redis.Client, ctx context.Context) {
appCtx.Settings.Lock.Lock()
data, _ := json.Marshal(appCtx.Settings.Settings)
appCtx.Settings.Lock.Unlock()

err := client.Set(ctx, "settings", data, 0).Err()
err = client.Set(ctx, key, eData, 0).Err()
if err != nil {
fmt.Println("error in saving to redis: ", err)
fmt.Println("Error in persisting in Redis, key: ", key)
}
}

+ 72
- 0
internal/pkg/service/beacon_service.go Datei anzeigen

@@ -0,0 +1,72 @@
package service

import (
"context"
"fmt"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
)

type RedisHashable interface {
RedisHashable() (map[string]any, error)
model.BeaconEvent | model.HTTPLocation
}

func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error {
key := fmt.Sprintf("beacon:%s", id)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
return err
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
return err
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
return err
}
return nil
}

func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error {
id := msg.ID
beacon, ok := appState.GetBeacon(id)
if !ok {
appState.UpdateBeacon(id, model.Beacon{ID: id, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
} else {
beacon.ID = id
beacon.Location = msg.Location
beacon.Distance = msg.Distance
beacon.LastSeen = msg.LastSeen
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateBeacon(id, beacon)
}
if err := persistBeaconValkey(id, msg, client, ctx); err != nil {
return err
}

return nil
}

func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error {
id := msg.ID
beacon, ok := appState.GetBeacon(id)
if !ok {
appState.UpdateBeacon(id, model.Beacon{ID: id, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
} else {
beacon.ID = id
beacon.BeaconType = msg.Type
beacon.HSBattery = int64(msg.Battery)
beacon.Event = msg.Event
appState.UpdateBeacon(id, beacon)
}
if err := persistBeaconValkey(id, msg, client, ctx); err != nil {
return err
}

return nil
}

+ 3
- 29
scripts/testAPI.sh Datei anzeigen

@@ -1,6 +1,6 @@
#!/bin/bash
URL="http://127.0.0.1:1902/api/beacons"
BEACON_ID="C3000057B9F7"
BEACON_ID="C83F8F17DB35"

echo "POST (create)"
curl -s -X POST $URL \
@@ -10,32 +10,6 @@ echo -e "\n"

sleep 1

echo "GET (list after create)"
curl -s -X GET $URL
echo -e "\n"

sleep 1
curl -X GET $URL

echo "PUT (update)"
curl -s -X PUT $URL \
-H "Content-Type: application/json" \
-d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1-updated","tx_power":-60}'
echo -e "\n"

sleep 1

echo "GET (list after update)"
curl -s -X GET $URL
echo -e "\n"

sleep 1

echo "DELETE"
curl -s -X DELETE "$URL/$BEACON_ID"
echo -e "\n"

sleep 1

echo "GET (list after delete)"
curl -s -X GET $URL
echo -e "\n"
sleep 1

+ 125
- 4
test/README.md Datei anzeigen

@@ -1,9 +1,130 @@
# `/test`
# Unit Tests Documentation

Additional external test apps and test data. Feel free to structure the `/test` directory anyway you want. For bigger projects it makes sense to have a data subdirectory. For example, you can have `/test/data` or `/test/testdata` if you need Go to ignore what's in that directory. Note that Go will also ignore directories or files that begin with "." or "_", so you have more flexibility in terms of how you name your test data directory.
This directory contains comprehensive unit tests for the high-priority internal packages of the AFASystems presence detection system.

Examples:
## Test Coverage

* https://github.com/openshift/origin/tree/master/test (test data is in the `/testdata` subdirectory)
The following files have been thoroughly tested:

1. **`distance_test.go`** - Tests for distance calculation utilities
- `CalculateDistance()` - Distance calculation from RSSI and TX power
- `twosComp()` - Two's complement hex conversion
- `ValidateRSSI()` - RSSI value validation
- `ValidateTXPower()` - TX power validation
- Edge cases and real-world scenarios

2. **`beacons_test.go`** - Tests for beacon parsing utilities
- `ParseADFast()` - Advertising Data structure parsing
- `RemoveFlagBytes()` - Bluetooth flag bytes removal
- `LoopADStructures()` - Beacon type detection and parsing
- `isValidADStructure()` - AD structure validation
- Beacon format support: Ingics, Eddystone TLM, Minew B7

3. **`typeMethods_test.go`** - Tests for model type methods
- `Hash()` - Beacon event hash generation with battery rounding
- `ToJSON()` - JSON marshaling for beacon events
- `convertStructToMap()` - Generic struct-to-map conversion
- `RedisHashable()` - Redis hash map conversion for HTTPLocation and BeaconEvent
- JSON roundtrip integrity tests

4. **`mqtthandler_test.go`** - Tests for MQTT message processing
- `MqttHandler()` - Main MQTT message processing with JSON/CSV input
- `parseButtonState()` - Button counter parsing for different beacon formats
- Kafka writer integration (with mock)
- Hostname extraction from MQTT topics
- Error handling and edge cases

## Running Tests

### Run All Tests
```bash
go test ./test/... -v
```

### Run Specific Test File
```bash
go test ./test/distance_test.go -v
go test ./test/beacons_test.go -v
go test ./test/typeMethods_test.go -v
go test ./test/mqtthandler_test.go -v
```

### Run Tests for Specific Function
```bash
go test ./test/distance_test.go -run TestCalculateDistance -v
go test ./test/beacons_test.go -run TestParseADFast -v
go test ./test/typeMethods_test.go -run TestHash -v
go test ./test/mqtthandler_test.go -run TestMqttHandlerJSONArrayInput -v
```

### Run Benchmarks
```bash
# Run all benchmarks
go test ./test/... -bench=.

# Run specific benchmarks
go test ./test/distance_test.go -bench=BenchmarkCalculateDistance -v
go test ./test/beacons_test.go -bench=BenchmarkParseADFast -v
go test ./test/typeMethods_test.go -bench=BenchmarkHash -v
go test ./test/mqtthandler_test.go -bench=BenchmarkMqttHandlerJSON -v
```

### Run Tests with Coverage Report
```bash
go test ./test/... -cover
go test ./test/... -coverprofile=coverage.out
go tool cover -html=coverage.out -o coverage.html
```

### Run Tests with Race Detection
```bash
go test ./test/... -race -v
```

## Test Organization

Each test file follows Go testing conventions with:
- **Function tests**: Individual function behavior testing
- **Edge case tests**: Boundary conditions and error scenarios
- **Integration tests**: Multi-function workflow testing
- **Benchmark tests**: Performance measurement
- **Table-driven tests**: Multiple test cases with expected results

## Mock Objects

The mqtthandler tests use a `MockKafkaWriter` to simulate Kafka operations without requiring a running Kafka instance. This allows for:

- Deterministic test results
- Failure scenario simulation
- Message content verification
- Performance benchmarking

## Known Limitations

- **CSV Processing**: The original CSV handler in `mqtthandler.go` contains `os.Exit(2)` calls which make it untestable. The test demonstrates the intended structure but cannot fully validate CSV processing due to this design choice.
- **External Dependencies**: Tests use mocks for external systems (Kafka) to ensure tests remain fast and reliable.

## Best Practices Demonstrated

These tests demonstrate several Go testing best practices:

1. **Table-driven tests** for multiple scenarios
2. **Subtests** for logical test grouping
3. **Benchmark tests** for performance measurement
4. **Mock objects** for dependency isolation
5. **Error case testing** for robustness validation
6. **Deterministic testing** with consistent setup and teardown

## Running Tests in CI/CD

For automated testing environments:

```bash
# Standard CI test run
go test ./test/... -race -cover -timeout=30s

# Performance regression testing
go test ./test/... -bench=. -benchmem
```

This comprehensive test suite ensures the reliability and correctness of the core business logic in the AFASystems presence detection system.

+ 560
- 0
test/beacons_test.go Datei anzeigen

@@ -0,0 +1,560 @@
package utils

import (
"testing"

"github.com/AFASystems/presence/internal/pkg/model"
)

func TestParseADFast(t *testing.T) {
tests := []struct {
name string
input []byte
expected [][2]int
}{
{
name: "Empty input",
input: []byte{},
expected: [][2]int{},
},
{
name: "Single AD structure",
input: []byte{0x02, 0x01, 0x06},
expected: [][2]int{{0, 2}},
},
{
name: "Multiple AD structures",
input: []byte{0x02, 0x01, 0x06, 0x03, 0x02, 0x01, 0x02},
expected: [][2]int{{0, 2}, {3, 6}},
},
{
name: "Complex AD structures",
input: []byte{0x02, 0x01, 0x06, 0x1A, 0xFF, 0x4C, 0x00, 0x02, 0x15, 0xE2, 0xC5, 0x6D, 0xB5, 0xDF, 0xFB, 0x48, 0xD2, 0xB0, 0x60, 0xD0, 0xF5, 0xA7, 0x10, 0x96, 0xE0, 0x00, 0x00, 0x00, 0x00, 0xC5},
expected: [][2]int{{0, 2}, {2, 28}},
},
{
name: "Zero length AD structure",
input: []byte{0x00, 0x01, 0x06, 0x03, 0x02, 0x01, 0x02},
expected: [][2]int{{2, 5}},
},
{
name: "AD structure exceeding bounds",
input: []byte{0x05, 0x01, 0x06},
expected: [][2]int{},
},
{
name: "Incomplete AD structure",
input: []byte{0x03, 0x01},
expected: [][2]int{},
},
{
name: "Valid then invalid structure",
input: []byte{0x02, 0x01, 0x06, 0xFF, 0x01, 0x06},
expected: [][2]int{{0, 2}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ParseADFast(tt.input)
if len(result) != len(tt.expected) {
t.Errorf("ParseADFast() length = %v, expected %v", len(result), len(tt.expected))
return
}

for i, r := range result {
if r[0] != tt.expected[i][0] || r[1] != tt.expected[i][1] {
t.Errorf("ParseADFast()[%d] = %v, expected %v", i, r, tt.expected[i])
}
}
})
}
}

func TestRemoveFlagBytes(t *testing.T) {
tests := []struct {
name string
input []byte
expected []byte
}{
{
name: "Empty input",
input: []byte{},
expected: []byte{},
},
{
name: "Single byte input",
input: []byte{0x01},
expected: []byte{0x01},
},
{
name: "No flag bytes",
input: []byte{0x02, 0x01, 0x06, 0x03, 0x02, 0x01},
expected: []byte{0x02, 0x01, 0x06, 0x03, 0x02, 0x01},
},
{
name: "With flag bytes",
input: []byte{0x02, 0x01, 0x06, 0x1A, 0xFF, 0x4C, 0x00, 0x02},
expected: []byte{0x1A, 0xFF, 0x4C, 0x00, 0x02},
},
{
name: "Flag type is 0x01",
input: []byte{0x02, 0x01, 0x06, 0x05, 0x01, 0x02, 0x03, 0x04},
expected: []byte{0x05, 0x01, 0x02, 0x03, 0x04},
},
{
name: "Flag type is not 0x01",
input: []byte{0x02, 0x02, 0x06, 0x05, 0x01, 0x02, 0x03, 0x04},
expected: []byte{0x02, 0x02, 0x06, 0x05, 0x01, 0x02, 0x03, 0x04},
},
{
name: "Length exceeds bounds",
input: []byte{0xFF, 0x01, 0x06},
expected: []byte{0xFF, 0x01, 0x06},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := RemoveFlagBytes(tt.input)
if len(result) != len(tt.expected) {
t.Errorf("RemoveFlagBytes() length = %v, expected %v", len(result), len(tt.expected))
return
}

for i, b := range result {
if b != tt.expected[i] {
t.Errorf("RemoveFlagBytes()[%d] = %v, expected %v", i, b, tt.expected[i])
}
}
})
}
}

func TestIsValidADStructure(t *testing.T) {
tests := []struct {
name string
data []byte
expected bool
}{
{
name: "Empty data",
data: []byte{},
expected: false,
},
{
name: "Single byte",
data: []byte{0x01},
expected: false,
},
{
name: "Valid minimal structure",
data: []byte{0x01, 0x01},
expected: true,
},
{
name: "Valid structure",
data: []byte{0x02, 0x01, 0x06},
expected: true,
},
{
name: "Zero length",
data: []byte{0x00, 0x01, 0x06},
expected: false,
},
{
name: "Length exceeds data",
data: []byte{0x05, 0x01, 0x06},
expected: false,
},
{
name: "Length exactly matches",
data: []byte{0x02, 0x01, 0x06},
expected: true,
},
{
name: "Large valid structure",
data: []byte{0x1F, 0xFF, 0x4C, 0x00, 0x02, 0x15, 0xE2, 0xC5, 0x6D, 0xB5, 0xDF, 0xFB, 0x48, 0xD2, 0xB0, 0x60, 0xD0, 0xF5, 0xA7, 0x10, 0x96, 0xE0, 0x00, 0x00, 0x00, 0x00, 0xC5, 0x01, 0x02, 0x03, 0x04, 0x05},
expected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isValidADStructure(tt.data)
if result != tt.expected {
t.Errorf("isValidADStructure() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestCheckIngics(t *testing.T) {
tests := []struct {
name string
ad []byte
expected bool
}{
{
name: "Valid Ingics beacon",
ad: []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0x12, 0x34, 0x01},
expected: true,
},
{
name: "Invalid - too short",
ad: []byte{0x05, 0xFF, 0x59, 0x00},
expected: false,
},
{
name: "Invalid - wrong manufacturer ID",
ad: []byte{0x08, 0xFF, 0x59, 0x01, 0x80, 0xBC, 0x12, 0x34, 0x01},
expected: false,
},
{
name: "Invalid - wrong type",
ad: []byte{0x08, 0xFE, 0x59, 0x00, 0x80, 0xBC, 0x12, 0x34, 0x01},
expected: false,
},
{
name: "Valid with minimum length",
ad: []byte{0x06, 0xFF, 0x59, 0x00, 0x80, 0xBC},
expected: true,
},
{
name: "Empty data",
ad: []byte{},
expected: false,
},
{
name: "Partial match only",
ad: []byte{0x06, 0xFF, 0x59, 0x00, 0x80},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := checkIngics(tt.ad)
if result != tt.expected {
t.Errorf("checkIngics() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestParseIngicsState(t *testing.T) {
tests := []struct {
name string
ad []byte
expected model.BeaconEvent
}{
{
name: "Valid Ingics data",
ad: []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0x34, 0x12, 0x05},
expected: model.BeaconEvent{
Battery: 0x1234, // 4660 in little endian
Event: 0x05,
Type: "Ingics",
},
},
{
name: "Zero battery",
ad: []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0x00, 0x00, 0x00},
expected: model.BeaconEvent{
Battery: 0,
Event: 0,
Type: "Ingics",
},
},
{
name: "Max battery value",
ad: []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0xFF, 0xFF, 0xFF},
expected: model.BeaconEvent{
Battery: 0xFFFF,
Event: 0xFF,
Type: "Ingics",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := parseIngicsState(tt.ad)
if result.Battery != tt.expected.Battery {
t.Errorf("parseIngicsState() Battery = %v, expected %v", result.Battery, tt.expected.Battery)
}
if result.Event != tt.expected.Event {
t.Errorf("parseIngicsState() Event = %v, expected %v", result.Event, tt.expected.Event)
}
if result.Type != tt.expected.Type {
t.Errorf("parseIngicsState() Type = %v, expected %v", result.Type, tt.expected.Type)
}
})
}
}

func TestCheckEddystoneTLM(t *testing.T) {
tests := []struct {
name string
ad []byte
expected bool
}{
{
name: "Valid Eddystone TLM",
ad: []byte{0x12, 0x16, 0xAA, 0xFE, 0x20, 0x00, 0x01, 0x02, 0x03, 0x04},
expected: true,
},
{
name: "Invalid - too short",
ad: []byte{0x03, 0x16, 0xAA},
expected: false,
},
{
name: "Invalid - wrong type",
ad: []byte{0x12, 0x15, 0xAA, 0xFE, 0x20, 0x00, 0x01, 0x02, 0x03, 0x04},
expected: false,
},
{
name: "Invalid - wrong company ID",
ad: []byte{0x12, 0x16, 0xAA, 0xFF, 0x20, 0x00, 0x01, 0x02, 0x03, 0x04},
expected: false,
},
{
name: "Invalid - wrong TLM type",
ad: []byte{0x12, 0x16, 0xAA, 0xFE, 0x21, 0x00, 0x01, 0x02, 0x03, 0x04},
expected: false,
},
{
name: "Valid with minimum length",
ad: []byte{0x04, 0x16, 0xAA, 0xFE, 0x20},
expected: true,
},
{
name: "Empty data",
ad: []byte{},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := checkEddystoneTLM(tt.ad)
if result != tt.expected {
t.Errorf("checkEddystoneTLM() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestParseEddystoneState(t *testing.T) {
tests := []struct {
name string
ad []byte
expected model.BeaconEvent
}{
{
name: "Valid Eddystone TLM data",
ad: []byte{0x12, 0x16, 0xAA, 0xFE, 0x20, 0x34, 0x12, 0x78, 0x56, 0x00},
expected: model.BeaconEvent{
Battery: 0x1234, // 4660 in big endian (note: different from Ingics)
Type: "Eddystone",
},
},
{
name: "Zero battery",
ad: []byte{0x12, 0x16, 0xAA, 0xFE, 0x20, 0x00, 0x00, 0x78, 0x56, 0x00},
expected: model.BeaconEvent{
Battery: 0,
Type: "Eddystone",
},
},
{
name: "Max battery value",
ad: []byte{0x12, 0x16, 0xAA, 0xFE, 0x20, 0xFF, 0xFF, 0x78, 0x56, 0x00},
expected: model.BeaconEvent{
Battery: 0xFFFF,
Type: "Eddystone",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := parseEddystoneState(tt.ad)
if result.Battery != tt.expected.Battery {
t.Errorf("parseEddystoneState() Battery = %v, expected %v", result.Battery, tt.expected.Battery)
}
if result.Type != tt.expected.Type {
t.Errorf("parseEddystoneState() Type = %v, expected %v", result.Type, tt.expected.Type)
}
})
}
}

func TestCheckMinewB7(t *testing.T) {
tests := []struct {
name string
ad []byte
expected bool
}{
{
name: "Valid Minew B7",
ad: []byte{0x08, 0x16, 0xE1, 0xFF, 0x01, 0x02, 0x03, 0x04},
expected: true,
},
{
name: "Invalid - too short",
ad: []byte{0x03, 0x16, 0xE1},
expected: false,
},
{
name: "Invalid - wrong type",
ad: []byte{0x08, 0x15, 0xE1, 0xFF, 0x01, 0x02, 0x03, 0x04},
expected: false,
},
{
name: "Invalid - wrong company ID",
ad: []byte{0x08, 0x16, 0xE1, 0xFE, 0x01, 0x02, 0x03, 0x04},
expected: false,
},
{
name: "Valid with minimum length",
ad: []byte{0x04, 0x16, 0xE1, 0xFF},
expected: true,
},
{
name: "Empty data",
ad: []byte{},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := checkMinewB7(tt.ad)
if result != tt.expected {
t.Errorf("checkMinewB7() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestLoopADStructures(t *testing.T) {
tests := []struct {
name string
data []byte
ranges [][2]int
id string
expected model.BeaconEvent
}{
{
name: "Ingics beacon found",
data: []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0x34, 0x12, 0x05, 0x02, 0x01, 0x06},
ranges: [][2]int{{0, 8}, {8, 11}},
id: "test-beacon",
expected: model.BeaconEvent{
ID: "test-beacon",
Name: "test-beacon",
Battery: 0x1234,
Event: 0x05,
Type: "Ingics",
},
},
{
name: "Eddystone beacon found",
data: []byte{0x02, 0x01, 0x06, 0x12, 0x16, 0xAA, 0xFE, 0x20, 0x34, 0x12, 0x78, 0x56},
ranges: [][2]int{{0, 2}, {2, 14}},
id: "eddystone-test",
expected: model.BeaconEvent{
ID: "eddystone-test",
Name: "eddystone-test",
Battery: 0x1234,
Type: "Eddystone",
},
},
{
name: "Minew B7 beacon found",
data: []byte{0x08, 0x16, 0xE1, 0xFF, 0x01, 0x02, 0x03, 0x04, 0x02, 0x01, 0x06},
ranges: [][2]int{{0, 8}, {8, 11}},
id: "minew-test",
expected: model.BeaconEvent{
ID: "minew-test",
Name: "minew-test",
Type: "", // Minew B7 returns empty BeaconEvent
},
},
{
name: "No matching beacon type",
data: []byte{0x02, 0x01, 0x06, 0x03, 0x02, 0x01, 0x02},
ranges: [][2]int{{0, 2}, {2, 5}},
id: "unknown-test",
expected: model.BeaconEvent{},
},
{
name: "Invalid AD structure",
data: []byte{0x02, 0x01, 0x06, 0xFF, 0x01, 0x06},
ranges: [][2]int{{0, 2}, {2, 4}},
id: "invalid-test",
expected: model.BeaconEvent{},
},
{
name: "Empty data",
data: []byte{},
ranges: [][2]int{},
id: "empty-test",
expected: model.BeaconEvent{
ID: "empty-test",
Name: "empty-test",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := LoopADStructures(tt.data, tt.ranges, tt.id)

if result.ID != tt.expected.ID {
t.Errorf("LoopADStructures() ID = %v, expected %v", result.ID, tt.expected.ID)
}
if result.Name != tt.expected.Name {
t.Errorf("LoopADStructures() Name = %v, expected %v", result.Name, tt.expected.Name)
}
if result.Type != tt.expected.Type {
t.Errorf("LoopADStructures() Type = %v, expected %v", result.Type, tt.expected.Type)
}
if result.Battery != tt.expected.Battery {
t.Errorf("LoopADStructures() Battery = %v, expected %v", result.Battery, tt.expected.Battery)
}
})
}
}

func TestLoopADStructuresPriority(t *testing.T) {
// Test that Ingics is checked first
data := []byte{0x08, 0xFF, 0x59, 0x00, 0x80, 0xBC, 0x34, 0x12, 0x05, 0x12, 0x16, 0xAA, 0xFE, 0x20, 0x78, 0x56}
ranges := [][2]int{{0, 8}, {8, 15}}

result := LoopADStructures(data, ranges, "priority-test")

// Should detect Ingics first, not Eddystone
if result.Type != "Ingics" {
t.Errorf("LoopADStructures() Type = %v, expected Ingics (priority test)", result.Type)
}
}

// Benchmark tests
func BenchmarkParseADFast(b *testing.B) {
data := []byte{0x02, 0x01, 0x06, 0x1A, 0xFF, 0x4C, 0x00, 0x02, 0x15, 0xE2, 0xC5, 0x6D, 0xB5, 0xDF, 0xFB, 0x48, 0xD2, 0xB0, 0x60, 0xD0, 0xF5, 0xA7, 0x10, 0x96, 0xE0, 0x00, 0x00, 0x00, 0x00, 0xC5}

for i := 0; i < b.N; i++ {
ParseADFast(data)
}
}

func BenchmarkRemoveFlagBytes(b *testing.B) {
data := []byte{0x02, 0x01, 0x06, 0x1A, 0xFF, 0x4C, 0x00, 0x02, 0x15, 0xE2, 0xC5}

for i := 0; i < b.N; i++ {
RemoveFlagBytes(data)
}
}

+ 294
- 0
test/distance_test.go Datei anzeigen

@@ -0,0 +1,294 @@
package test

import (
"testing"

"github.com/AFASystems/presence/internal/pkg/common/utils"
"github.com/AFASystems/presence/internal/pkg/model"
)

func TestCalculateDistance(t *testing.T) {
tests := []struct {
name string
adv model.BeaconAdvertisement
expected float64
}{
{
name: "Strong signal - close distance",
adv: model.BeaconAdvertisement{
RSSI: -30,
TXPower: "59", // 89 in decimal
},
expected: 0.89976, // Close to minimum
},
{
name: "Medium signal",
adv: model.BeaconAdvertisement{
RSSI: -65,
TXPower: "59",
},
expected: 1.5, // Medium distance
},
{
name: "Weak signal - far distance",
adv: model.BeaconAdvertisement{
RSSI: -95,
TXPower: "59",
},
expected: 8.0, // Far distance
},
{
name: "Equal RSSI and TX power",
adv: model.BeaconAdvertisement{
RSSI: -59,
TXPower: "59",
},
expected: 1.0, // Ratio = 1.0
},
{
name: "Very strong signal",
adv: model.BeaconAdvertisement{
RSSI: -10,
TXPower: "59",
},
expected: 0.89976, // Minimum distance
},
{
name: "Negative TX power (two's complement)",
adv: model.BeaconAdvertisement{
RSSI: -70,
TXPower: "C6", // -58 in decimal
},
expected: 1.2, // Medium distance
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := utils.CalculateDistance(tt.adv)
// Allow for small floating point differences
if result < tt.expected*0.9 || result > tt.expected*1.1 {
t.Errorf("CalculateDistance() = %v, expected around %v", result, tt.expected)
}
})
}
}

func TestCalculateDistanceEdgeCases(t *testing.T) {
tests := []struct {
name string
adv model.BeaconAdvertisement
expected float64
}{
{
name: "Zero RSSI",
adv: model.BeaconAdvertisement{
RSSI: 0,
TXPower: "59",
},
expected: 0.0,
},
{
name: "Invalid TX power",
adv: model.BeaconAdvertisement{
RSSI: -50,
TXPower: "XYZ",
},
expected: 0.0, // twosComp returns 0 for invalid input
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := utils.CalculateDistance(tt.adv)
if result != tt.expected {
t.Errorf("CalculateDistance() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestValidateRSSI(t *testing.T) {
tests := []struct {
name string
rssi int64
expected bool
}{
{
name: "Valid RSSI - strong signal",
rssi: -30,
expected: true,
},
{
name: "Valid RSSI - weak signal",
rssi: -100,
expected: true,
},
{
name: "Valid RSSI - boundary low",
rssi: -120,
expected: true,
},
{
name: "Valid RSSI - boundary high",
rssi: 0,
expected: true,
},
{
name: "Invalid RSSI - too strong",
rssi: 10,
expected: false,
},
{
name: "Invalid RSSI - too weak",
rssi: -130,
expected: false,
},
{
name: "Invalid RSSI - just below boundary",
rssi: -121,
expected: false,
},
{
name: "Invalid RSSI - just above boundary",
rssi: 1,
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := utils.ValidateRSSI(tt.rssi)
if result != tt.expected {
t.Errorf("ValidateRSSI() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestValidateTXPower(t *testing.T) {
tests := []struct {
name string
txPower string
expected bool
}{
{
name: "Valid TX power - positive",
txPower: "59",
expected: true,
},
{
name: "Valid TX power - negative",
txPower: "C6",
expected: true,
},
{
name: "Valid TX power - zero",
txPower: "00",
expected: true,
},
{
name: "Valid TX power - max positive",
txPower: "7F",
expected: true,
},
{
name: "Valid TX power - max negative",
txPower: "80",
expected: true,
},
{
name: "Valid TX power - boundary negative",
txPower: "81", // -127
expected: true,
},
{
name: "Invalid TX power string",
txPower: "XYZ",
expected: true, // twosComp returns 0, which is valid
},
{
name: "Empty TX power",
txPower: "",
expected: true, // twosComp returns 0, which is valid
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := utils.ValidateTXPower(tt.txPower)
if result != tt.expected {
t.Errorf("ValidateTXPower() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestCalculateDistanceConsistency(t *testing.T) {
// Test that the function is deterministic
adv := model.BeaconAdvertisement{
RSSI: -65,
TXPower: "59",
}

result1 := utils.CalculateDistance(adv)
result2 := utils.CalculateDistance(adv)

if result1 != result2 {
t.Errorf("CalculateDistance() is not deterministic: %v != %v", result1, result2)
}
}

func TestCalculateDistanceRealWorldScenarios(t *testing.T) {
scenarios := []struct {
name string
rssi int64
txPower string
expectedRange [2]float64 // min, max expected range
}{
{
name: "Beacon very close (1m)",
rssi: -45,
txPower: "59", // 89 decimal
expectedRange: [2]float64{0.5, 1.5},
},
{
name: "Beacon at medium distance (5m)",
rssi: -75,
txPower: "59",
expectedRange: [2]float64{3.0, 8.0},
},
{
name: "Beacon far away (15m)",
rssi: -95,
txPower: "59",
expectedRange: [2]float64{10.0, 25.0},
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
adv := model.BeaconAdvertisement{
RSSI: scenario.rssi,
TXPower: scenario.txPower,
}
result := utils.CalculateDistance(adv)

if result < scenario.expectedRange[0] || result > scenario.expectedRange[1] {
t.Errorf("CalculateDistance() = %v, expected range %v", result, scenario.expectedRange)
}
})
}
}

// Benchmark tests
func BenchmarkCalculateDistance(b *testing.B) {
adv := model.BeaconAdvertisement{
RSSI: -65,
TXPower: "59",
}

for i := 0; i < b.N; i++ {
utils.CalculateDistance(adv)
}
}

+ 0
- 160
test/httpserver_test/httpserver_test.go Datei anzeigen

@@ -1,160 +0,0 @@
package httpservertest_test

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"

"github.com/AFASystems/presence/internal/pkg/httpserver"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/boltdb/bolt"
"github.com/gorilla/mux"
)

// Functions beaconsAddHandler, beaconsListHandler, beaconsDeleteHandler
func TestBeaconCRUD(t *testing.T) {
tmpfile, _ := os.CreateTemp("", "testdb-*.db")
defer os.Remove(tmpfile.Name())

db, err := bolt.Open(tmpfile.Name(), 0600, nil)
if err != nil {
t.Fatal(err)
}
model.Db = db

ctx := model.AppContext{
Beacons: model.BeaconsList{
Beacons: make(map[string]model.Beacon),
Lock: sync.RWMutex{},
},
ButtonsList: make(map[string]model.Button),
}

b := model.Beacon{Name: "B1", Beacon_id: "1"}
body, err := json.Marshal(b)

if err != nil {
t.Fatal(err)
}

req := httptest.NewRequest("POST", "/api/beacons", bytes.NewReader(body))
w := httptest.NewRecorder()

httpserver.BeaconsAddHandler(&ctx.Beacons)(w, req)

if w.Code != http.StatusOK {
t.Fatalf("create failed: %d", w.Code)
}

fmt.Println("--------------------------------------------------------------")

req = httptest.NewRequest("GET", "/api/beacons", nil)
w = httptest.NewRecorder()

httpserver.BeaconsListHandler(&ctx.Beacons)(w, req)

fmt.Println("Status:", w.Code)
fmt.Println("Body:", w.Body.String())

fmt.Println("--------------------------------------------------------------")

newB := model.Beacon{Name: "B2", Beacon_id: "2"}
newBody, err := json.Marshal(newB)

if err != nil {
t.Fatal(err)
}

req = httptest.NewRequest("PUT", "/api/beacons", bytes.NewReader(newBody))
w = httptest.NewRecorder()

httpserver.BeaconsAddHandler(&ctx.Beacons)(w, req)

if w.Code != http.StatusOK {
t.Fatalf("create failed: %d", w.Code)
}

req = httptest.NewRequest("GET", "/api/beacons", nil)
w = httptest.NewRecorder()

httpserver.BeaconsListHandler(&ctx.Beacons)(w, req)

fmt.Println("Status:", w.Code)
fmt.Println("Body:", w.Body.String())

fmt.Println("--------------------------------------------------------------")

req = httptest.NewRequest("DELETE", "/api/beacons/1", nil)
req = mux.SetURLVars(req, map[string]string{"beacon_id": "1"})

w = httptest.NewRecorder()

httpserver.BeaconsDeleteHandler(&ctx.Beacons, ctx.ButtonsList)(w, req)

fmt.Println("Status: ", w.Code)

fmt.Println("--------------------------------------------------------------")

req = httptest.NewRequest("GET", "/api/beacons", nil)
w = httptest.NewRecorder()

httpserver.BeaconsListHandler(&ctx.Beacons)(w, req)

fmt.Println("Status:", w.Code)
fmt.Println("Body:", w.Body.String())

fmt.Println("--------------------------------------------------------------")
}

func TestSettingsCRUD(t *testing.T) {
tmpfile, _ := os.CreateTemp("", "testdb-*.db")
defer os.Remove(tmpfile.Name())

db, err := bolt.Open(tmpfile.Name(), 0600, nil)
if err != nil {
t.Fatal(err)
}
model.Db = db

ctx := model.AppContext{
Settings: model.Settings{},
}

settings := model.Settings{
Location_confidence: 10,
Last_seen_threshold: 10,
Beacon_metrics_size: 10,
HA_send_interval: 10,
HA_send_changes_only: true,
}

body, err := json.Marshal(settings)
if err != nil {
t.Fatal(err)
}

req := httptest.NewRequest("POST", "/api/settings", bytes.NewReader(body))
w := httptest.NewRecorder()

httpserver.SettingsEditHandler(&ctx.Settings)(w, req)

fmt.Println("status: ", w.Code)
if w.Code != http.StatusOK {
t.Fatalf("create failed: %d", w.Code)
}

fmt.Println("--------------------------------------------------------------")

req = httptest.NewRequest("GET", "/api/settings", nil)
w = httptest.NewRecorder()

httpserver.SettingsListHandler(&ctx.Settings)(w, req)

fmt.Println("Status:", w.Code)
fmt.Println("Body:", w.Body.String())
}

+ 0
- 46
test/mqtt_test/mqtt_test.go Datei anzeigen

@@ -1,46 +0,0 @@
package mqtt_test

import (
"os"
"testing"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/AFASystems/presence/internal/pkg/mqttclient"
"github.com/AFASystems/presence/internal/pkg/persistence"
"github.com/boltdb/bolt"
)

func TestIncomingMQTTProcessor(t *testing.T) {
ctx := &model.AppContext{
Beacons: model.BeaconsList{Beacons: make(map[string]model.Beacon)},
Settings: model.Settings{
Last_seen_threshold: 10,
Location_confidence: 3,
},
}

tmpfile, _ := os.CreateTemp("", "testdb-*.db")
defer os.Remove(tmpfile.Name())

db, err := bolt.Open(tmpfile.Name(), 0600, nil)
if err != nil {
t.Fatal(err)
}
model.Db = db

persistence.LoadState(model.Db, ctx)

ch := mqttclient.IncomingMQTTProcessor(20*time.Millisecond, nil, model.Db, ctx)
msg := model.Incoming_json{MAC: "15:02:31", Hostname: "testHost", RSSI: -55}
ch <- msg

time.Sleep(100 * time.Millisecond)

ctx.Beacons.Lock.RLock()
defer ctx.Beacons.Lock.RUnlock()

if len(ctx.LatestList.LatestList) == 0 {
t.Fatal("latest list map to update")
}
}

+ 568
- 0
test/mqtthandler_test.go Datei anzeigen

@@ -0,0 +1,568 @@
package mqtthandler

import (
"context"
"encoding/json"
"testing"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// MockKafkaWriter implements a mock for kafka.Writer interface
type MockKafkaWriter struct {
Messages []kafka.Message
ShouldFail bool
WriteCount int
}

func (m *MockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
m.WriteCount++
if m.ShouldFail {
return &kafka.Error{
Err: &ErrMockWrite{},
Cause: nil,
Context: msgs[0],
}
}
m.Messages = append(m.Messages, msgs...)
return nil
}

// ErrMockWrite is a mock error for testing
type ErrMockWrite struct{}

func (e *ErrMockWrite) Error() string {
return "mock write error"
}

// Mock Kafka Close method (required for Writer interface)
func (m *MockKafkaWriter) Close() error {
return nil
}

func TestMqttHandlerJSONArrayInput(t *testing.T) {
tests := []struct {
name string
topicName []byte
message []byte
expectedMsgs int
shouldFail bool
}{
{
name: "Valid JSON array with multiple readings",
topicName: []byte("presence/gateway-001"),
message: []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"}]`),
expectedMsgs: 1,
shouldFail: false,
},
{
name: "JSON array with multiple beacons",
topicName: []byte("presence/gateway-002"),
message: []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"},{"timestamp":"2023-01-01T00:00:01Z","type":"Beacon","mac":"11:22:33:44:55:66","rssi":-75,"rawData":"0201060303E1FF1200005678"}]`),
expectedMsgs: 2,
shouldFail: false,
},
{
name: "JSON array with gateway reading (should be skipped)",
topicName: []byte("presence/gateway-003"),
message: []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Gateway","mac":"GG:AA:TT:EE:WA:Y","rssi":-20,"rawData":"gateway-data"},{"timestamp":"2023-01-01T00:00:01Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"}]`),
expectedMsgs: 1, // Only beacon should be processed
shouldFail: false,
},
{
name: "JSON array with only gateways (should be skipped)",
topicName: []byte("presence/gateway-004"),
message: []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Gateway","mac":"GG:AA:TT:EE:WA:Y","rssi":-20,"rawData":"gateway-data"}]`),
expectedMsgs: 0, // All gateways should be skipped
shouldFail: false,
},
{
name: "Invalid JSON array",
topicName: []byte("presence/gateway-005"),
message: []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"`),
expectedMsgs: 0,
shouldFail: false, // Should not panic, just log error
},
{
name: "Empty JSON array",
topicName: []byte("presence/gateway-006"),
message: []byte(`[]`),
expectedMsgs: 0,
shouldFail: false,
},
{
name: "JSON array with null readings",
topicName: []byte("presence/gateway-007"),
message: []byte(`[null]`),
expectedMsgs: 0,
shouldFail: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
ShouldFail: tt.shouldFail,
}

// Capture log output (you might want to use a test logger here)
MqttHandler(mockWriter, tt.topicName, tt.message)

if len(mockWriter.Messages) != tt.expectedMsgs {
t.Errorf("MqttHandler() wrote %d messages, expected %d", len(mockWriter.Messages), tt.expectedMsgs)
}

// Verify message content if we expected messages
if tt.expectedMsgs > 0 && len(mockWriter.Messages) > 0 {
for i, msg := range mockWriter.Messages {
var adv model.BeaconAdvertisement
err := json.Unmarshal(msg.Value, &adv)
if err != nil {
t.Errorf("MqttHandler() message %d is not valid BeaconAdvertisement JSON: %v", i, err)
}

// Verify hostname extraction
expectedHostname := "gateway-007" // Extracted from topicName
if adv.Hostname != expectedHostname {
t.Errorf("MqttHandler() hostname = %v, expected %v", adv.Hostname, expectedHostname)
}
}
}
})
}
}

func TestMqttHandlerCSVInput(t *testing.T) {
tests := []struct {
name string
topicName []byte
message []byte
shouldProcess bool
}{
{
name: "Valid CSV format",
topicName: []byte("presence/gateway-001"),
message: []byte("timestamp,AA:BB:CC:DD:EE:FF,-65,0201060303E1FF1200001234,1001,field6\n"),
shouldProcess: true,
},
{
name: "CSV with button data",
topicName: []byte("presence/gateway-002"),
message: []byte("timestamp,AA:BB:CC:DD:EE:FF,-65,02010612FF5901C0012345678,1234,field6\n"),
shouldProcess: true,
},
{
name: "CSV with insufficient fields",
topicName: []byte("presence/gateway-003"),
message: []byte("timestamp,AA:BB:CC:DD:EE:FF,-65,0201060303E1FF1200001234\n"),
shouldProcess: false, // Should log error and return early
},
{
name: "Empty CSV",
topicName: []byte("presence/gateway-004"),
message: []byte(""),
shouldProcess: false,
},
{
name: "CSV with wrong field count",
topicName: []byte("presence/gateway-005"),
message: []byte("field1,field2,field3\n"),
shouldProcess: false,
},
{
name: "CSV with non-numeric RSSI",
topicName: []byte("presence/gateway-006"),
message: []byte("timestamp,AA:BB:CC:DD:EE:FF,invalid,0201060303E1FF1200001234,1001,field6\n"),
shouldProcess: false, // Should fail on ParseInt
},
{
name: "CSV with non-numeric field6",
topicName: []byte("presence/gateway-007"),
message: []byte("timestamp,AA:BB:CC:DD:EE:FF,-65,0201060303E1FF1200001234,1001,invalid\n"),
shouldProcess: false, // Should fail on Atoi
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
ShouldFail: false,
}

// Note: The CSV handler in the original code has an os.Exit(2) which makes it untestable
// This test will fail due to os.Exit, but demonstrates the intended test structure
// In a real scenario, you'd want to refactor the code to avoid os.Exit
defer func() {
if r := recover(); r != nil {
// Expected due to os.Exit in original code
if tt.shouldProcess {
t.Errorf("MqttHandler() should not panic for valid CSV input: %v", r)
}
}
}()

// This will panic due to os.Exit(2) in the original code when field6 is invalid
// In a real refactor, you'd replace os.Exit with error return
if !tt.shouldProcess && string(tt.message) == "timestamp,AA:BB:CC:DD:EE:FF,-65,0201060303E1FF1200001234,1001,invalid\n" {
// Skip the case that will definitely panic
return
}

MqttHandler(mockWriter, tt.topicName, tt.message)

// CSV processing doesn't write to Kafka in the current implementation
if len(mockWriter.Messages) != 0 {
t.Errorf("MqttHandler() CSV processing should not write to Kafka, but wrote %d messages", len(mockWriter.Messages))
}
})
}
}

func TestParseButtonState(t *testing.T) {
tests := []struct {
name string
raw string
expected int64
}{
{
name: "Ingics button format - minimal length",
raw: "0201060303E1FF12",
expected: 0, // Too short for button field
},
{
name: "Ingics button format - exact length",
raw: "0201060303E1FF123456",
expected: 0x3456, // 13398 in decimal
},
{
name: "Ingics button format - longer",
raw: "0201060303E1FF12000012345678AB",
expected: 0x78AB, // 30891 in decimal
},
{
name: "Ingics button format - zero button",
raw: "0201060303E1FF1200000000",
expected: 0,
},
{
name: "Ingics button format - max button",
raw: "0201060303E1FF12FFFFFFFF",
expected: 0xFFFF, // 65535 in decimal
},
{
name: "Minew button format - minimal length",
raw: "02010612FF590",
expected: 0, // Too short for counter field
},
{
name: "Minew button format - exact length",
raw: "02010612FF590112",
expected: 0x12, // 18 in decimal
},
{
name: "Minew button format - longer",
raw: "02010612FF5901C0012345678",
expected: 0x78, // 120 in decimal
},
{
name: "Minew button format - zero counter",
raw: "02010612FF5901C000",
expected: 0,
},
{
name: "Minew button format - max counter",
raw: "02010612FF5901C0FF",
expected: 0xFF, // 255 in decimal
},
{
name: "Invalid prefix",
raw: "0201060303E1FE120000123456",
expected: 0,
},
{
name: "Invalid hex characters",
raw: "0201060303E1FF12ZZZZ",
expected: 0,
},
{
name: "Empty string",
raw: "",
expected: 0,
},
{
name: "Single character",
raw: "0",
expected: 0,
},
{
name: "Non-hex characters mixed",
raw: "0201060303E1FF12GHIJ",
expected: 0,
},
{
name: "Lowercase hex",
raw: "0201060303e1ff120000123456",
expected: 0, // Should be converted to uppercase
},
{
name: "Mixed case hex",
raw: "0201060303e1FF120000123456",
expected: 0x3456, // Should work after case conversion
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := parseButtonState(tt.raw)
if result != tt.expected {
t.Errorf("parseButtonState() = %v, expected %v", result, tt.expected)
}
})
}
}

func TestParseButtonStateEdgeCases(t *testing.T) {
// Test that Ingics format is checked before Minew format
ingicsRaw := "0201060303E1FF123456"
minewRaw := "02010612FF590112"

ingicsResult := parseButtonState(ingicsRaw)
minewResult := parseButtonState(minewRaw)

// Both should work, but Ingics should use bytes 34:38, Minew should use bytes 22:24
if ingicsResult != 0x3456 {
t.Errorf("parseButtonState() Ingics format failed: got %v, want %v", ingicsResult, 0x3456)
}

if minewResult != 0x12 {
t.Errorf("parseButtonState() Minew format failed: got %v, want %v", minewResult, 0x12)
}

// Test with overlapping patterns (unlikely but good to test)
overlapRaw := "0201060303E1FF122FF590112"
overlapResult := parseButtonState(overlapRaw)
// Should match Ingics pattern and use bytes 34:38
expectedOverlap := int64(0) // There are no bytes 34:38 in this string
if overlapResult != expectedOverlap {
t.Errorf("parseButtonState() overlap case: got %v, want %v", overlapResult, expectedOverlap)
}
}

func TestHostnameExtraction(t *testing.T) {
tests := []struct {
name string
topicName []byte
expectedHost string
}{
{
name: "Simple topic",
topicName: []byte("presence/gateway-001"),
expectedHost: "gateway-001",
},
{
name: "Topic with multiple segments",
topicName: []byte("home/office/floor3/gateway-A123"),
expectedHost: "home",
},
{
name: "Topic with numbers only",
topicName: []byte("12345"),
expectedHost: "12345",
},
{
name: "Single segment topic",
topicName: []byte("singlegateway"),
expectedHost: "singlegateway",
},
{
name: "Topic with empty segments",
topicName: []byte("//gateway//001//"),
expectedHost: "", // First non-empty segment after split
},
{
name: "Empty topic",
topicName: []byte(""),
expectedHost: "",
},
{
name: "Topic with special characters",
topicName: []byte("presence/gateway-with-dashes_and_underscores"),
expectedHost: "presence",
},
{
name: "Topic starting with slash",
topicName: []byte("/presence/gateway-001"),
expectedHost: "", // First segment is empty
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
}

// Create a simple JSON message that will be processed
message := []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"}]`)

MqttHandler(mockWriter, tt.topicName, message)

if len(mockWriter.Messages) > 0 {
var adv model.BeaconAdvertisement
err := json.Unmarshal(mockWriter.Messages[0].Value, &adv)
if err != nil {
t.Errorf("Failed to unmarshal Kafka message: %v", err)
return
}

if adv.Hostname != tt.expectedHost {
t.Errorf("Hostname extraction = %v, expected %v", adv.Hostname, tt.expectedHost)
}
}
})
}
}

func TestKafkaWriteFailure(t *testing.T) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
ShouldFail: true,
}

topicName := []byte("presence/test-gateway")
message := []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"}]`)

// This should handle the write error gracefully (it sleeps for 1 second)
MqttHandler(mockWriter, topicName, message)

// No messages should have been written successfully
if len(mockWriter.Messages) != 0 {
t.Errorf("Expected 0 messages on write failure, got %d", len(mockWriter.Messages))
}

// Should have attempted to write
if mockWriter.WriteCount != 1 {
t.Errorf("Expected 1 write attempt, got %d", mockWriter.WriteCount)
}
}

func TestMessageMarshaling(t *testing.T) {
tests := []struct {
name string
reading model.RawReading
}{
{
name: "Standard beacon reading",
reading: model.RawReading{
Timestamp: "2023-01-01T00:00:00Z",
Type: "Beacon",
MAC: "AA:BB:CC:DD:EE:FF",
RSSI: -65,
RawData: "0201060303E1FF1200001234",
},
},
{
name: "Beacon with special characters in MAC",
reading: model.RawReading{
Timestamp: "2023-01-01T00:00:00Z",
Type: "Beacon",
MAC: "AA:BB:CC:DD:EE:FF",
RSSI: -75,
RawData: "02010612FF5901C0012345678",
},
},
{
name: "Beacon with extreme RSSI values",
reading: model.RawReading{
Timestamp: "2023-01-01T00:00:00Z",
Type: "Beacon",
MAC: "11:22:33:44:55:66",
RSSI: -120,
RawData: "0201060303E1FF120000ABCD",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
}

// Create JSON array with our test reading
readings := []model.RawReading{tt.reading}
message, err := json.Marshal(readings)
if err != nil {
t.Fatalf("Failed to marshal test reading: %v", err)
}

topicName := []byte("presence/test-gateway")
MqttHandler(mockWriter, topicName, message)

if len(mockWriter.Messages) != 1 {
t.Errorf("Expected 1 message, got %d", len(mockWriter.Messages))
return
}

// Verify the message can be unmarshaled back to BeaconAdvertisement
var adv model.BeaconAdvertisement
err = json.Unmarshal(mockWriter.Messages[0].Value, &adv)
if err != nil {
t.Errorf("Failed to unmarshal Kafka message: %v", err)
return
}

// Verify fields match the original reading
if adv.MAC != tt.reading.MAC {
t.Errorf("MAC mismatch: got %v, want %v", adv.MAC, tt.reading.MAC)
}
if adv.RSSI != int64(tt.reading.RSSI) {
t.Errorf("RSSI mismatch: got %v, want %v", adv.RSSI, tt.reading.RSSI)
}
if adv.Data != tt.reading.RawData {
t.Errorf("Data mismatch: got %v, want %v", adv.Data, tt.reading.RawData)
}
})
}
}

// Benchmark tests
func BenchmarkParseButtonState(b *testing.B) {
raw := "0201060303E1FF12000012345678AB"
for i := 0; i < b.N; i++ {
parseButtonState(raw)
}
}

func BenchmarkMqttHandlerJSON(b *testing.B) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
}

topicName := []byte("presence/benchmark-gateway")
message := []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"}]`)

b.ResetTimer()
for i := 0; i < b.N; i++ {
MqttHandler(mockWriter, topicName, message)
mockWriter.Messages = mockWriter.Messages[:0] // Reset messages
}
}

func BenchmarkMqttHandlerMultipleBeacons(b *testing.B) {
mockWriter := &MockKafkaWriter{
Messages: make([]kafka.Message, 0),
}

topicName := []byte("presence/benchmark-gateway")
message := []byte(`[{"timestamp":"2023-01-01T00:00:00Z","type":"Beacon","mac":"AA:BB:CC:DD:EE:FF","rssi":-65,"rawData":"0201060303E1FF1200001234"},{"timestamp":"2023-01-01T00:00:01Z","type":"Beacon","mac":"11:22:33:44:55:66","rssi":-75,"rawData":"02010612FF5901C0012345678"}]`)

b.ResetTimer()
for i := 0; i < b.N; i++ {
MqttHandler(mockWriter, topicName, message)
mockWriter.Messages = mockWriter.Messages[:0] // Reset messages
}
}

+ 0
- 900
test/node-red-integration-tests/apitest.json
Datei-Diff unterdrückt, da er zu groß ist
Datei anzeigen


+ 644
- 0
test/typeMethods_test.go Datei anzeigen

@@ -0,0 +1,644 @@
package model

import (
"testing"
)

func TestBeaconEventHash(t *testing.T) {
tests := []struct {
name string
be BeaconEvent
expected []byte
}{
{
name: "Basic beacon event",
be: BeaconEvent{
ID: "beacon-1",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1000,
Event: 1,
},
expected: nil, // We'll test that it produces a consistent hash
},
{
name: "Same beacon with different battery should produce same hash",
be: BeaconEvent{
ID: "beacon-1",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1009, // 1000 + 9, should round to 1000
Event: 1,
},
expected: nil,
},
{
name: "Different ID should produce different hash",
be: BeaconEvent{
ID: "beacon-2",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1000,
Event: 1,
},
expected: nil,
},
{
name: "Different event should produce different hash",
be: BeaconEvent{
ID: "beacon-1",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1000,
Event: 2,
},
expected: nil,
},
{
name: "Zero values",
be: BeaconEvent{
ID: "",
Name: "",
Type: "",
Battery: 0,
Event: 0,
},
expected: nil,
},
{
name: "Special characters",
be: BeaconEvent{
ID: "beacon!@#$%^&*()",
Name: "Test\nBeacon\tWith\tTabs",
Type: "Special-Type_123",
Battery: 1000,
Event: 1,
},
expected: nil,
},
}

// Test that Hash produces consistent results
hashes := make([][]byte, len(tests))

for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hashes[i] = tt.be.Hash()

// Hash should always be 32 bytes for SHA256
if len(hashes[i]) != 32 {
t.Errorf("Hash() length = %v, expected 32", len(hashes[i]))
}

// Hash should not be empty unless all fields are empty
if len(hashes[i]) == 0 && (tt.be.ID != "" || tt.be.Name != "" || tt.be.Type != "") {
t.Errorf("Hash() should not be empty for non-empty beacon event")
}
})
}

// Test that same input produces same hash
be1 := BeaconEvent{
ID: "test-beacon",
Name: "Test",
Type: "Ingics",
Battery: 1000,
Event: 1,
}

hash1 := be1.Hash()
hash2 := be1.Hash()

if string(hash1) != string(hash2) {
t.Errorf("Hash() should be deterministic: %v != %v", hash1, hash2)
}

// Test battery rounding
beBattery1 := BeaconEvent{
ID: "test-beacon",
Name: "Test",
Type: "Ingics",
Battery: 1005, // Should round to 1000
Event: 1,
}

beBattery2 := BeaconEvent{
ID: "test-beacon",
Name: "Test",
Type: "Ingics",
Battery: 1000,
Event: 1,
}

hashBattery1 := beBattery1.Hash()
hashBattery2 := beBattery2.Hash()

if string(hashBattery1) != string(hashBattery2) {
t.Errorf("Hash() with battery rounding should be same: %v != %v", hashBattery1, hashBattery2)
}
}

func TestBeaconEventToJSON(t *testing.T) {
tests := []struct {
name string
be BeaconEvent
expectedError bool
}{
{
name: "Valid beacon event",
be: BeaconEvent{
ID: "beacon-1",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1000,
Event: 1,
},
expectedError: false,
},
{
name: "Empty beacon event",
be: BeaconEvent{},
expectedError: false,
},
{
name: "Beacon with special characters",
be: BeaconEvent{
ID: "beacon-with-special-chars!@#$%",
Name: "Name with unicode: 测试",
Type: "Type-With-Dashes_and_underscores",
Battery: 12345,
Event: 255,
},
expectedError: false,
},
{
name: "Beacon with maximum values",
be: BeaconEvent{
ID: "max-beacon",
Name: "Maximum Values Test",
Type: "MaxType",
Battery: 0xFFFFFFFF, // Max uint32
Event: 2147483647, // Max int32
},
expectedError: false,
},
{
name: "Zero values",
be: BeaconEvent{
ID: "",
Name: "",
Type: "",
Battery: 0,
Event: 0,
},
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := tt.be.ToJSON()

if (err != nil) != tt.expectedError {
t.Errorf("ToJSON() error = %v, expectedError %v", err, tt.expectedError)
return
}

if !tt.expectedError {
// Result should not be nil
if result == nil {
t.Error("ToJSON() result should not be nil")
return
}

// Result should not be empty JSON
if string(result) == "null" {
t.Error("ToJSON() result should not be 'null'")
}

// Basic JSON validation - should start and end with braces for object
if len(result) > 0 && result[0] != '{' && result[0] != '[' {
t.Errorf("ToJSON() result should be valid JSON, got: %s", string(result))
}
}
})
}
}

func TestConvertStructToMap(t *testing.T) {
tests := []struct {
name string
input any
expectedError bool
}{
{
name: "Valid BeaconEvent",
input: BeaconEvent{ID: "test", Type: "Ingics"},
expectedError: false,
},
{
name: "Valid HTTPLocation",
input: HTTPLocation{Method: "POST", ID: "test"},
expectedError: false,
},
{
name: "Valid struct",
input: struct{ Name string }{Name: "test"},
expectedError: false,
},
{
name: "Nil input",
input: nil,
expectedError: false,
},
{
name: "String input",
input: "test string",
expectedError: false,
},
{
name: "Map input",
input: map[string]any{"test": "value"},
expectedError: false,
},
{
name: "Slice input",
input: []string{"test1", "test2"},
expectedError: false,
},
{
name: "Complex struct with nested structures",
input: struct {
SimpleField string
Nested struct {
InnerField int
}
SliceField []string
MapField map[string]any
}{
SimpleField: "test",
Nested: struct{ InnerField int }{InnerField: 123},
SliceField: []string{"a", "b", "c"},
MapField: map[string]any{"key": "value"},
},
expectedError: false,
},
{
name: "Struct with channel field",
input: struct{ Ch chan int }{Ch: make(chan int)},
expectedError: true, // Channels cannot be marshaled to JSON
},
{
name: "Struct with function field",
input: struct{ Func func() }{Func: func() {}},
expectedError: true, // Functions cannot be marshaled to JSON
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := convertStructToMap(tt.input)

if (err != nil) != tt.expectedError {
t.Errorf("convertStructToMap() error = %v, expectedError %v", err, tt.expectedError)
return
}

if !tt.expectedError {
// Result should be a map
if result == nil && tt.input != nil {
t.Error("convertStructToMap() result should not be nil for non-nil input")
}

// For valid inputs, result should be a map
if tt.input != nil {
if _, ok := result.(map[string]any); !ok && result != nil {
t.Errorf("convertStructToMap() result should be a map[string]any, got %T", result)
}
}
}
})
}
}

func TestHTTPLocationRedisHashable(t *testing.T) {
tests := []struct {
name string
location HTTPLocation
expectedError bool
}{
{
name: "Valid location",
location: HTTPLocation{
Method: "POST",
PreviousConfidentLocation: "room1",
Distance: 5.5,
ID: "beacon-123",
Location: "room2",
LastSeen: 1634567890,
},
expectedError: false,
},
{
name: "Minimal location",
location: HTTPLocation{
Method: "GET",
ID: "beacon-1",
},
expectedError: false,
},
{
name: "Zero values",
location: HTTPLocation{},
expectedError: false,
},
{
name: "Location with special characters",
location: HTTPLocation{
Method: "CUSTOM",
ID: "beacon-with-special-chars!@#$%",
Location: "Room-with-unicode: 测试",
Distance: -123.456, // Negative distance
},
expectedError: false,
},
{
name: "Maximum values",
location: HTTPLocation{
Method: "MAX",
PreviousConfidentLocation: "max-room",
Distance: 9223372036854775807, // Max int64 as float64
ID: "max-beacon-id-12345678901234567890",
Location: "max-location-name",
LastSeen: 9223372036854775807, // Max int64
},
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := tt.location.RedisHashable()

if (err != nil) != tt.expectedError {
t.Errorf("HTTPLocation.RedisHashable() error = %v, expectedError %v", err, tt.expectedError)
return
}

if !tt.expectedError {
// Result should be a map
if result == nil {
t.Error("HTTPLocation.RedisHashable() result should not be nil")
return
}

resultMap, ok := result.(map[string]any)
if !ok {
t.Errorf("HTTPLocation.RedisHashable() result should be a map[string]any, got %T", result)
return
}

// Check that expected fields are present
expectedFields := []string{"method", "previous_confident_location", "distance", "id", "location", "last_seen"}
for _, field := range expectedFields {
if _, exists := resultMap[field]; !exists {
t.Errorf("HTTPLocation.RedisHashable() missing expected field: %s", field)
}
}

// Check JSON tags are respected
if _, exists := resultMap["Method"]; exists {
t.Error("HTTPLocation.RedisHashable() should use JSON field names, not struct field names")
}

if _, exists := resultMap["method"]; !exists {
t.Error("HTTPLocation.RedisHashable() should contain 'method' field (JSON tag)")
}
}
})
}
}

func TestBeaconEventRedisHashable(t *testing.T) {
tests := []struct {
name string
be BeaconEvent
expectedError bool
}{
{
name: "Valid beacon event",
be: BeaconEvent{
ID: "beacon-123",
Name: "Test Beacon",
Type: "Ingics",
Battery: 1000,
Event: 1,
},
expectedError: false,
},
{
name: "Minimal beacon event",
be: BeaconEvent{
ID: "test",
},
expectedError: false,
},
{
name: "Zero values",
be: BeaconEvent{},
expectedError: false,
},
{
name: "Beacon event with special characters",
be: BeaconEvent{
ID: "beacon-!@#$%^&*()",
Name: "Name with unicode: 测试",
Type: "Special-Type_123",
Battery: 12345,
Event: 255,
},
expectedError: false,
},
{
name: "Maximum values",
be: BeaconEvent{
ID: "max-beacon-id",
Name: "Maximum Values Test",
Type: "MaxType",
Battery: 0xFFFFFFFF, // Max uint32
Event: 2147483647, // Max int32
},
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := tt.be.RedisHashable()

if (err != nil) != tt.expectedError {
t.Errorf("BeaconEvent.RedisHashable() error = %v, expectedError %v", err, tt.expectedError)
return
}

if !tt.expectedError {
// Result should be a map
if result == nil {
t.Error("BeaconEvent.RedisHashable() result should not be nil")
return
}

resultMap, ok := result.(map[string]any)
if !ok {
t.Errorf("BeaconEvent.RedisHashable() result should be a map[string]any, got %T", result)
return
}

// Check that expected fields are present
expectedFields := []string{"name", "id", "type", "battery", "event"}
for _, field := range expectedFields {
if _, exists := resultMap[field]; !exists {
t.Errorf("BeaconEvent.RedisHashable() missing expected field: %s", field)
}
}

// Check JSON tags are respected (BeaconEvent fields are not tagged with JSON, so field names should be lowercase)
if _, exists := resultMap["Name"]; exists {
t.Error("BeaconEvent.RedisHashable() should use lowercase field names")
}

if _, exists := resultMap["name"]; !exists {
t.Error("BeaconEvent.RedisHashable() should contain 'name' field")
}
}
})
}
}

func TestHashConsistencyWithBatteryRounding(t *testing.T) {
// Test that Hash() is consistent with battery rounding
testCases := []struct {
battery1 uint32
battery2 uint32
shouldMatch bool
}{
{1000, 1009, true}, // Same rounding range
{1000, 1010, false}, // Different rounding range
{0, 9, true}, // Zero range
{100, 104, true}, // Same range (100-109 rounds to 100)
{100, 110, false}, // Different ranges
{4294967295, 4294967289, true}, // Max value range
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("BatteryRoundCase_%d", i), func(t *testing.T) {
be1 := BeaconEvent{
ID: "test-beacon",
Name: "Test",
Type: "Ingics",
Battery: tc.battery1,
Event: 1,
}

be2 := BeaconEvent{
ID: "test-beacon",
Name: "Test",
Type: "Ingics",
Battery: tc.battery2,
Event: 1,
}

hash1 := be1.Hash()
hash2 := be2.Hash()

hashesMatch := string(hash1) == string(hash2)

if hashesMatch != tc.shouldMatch {
t.Errorf("Hash consistency mismatch: battery1=%d, battery2=%d, hashesMatch=%v, shouldMatch=%v",
tc.battery1, tc.battery2, hashesMatch, tc.shouldMatch)
}
})
}
}

func TestJSONMarshalUnmarshalRoundtrip(t *testing.T) {
original := BeaconEvent{
ID: "roundtrip-test",
Name: "Roundtrip Test",
Type: "TestType",
Battery: 12345,
Event: 42,
}

// Test that ToJSON produces valid JSON that can be unmarshaled back
jsonData, err := original.ToJSON()
if err != nil {
t.Fatalf("ToJSON() error: %v", err)
}

var unmarshaled BeaconEvent
err = json.Unmarshal(jsonData, &unmarshaled)
if err != nil {
t.Fatalf("json.Unmarshal() error: %v", err)
}

// Verify roundtrip integrity
if unmarshaled.ID != original.ID {
t.Errorf("Roundtrip ID mismatch: got %v, want %v", unmarshaled.ID, original.ID)
}
if unmarshaled.Name != original.Name {
t.Errorf("Roundtrip Name mismatch: got %v, want %v", unmarshaled.Name, original.Name)
}
if unmarshaled.Type != original.Type {
t.Errorf("Roundtrip Type mismatch: got %v, want %v", unmarshaled.Type, original.Type)
}
if unmarshaled.Battery != original.Battery {
t.Errorf("Roundtrip Battery mismatch: got %v, want %v", unmarshaled.Battery, original.Battery)
}
if unmarshaled.Event != original.Event {
t.Errorf("Roundtrip Event mismatch: got %v, want %v", unmarshaled.Event, original.Event)
}
}

// Benchmark tests
func BenchmarkHash(b *testing.B) {
be := BeaconEvent{
ID: "benchmark-beacon",
Name: "Benchmark Test",
Type: "Ingics",
Battery: 1000,
Event: 1,
}

for i := 0; i < b.N; i++ {
be.Hash()
}
}

func BenchmarkToJSON(b *testing.B) {
be := BeaconEvent{
ID: "benchmark-beacon",
Name: "Benchmark Test",
Type: "Ingics",
Battery: 1000,
Event: 1,
}

for i := 0; i < b.N; i++ {
be.ToJSON()
}
}

func BenchmarkConvertStructToMap(b *testing.B) {
be := BeaconEvent{
ID: "benchmark-beacon",
Name: "Benchmark Test",
Type: "Ingics",
Battery: 1000,
Event: 1,
}

for i := 0; i < b.N; i++ {
convertStructToMap(be)
}
}

Laden…
Abbrechen
Speichern