| @@ -24,4 +24,6 @@ cmd/presenSe/presence.db | |||||
| # Dependency directories (remove the comment below to include it) | # Dependency directories (remove the comment below to include it) | ||||
| vendor/ | vendor/ | ||||
| volumes/node-red/ | volumes/node-red/ | ||||
| main | |||||
| main | |||||
| *.sh | |||||
| @@ -1,196 +1,23 @@ | |||||
| # Standard Go Project Layout | |||||
| # Project Overview | |||||
| ## Overview | |||||
| ## Bridge | |||||
| This is a basic layout for Go application projects. Note that it's basic in terms of content because it's focusing only on the general layout and not what you have inside. It's also basic because it's very high level and it doesn't go into great details in terms of how you can structure your project even further. For example, it doesn't try to cover the project structure you'd have with something like Clean Architecture. | |||||
| Used for sending messages between MQTT broker and Kafka ... Initial config is done | |||||
| This is **`NOT an official standard defined by the core Go dev team`**. This is a set of common historical and emerging project layout patterns in the Go ecosystem. Some of these patterns are more popular than others. It also has a number of small enhancements along with several supporting directories common to any large enough real world application. Note that the **core Go team provides a great set of general guidelines about structuring Go projects** and what it means for your project when it's imported and when it's installed. See the [`Organizing a Go module`](https://go.dev/doc/modules/layout) page in the official Go docs for more details. It includes the `internal` and `cmd` directory patterns (described below) and other useful information. | |||||
| ## Decoder | |||||
| **`If you are trying to learn Go or if you are building a PoC or a simple project for yourself this project layout is an overkill. Start with something really simple instead (a single `main.go` file and `go.mod` is more than enough).`** As your project grows keep in mind that it'll be important to make sure your code is well structured otherwise you'll end up with a messy code with lots of hidden dependencies and global state. When you have more people working on the project you'll need even more structure. That's when it's important to introduce a common way to manage packages/libraries. When you have an open source project or when you know other projects import the code from your project repository that's when it's important to have private (aka `internal`) packages and code. Clone the repository, keep what you need and delete everything else! Just because it's there it doesn't mean you have to use it all. None of these patterns are used in every single project. Even the `vendor` pattern is not universal. | |||||
| Decoding BLE beacons -> generating notifications (batery, fall detection) | |||||
| With Go 1.14 [`Go Modules`](https://go.dev/wiki/Modules) are finally ready for production. Use [`Go Modules`](https://blog.golang.org/using-go-modules) unless you have a specific reason not to use them and if you do then you don’t need to worry about $GOPATH and where you put your project. The basic `go.mod` file in the repo assumes your project is hosted on GitHub, but it's not a requirement. The module path can be anything though the first module path component should have a dot in its name (the current version of Go doesn't enforce it anymore, but if you are using slightly older versions don't be surprised if your builds fail without it). See Issues [`37554`](https://github.com/golang/go/issues/37554) and [`32819`](https://github.com/golang/go/issues/32819) if you want to know more about it. | |||||
| Still needs to be reimplemented | |||||
| This project layout is intentionally generic and it doesn't try to impose a specific Go package structure. | |||||
| ## Locations algorithm | |||||
| This is a community effort. Open an issue if you see a new pattern or if you think one of the existing patterns needs to be updated. | |||||
| Calculating location -> generating notifications | |||||
| If you need help with naming, formatting and style start by running [`gofmt`](https://golang.org/cmd/gofmt/) and [`staticcheck`](https://github.com/dominikh/go-tools/tree/master/cmd/staticcheck). The previous standard linter, golint, is now deprecated and not maintained; use of a maintained linter such as staticcheck is recommended. Also make sure to read these Go code style guidelines and recommendations: | |||||
| * https://talks.golang.org/2014/names.slide | |||||
| * https://golang.org/doc/effective_go.html#names | |||||
| * https://blog.golang.org/package-names | |||||
| * https://go.dev/wiki/CodeReviewComments | |||||
| * [Style guideline for Go packages](https://rakyll.org/style-packages) (rakyll/JBD) | |||||
| still needs to be implemented | |||||
| See [`Go Project Layout`](https://medium.com/golang-learn/go-project-layout-e5213cdcfaa2) for additional background information. | |||||
| ## Server | |||||
| More about naming and organizing packages as well as other code structure recommendations: | |||||
| * [GopherCon EU 2018: Peter Bourgon - Best Practices for Industrial Programming](https://www.youtube.com/watch?v=PTE4VJIdHPg) | |||||
| * [GopherCon Russia 2018: Ashley McNamara + Brian Ketelsen - Go best practices.](https://www.youtube.com/watch?v=MzTcsI6tn-0) | |||||
| * [GopherCon 2017: Edward Muller - Go Anti-Patterns](https://www.youtube.com/watch?v=ltqV6pDKZD8) | |||||
| * [GopherCon 2018: Kat Zien - How Do You Structure Your Go Apps](https://www.youtube.com/watch?v=oL6JBUk6tj0) | |||||
| Publishing to front end and notifications | |||||
| A Chinese post about Package-Oriented-Design guidelines and Architecture layer | |||||
| * [面向包的设计和架构分层](https://github.com/danceyoung/paper-code/blob/master/package-oriented-design/packageorienteddesign.md) | |||||
| ## Go Directories | |||||
| ### `/cmd` | |||||
| Main applications for this project. | |||||
| The directory name for each application should match the name of the executable you want to have (e.g., `/cmd/myapp`). | |||||
| Don't put a lot of code in the application directory. If you think the code can be imported and used in other projects, then it should live in the `/pkg` directory. If the code is not reusable or if you don't want others to reuse it, put that code in the `/internal` directory. You'll be surprised what others will do, so be explicit about your intentions! | |||||
| It's common to have a small `main` function that imports and invokes the code from the `/internal` and `/pkg` directories and nothing else. | |||||
| See the [`/cmd`](cmd/README.md) directory for examples. | |||||
| ### `/internal` | |||||
| Private application and library code. This is the code you don't want others importing in their applications or libraries. Note that this layout pattern is enforced by the Go compiler itself. See the Go 1.4 [`release notes`](https://golang.org/doc/go1.4#internalpackages) for more details. Note that you are not limited to the top level `internal` directory. You can have more than one `internal` directory at any level of your project tree. | |||||
| You can optionally add a bit of extra structure to your internal packages to separate your shared and non-shared internal code. It's not required (especially for smaller projects), but it's nice to have visual clues showing the intended package use. Your actual application code can go in the `/internal/app` directory (e.g., `/internal/app/myapp`) and the code shared by those apps in the `/internal/pkg` directory (e.g., `/internal/pkg/myprivlib`). | |||||
| You use internal directories to make packages private. If you put a package inside an internal directory, then other packages can’t import it unless they share a common ancestor. And it’s the only directory named in Go’s documentation and has special compiler treatment. | |||||
| ### `/pkg` | |||||
| Library code that's ok to use by external applications (e.g., `/pkg/mypubliclib`). Other projects will import these libraries expecting them to work, so think twice before you put something here :-) Note that the `internal` directory is a better way to ensure your private packages are not importable because it's enforced by Go. The `/pkg` directory is still a good way to explicitly communicate that the code in that directory is safe for use by others. The [`I'll take pkg over internal`](https://travisjeffery.com/b/2019/11/i-ll-take-pkg-over-internal/) blog post by Travis Jeffery provides a good overview of the `pkg` and `internal` directories and when it might make sense to use them. | |||||
| It's also a way to group Go code in one place when your root directory contains lots of non-Go components and directories making it easier to run various Go tools (as mentioned in these talks: [`Best Practices for Industrial Programming`](https://www.youtube.com/watch?v=PTE4VJIdHPg) from GopherCon EU 2018, [GopherCon 2018: Kat Zien - How Do You Structure Your Go Apps](https://www.youtube.com/watch?v=oL6JBUk6tj0) and [GoLab 2018 - Massimiliano Pippi - Project layout patterns in Go](https://www.youtube.com/watch?v=3gQa1LWwuzk)). | |||||
| See the [`/pkg`](pkg/README.md) directory if you want to see which popular Go repos use this project layout pattern. This is a common layout pattern, but it's not universally accepted and some in the Go community don't recommend it. | |||||
| It's ok not to use it if your app project is really small and where an extra level of nesting doesn't add much value (unless you really want to :-)). Think about it when it's getting big enough and your root directory gets pretty busy (especially if you have a lot of non-Go app components). | |||||
| The `pkg` directory origins: The old Go source code used to use `pkg` for its packages and then various Go projects in the community started copying the pattern (see [`this`](https://twitter.com/bradfitz/status/1039512487538970624) Brad Fitzpatrick's tweet for more context). | |||||
| ### `/vendor` | |||||
| Application dependencies (managed manually or by your favorite dependency management tool like the new built-in [`Go Modules`](https://go.dev/wiki/Modules) feature). The `go mod vendor` command will create the `/vendor` directory for you. Note that you might need to add the `-mod=vendor` flag to your `go build` command if you are not using Go 1.14 where it's on by default. | |||||
| Don't commit your application dependencies if you are building a library. | |||||
| Note that since [`1.13`](https://golang.org/doc/go1.13#modules) Go also enabled the module proxy feature (using [`https://proxy.golang.org`](https://proxy.golang.org) as their module proxy server by default). Read more about it [`here`](https://blog.golang.org/module-mirror-launch) to see if it fits all of your requirements and constraints. If it does, then you won't need the `vendor` directory at all. | |||||
| ## Service Application Directories | |||||
| ### `/api` | |||||
| OpenAPI/Swagger specs, JSON schema files, protocol definition files. | |||||
| See the [`/api`](api/README.md) directory for examples. | |||||
| ## Web Application Directories | |||||
| ### `/web` | |||||
| Web application specific components: static web assets, server side templates and SPAs. | |||||
| ## Common Application Directories | |||||
| ### `/configs` | |||||
| Configuration file templates or default configs. | |||||
| Put your `confd` or `consul-template` template files here. | |||||
| ### `/init` | |||||
| System init (systemd, upstart, sysv) and process manager/supervisor (runit, supervisord) configs. | |||||
| ### `/scripts` | |||||
| Scripts to perform various build, install, analysis, etc operations. | |||||
| These scripts keep the root level Makefile small and simple (e.g., [`https://github.com/hashicorp/terraform/blob/main/Makefile`](https://github.com/hashicorp/terraform/blob/main/Makefile)). | |||||
| See the [`/scripts`](scripts/README.md) directory for examples. | |||||
| ### `/build` | |||||
| Packaging and Continuous Integration. | |||||
| Put your cloud (AMI), container (Docker), OS (deb, rpm, pkg) package configurations and scripts in the `/build/package` directory. | |||||
| Put your CI (travis, circle, drone) configurations and scripts in the `/build/ci` directory. Note that some of the CI tools (e.g., Travis CI) are very picky about the location of their config files. Try putting the config files in the `/build/ci` directory linking them to the location where the CI tools expect them (when possible). | |||||
| ### `/deployments` | |||||
| IaaS, PaaS, system and container orchestration deployment configurations and templates (docker-compose, kubernetes/helm, terraform). Note that in some repos (especially apps deployed with kubernetes) this directory is called `/deploy`. | |||||
| ### `/test` | |||||
| Additional external test apps and test data. Feel free to structure the `/test` directory anyway you want. For bigger projects it makes sense to have a data subdirectory. For example, you can have `/test/data` or `/test/testdata` if you need Go to ignore what's in that directory. Note that Go will also ignore directories or files that begin with "." or "_", so you have more flexibility in terms of how you name your test data directory. | |||||
| See the [`/test`](test/README.md) directory for examples. | |||||
| ## Other Directories | |||||
| ### `/docs` | |||||
| Design and user documents (in addition to your godoc generated documentation). | |||||
| See the [`/docs`](docs/README.md) directory for examples. | |||||
| ### `/tools` | |||||
| Supporting tools for this project. Note that these tools can import code from the `/pkg` and `/internal` directories. | |||||
| See the [`/tools`](tools/README.md) directory for examples. | |||||
| ### `/examples` | |||||
| Examples for your applications and/or public libraries. | |||||
| See the [`/examples`](examples/README.md) directory for examples. | |||||
| ### `/third_party` | |||||
| External helper tools, forked code and other 3rd party utilities (e.g., Swagger UI). | |||||
| ### `/githooks` | |||||
| Git hooks. | |||||
| ### `/assets` | |||||
| Other assets to go along with your repository (images, logos, etc). | |||||
| ### `/website` | |||||
| This is the place to put your project's website data if you are not using GitHub pages. | |||||
| See the [`/website`](website/README.md) directory for examples. | |||||
| ## Directories You Shouldn't Have | |||||
| ### `/src` | |||||
| Some Go projects do have a `src` folder, but it usually happens when the devs came from the Java world where it's a common pattern. If you can help yourself try not to adopt this Java pattern. You really don't want your Go code or Go projects to look like Java :-) | |||||
| Don't confuse the project level `/src` directory with the `/src` directory Go uses for its workspaces as described in [`How to Write Go Code`](https://golang.org/doc/code.html). The `$GOPATH` environment variable points to your (current) workspace (by default it points to `$HOME/go` on non-windows systems). This workspace includes the top level `/pkg`, `/bin` and `/src` directories. Your actual project ends up being a sub-directory under `/src`, so if you have the `/src` directory in your project the project path will look like this: `/some/path/to/workspace/src/your_project/src/your_code.go`. Note that with Go 1.11 it's possible to have your project outside of your `GOPATH`, but it still doesn't mean it's a good idea to use this layout pattern. | |||||
| ## Badges | |||||
| * [Go Report Card](https://goreportcard.com/) - It will scan your code with `gofmt`, `go vet`, `gocyclo`, `golint`, `ineffassign`, `license` and `misspell`. Replace `github.com/golang-standards/project-layout` with your project reference. | |||||
| [](https://goreportcard.com/report/github.com/golang-standards/project-layout) | |||||
| * ~~[GoDoc](http://godoc.org) - It will provide online version of your GoDoc generated documentation. Change the link to point to your project.~~ | |||||
| [](http://godoc.org/github.com/golang-standards/project-layout) | |||||
| * [Pkg.go.dev](https://pkg.go.dev) - Pkg.go.dev is a new destination for Go discovery & docs. You can create a badge using the [badge generation tool](https://pkg.go.dev/badge). | |||||
| [](https://pkg.go.dev/github.com/golang-standards/project-layout) | |||||
| * Release - It will show the latest release number for your project. Change the github link to point to your project. | |||||
| [](https://github.com/golang-standards/project-layout/releases/latest) | |||||
| ## Notes | |||||
| A more opinionated project template with sample/reusable configs, scripts and code is a WIP. | |||||
| Still needs to be reimplemented | |||||
| @@ -0,0 +1,50 @@ | |||||
| version: "2" | |||||
| services: | |||||
| kafdrop: | |||||
| image: obsidiandynamics/kafdrop | |||||
| restart: "no" | |||||
| ports: | |||||
| - "127.0.0.1:9000:9000" | |||||
| environment: | |||||
| KAFKA_BROKERCONNECT: "kafka:29092" | |||||
| depends_on: | |||||
| - "kafka" | |||||
| kafka: | |||||
| image: apache/kafka:3.9.0 | |||||
| restart: "no" | |||||
| ports: | |||||
| # - "127.0.0.1:2181:2181" | |||||
| - "127.0.0.1:9092:9092" | |||||
| - "127.0.0.1:9093:9093" | |||||
| healthcheck: # <-- ADD THIS BLOCK | |||||
| test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"] | |||||
| interval: 10s | |||||
| timeout: 5s | |||||
| retries: 10 | |||||
| start_period: 20s | |||||
| environment: | |||||
| KAFKA_NODE_ID: 1 | |||||
| KAFKA_PROCESS_ROLES: broker,controller | |||||
| KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092,CONTROLLER://127.0.0.1:9093 | |||||
| KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 | |||||
| KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER | |||||
| KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT | |||||
| KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 | |||||
| KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |||||
| KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | |||||
| KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | |||||
| KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 | |||||
| KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | |||||
| KAFKA_NUM_PARTITIONS: 3 | |||||
| kafka-init: | |||||
| image: apache/kafka:3.9.0 | |||||
| command: [ "/bin/bash", "-c", "chmod +x /tmp/create_topic.sh && /tmp/create_topic.sh"] | |||||
| depends_on: | |||||
| kafka: | |||||
| condition: service_healthy | |||||
| volumes: | |||||
| - ./init-scripts/create_topic.sh:/tmp/create_topic.sh | |||||
| environment: | |||||
| - TOPIC_NAMES=topic1,topic2,topic3 | |||||
| @@ -1,99 +0,0 @@ | |||||
| services: | |||||
| emqx: | |||||
| image: emqx/emqx:5.8.8 | |||||
| container_name: emqx | |||||
| environment: | |||||
| - EMQX_DASHBOARD__DEFAULT_USERNAME=user | |||||
| - EMQX_DASHBOARD__DEFAULT_PASSWORD=pass | |||||
| ports: | |||||
| - "127.0.0.1:1883:1883" | |||||
| healthcheck: | |||||
| test: ["CMD", "curl", "-f", "http://localhost:18083/api/v5/status"] | |||||
| interval: 10s | |||||
| timeout: 5s | |||||
| retries: 10 | |||||
| start_period: 20s | |||||
| kafka: | |||||
| image: apache/kafka:3.9.0 | |||||
| container_name: kafka | |||||
| healthcheck: | |||||
| test: ["CMD-SHELL", "nc -z localhost 9092"] | |||||
| interval: 10s | |||||
| timeout: 5s | |||||
| retries: 10 | |||||
| environment: | |||||
| - KAFKA_NODE_ID=1 | |||||
| - KAFKA_PROCESS_ROLES=broker,controller | |||||
| - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 | |||||
| - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 | |||||
| - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER | |||||
| - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 | |||||
| - KAFKA_LOG_DIRS=/tmp/kraft-combined-logs | |||||
| - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 | |||||
| - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true | |||||
| ports: | |||||
| - "127.0.0.1:9092:9092" | |||||
| valkey: | |||||
| image: valkey/valkey:9.0.0 | |||||
| container_name: valkey | |||||
| ports: | |||||
| - "127.0.0.1:6379:6379" | |||||
| node-red: | |||||
| image: nodered/node-red:latest-22 | |||||
| container_name: node-red | |||||
| ports: | |||||
| - "127.0.0.1:1880:1880" | |||||
| volumes: | |||||
| - "../volumes/node-red:/data" | |||||
| presense-decoder: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.decoder | |||||
| network: host | |||||
| image: presense-decoder | |||||
| container_name: presense-decoder | |||||
| environment: | |||||
| - REDIS_URL=valkey:6379 | |||||
| - KAFKA_URL=kafka:9092 | |||||
| depends_on: | |||||
| - kafka | |||||
| - valkey | |||||
| restart: always | |||||
| presense-server: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.server | |||||
| network: host | |||||
| image: presense-server | |||||
| container_name: presense-server | |||||
| environment: | |||||
| - REDIS_URL=valkey:6379 | |||||
| - KAFKA_URL=kafka:9092 | |||||
| depends_on: | |||||
| - kafka | |||||
| - emqx | |||||
| ports: | |||||
| - "127.0.0.1:1902:1902" | |||||
| restart: always | |||||
| presense-bridge: | |||||
| build: | |||||
| context: ../ | |||||
| dockerfile: build/package/Dockerfile.bridge | |||||
| network: host | |||||
| image: presense-bridge | |||||
| container_name: presense-bridge | |||||
| environment: | |||||
| - KAFKA_URL=kafka:9092 | |||||
| - MQTT_HOST=192.168.1.101:1883 | |||||
| - MQTT_USERNAME=user | |||||
| - MQTT_PASSWORD=pass | |||||
| depends_on: | |||||
| kafka: | |||||
| condition: service_healthy | |||||
| restart: always | |||||
| @@ -0,0 +1,21 @@ | |||||
| #!/bin/bash | |||||
| # create topic rawbeacons | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic rawbeacons \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic apibeacons | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic apibeacons \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic alertBeacons | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic alertbeacons \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic alertBeacons | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic locevents \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| @@ -0,0 +1,32 @@ | |||||
| Ingics iBS01G | |||||
| id: C83F8F17DB35 | |||||
| 020106 12 FF590080BC240100FFFFFFFF000000000000 | |||||
| id: C83F8F17DB35 | |||||
| 020106 12 FF590080BC240100FFFFFFFF000000000000 | |||||
| type 0x16 - service Data | |||||
| Minew B7 | |||||
| id: C300003947C4 | |||||
| 020106 0303E1FF 1216E1FFA1031AFFFEFEFB0000C447390000C3 | |||||
| id: C300003947C4 | |||||
| 0201061AFF4C000215FDA50693A4E24FB1AFCFC6EB0764782500000000EC - iBeacon | |||||
| id: C300003947C4 | |||||
| 020106 0303AAFE 1516AAFE00E800112233445566778899ABCDE7280002 - eddystone | |||||
| id: C300003947C4 | |||||
| 0201060303E1FF1216E1FFA1031AFFFEFEFB0000C447390000C3 | |||||
| id: C300003947C4 | |||||
| 0201060303E1FF0E16E1FFA1081AC447390000C34237 | |||||
| Minew MWB01 | |||||
| id: C7AE561E38B7 | |||||
| 02010617FF0001000000000000000000005F0700006F4C0000640003095336 | |||||
| id: C7AE561E38B7 | |||||
| 02010617FF00020000FF0000FF0000FF0006001D005200000B200803095336 | |||||
| Minew MWC01 | |||||
| id: E01F9A7A47D2 | |||||
| 02010617FF00020000FF0000FF0000FF0006001D005200000A242D03095332 | |||||
| id: E01F9A7A47D2 | |||||
| 02010617FF000100000000000000000000780700006F4C0000640003095332 | |||||
| @@ -1,231 +1,223 @@ | |||||
| package main | package main | ||||
| import ( | import ( | ||||
| "bytes" | |||||
| "context" | "context" | ||||
| "encoding/binary" | |||||
| "encoding/hex" | |||||
| "encoding/json" | |||||
| "fmt" | "fmt" | ||||
| "math" | |||||
| "strconv" | |||||
| "time" | |||||
| "strings" | |||||
| "github.com/AFASystems/presence/internal/pkg/config" | "github.com/AFASystems/presence/internal/pkg/config" | ||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| "github.com/AFASystems/presence/internal/pkg/mqttclient" | |||||
| presenseredis "github.com/AFASystems/presence/internal/pkg/redis" | |||||
| "github.com/redis/go-redis/v9" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| // Move Kafka topics, Redis keys, intervals to env config | |||||
| // Replace hardcoded IPs with env vars | |||||
| // avoid defers -> lock and unlock right before and after usage | |||||
| // Distance formula uses twos_comp incorrectly should parse signed int not hex string | |||||
| // Use buffered log instead of fmt.Println ??? | |||||
| // Limit metrics slice size with ring buffer ?? | |||||
| // handle go routine exit signals with context.WithCancel() ?? | |||||
| // Make internal package for Kafka and Redis | |||||
| // Make internal package for processor: | |||||
| // Helper functions: twos_comp, getBeaconId | |||||
| func main() { | func main() { | ||||
| // Load global context to init beacons and latest list | // Load global context to init beacons and latest list | ||||
| appCtx := model.AppContext{ | appCtx := model.AppContext{ | ||||
| Beacons: model.BeaconsList{ | Beacons: model.BeaconsList{ | ||||
| Beacons: make(map[string]model.Beacon), | Beacons: make(map[string]model.Beacon), | ||||
| }, | }, | ||||
| LatestList: model.LatestBeaconsList{ | |||||
| LatestList: make(map[string]model.Beacon), | |||||
| }, | |||||
| Settings: model.Settings{ | Settings: model.Settings{ | ||||
| Settings: model.SettingsVal{ | Settings: model.SettingsVal{ | ||||
| Location_confidence: 4, | |||||
| Last_seen_threshold: 15, | |||||
| Beacon_metrics_size: 30, | |||||
| HA_send_interval: 5, | |||||
| HA_send_changes_only: false, | |||||
| LocationConfidence: 4, | |||||
| LastSeenThreshold: 15, | |||||
| BeaconMetricSize: 30, | |||||
| HASendInterval: 5, | |||||
| HASendChangesOnly: false, | |||||
| }, | }, | ||||
| }, | }, | ||||
| BeaconEvents: model.BeaconEventList{ | |||||
| Beacons: make(map[string]model.BeaconEvent), | |||||
| }, | |||||
| BeaconsLookup: make(map[string]struct{}), | |||||
| } | } | ||||
| cfg := config.Load() | cfg := config.Load() | ||||
| // Kafka writer idk why yet | |||||
| writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons") | |||||
| defer writer.Close() | |||||
| // Kafka reader for Raw MQTT beacons | // Kafka reader for Raw MQTT beacons | ||||
| rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID") | |||||
| rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") | |||||
| defer rawReader.Close() | defer rawReader.Close() | ||||
| // Kafka reader for API server updates | // Kafka reader for API server updates | ||||
| apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID") | |||||
| apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api") | |||||
| defer apiReader.Close() | defer apiReader.Close() | ||||
| // Kafka reader for latest list updates | |||||
| latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID") | |||||
| defer latestReader.Close() | |||||
| // Kafka reader for settings updates | |||||
| settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID") | |||||
| defer settingsReader.Close() | |||||
| ctx := context.Background() | |||||
| // Init Redis Client | |||||
| client := redis.NewClient(&redis.Options{ | |||||
| Addr: cfg.RedisURL, | |||||
| Password: "", | |||||
| }) | |||||
| beaconsList := presenseredis.LoadBeaconsList(client, ctx) | |||||
| appCtx.Beacons.Beacons = beaconsList | |||||
| latestList := presenseredis.LoadLatestList(client, ctx) | |||||
| appCtx.LatestList.LatestList = latestList | |||||
| settings := presenseredis.LoadSettings(client, ctx) | |||||
| appCtx.Settings.Settings = settings | |||||
| alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons") | |||||
| defer alertWriter.Close() | |||||
| // declare channel for collecting Kafka messages | |||||
| chRaw := make(chan model.Incoming_json, 2000) | |||||
| fmt.Println("Decoder initialized, subscribed to Kafka topics") | |||||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||||
| chApi := make(chan model.ApiUpdate, 2000) | chApi := make(chan model.ApiUpdate, 2000) | ||||
| chLatest := make(chan model.Incoming_json, 2000) | |||||
| chSettings := make(chan model.SettingsVal, 10) | |||||
| go kafkaclient.Consume(rawReader, chRaw) | go kafkaclient.Consume(rawReader, chRaw) | ||||
| go kafkaclient.Consume(apiReader, chApi) | go kafkaclient.Consume(apiReader, chApi) | ||||
| go kafkaclient.Consume(latestReader, chLatest) | |||||
| go kafkaclient.Consume(settingsReader, chSettings) | |||||
| go func() { | |||||
| // Syncing Redis cache every 1s with 2 lists: beacons, latest list | |||||
| ticker := time.NewTicker(1 * time.Second) | |||||
| defer ticker.Stop() | |||||
| for range ticker.C { | |||||
| presenseredis.SaveBeaconsList(&appCtx, client, ctx) | |||||
| presenseredis.SaveLatestList(&appCtx, client, ctx) | |||||
| presenseredis.SaveSettings(&appCtx, client, ctx) | |||||
| } | |||||
| }() | |||||
| for { | for { | ||||
| select { | select { | ||||
| case msg := <-chRaw: | case msg := <-chRaw: | ||||
| processIncoming(msg, &appCtx) | |||||
| processIncoming(msg, &appCtx, alertWriter) | |||||
| case msg := <-chApi: | case msg := <-chApi: | ||||
| switch msg.Method { | switch msg.Method { | ||||
| case "POST": | case "POST": | ||||
| appCtx.Beacons.Lock.Lock() | |||||
| appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon | |||||
| id := msg.Beacon.ID | |||||
| appCtx.BeaconsLookup[id] = struct{}{} | |||||
| case "DELETE": | case "DELETE": | ||||
| _, exists := appCtx.Beacons.Beacons[msg.ID] | |||||
| if exists { | |||||
| appCtx.Beacons.Lock.Lock() | |||||
| delete(appCtx.Beacons.Beacons, msg.ID) | |||||
| } | |||||
| default: | |||||
| fmt.Println("unknown method: ", msg.Method) | |||||
| fmt.Println("Incoming delete message") | |||||
| } | } | ||||
| appCtx.Beacons.Lock.Unlock() | |||||
| case msg := <-chLatest: | |||||
| fmt.Println("latest msg: ", msg) | |||||
| case msg := <-chSettings: | |||||
| appCtx.Settings.Lock.Lock() | |||||
| appCtx.Settings.Settings = msg | |||||
| fmt.Println("settings channel: ", msg) | |||||
| appCtx.Settings.Lock.Unlock() | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) { | |||||
| defer func() { | |||||
| if err := recover(); err != nil { | |||||
| fmt.Println("work failed:", err) | |||||
| } | |||||
| }() | |||||
| func processIncoming(adv model.BeaconAdvertisement, ctx *model.AppContext, writer *kafka.Writer) { | |||||
| id := adv.MAC | |||||
| _, ok := ctx.BeaconsLookup[id] | |||||
| if !ok { | |||||
| return | |||||
| } | |||||
| fmt.Println("message came") | |||||
| err := decodeBeacon(adv, ctx, writer) | |||||
| if err != nil { | |||||
| fmt.Println("error in decoding") | |||||
| return | |||||
| } | |||||
| } | |||||
| incoming = mqttclient.IncomingBeaconFilter(incoming) | |||||
| id := mqttclient.GetBeaconID(incoming) | |||||
| now := time.Now().Unix() | |||||
| func decodeBeacon(adv model.BeaconAdvertisement, ctx *model.AppContext, writer *kafka.Writer) error { | |||||
| beacon := strings.TrimSpace(adv.Data) | |||||
| id := adv.MAC | |||||
| if beacon == "" { | |||||
| return nil // How to return error?, do I even need to return error | |||||
| } | |||||
| beacons := &ctx.Beacons | |||||
| b, err := hex.DecodeString(beacon) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| beacons.Lock.Lock() | |||||
| defer beacons.Lock.Unlock() | |||||
| // check for flag byte, if first AD structure is flag bytes, remove it | |||||
| if len(b) > 1 && b[1] == 0x01 { | |||||
| l := int(b[0]) // length of AD structure | |||||
| if 1+l <= len(b) { | |||||
| b = b[1+l:] | |||||
| } | |||||
| } | |||||
| latestList := &ctx.LatestList | |||||
| adStructureIndeces := ParseADFast(b) | |||||
| event := model.BeaconEvent{} | |||||
| for _, r := range adStructureIndeces { | |||||
| ad := b[r[0]:r[1]] | |||||
| if checkIngics(ad) { | |||||
| event = parseIngicsState(ad) | |||||
| event.ID = id | |||||
| event.Name = id | |||||
| break | |||||
| } else if checkEddystoneTLM(ad) { | |||||
| event = parseEddystoneState(ad) | |||||
| event.ID = id | |||||
| event.Name = id | |||||
| break | |||||
| } else if checkMinewB7(ad) { | |||||
| fmt.Println("Minew B7 vendor format") | |||||
| break | |||||
| } | |||||
| } | |||||
| latestList.Lock.Lock() | |||||
| defer latestList.Lock.Unlock() | |||||
| if event.ID != "" { | |||||
| prevEvent, ok := ctx.BeaconEvents.Beacons[id] | |||||
| ctx.BeaconEvents.Beacons[id] = event | |||||
| if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) { | |||||
| return nil | |||||
| } | |||||
| beacon, exists := beacons.Beacons[id] | |||||
| if !exists { | |||||
| x, exists := latestList.LatestList[id] | |||||
| if exists { | |||||
| x.Last_seen = now | |||||
| x.Incoming_JSON = incoming | |||||
| x.Distance = getBeaconDistance(incoming) | |||||
| latestList.LatestList[id] = x | |||||
| } else { | |||||
| latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)} | |||||
| eMsg, err := json.Marshal(event) | |||||
| if err != nil { | |||||
| return err | |||||
| } | } | ||||
| // Move this to seperate routine? | |||||
| for k, v := range latestList.LatestList { | |||||
| if (now - v.Last_seen) > 10 { | |||||
| delete(latestList.LatestList, k) | |||||
| } | |||||
| err = writer.WriteMessages(context.Background(), kafka.Message{ | |||||
| Value: eMsg, | |||||
| }) | |||||
| if err != nil { | |||||
| return err | |||||
| } | } | ||||
| return | |||||
| fmt.Println("Message sent") | |||||
| } | } | ||||
| updateBeacon(&beacon, incoming) | |||||
| beacons.Beacons[id] = beacon | |||||
| return nil | |||||
| } | } | ||||
| func getBeaconDistance(incoming model.Incoming_json) float64 { | |||||
| rssi := incoming.RSSI | |||||
| power := incoming.TX_power | |||||
| distance := 100.0 | |||||
| func checkIngics(ad []byte) bool { | |||||
| if len(ad) >= 6 && | |||||
| ad[1] == 0xFF && | |||||
| ad[2] == 0x59 && | |||||
| ad[3] == 0x00 && | |||||
| ad[4] == 0x80 && | |||||
| ad[5] == 0xBC { | |||||
| return true | |||||
| } | |||||
| return false | |||||
| } | |||||
| ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) | |||||
| if ratio < 1.0 { | |||||
| distance = math.Pow(ratio, 10) | |||||
| } else { | |||||
| distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 | |||||
| func parseIngicsState(ad []byte) model.BeaconEvent { | |||||
| return model.BeaconEvent{ | |||||
| Battery: uint32(binary.LittleEndian.Uint16(ad[6:8])), | |||||
| Event: int(ad[8]), | |||||
| Type: "Ingics", | |||||
| } | } | ||||
| return distance | |||||
| } | } | ||||
| func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) { | |||||
| now := time.Now().Unix() | |||||
| func checkEddystoneTLM(ad []byte) bool { | |||||
| if len(ad) >= 4 && | |||||
| ad[1] == 0x16 && | |||||
| ad[2] == 0xAA && | |||||
| ad[3] == 0xFE && | |||||
| ad[4] == 0x20 { | |||||
| return true | |||||
| } | |||||
| beacon.Incoming_JSON = incoming | |||||
| beacon.Last_seen = now | |||||
| beacon.Beacon_type = incoming.Beacon_type | |||||
| beacon.HB_ButtonCounter = incoming.HB_ButtonCounter | |||||
| beacon.HB_Battery = incoming.HB_Battery | |||||
| beacon.HB_RandomNonce = incoming.HB_RandomNonce | |||||
| beacon.HB_ButtonMode = incoming.HB_ButtonMode | |||||
| return false | |||||
| } | |||||
| if beacon.Beacon_metrics == nil { | |||||
| beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now | |||||
| func parseEddystoneState(ad []byte) model.BeaconEvent { | |||||
| return model.BeaconEvent{ | |||||
| Battery: uint32(binary.BigEndian.Uint16(ad[6:8])), | |||||
| Type: "Eddystone", | |||||
| } | } | ||||
| } | |||||
| metric := model.BeaconMetric{} | |||||
| metric.Distance = getBeaconDistance(incoming) | |||||
| metric.Timestamp = now | |||||
| metric.Rssi = int64(incoming.RSSI) | |||||
| metric.Location = incoming.Hostname | |||||
| beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric) | |||||
| // I dont think this is always true, but for testing is ok | |||||
| func checkMinewB7(ad []byte) bool { | |||||
| if len(ad) >= 4 && | |||||
| ad[1] == 0x16 && | |||||
| ad[2] == 0xE1 && | |||||
| ad[3] == 0xFF { | |||||
| return true | |||||
| } | |||||
| // Leave the HB button implementation for now | |||||
| return false | |||||
| } | } | ||||
| func twos_comp(inp string) int64 { | |||||
| i, _ := strconv.ParseInt("0x"+inp, 0, 64) | |||||
| return i - 256 | |||||
| func ParseADFast(b []byte) [][2]int { | |||||
| var res [][2]int | |||||
| i := 0 | |||||
| for i < len(b) { | |||||
| l := int(b[i]) | |||||
| if l == 0 || i+1+l > len(b) { | |||||
| break | |||||
| } | |||||
| res = append(res, [2]int{i, i + 1 + l}) | |||||
| i += 1 + l | |||||
| } | |||||
| return res | |||||
| } | } | ||||
| @@ -4,68 +4,248 @@ import ( | |||||
| "context" | "context" | ||||
| "encoding/json" | "encoding/json" | ||||
| "fmt" | "fmt" | ||||
| "math" | |||||
| "strconv" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/config" | |||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| "github.com/redis/go-redis/v9" | |||||
| "github.com/segmentio/kafka-go" | |||||
| ) | ) | ||||
| func main() { | func main() { | ||||
| ctx := context.Background() | |||||
| client := redis.NewClient(&redis.Options{ | |||||
| Addr: "127.0.0.1:6379", | |||||
| Password: "", | |||||
| }) | |||||
| } | |||||
| // Load global context to init beacons and latest list | |||||
| appCtx := model.AppContext{ | |||||
| Settings: model.Settings{ | |||||
| Settings: model.SettingsVal{ | |||||
| LocationConfidence: 4, | |||||
| LastSeenThreshold: 15, | |||||
| BeaconMetricSize: 30, | |||||
| HASendInterval: 5, | |||||
| HASendChangesOnly: false, | |||||
| RSSIEnforceThreshold: false, | |||||
| RSSIMinThreshold: 100, | |||||
| }, | |||||
| }, | |||||
| BeaconsLookup: make(map[string]struct{}), | |||||
| LatestList: model.LatestBeaconsList{ | |||||
| LatestList: make(map[string]model.Beacon), | |||||
| }, | |||||
| Beacons: model.BeaconsList{ | |||||
| Beacons: make(map[string]model.Beacon), | |||||
| }, | |||||
| } | |||||
| func getLikelyLocations(client *redis.Client, ctx context.Context) { | |||||
| beaconsList, err := client.Get(ctx, "beaconsList").Result() | |||||
| var beacons = make(map[string]model.Beacon) | |||||
| if err == redis.Nil { | |||||
| fmt.Println("no beacons list, starting empty") | |||||
| } else if err != nil { | |||||
| panic(err) | |||||
| } else { | |||||
| json.Unmarshal([]byte(beaconsList), &beacons) | |||||
| cfg := config.Load() | |||||
| // Kafka reader for Raw MQTT beacons | |||||
| rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") | |||||
| defer rawReader.Close() | |||||
| // Kafka reader for API server updates | |||||
| apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") | |||||
| defer apiReader.Close() | |||||
| writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents") | |||||
| defer writer.Close() | |||||
| fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") | |||||
| locTicker := time.NewTicker(1 * time.Second) | |||||
| defer locTicker.Stop() | |||||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||||
| chApi := make(chan model.ApiUpdate, 2000) | |||||
| go kafkaclient.Consume(rawReader, chRaw) | |||||
| go kafkaclient.Consume(apiReader, chApi) | |||||
| for { | |||||
| select { | |||||
| case <-locTicker.C: | |||||
| getLikelyLocations(&appCtx, writer) | |||||
| case msg := <-chRaw: | |||||
| assignBeaconToList(msg, &appCtx) | |||||
| case msg := <-chApi: | |||||
| switch msg.Method { | |||||
| case "POST": | |||||
| id := msg.Beacon.ID | |||||
| appCtx.BeaconsLookup[id] = struct{}{} | |||||
| case "DELETE": | |||||
| fmt.Println("Incoming delete message") | |||||
| } | |||||
| } | |||||
| } | } | ||||
| } | |||||
| func getLikelyLocations(ctx *model.AppContext, writer *kafka.Writer) { | |||||
| fmt.Println("get likely locations called") | |||||
| ctx.Beacons.Lock.Lock() | |||||
| beacons := ctx.Beacons.Beacons | |||||
| ctx.Beacons.Lock.Unlock() | |||||
| for id, beacon := range beacons { | |||||
| if len(beacon.Beacon_metrics) == 0 { | |||||
| for _, beacon := range beacons { | |||||
| // Shrinking the model because other properties have nothing to do with the location | |||||
| r := model.HTTPLocation{ | |||||
| Method: "Standard", | |||||
| Distance: 999, | |||||
| ID: beacon.ID, | |||||
| Location: "", | |||||
| LastSeen: 999, | |||||
| } | |||||
| mSize := len(beacon.BeaconMetrics) | |||||
| if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > ctx.Settings.Settings.LastSeenThreshold { | |||||
| continue | continue | ||||
| } | } | ||||
| if isExpired(&beacon, settings) { | |||||
| handleExpiredBeacon(&beacon, cl, ctx) | |||||
| locList := make(map[string]float64) | |||||
| seenW := 1.5 | |||||
| rssiW := 0.75 | |||||
| for _, metric := range beacon.BeaconMetrics { | |||||
| res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0))) | |||||
| locList[metric.Location] += res | |||||
| } | |||||
| bestLocName := "" | |||||
| maxScore := 0.0 | |||||
| for locName, score := range locList { | |||||
| if score > maxScore { | |||||
| maxScore = score | |||||
| bestLocName = locName | |||||
| } | |||||
| } | |||||
| if bestLocName == beacon.PreviousLocation { | |||||
| beacon.LocationConfidence++ | |||||
| } else { | |||||
| beacon.LocationConfidence = 0 | |||||
| } | |||||
| r.Distance = beacon.BeaconMetrics[mSize-1].Distance | |||||
| r.Location = bestLocName | |||||
| r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp | |||||
| if beacon.LocationConfidence == ctx.Settings.Settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName { | |||||
| beacon.LocationConfidence = 0 | |||||
| // Who do I need this if I am sending entire structure anyways? who knows | |||||
| js, err := json.Marshal(model.LocationChange{ | |||||
| Method: "LocationChange", | |||||
| BeaconRef: beacon, | |||||
| Name: beacon.Name, | |||||
| PreviousLocation: beacon.PreviousConfidentLocation, | |||||
| NewLocation: bestLocName, | |||||
| Timestamp: time.Now().Unix(), | |||||
| }) | |||||
| if err != nil { | |||||
| beacon.PreviousConfidentLocation = bestLocName | |||||
| beacon.PreviousLocation = bestLocName | |||||
| ctx.Beacons.Lock.Lock() | |||||
| ctx.Beacons.Beacons[beacon.ID] = beacon | |||||
| ctx.Beacons.Lock.Unlock() | |||||
| continue | |||||
| } | |||||
| msg := kafka.Message{ | |||||
| Value: js, | |||||
| } | |||||
| err = writer.WriteMessages(context.Background(), msg) | |||||
| if err != nil { | |||||
| fmt.Println("Error in sending Kafka message") | |||||
| } | |||||
| } | |||||
| beacon.PreviousLocation = bestLocName | |||||
| ctx.Beacons.Lock.Lock() | |||||
| ctx.Beacons.Beacons[beacon.ID] = beacon | |||||
| ctx.Beacons.Lock.Unlock() | |||||
| js, err := json.Marshal(r) | |||||
| if err != nil { | |||||
| continue | continue | ||||
| } | } | ||||
| best := calculateBestLocation(&beacon) | |||||
| updateBeaconState(&beacon, best, settings, ctx, cl) | |||||
| msg := kafka.Message{ | |||||
| Value: js, | |||||
| } | |||||
| err = writer.WriteMessages(context.Background(), msg) | |||||
| if err != nil { | |||||
| fmt.Println("Error in sending Kafka message") | |||||
| } | |||||
| } | |||||
| } | |||||
| func assignBeaconToList(adv model.BeaconAdvertisement, ctx *model.AppContext) { | |||||
| id := adv.MAC | |||||
| _, ok := ctx.BeaconsLookup[id] | |||||
| now := time.Now().Unix() | |||||
| if !ok { | |||||
| ctx.LatestList.Lock.Lock() | |||||
| ctx.LatestList.LatestList[id] = model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: getBeaconDistance(adv)} | |||||
| ctx.LatestList.Lock.Unlock() | |||||
| return | |||||
| } | |||||
| fmt.Println("RSSI: ", adv.RSSI) | |||||
| if ctx.Settings.Settings.RSSIEnforceThreshold && (int64(adv.RSSI) < ctx.Settings.Settings.RSSIMinThreshold) { | |||||
| return | |||||
| } | |||||
| ctx.Beacons.Lock.Lock() | |||||
| beacon, ok := ctx.Beacons.Beacons[id] | |||||
| if !ok { | |||||
| beacon = model.Beacon{ | |||||
| ID: id, | |||||
| } | |||||
| } | |||||
| ctx.Beacons.Lock.Unlock() | |||||
| beacon.IncomingJSON = adv | |||||
| beacon.LastSeen = now | |||||
| if beacon.BeaconMetrics == nil { | |||||
| beacon.BeaconMetrics = make([]model.BeaconMetric, 0, ctx.Settings.Settings.BeaconMetricSize) | |||||
| } | |||||
| metric := model.BeaconMetric{ | |||||
| Distance: getBeaconDistance(adv), | |||||
| Timestamp: now, | |||||
| RSSI: int64(adv.RSSI), | |||||
| Location: adv.Hostname, | |||||
| } | |||||
| if len(beacon.BeaconMetrics) >= ctx.Settings.Settings.BeaconMetricSize { | |||||
| copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:]) | |||||
| beacon.BeaconMetrics[ctx.Settings.Settings.BeaconMetricSize-1] = metric | |||||
| } else { | |||||
| beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric) | |||||
| } | |||||
| appendHTTPResult(ctx, beacon, best) | |||||
| ctx.Beacons.Beacons[id] = beacon | |||||
| ctx.Beacons.Lock.Lock() | |||||
| ctx.Beacons.Beacons[id] = beacon | |||||
| ctx.Beacons.Lock.Unlock() | |||||
| } | |||||
| func getBeaconDistance(adv model.BeaconAdvertisement) float64 { | |||||
| ratio := float64(adv.RSSI) * (1.0 / float64(twosComp(adv.TXPower))) | |||||
| distance := 100.0 | |||||
| if ratio < 1.0 { | |||||
| distance = math.Pow(ratio, 10) | |||||
| } else { | |||||
| distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 | |||||
| } | } | ||||
| return distance | |||||
| } | } | ||||
| // get likely locations: | |||||
| /* | |||||
| 1. Locks the http_results list | |||||
| 2. inits list to empty struct type -> TODO: what is this list used for | |||||
| 3. loops through beacons list -> should be locked? | |||||
| 4. check for beacon metrics -> how do you get beacon metrics, I guess it has an array of timestamps | |||||
| 5. check for threshold value in the settings | |||||
| 5.1. check for property expired location | |||||
| 5.2. if location is not expired -> mark it as expired, generate message and send to all clients, | |||||
| if clients do not respond close the connection | |||||
| 6. Init best location with type Best_location{} -> what is this type | |||||
| 7. make locations list -> key: string, val: float64 | |||||
| 7.1 set weight for seen and rssi | |||||
| 7.2 loop over metrics of the beacon -> some alogirthm based on location value | |||||
| I think the algorithm is recording names of different gateways and their rssi's and then from | |||||
| that it checks gateway name and makes decisions based on calculated values | |||||
| 7.3 writes result in best location and updates list location history with this name if the list | |||||
| is longer than 10 elements it removes the first element | |||||
| */ | |||||
| func twosComp(inp string) int64 { | |||||
| i, _ := strconv.ParseInt("0x"+inp, 0, 64) | |||||
| return i - 256 | |||||
| } | |||||
| @@ -1,188 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "log" | |||||
| "os" | |||||
| "os/signal" | |||||
| "strconv" | |||||
| "strings" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/config" | |||||
| "github.com/AFASystems/presence/internal/pkg/httpserver" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/AFASystems/presence/internal/pkg/mqttclient" | |||||
| "github.com/AFASystems/presence/internal/pkg/persistence" | |||||
| "github.com/boltdb/bolt" | |||||
| "github.com/gorilla/websocket" | |||||
| "github.com/yosssi/gmq/mqtt" | |||||
| "github.com/yosssi/gmq/mqtt/client" | |||||
| ) | |||||
| func main() { | |||||
| sigc := make(chan os.Signal, 1) | |||||
| signal.Notify(sigc, os.Interrupt) | |||||
| cfg := config.Load() | |||||
| fmt.Println("hello world") | |||||
| db, err := bolt.Open("presence.db", 0644, nil) | |||||
| if err != nil { | |||||
| log.Fatal(err) | |||||
| } | |||||
| defer db.Close() | |||||
| model.Db = db | |||||
| cli := client.New(&client.Options{ | |||||
| ErrorHandler: func(err error) { | |||||
| fmt.Println(err) | |||||
| }, | |||||
| }) | |||||
| defer cli.Terminate() | |||||
| fmt.Println("host: ", cfg.MQTTHost, " Client ID: ", cfg.MQTTClientID, "user: ", cfg.MQTTUser) | |||||
| err = cli.Connect(&client.ConnectOptions{ | |||||
| Network: "tcp", | |||||
| Address: cfg.MQTTHost, | |||||
| ClientID: []byte(cfg.MQTTClientID), | |||||
| UserName: []byte(cfg.MQTTUser), | |||||
| Password: []byte(cfg.MQTTPass), | |||||
| }) | |||||
| if err != nil { | |||||
| fmt.Println("Error comes from here") | |||||
| panic(err) | |||||
| } | |||||
| ctx := &model.AppContext{ | |||||
| HTTPResults: model.HTTPResultsList{ | |||||
| HTTPResults: model.HTTPLocationsList{Beacons: []model.HTTPLocation{}}, | |||||
| }, | |||||
| Beacons: model.BeaconsList{ | |||||
| Beacons: make(map[string]model.Beacon), | |||||
| }, | |||||
| ButtonsList: make(map[string]model.Button), | |||||
| Settings: model.Settings{ | |||||
| Location_confidence: 4, | |||||
| Last_seen_threshold: 15, | |||||
| Beacon_metrics_size: 30, | |||||
| HA_send_interval: 5, | |||||
| HA_send_changes_only: false, | |||||
| }, | |||||
| Clients: make(map[*websocket.Conn]bool), | |||||
| Broadcast: make(chan model.Message, 100), | |||||
| Locations: model.LocationsList{Locations: make(map[string]model.Location)}, | |||||
| LatestList: model.LatestBeaconsList{LatestList: make(map[string]model.Beacon)}, | |||||
| } | |||||
| persistence.LoadState(model.Db, ctx) | |||||
| incomingChan := mqttclient.IncomingMQTTProcessor(1*time.Second, cli, model.Db, ctx) | |||||
| err = cli.Subscribe(&client.SubscribeOptions{ | |||||
| SubReqs: []*client.SubReq{ | |||||
| &client.SubReq{ | |||||
| TopicFilter: []byte("publish_out/#"), | |||||
| QoS: mqtt.QoS0, | |||||
| Handler: func(topicName, message []byte) { | |||||
| msgStr := string(message) | |||||
| t := strings.Split(string(topicName), "/") | |||||
| hostname := t[1] | |||||
| fmt.Println("hostname: ", hostname) | |||||
| if strings.HasPrefix(msgStr, "[") { | |||||
| var readings []model.RawReading | |||||
| err := json.Unmarshal(message, &readings) | |||||
| if err != nil { | |||||
| log.Printf("Error parsing JSON: %v", err) | |||||
| return | |||||
| } | |||||
| for _, reading := range readings { | |||||
| if reading.Type == "Gateway" { | |||||
| continue | |||||
| } | |||||
| incoming := model.Incoming_json{ | |||||
| Hostname: hostname, | |||||
| MAC: reading.MAC, | |||||
| RSSI: int64(reading.RSSI), | |||||
| Data: reading.RawData, | |||||
| HB_ButtonCounter: parseButtonState(reading.RawData), | |||||
| } | |||||
| incomingChan <- incoming | |||||
| } | |||||
| } else { | |||||
| s := strings.Split(string(message), ",") | |||||
| if len(s) < 6 { | |||||
| log.Printf("Messaggio CSV non valido: %s", msgStr) | |||||
| return | |||||
| } | |||||
| rawdata := s[4] | |||||
| buttonCounter := parseButtonState(rawdata) | |||||
| if buttonCounter > 0 { | |||||
| incoming := model.Incoming_json{} | |||||
| i, _ := strconv.ParseInt(s[3], 10, 64) | |||||
| incoming.Hostname = hostname | |||||
| incoming.Beacon_type = "hb_button" | |||||
| incoming.MAC = s[1] | |||||
| incoming.RSSI = i | |||||
| incoming.Data = rawdata | |||||
| incoming.HB_ButtonCounter = buttonCounter | |||||
| read_line := strings.TrimRight(string(s[5]), "\r\n") | |||||
| it, err33 := strconv.Atoi(read_line) | |||||
| if err33 != nil { | |||||
| fmt.Println(it) | |||||
| fmt.Println(err33) | |||||
| os.Exit(2) | |||||
| } | |||||
| incomingChan <- incoming | |||||
| } | |||||
| } | |||||
| }, | |||||
| }, | |||||
| }, | |||||
| }) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| fmt.Println("CONNECTED TO MQTT") | |||||
| fmt.Println("\n ") | |||||
| fmt.Println("Visit http://" + cfg.HTTPAddr + " on your browser to see the web interface") | |||||
| fmt.Println("\n ") | |||||
| go httpserver.StartHTTPServer(cfg.HTTPAddr, ctx) | |||||
| <-sigc | |||||
| if err := cli.Disconnect(); err != nil { | |||||
| panic(err) | |||||
| } | |||||
| } | |||||
| func parseButtonState(raw string) int64 { | |||||
| raw = strings.ToUpper(raw) | |||||
| if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 { | |||||
| buttonField := raw[34:38] | |||||
| if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil { | |||||
| return buttonValue | |||||
| } | |||||
| } | |||||
| if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 { | |||||
| counterField := raw[22:24] | |||||
| buttonState, err := strconv.ParseInt(counterField, 16, 64) | |||||
| if err == nil { | |||||
| return buttonState | |||||
| } | |||||
| } | |||||
| return 0 | |||||
| } | |||||
| @@ -32,7 +32,7 @@ func HttpServer(addr string) { | |||||
| methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | ||||
| // Kafka writer that relays messages | // Kafka writer that relays messages | ||||
| writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons") | |||||
| writer := kafkaclient.KafkaWriter("127.0.0.1:9092", "apibeacons") | |||||
| defer writer.Close() | defer writer.Close() | ||||
| settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") | settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") | ||||
| @@ -44,11 +44,6 @@ func HttpServer(addr string) { | |||||
| r := mux.NewRouter() | r := mux.NewRouter() | ||||
| client := redis.NewClient(&redis.Options{ | |||||
| Addr: "valkey:6379", | |||||
| Password: "", | |||||
| }) | |||||
| // declare WS clients list | do I need it though? or will locations worker send message | // declare WS clients list | do I need it though? or will locations worker send message | ||||
| // to kafka and then only this service (server) is being used for communication with the clients | // to kafka and then only this service (server) is being used for communication with the clients | ||||
| clients := make(map[*websocket.Conn]bool) | clients := make(map[*websocket.Conn]bool) | ||||
| @@ -58,17 +53,17 @@ func HttpServer(addr string) { | |||||
| // For now just add beacon DELETE / GET / POST / PUT methods | // For now just add beacon DELETE / GET / POST / PUT methods | ||||
| r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") | r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") | ||||
| r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") | |||||
| // r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") | |||||
| r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") | r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") | ||||
| r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") | r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") | ||||
| r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") | |||||
| // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") | |||||
| r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") | r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") | ||||
| // Handler for WS messages | // Handler for WS messages | ||||
| // No point in having seperate route for each message type, better to handle different message types in one connection | // No point in having seperate route for each message type, better to handle different message types in one connection | ||||
| r.HandleFunc("/ws/api/beacons", serveWs(client)) | |||||
| r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) | |||||
| // r.HandleFunc("/ws/api/beacons", serveWs(client)) | |||||
| // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) | |||||
| r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) | r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) | ||||
| http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | ||||
| @@ -103,6 +98,8 @@ func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { | |||||
| ID: beaconId, | ID: beaconId, | ||||
| } | } | ||||
| fmt.Println("Sending DELETE message") | |||||
| flag := sendKafkaMessage(writer, &apiUpdate) | flag := sendKafkaMessage(writer, &apiUpdate) | ||||
| if !flag { | if !flag { | ||||
| fmt.Println("error in sending Kafka message") | fmt.Println("error in sending Kafka message") | ||||
| @@ -125,7 +122,9 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { | |||||
| return | return | ||||
| } | } | ||||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) { | |||||
| fmt.Println("sending POST message") | |||||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { | |||||
| http.Error(w, "name and beacon_id cannot be blank", 400) | http.Error(w, "name and beacon_id cannot be blank", 400) | ||||
| return | return | ||||
| } | } | ||||
| @@ -146,35 +145,14 @@ func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { | |||||
| } | } | ||||
| } | } | ||||
| func beaconsListHandler(client *redis.Client) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| beaconsList, err := client.Get(context.Background(), "beaconsList").Result() | |||||
| if err == redis.Nil { | |||||
| fmt.Println("no beacons list, starting empty") | |||||
| http.Error(w, "list is empty", 500) | |||||
| } else if err != nil { | |||||
| http.Error(w, "Internal server error", 500) | |||||
| panic(err) | |||||
| } else { | |||||
| w.Write([]byte(beaconsList)) | |||||
| } | |||||
| } | |||||
| } | |||||
| // func beaconsListHandler(client *redis.Client) http.HandlerFunc { | |||||
| // return func(w http.ResponseWriter, r *http.Request) { | |||||
| // } | |||||
| // } | |||||
| func settingsListHandler(client *redis.Client) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| settings, err := client.Get(context.Background(), "settings").Result() | |||||
| if err == redis.Nil { | |||||
| fmt.Println("no settings persisted, starting empty") | |||||
| http.Error(w, "list is empty", 500) | |||||
| } else if err != nil { | |||||
| http.Error(w, "Internal server error", 500) | |||||
| panic(err) | |||||
| } else { | |||||
| w.Write([]byte(settings)) | |||||
| } | |||||
| } | |||||
| } | |||||
| // func settingsListHandler(client *redis.Client) http.HandlerFunc { | |||||
| // return func(w http.ResponseWriter, r *http.Request) {} | |||||
| // } | |||||
| func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { | func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { | ||||
| return func(w http.ResponseWriter, r *http.Request) { | return func(w http.ResponseWriter, r *http.Request) { | ||||
| @@ -214,7 +192,7 @@ func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { | |||||
| } | } | ||||
| func settingsCheck(settings model.SettingsVal) bool { | func settingsCheck(settings model.SettingsVal) bool { | ||||
| if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 { | |||||
| if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 { | |||||
| return false | return false | ||||
| } | } | ||||
| @@ -0,0 +1,86 @@ | |||||
| package main | |||||
| import ( | |||||
| "bufio" | |||||
| "encoding/hex" | |||||
| "fmt" | |||||
| "log" | |||||
| "os" | |||||
| "strings" | |||||
| ) | |||||
| func main() { | |||||
| file, err := os.Open("save.txt") | |||||
| if err != nil { | |||||
| log.Fatalf("Failed to open file: %s", err) | |||||
| } | |||||
| defer file.Close() | |||||
| scanner := bufio.NewScanner(file) | |||||
| for scanner.Scan() { | |||||
| line := scanner.Text() | |||||
| decodeBeacon(line) | |||||
| } | |||||
| } | |||||
| func decodeBeacon(beacon string) { | |||||
| beacon = strings.TrimSpace(beacon) | |||||
| if beacon == "" { | |||||
| return | |||||
| } | |||||
| // convert to bytes for faster operations | |||||
| b, err := hex.DecodeString(beacon) | |||||
| if err != nil { | |||||
| fmt.Println("invalid line: ", beacon) | |||||
| return | |||||
| } | |||||
| // remove flag bytes - they hold no structural information | |||||
| if len(b) > 1 && b[1] == 0x01 { | |||||
| l := int(b[0]) | |||||
| if 1+l <= len(b) { | |||||
| b = b[1+l:] | |||||
| } | |||||
| } | |||||
| adBlockIndeces := parseADFast(b) | |||||
| for _, r := range adBlockIndeces { | |||||
| ad := b[r[0]:r[1]] | |||||
| if len(ad) >= 4 && | |||||
| ad[1] == 0x16 && | |||||
| ad[2] == 0xAA && | |||||
| ad[3] == 0xFE { | |||||
| // fmt.Println("Eddystone:", hex.EncodeToString(b)) | |||||
| return | |||||
| } | |||||
| if len(ad) >= 7 && | |||||
| ad[1] == 0xFF && | |||||
| ad[2] == 0x4C && ad[3] == 0x00 && | |||||
| ad[4] == 0x02 && ad[5] == 0x15 { | |||||
| // fmt.Println("iBeacon:", hex.EncodeToString(b)) | |||||
| return | |||||
| } | |||||
| } | |||||
| fmt.Println(hex.EncodeToString(b)) | |||||
| } | |||||
| func parseADFast(b []byte) [][2]int { | |||||
| var res [][2]int | |||||
| i := 0 | |||||
| for i < len(b) { | |||||
| l := int(b[i]) | |||||
| if l == 0 || i+1+l > len(b) { | |||||
| break | |||||
| } | |||||
| res = append(res, [2]int{i, i + 1 + l}) | |||||
| i += 1 + l | |||||
| } | |||||
| return res | |||||
| } | |||||
| @@ -1,13 +1,13 @@ | |||||
| package mqtthandler | package mqtthandler | ||||
| import ( | import ( | ||||
| "fmt" | |||||
| "context" | |||||
| "encoding/json" | "encoding/json" | ||||
| "strings" | |||||
| "fmt" | |||||
| "log" | "log" | ||||
| "strconv" | |||||
| "os" | "os" | ||||
| "context" | |||||
| "strconv" | |||||
| "strings" | |||||
| "time" | "time" | ||||
| "github.com/AFASystems/presence/internal/pkg/model" | "github.com/AFASystems/presence/internal/pkg/model" | ||||
| @@ -30,17 +30,18 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) { | |||||
| if reading.Type == "Gateway" { | if reading.Type == "Gateway" { | ||||
| continue | continue | ||||
| } | } | ||||
| incoming := model.Incoming_json{ | |||||
| Hostname: hostname, | |||||
| MAC: reading.MAC, | |||||
| RSSI: int64(reading.RSSI), | |||||
| Data: reading.RawData, | |||||
| HB_ButtonCounter: parseButtonState(reading.RawData), | |||||
| adv := model.BeaconAdvertisement{ | |||||
| Hostname: hostname, | |||||
| MAC: reading.MAC, | |||||
| RSSI: int64(reading.RSSI), | |||||
| Data: reading.RawData, | |||||
| HSButtonCounter: parseButtonState(reading.RawData), | |||||
| } | } | ||||
| encodedMsg, err := json.Marshal(incoming) | |||||
| encodedMsg, err := json.Marshal(adv) | |||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("Error in marshaling: ", err) | fmt.Println("Error in marshaling: ", err) | ||||
| break | |||||
| } | } | ||||
| msg := kafka.Message{ | msg := kafka.Message{ | ||||
| @@ -49,6 +50,8 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) { | |||||
| err = writer.WriteMessages(context.Background(), msg) | err = writer.WriteMessages(context.Background(), msg) | ||||
| if err != nil { | if err != nil { | ||||
| fmt.Println("Error in writing to Kafka: ", err) | fmt.Println("Error in writing to Kafka: ", err) | ||||
| time.Sleep(1 * time.Second) | |||||
| break | |||||
| } | } | ||||
| fmt.Println("message sent: ", time.Now()) | fmt.Println("message sent: ", time.Now()) | ||||
| @@ -63,14 +66,14 @@ func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) { | |||||
| rawdata := s[4] | rawdata := s[4] | ||||
| buttonCounter := parseButtonState(rawdata) | buttonCounter := parseButtonState(rawdata) | ||||
| if buttonCounter > 0 { | if buttonCounter > 0 { | ||||
| incoming := model.Incoming_json{} | |||||
| adv := model.BeaconAdvertisement{} | |||||
| i, _ := strconv.ParseInt(s[3], 10, 64) | i, _ := strconv.ParseInt(s[3], 10, 64) | ||||
| incoming.Hostname = hostname | |||||
| incoming.Beacon_type = "hb_button" | |||||
| incoming.MAC = s[1] | |||||
| incoming.RSSI = i | |||||
| incoming.Data = rawdata | |||||
| incoming.HB_ButtonCounter = buttonCounter | |||||
| adv.Hostname = hostname | |||||
| adv.BeaconType = "hb_button" | |||||
| adv.MAC = s[1] | |||||
| adv.RSSI = i | |||||
| adv.Data = rawdata | |||||
| adv.HSButtonCounter = buttonCounter | |||||
| read_line := strings.TrimRight(string(s[5]), "\r\n") | read_line := strings.TrimRight(string(s[5]), "\r\n") | ||||
| it, err33 := strconv.Atoi(read_line) | it, err33 := strconv.Atoi(read_line) | ||||
| @@ -102,4 +105,4 @@ func parseButtonState(raw string) int64 { | |||||
| } | } | ||||
| return 0 | return 0 | ||||
| } | |||||
| } | |||||
| @@ -0,0 +1,181 @@ | |||||
| package appcontext | |||||
| import ( | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| // Manager provides centralized access to application state | |||||
| type Manager struct { | |||||
| beacons model.BeaconsList | |||||
| settings model.Settings | |||||
| beaconEvents model.BeaconEventList | |||||
| beaconsLookup map[string]struct{} | |||||
| latestList model.LatestBeaconsList | |||||
| } | |||||
| // NewManager creates a new application context manager with default values | |||||
| func NewManager() *Manager { | |||||
| return &Manager{ | |||||
| beacons: model.BeaconsList{ | |||||
| Beacons: make(map[string]model.Beacon), | |||||
| }, | |||||
| settings: model.Settings{ | |||||
| Settings: model.SettingsVal{ | |||||
| LocationConfidence: 4, | |||||
| LastSeenThreshold: 15, | |||||
| BeaconMetricSize: 30, | |||||
| HASendInterval: 5, | |||||
| HASendChangesOnly: false, | |||||
| RSSIEnforceThreshold: false, | |||||
| RSSIMinThreshold: 100, | |||||
| }, | |||||
| }, | |||||
| beaconEvents: model.BeaconEventList{ | |||||
| Beacons: make(map[string]model.BeaconEvent), | |||||
| }, | |||||
| beaconsLookup: make(map[string]struct{}), | |||||
| latestList: model.LatestBeaconsList{ | |||||
| LatestList: make(map[string]model.Beacon), | |||||
| }, | |||||
| } | |||||
| } | |||||
| // GetBeacons returns thread-safe access to beacons list | |||||
| func (m *Manager) GetBeacons() *model.BeaconsList { | |||||
| return &m.beacons | |||||
| } | |||||
| // GetSettings returns thread-safe access to settings | |||||
| func (m *Manager) GetSettings() *model.Settings { | |||||
| return &m.settings | |||||
| } | |||||
| // GetBeaconEvents returns thread-safe access to beacon events | |||||
| func (m *Manager) GetBeaconEvents() *model.BeaconEventList { | |||||
| return &m.beaconEvents | |||||
| } | |||||
| // GetBeaconsLookup returns thread-safe access to beacon lookup map | |||||
| func (m *Manager) GetBeaconsLookup() map[string]struct{} { | |||||
| return m.beaconsLookup | |||||
| } | |||||
| // GetLatestList returns thread-safe access to latest beacons list | |||||
| func (m *Manager) GetLatestList() *model.LatestBeaconsList { | |||||
| return &m.latestList | |||||
| } | |||||
| // AddBeaconToLookup adds a beacon ID to the lookup map | |||||
| func (m *Manager) AddBeaconToLookup(id string) { | |||||
| m.beaconsLookup[id] = struct{}{} | |||||
| } | |||||
| // RemoveBeaconFromLookup removes a beacon ID from the lookup map | |||||
| func (m *Manager) RemoveBeaconFromLookup(id string) { | |||||
| delete(m.beaconsLookup, id) | |||||
| } | |||||
| // BeaconExists checks if a beacon exists in the lookup | |||||
| func (m *Manager) BeaconExists(id string) bool { | |||||
| _, exists := m.beaconsLookup[id] | |||||
| return exists | |||||
| } | |||||
| // GetBeacon returns a beacon by ID (thread-safe) | |||||
| func (m *Manager) GetBeacon(id string) (model.Beacon, bool) { | |||||
| m.beacons.Lock.RLock() | |||||
| defer m.beacons.Lock.RUnlock() | |||||
| beacon, exists := m.beacons.Beacons[id] | |||||
| return beacon, exists | |||||
| } | |||||
| // UpdateBeacon updates a beacon in the list (thread-safe) | |||||
| func (m *Manager) UpdateBeacon(id string, beacon model.Beacon) { | |||||
| m.beacons.Lock.Lock() | |||||
| defer m.beacons.Lock.Unlock() | |||||
| m.beacons.Beacons[id] = beacon | |||||
| } | |||||
| // GetBeaconEvent returns a beacon event by ID (thread-safe) | |||||
| func (m *Manager) GetBeaconEvent(id string) (model.BeaconEvent, bool) { | |||||
| m.beaconEvents.Lock.RLock() | |||||
| defer m.beaconEvents.Lock.RUnlock() | |||||
| event, exists := m.beaconEvents.Beacons[id] | |||||
| return event, exists | |||||
| } | |||||
| // UpdateBeaconEvent updates a beacon event in the list (thread-safe) | |||||
| func (m *Manager) UpdateBeaconEvent(id string, event model.BeaconEvent) { | |||||
| m.beaconEvents.Lock.Lock() | |||||
| defer m.beaconEvents.Lock.Unlock() | |||||
| m.beaconEvents.Beacons[id] = event | |||||
| } | |||||
| // GetLatestBeacon returns the latest beacon by ID (thread-safe) | |||||
| func (m *Manager) GetLatestBeacon(id string) (model.Beacon, bool) { | |||||
| m.latestList.Lock.RLock() | |||||
| defer m.latestList.Lock.RUnlock() | |||||
| beacon, exists := m.latestList.LatestList[id] | |||||
| return beacon, exists | |||||
| } | |||||
| // UpdateLatestBeacon updates the latest beacon in the list (thread-safe) | |||||
| func (m *Manager) UpdateLatestBeacon(id string, beacon model.Beacon) { | |||||
| m.latestList.Lock.Lock() | |||||
| defer m.latestList.Lock.Unlock() | |||||
| m.latestList.LatestList[id] = beacon | |||||
| } | |||||
| // GetAllBeacons returns a copy of all beacons | |||||
| func (m *Manager) GetAllBeacons() map[string]model.Beacon { | |||||
| m.beacons.Lock.RLock() | |||||
| defer m.beacons.Lock.RUnlock() | |||||
| beacons := make(map[string]model.Beacon) | |||||
| for id, beacon := range m.beacons.Beacons { | |||||
| beacons[id] = beacon | |||||
| } | |||||
| return beacons | |||||
| } | |||||
| // GetAllLatestBeacons returns a copy of all latest beacons | |||||
| func (m *Manager) GetAllLatestBeacons() map[string]model.Beacon { | |||||
| m.latestList.Lock.RLock() | |||||
| defer m.latestList.Lock.RUnlock() | |||||
| beacons := make(map[string]model.Beacon) | |||||
| for id, beacon := range m.latestList.LatestList { | |||||
| beacons[id] = beacon | |||||
| } | |||||
| return beacons | |||||
| } | |||||
| // GetBeaconCount returns the number of tracked beacons | |||||
| func (m *Manager) GetBeaconCount() int { | |||||
| m.beacons.Lock.RLock() | |||||
| defer m.beacons.Lock.RUnlock() | |||||
| return len(m.beacons.Beacons) | |||||
| } | |||||
| // GetSettingsValue returns current settings as a value | |||||
| func (m *Manager) GetSettingsValue() model.SettingsVal { | |||||
| m.settings.Lock.RLock() | |||||
| defer m.settings.Lock.RUnlock() | |||||
| return m.settings.Settings | |||||
| } | |||||
| // UpdateSettings updates the system settings (thread-safe) | |||||
| func (m *Manager) UpdateSettings(newSettings model.SettingsVal) { | |||||
| m.settings.Lock.Lock() | |||||
| defer m.settings.Lock.Unlock() | |||||
| m.settings.Settings = newSettings | |||||
| } | |||||
| @@ -26,12 +26,11 @@ func Load() *Config { | |||||
| return &Config{ | return &Config{ | ||||
| HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"), | HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"), | ||||
| WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"), | WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"), | ||||
| MQTTHost: getEnv("MQTT_HOST", "127.0.0.1:11883"), | |||||
| MQTTHost: getEnv("MQTT_HOST", "192.168.1.101:1883"), | |||||
| MQTTUser: getEnv("MQTT_USERNAME", "user"), | MQTTUser: getEnv("MQTT_USERNAME", "user"), | ||||
| MQTTPass: getEnv("MQTT_PASSWORD", "pass"), | MQTTPass: getEnv("MQTT_PASSWORD", "pass"), | ||||
| MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), | MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), | ||||
| DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), | DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), | ||||
| KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), | KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), | ||||
| RedisURL: getEnv("REDIS_URL", "127.0.0.1:6379"), | |||||
| } | } | ||||
| } | } | ||||
| @@ -1,371 +0,0 @@ | |||||
| package httpserver | |||||
| import ( | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "log" | |||||
| "net/http" | |||||
| "strings" | |||||
| "sync" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/AFASystems/presence/internal/pkg/persistence" | |||||
| "github.com/gorilla/handlers" | |||||
| "github.com/gorilla/mux" | |||||
| "github.com/gorilla/websocket" | |||||
| ) | |||||
| var ( | |||||
| upgrader = websocket.Upgrader{ | |||||
| ReadBufferSize: 1024, | |||||
| WriteBufferSize: 1024, | |||||
| CheckOrigin: func(r *http.Request) bool { | |||||
| return true | |||||
| }, | |||||
| } | |||||
| ) | |||||
| const ( | |||||
| writeWait = 10 * time.Second | |||||
| pongWait = 60 * time.Second | |||||
| pingPeriod = (pongWait * 9) / 10 | |||||
| beaconPeriod = 2 * time.Second | |||||
| ) | |||||
| // Init store in main or anywhere else and pass it to all initializer functions | |||||
| // called in main, then with controllers or handlers use wrapper that takes entire store | |||||
| // allocates only the properties that need to be passed into the controller | |||||
| func StartHTTPServer(addr string, ctx *model.AppContext) { | |||||
| headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) | |||||
| originsOk := handlers.AllowedOrigins([]string{"*"}) | |||||
| methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | |||||
| // Set up HTTP server | |||||
| r := mux.NewRouter() | |||||
| r.HandleFunc("/api/results", resultsHandler(&ctx.HTTPResults)) | |||||
| r.HandleFunc("/api/beacons/{beacon_id}", BeaconsDeleteHandler(&ctx.Beacons, ctx.ButtonsList)).Methods("DELETE") | |||||
| r.HandleFunc("/api/beacons", BeaconsListHandler(&ctx.Beacons)).Methods("GET") | |||||
| r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("POST") //since beacons are hashmap, just have put and post be same thing. it'll either add or modify that entry | |||||
| r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("PUT") | |||||
| r.HandleFunc("/api/latest-beacons", latestBeaconsListHandler(&ctx.LatestList)).Methods("GET") | |||||
| r.HandleFunc("/api/settings", SettingsListHandler(&ctx.Settings)).Methods("GET") | |||||
| r.HandleFunc("/api/settings", SettingsEditHandler(&ctx.Settings)).Methods("POST") | |||||
| r.PathPrefix("/js/").Handler(http.StripPrefix("/js/", http.FileServer(http.Dir("static_html/js/")))) | |||||
| r.PathPrefix("/css/").Handler(http.StripPrefix("/css/", http.FileServer(http.Dir("static_html/css/")))) | |||||
| r.PathPrefix("/img/").Handler(http.StripPrefix("/img/", http.FileServer(http.Dir("static_html/img/")))) | |||||
| r.PathPrefix("/").Handler(http.FileServer(http.Dir("static_html/"))) | |||||
| http.Handle("/", r) | |||||
| mxWS := mux.NewRouter() | |||||
| mxWS.HandleFunc("/ws/api/beacons", serveWs(&ctx.HTTPResults)) | |||||
| mxWS.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(&ctx.LatestList)) | |||||
| mxWS.HandleFunc("/ws/broadcast", handleConnections(ctx.Clients, &ctx.Broadcast)) | |||||
| http.Handle("/ws/", mxWS) | |||||
| go handleMessages(ctx.Clients, &ctx.Broadcast) | |||||
| http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | |||||
| } | |||||
| func resultsHandler(httpResults *model.HTTPResultsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| httpResults.HTTPResultsLock.Lock() | |||||
| defer httpResults.HTTPResultsLock.Unlock() | |||||
| js, err := json.Marshal(httpResults.HTTPResults) | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), http.StatusInternalServerError) | |||||
| return | |||||
| } | |||||
| w.Write(js) | |||||
| } | |||||
| } | |||||
| func BeaconsListHandler(beacons *model.BeaconsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| beacons.Lock.RLock() | |||||
| js, err := json.Marshal(beacons.Beacons) | |||||
| beacons.Lock.RUnlock() | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), http.StatusInternalServerError) | |||||
| return | |||||
| } | |||||
| w.Write(js) | |||||
| } | |||||
| } | |||||
| func BeaconsAddHandler(beacons *model.BeaconsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| decoder := json.NewDecoder(r.Body) | |||||
| var inBeacon model.Beacon | |||||
| err := decoder.Decode(&inBeacon) | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), 400) | |||||
| return | |||||
| } | |||||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) { | |||||
| http.Error(w, "name and beacon_id cannot be blank", 400) | |||||
| return | |||||
| } | |||||
| beacons.Beacons[inBeacon.Beacon_id] = inBeacon | |||||
| err = persistence.PersistBeacons(beacons) | |||||
| if err != nil { | |||||
| http.Error(w, "trouble persisting beacons list, create bucket", 500) | |||||
| return | |||||
| } | |||||
| w.Write([]byte("ok")) | |||||
| } | |||||
| } | |||||
| func BeaconsDeleteHandler(beacons *model.BeaconsList, buttonsList map[string]model.Button) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| vars := mux.Vars(r) | |||||
| fmt.Println("route param: ", vars) | |||||
| beaconId := vars["beacon_id"] | |||||
| _, ok := beacons.Beacons[beaconId] | |||||
| if !ok { | |||||
| http.Error(w, "no beacon with the specified id", 400) // change the status code | |||||
| return | |||||
| } | |||||
| delete(beacons.Beacons, beaconId) | |||||
| _, ok = buttonsList[beaconId] | |||||
| if ok { | |||||
| delete(buttonsList, beaconId) | |||||
| } | |||||
| err := persistence.PersistBeacons(beacons) | |||||
| if err != nil { | |||||
| http.Error(w, "trouble persisting beacons list, create bucket", 500) | |||||
| return | |||||
| } | |||||
| w.Write([]byte("ok")) | |||||
| } | |||||
| } | |||||
| func latestBeaconsListHandler(latestList *model.LatestBeaconsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| latestList.Lock.RLock() | |||||
| var la = make([]model.Beacon, 0) | |||||
| for _, b := range latestList.LatestList { | |||||
| la = append(la, b) | |||||
| } | |||||
| latestList.Lock.RUnlock() | |||||
| js, err := json.Marshal(la) | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), http.StatusInternalServerError) | |||||
| return | |||||
| } | |||||
| w.Write(js) | |||||
| } | |||||
| } | |||||
| func SettingsListHandler(settings *model.Settings) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| js, err := json.Marshal(settings) | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), http.StatusInternalServerError) | |||||
| return | |||||
| } | |||||
| w.Write(js) | |||||
| } | |||||
| } | |||||
| func SettingsEditHandler(settings *model.Settings) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| decoder := json.NewDecoder(r.Body) | |||||
| var inSettings model.Settings | |||||
| err := decoder.Decode(&inSettings) | |||||
| if err != nil { | |||||
| http.Error(w, err.Error(), 400) | |||||
| return | |||||
| } | |||||
| //make sure values are > 0 | |||||
| if (inSettings.Location_confidence <= 0) || | |||||
| (inSettings.Last_seen_threshold <= 0) || | |||||
| (inSettings.HA_send_interval <= 0) { | |||||
| http.Error(w, "values must be greater than 0", 400) | |||||
| return | |||||
| } | |||||
| *settings = inSettings | |||||
| err = persistence.PersistSettings(settings) | |||||
| if err != nil { | |||||
| http.Error(w, "trouble persisting settings, create bucket", 500) | |||||
| return | |||||
| } | |||||
| w.Write([]byte("ok")) | |||||
| } | |||||
| } | |||||
| func reader(ws *websocket.Conn) { | |||||
| defer ws.Close() | |||||
| ws.SetReadLimit(512) | |||||
| ws.SetReadDeadline(time.Now().Add(pongWait)) | |||||
| ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil }) | |||||
| for { | |||||
| _, _, err := ws.ReadMessage() | |||||
| if err != nil { | |||||
| break | |||||
| } | |||||
| } | |||||
| } | |||||
| func writer(ws *websocket.Conn, httpResult *model.HTTPResultsList) { | |||||
| pingTicker := time.NewTicker(pingPeriod) | |||||
| beaconTicker := time.NewTicker(beaconPeriod) | |||||
| defer func() { | |||||
| pingTicker.Stop() | |||||
| beaconTicker.Stop() | |||||
| ws.Close() | |||||
| }() | |||||
| for { | |||||
| select { | |||||
| case <-beaconTicker.C: | |||||
| httpResult.HTTPResultsLock.Lock() | |||||
| defer httpResult.HTTPResultsLock.Unlock() | |||||
| js, err := json.Marshal(httpResult.HTTPResults) | |||||
| if err != nil { | |||||
| js = []byte("error") | |||||
| } | |||||
| ws.SetWriteDeadline(time.Now().Add(writeWait)) | |||||
| if err := ws.WriteMessage(websocket.TextMessage, js); err != nil { | |||||
| return | |||||
| } | |||||
| case <-pingTicker.C: | |||||
| ws.SetWriteDeadline(time.Now().Add(writeWait)) | |||||
| if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| func serveWs(httpResult *model.HTTPResultsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| ws, err := upgrader.Upgrade(w, r, nil) | |||||
| if err != nil { | |||||
| if _, ok := err.(websocket.HandshakeError); !ok { | |||||
| log.Println(err) | |||||
| } | |||||
| return | |||||
| } | |||||
| go writer(ws, httpResult) | |||||
| reader(ws) | |||||
| } | |||||
| } | |||||
| func latestBeaconWriter(ws *websocket.Conn, latestBeaconsList map[string]model.Beacon, lock *sync.RWMutex) { | |||||
| pingTicker := time.NewTicker(pingPeriod) | |||||
| beaconTicker := time.NewTicker(beaconPeriod) | |||||
| defer func() { | |||||
| pingTicker.Stop() | |||||
| beaconTicker.Stop() | |||||
| ws.Close() | |||||
| }() | |||||
| for { | |||||
| select { | |||||
| case <-beaconTicker.C: | |||||
| lock.RLock() | |||||
| var la = make([]model.Beacon, 0) | |||||
| for _, b := range latestBeaconsList { | |||||
| la = append(la, b) | |||||
| } | |||||
| lock.RUnlock() | |||||
| js, err := json.Marshal(la) | |||||
| if err != nil { | |||||
| js = []byte("error") | |||||
| } | |||||
| ws.SetWriteDeadline(time.Now().Add(writeWait)) | |||||
| if err := ws.WriteMessage(websocket.TextMessage, js); err != nil { | |||||
| return | |||||
| } | |||||
| case <-pingTicker.C: | |||||
| ws.SetWriteDeadline(time.Now().Add(writeWait)) | |||||
| if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| func serveLatestBeaconsWs(latestList *model.LatestBeaconsList) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| ws, err := upgrader.Upgrade(w, r, nil) | |||||
| if err != nil { | |||||
| if _, ok := err.(websocket.HandshakeError); !ok { | |||||
| log.Println(err) | |||||
| } | |||||
| return | |||||
| } | |||||
| go latestBeaconWriter(ws, latestList.LatestList, &latestList.Lock) | |||||
| reader(ws) | |||||
| } | |||||
| } | |||||
| func handleConnections(clients map[*websocket.Conn]bool, broadcast *chan model.Message) http.HandlerFunc { | |||||
| return func(w http.ResponseWriter, r *http.Request) { | |||||
| ws, err := upgrader.Upgrade(w, r, nil) | |||||
| if err != nil { | |||||
| log.Fatal(err) | |||||
| } | |||||
| defer ws.Close() | |||||
| clients[ws] = true | |||||
| for { | |||||
| var msg model.Message | |||||
| err := ws.ReadJSON(&msg) | |||||
| if err != nil { | |||||
| log.Printf("error: %v", err) | |||||
| delete(clients, ws) | |||||
| break | |||||
| } | |||||
| *broadcast <- msg | |||||
| } | |||||
| } | |||||
| } | |||||
| func handleMessages(clients map[*websocket.Conn]bool, broadcast *chan model.Message) { | |||||
| for { | |||||
| msg := <-*broadcast | |||||
| for client := range clients { | |||||
| err := client.WriteJSON(msg) | |||||
| if err != nil { | |||||
| log.Printf("error: %v", err) | |||||
| client.Close() | |||||
| delete(clients, client) | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| @@ -1,3 +0,0 @@ | |||||
| # Server | |||||
| TODO: refactor to structure: router -> controller -> service, possibly use swagger or any other package to define structure of the API server | |||||
| @@ -11,6 +11,8 @@ func KafkaWriter(kafkaURL, topic string) *kafka.Writer { | |||||
| Addr: kafka.TCP(kafkaURL), | Addr: kafka.TCP(kafkaURL), | ||||
| Topic: topic, | Topic: topic, | ||||
| Balancer: &kafka.LeastBytes{}, | Balancer: &kafka.LeastBytes{}, | ||||
| Async: false, | |||||
| RequiredAcks: kafka.RequireAll, | |||||
| BatchSize: 100, | BatchSize: 100, | ||||
| BatchTimeout: 10 * time.Millisecond, | BatchTimeout: 10 * time.Millisecond, | ||||
| } | } | ||||
| @@ -0,0 +1,16 @@ | |||||
| package model | |||||
| import ( | |||||
| "crypto/sha256" | |||||
| "fmt" | |||||
| ) | |||||
| func (b BeaconEvent) Hash() []byte { | |||||
| rBatt := (b.Battery / 10) * 10 | |||||
| c := fmt.Sprintf("%d%d%s%s%s", rBatt, b.Event, b.ID, b.Name, b.Type) | |||||
| h := sha256.New() | |||||
| h.Write([]byte(c)) | |||||
| bs := h.Sum(nil) | |||||
| return bs | |||||
| } | |||||
| @@ -4,16 +4,17 @@ import ( | |||||
| "sync" | "sync" | ||||
| "github.com/boltdb/bolt" | "github.com/boltdb/bolt" | ||||
| "github.com/gorilla/websocket" | |||||
| ) | ) | ||||
| // Settings defines configuration parameters for presence detection behavior. | // Settings defines configuration parameters for presence detection behavior. | ||||
| type SettingsVal struct { | type SettingsVal struct { | ||||
| Location_confidence int64 `json:"location_confidence"` | |||||
| Last_seen_threshold int64 `json:"last_seen_threshold"` | |||||
| Beacon_metrics_size int `json:"beacon_metrics_size"` | |||||
| HA_send_interval int64 `json:"ha_send_interval"` | |||||
| HA_send_changes_only bool `json:"ha_send_changes_only"` | |||||
| LocationConfidence int64 `json:"location_confidence"` | |||||
| LastSeenThreshold int64 `json:"last_seen_threshold"` | |||||
| BeaconMetricSize int `json:"beacon_metrics_size"` | |||||
| HASendInterval int64 `json:"ha_send_interval"` | |||||
| HASendChangesOnly bool `json:"ha_send_changes_only"` | |||||
| RSSIMinThreshold int64 `json:"rssi_min_threshold"` | |||||
| RSSIEnforceThreshold bool `json:"enforce_rssi_threshold"` | |||||
| } | } | ||||
| type Settings struct { | type Settings struct { | ||||
| @@ -21,126 +22,127 @@ type Settings struct { | |||||
| Lock sync.RWMutex | Lock sync.RWMutex | ||||
| } | } | ||||
| // Incoming_json represents the JSON payload received from beacon messages. | |||||
| type Incoming_json struct { | |||||
| Hostname string `json:"hostname"` | |||||
| MAC string `json:"mac"` | |||||
| RSSI int64 `json:"rssi"` | |||||
| Is_scan_response string `json:"is_scan_response"` | |||||
| Ttype string `json:"type"` | |||||
| Data string `json:"data"` | |||||
| Beacon_type string `json:"beacon_type"` | |||||
| UUID string `json:"uuid"` | |||||
| Major string `json:"major"` | |||||
| Minor string `json:"minor"` | |||||
| TX_power string `json:"tx_power"` | |||||
| Namespace string `json:"namespace"` | |||||
| Instance_id string `json:"instance_id"` | |||||
| // button stuff | |||||
| HB_ButtonCounter int64 `json:"hb_button_counter"` | |||||
| HB_ButtonCounter_Prev int64 `json:"hb_button_counter_prev"` | |||||
| HB_Battery int64 `json:"hb_button_battery"` | |||||
| HB_RandomNonce string `json:"hb_button_random"` | |||||
| HB_ButtonMode string `json:"hb_button_mode"` | |||||
| // BeaconAdvertisement represents the JSON payload received from beacon advertisements. | |||||
| type BeaconAdvertisement struct { | |||||
| Hostname string `json:"hostname"` | |||||
| MAC string `json:"mac"` | |||||
| RSSI int64 `json:"rssi"` | |||||
| ScanResponse string `json:"is_scan_response"` | |||||
| Type string `json:"type"` | |||||
| Data string `json:"data"` | |||||
| BeaconType string `json:"beacon_type"` | |||||
| UUID string `json:"uuid"` | |||||
| Major string `json:"major"` | |||||
| Minor string `json:"minor"` | |||||
| TXPower string `json:"tx_power"` | |||||
| NamespaceID string `json:"namespace"` | |||||
| InstanceID string `json:"instance_id"` | |||||
| HSButtonCounter int64 `json:"hb_button_counter"` | |||||
| HSButtonPrev int64 `json:"hb_button_counter_prev"` | |||||
| HSBatteryLevel int64 `json:"hb_button_battery"` | |||||
| HSRandomNonce string `json:"hb_button_random"` | |||||
| HSButtonMode string `json:"hb_button_mode"` | |||||
| } | } | ||||
| // Advertisement describes a generic beacon advertisement payload. | // Advertisement describes a generic beacon advertisement payload. | ||||
| type Advertisement struct { | type Advertisement struct { | ||||
| ttype string | |||||
| content string | |||||
| seen int64 | |||||
| Type string | |||||
| Content string | |||||
| Seen int64 | |||||
| } | } | ||||
| // BeaconMetric stores signal and distance data for a beacon. | // BeaconMetric stores signal and distance data for a beacon. | ||||
| type BeaconMetric struct { | type BeaconMetric struct { | ||||
| Location string | Location string | ||||
| Distance float64 | Distance float64 | ||||
| Rssi int64 | |||||
| RSSI int64 | |||||
| Timestamp int64 | Timestamp int64 | ||||
| } | } | ||||
| // Location defines a physical location and synchronization control. | // Location defines a physical location and synchronization control. | ||||
| type Location struct { | type Location struct { | ||||
| name string | |||||
| lock sync.RWMutex | |||||
| Name string | |||||
| Lock sync.RWMutex | |||||
| } | } | ||||
| // BestLocation represents the most probable location of a beacon. | // BestLocation represents the most probable location of a beacon. | ||||
| type BestLocation struct { | type BestLocation struct { | ||||
| Distance float64 | |||||
| Name string | |||||
| Last_seen int64 | |||||
| Distance float64 | |||||
| Name string | |||||
| LastSeen int64 | |||||
| } | } | ||||
| // HTTPLocation describes a beacon's state as served over HTTP. | // HTTPLocation describes a beacon's state as served over HTTP. | ||||
| type HTTPLocation struct { | type HTTPLocation struct { | ||||
| Previous_confident_location string `json:"previous_confident_location"` | |||||
| Distance float64 `json:"distance"` | |||||
| Name string `json:"name"` | |||||
| Beacon_name string `json:"beacon_name"` | |||||
| Beacon_id string `json:"beacon_id"` | |||||
| Beacon_type string `json:"beacon_type"` | |||||
| HB_Battery int64 `json:"hb_button_battery"` | |||||
| HB_ButtonMode string `json:"hb_button_mode"` | |||||
| HB_ButtonCounter int64 `json:"hb_button_counter"` | |||||
| Location string `json:"location"` | |||||
| Last_seen int64 `json:"last_seen"` | |||||
| Method string `json:"method"` | |||||
| PreviousConfidentLocation string `json:"previous_confident_location"` | |||||
| Distance float64 `json:"distance"` | |||||
| ID string `json:"id"` | |||||
| Location string `json:"location"` | |||||
| LastSeen int64 `json:"last_seen"` | |||||
| } | } | ||||
| // LocationChange defines a change event for a beacon's detected location. | // LocationChange defines a change event for a beacon's detected location. | ||||
| type LocationChange struct { | type LocationChange struct { | ||||
| Beacon_ref Beacon `json:"beacon_info"` | |||||
| Name string `json:"name"` | |||||
| Beacon_name string `json:"beacon_name"` | |||||
| Previous_location string `json:"previous_location"` | |||||
| New_location string `json:"new_location"` | |||||
| Timestamp int64 `json:"timestamp"` | |||||
| Method string `json:"method"` | |||||
| BeaconRef Beacon `json:"beacon_info"` | |||||
| Name string `json:"name"` | |||||
| BeaconName string `json:"beacon_name"` | |||||
| PreviousLocation string `json:"previous_location"` | |||||
| NewLocation string `json:"new_location"` | |||||
| Timestamp int64 `json:"timestamp"` | |||||
| } | } | ||||
| // HAMessage represents a Home Assistant integration payload. | // HAMessage represents a Home Assistant integration payload. | ||||
| type HAMessage struct { | type HAMessage struct { | ||||
| Beacon_id string `json:"id"` | |||||
| Beacon_name string `json:"name"` | |||||
| Distance float64 `json:"distance"` | |||||
| ID string `json:"id"` | |||||
| BeaconName string `json:"name"` | |||||
| Distance float64 `json:"distance"` | |||||
| } | } | ||||
| // Beacon holds all relevant information about a tracked beacon device. | // Beacon holds all relevant information about a tracked beacon device. | ||||
| type Beacon struct { | type Beacon struct { | ||||
| Name string `json:"name"` | |||||
| Beacon_id string `json:"beacon_id"` | |||||
| Beacon_type string `json:"beacon_type"` | |||||
| Beacon_location string `json:"beacon_location"` | |||||
| Last_seen int64 `json:"last_seen"` | |||||
| Incoming_JSON Incoming_json `json:"incoming_json"` | |||||
| Distance float64 `json:"distance"` | |||||
| Previous_location string | |||||
| Previous_confident_location string | |||||
| Expired_location string | |||||
| Location_confidence int64 | |||||
| Location_history []string | |||||
| Beacon_metrics []BeaconMetric | |||||
| HB_ButtonCounter int64 `json:"hb_button_counter"` | |||||
| HB_ButtonCounter_Prev int64 `json:"hb_button_counter_prev"` | |||||
| HB_Battery int64 `json:"hb_button_battery"` | |||||
| HB_RandomNonce string `json:"hb_button_random"` | |||||
| HB_ButtonMode string `json:"hb_button_mode"` | |||||
| Name string `json:"name"` | |||||
| ID string `json:"beacon_id"` | |||||
| BeaconType string `json:"beacon_type"` | |||||
| BeaconLocation string `json:"beacon_location"` | |||||
| LastSeen int64 `json:"last_seen"` | |||||
| IncomingJSON BeaconAdvertisement `json:"incoming_json"` | |||||
| Distance float64 `json:"distance"` | |||||
| PreviousLocation string | |||||
| PreviousConfidentLocation string | |||||
| ExpiredLocation string | |||||
| LocationConfidence int64 | |||||
| LocationHistory []string | |||||
| BeaconMetrics []BeaconMetric | |||||
| HSButtonCounter int64 `json:"hs_button_counter"` | |||||
| HSButtonPrev int64 `json:"hs_button_counter_prev"` | |||||
| HSBattery int64 `json:"hs_button_battery"` | |||||
| HSRandomNonce string `json:"hs_button_random"` | |||||
| HSButtonMode string `json:"hs_button_mode"` | |||||
| } | |||||
| type BeaconEvent struct { | |||||
| Name string | |||||
| ID string | |||||
| Type string | |||||
| Battery uint32 | |||||
| Event int | |||||
| } | } | ||||
| // Button represents a hardware button beacon device. | // Button represents a hardware button beacon device. | ||||
| type Button struct { | type Button struct { | ||||
| Name string `json:"name"` | |||||
| Button_id string `json:"button_id"` | |||||
| Button_type string `json:"button_type"` | |||||
| Button_location string `json:"button_location"` | |||||
| Incoming_JSON Incoming_json `json:"incoming_json"` | |||||
| Distance float64 `json:"distance"` | |||||
| Last_seen int64 `json:"last_seen"` | |||||
| HB_ButtonCounter int64 `json:"hb_button_counter"` | |||||
| HB_Battery int64 `json:"hb_button_battery"` | |||||
| HB_RandomNonce string `json:"hb_button_random"` | |||||
| HB_ButtonMode string `json:"hb_button_mode"` | |||||
| Name string `json:"name"` | |||||
| ButtonID string `json:"button_id"` | |||||
| ButtonType string `json:"button_type"` | |||||
| ButtonLocation string `json:"button_location"` | |||||
| IncomingJSON BeaconAdvertisement `json:"incoming_json"` | |||||
| Distance float64 `json:"distance"` | |||||
| LastSeen int64 `json:"last_seen"` | |||||
| HSButtonCounter int64 `json:"hs_button_counter"` | |||||
| HSBattery int64 `json:"hs_button_battery"` | |||||
| HSRandomNonce string `json:"hs_button_random"` | |||||
| HSButtonMode string `json:"hs_button_mode"` | |||||
| } | } | ||||
| // BeaconsList holds all known beacons and their synchronization lock. | // BeaconsList holds all known beacons and their synchronization lock. | ||||
| @@ -149,6 +151,11 @@ type BeaconsList struct { | |||||
| Lock sync.RWMutex | Lock sync.RWMutex | ||||
| } | } | ||||
| type BeaconEventList struct { | |||||
| Beacons map[string]BeaconEvent | |||||
| Lock sync.RWMutex | |||||
| } | |||||
| // LocationsList holds all known locations with concurrency protection. | // LocationsList holds all known locations with concurrency protection. | ||||
| type LocationsList struct { | type LocationsList struct { | ||||
| Locations map[string]Location | Locations map[string]Location | ||||
| @@ -185,17 +192,6 @@ type HTTPResultsList struct { | |||||
| HTTPResults HTTPLocationsList | HTTPResults HTTPLocationsList | ||||
| } | } | ||||
| type AppContext struct { | |||||
| HTTPResults HTTPResultsList | |||||
| Beacons BeaconsList | |||||
| ButtonsList map[string]Button | |||||
| Settings Settings | |||||
| Broadcast chan Message | |||||
| Locations LocationsList | |||||
| LatestList LatestBeaconsList | |||||
| Clients map[*websocket.Conn]bool | |||||
| } | |||||
| type ApiUpdate struct { | type ApiUpdate struct { | ||||
| Method string | Method string | ||||
| Beacon Beacon | Beacon Beacon | ||||
| @@ -1,128 +0,0 @@ | |||||
| package mqttclient | |||||
| import ( | |||||
| "bytes" | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "log" | |||||
| "math" | |||||
| "os/exec" | |||||
| "strconv" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/yosssi/gmq/mqtt" | |||||
| "github.com/yosssi/gmq/mqtt/client" | |||||
| ) | |||||
| func GetBeaconID(incoming model.Incoming_json) string { | |||||
| unique_id := fmt.Sprintf("%s", incoming.MAC) | |||||
| return unique_id | |||||
| } | |||||
| func updateLatestList(incoming model.Incoming_json, now int64, latestList *model.LatestBeaconsList) { | |||||
| latestList.Lock.Lock() | |||||
| defer latestList.Lock.Unlock() | |||||
| b := model.Beacon{ | |||||
| Beacon_id: GetBeaconID(incoming), | |||||
| Beacon_type: incoming.Beacon_type, | |||||
| Last_seen: now, | |||||
| Incoming_JSON: incoming, | |||||
| Beacon_location: incoming.Hostname, | |||||
| Distance: getBeaconDistance(incoming), | |||||
| } | |||||
| latestList.LatestList[b.Beacon_id] = b | |||||
| for id, v := range latestList.LatestList { | |||||
| if now-v.Last_seen > 10 { | |||||
| delete(latestList.LatestList, id) | |||||
| } | |||||
| } | |||||
| } | |||||
| func updateBeaconData(beacon *model.Beacon, incoming model.Incoming_json, now int64, cl *client.Client, settings *model.SettingsVal) { | |||||
| beacon.Incoming_JSON = incoming | |||||
| beacon.Last_seen = now | |||||
| beacon.Beacon_type = incoming.Beacon_type | |||||
| beacon.HB_ButtonCounter = incoming.HB_ButtonCounter | |||||
| beacon.HB_Battery = incoming.HB_Battery | |||||
| beacon.HB_RandomNonce = incoming.HB_RandomNonce | |||||
| beacon.HB_ButtonMode = incoming.HB_ButtonMode | |||||
| m := model.BeaconMetric{ | |||||
| Distance: getBeaconDistance(incoming), | |||||
| Timestamp: now, | |||||
| Rssi: int64(incoming.RSSI), | |||||
| Location: incoming.Hostname, | |||||
| } | |||||
| beacon.Beacon_metrics = append(beacon.Beacon_metrics, m) | |||||
| if len(beacon.Beacon_metrics) > settings.Beacon_metrics_size { | |||||
| beacon.Beacon_metrics = beacon.Beacon_metrics[1:] | |||||
| } | |||||
| if beacon.HB_ButtonCounter_Prev != beacon.HB_ButtonCounter { | |||||
| beacon.HB_ButtonCounter_Prev = incoming.HB_ButtonCounter | |||||
| sendButtonPressed(*beacon, cl) | |||||
| } | |||||
| } | |||||
| func sendButtonPressed(beacon model.Beacon, cl *client.Client) { | |||||
| btn_msg, err := json.Marshal(beacon) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| err = cl.Publish(&client.PublishOptions{ | |||||
| QoS: mqtt.QoS1, | |||||
| TopicName: []byte("afa-systems/presence/button/" + beacon.Beacon_id), | |||||
| Message: btn_msg, | |||||
| }) | |||||
| if err != nil { | |||||
| panic(err) | |||||
| } | |||||
| s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --st=%d", beacon.Beacon_id, beacon.Incoming_JSON.Hostname, beacon.HB_ButtonCounter) | |||||
| err, out, errout := Shellout(s) | |||||
| if err != nil { | |||||
| log.Printf("error: %v\n", err) | |||||
| } | |||||
| fmt.Println("--- stdout ---") | |||||
| fmt.Println(out) | |||||
| fmt.Println("--- stderr ---") | |||||
| fmt.Println(errout) | |||||
| } | |||||
| func getBeaconDistance(incoming model.Incoming_json) float64 { | |||||
| distance := 1000.0 | |||||
| distance = getiBeaconDistance(incoming.RSSI, incoming.TX_power) | |||||
| return distance | |||||
| } | |||||
| func getiBeaconDistance(rssi int64, power string) float64 { | |||||
| ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) | |||||
| distance := 100.0 | |||||
| if ratio < 1.0 { | |||||
| distance = math.Pow(ratio, 10) | |||||
| } else { | |||||
| distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 | |||||
| } | |||||
| return distance | |||||
| } | |||||
| func twos_comp(inp string) int64 { | |||||
| i, _ := strconv.ParseInt("0x"+inp, 0, 64) | |||||
| return i - 256 | |||||
| } | |||||
| func Shellout(command string) (error, string, string) { | |||||
| var stdout bytes.Buffer | |||||
| var stderr bytes.Buffer | |||||
| cmd := exec.Command("bash", "-c", command) | |||||
| cmd.Stdout = &stdout | |||||
| cmd.Stderr = &stderr | |||||
| err := cmd.Run() | |||||
| return err, stdout.String(), stderr.String() | |||||
| } | |||||
| @@ -1,35 +0,0 @@ | |||||
| package mqttclient | |||||
| import ( | |||||
| "fmt" | |||||
| "strconv" | |||||
| "strings" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| ) | |||||
| func IncomingBeaconFilter(incoming model.Incoming_json) model.Incoming_json { | |||||
| out_json := incoming | |||||
| if incoming.Beacon_type == "hb_button" { | |||||
| raw_data := incoming.Data | |||||
| hb_button_prefix_str := fmt.Sprintf("02010612FF5900") | |||||
| if strings.HasPrefix(raw_data, hb_button_prefix_str) { | |||||
| out_json.Namespace = "ddddeeeeeeffff5544ff" | |||||
| counter_str := fmt.Sprintf("0x%s", raw_data[22:24]) | |||||
| counter, _ := strconv.ParseInt(counter_str, 0, 64) | |||||
| out_json.HB_ButtonCounter = counter | |||||
| battery_str := fmt.Sprintf("0x%s%s", raw_data[20:22], raw_data[18:20]) | |||||
| battery, _ := strconv.ParseInt(battery_str, 0, 64) | |||||
| out_json.HB_Battery = battery | |||||
| out_json.TX_power = fmt.Sprintf("0x%s", "4") | |||||
| out_json.Beacon_type = "hb_button" | |||||
| out_json.HB_ButtonMode = "presence_button" | |||||
| } | |||||
| } | |||||
| return out_json | |||||
| } | |||||
| @@ -1,165 +0,0 @@ | |||||
| package mqttclient | |||||
| import ( | |||||
| "encoding/json" | |||||
| "log" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/AFASystems/presence/internal/pkg/persistence" | |||||
| "github.com/yosssi/gmq/mqtt" | |||||
| "github.com/yosssi/gmq/mqtt/client" | |||||
| ) | |||||
| func getLikelyLocations(settings *model.SettingsVal, ctx *model.AppContext, cl *client.Client) { | |||||
| ctx.HTTPResults.HTTPResultsLock.Lock() | |||||
| defer ctx.HTTPResults.HTTPResultsLock.Unlock() | |||||
| ctx.HTTPResults.HTTPResults = model.HTTPLocationsList{Beacons: []model.HTTPLocation{}} | |||||
| shouldPersist := false | |||||
| for id, beacon := range ctx.Beacons.Beacons { | |||||
| if len(beacon.Beacon_metrics) == 0 { | |||||
| continue | |||||
| } | |||||
| if isExpired(&beacon, settings) { | |||||
| handleExpiredBeacon(&beacon, cl, ctx) | |||||
| continue | |||||
| } | |||||
| best := calculateBestLocation(&beacon) | |||||
| updateBeaconState(&beacon, best, settings, ctx, cl) | |||||
| appendHTTPResult(ctx, beacon, best) | |||||
| ctx.Beacons.Beacons[id] = beacon | |||||
| shouldPersist = true | |||||
| } | |||||
| if shouldPersist { | |||||
| persistence.PersistBeacons(&ctx.Beacons) | |||||
| } | |||||
| } | |||||
| func isExpired(b *model.Beacon, s *model.SettingsVal) bool { | |||||
| return time.Now().Unix()-b.Beacon_metrics[len(b.Beacon_metrics)-1].Timestamp > s.Last_seen_threshold | |||||
| } | |||||
| func handleExpiredBeacon(b *model.Beacon, cl *client.Client, ctx *model.AppContext) { | |||||
| if b.Expired_location == "expired" { | |||||
| return | |||||
| } | |||||
| b.Expired_location = "expired" | |||||
| msg := model.Message{ | |||||
| Email: b.Previous_confident_location, | |||||
| Username: b.Name, | |||||
| Message: "expired", | |||||
| } | |||||
| data, _ := json.Marshal(msg) | |||||
| log.Println(string(data)) | |||||
| ctx.Broadcast <- msg | |||||
| } | |||||
| func calculateBestLocation(b *model.Beacon) model.BestLocation { | |||||
| locScores := map[string]float64{} | |||||
| for _, m := range b.Beacon_metrics { | |||||
| score := 1.5 + 0.75*(1.0-(float64(m.Rssi)/-100.0)) | |||||
| locScores[m.Location] += score | |||||
| } | |||||
| bestName, bestScore := "", 0.0 | |||||
| for name, score := range locScores { | |||||
| if score > bestScore { | |||||
| bestName, bestScore = name, score | |||||
| } | |||||
| } | |||||
| last := b.Beacon_metrics[len(b.Beacon_metrics)-1] | |||||
| return model.BestLocation{Name: bestName, Distance: last.Distance, Last_seen: last.Timestamp} | |||||
| } | |||||
| func updateBeaconState(b *model.Beacon, best model.BestLocation, s *model.SettingsVal, ctx *model.AppContext, cl *client.Client) { | |||||
| updateLocationHistory(b, best.Name) | |||||
| updateConfidence(b, best.Name, s) | |||||
| if locationChanged(b, best, s) { | |||||
| publishLocationChange(b, best, cl) | |||||
| b.Location_confidence = 0 | |||||
| b.Previous_confident_location = best.Name | |||||
| } | |||||
| } | |||||
| func updateLocationHistory(b *model.Beacon, loc string) { | |||||
| b.Location_history = append(b.Location_history, loc) | |||||
| if len(b.Location_history) > 10 { | |||||
| b.Location_history = b.Location_history[1:] | |||||
| } | |||||
| } | |||||
| func updateConfidence(b *model.Beacon, loc string, s *model.SettingsVal) { | |||||
| counts := map[string]int{} | |||||
| for _, l := range b.Location_history { | |||||
| counts[l]++ | |||||
| } | |||||
| maxCount, mostCommon := 0, "" | |||||
| for l, c := range counts { | |||||
| if c > maxCount { | |||||
| maxCount, mostCommon = c, l | |||||
| } | |||||
| } | |||||
| if maxCount >= 7 { | |||||
| if mostCommon == b.Previous_confident_location { | |||||
| b.Location_confidence++ | |||||
| } else { | |||||
| b.Location_confidence = 1 | |||||
| b.Previous_confident_location = mostCommon | |||||
| } | |||||
| } | |||||
| } | |||||
| func locationChanged(b *model.Beacon, best model.BestLocation, s *model.SettingsVal) bool { | |||||
| return (b.Location_confidence == s.Location_confidence && | |||||
| b.Previous_confident_location != best.Name) || | |||||
| b.Expired_location == "expired" | |||||
| } | |||||
| func publishLocationChange(b *model.Beacon, best model.BestLocation, cl *client.Client) { | |||||
| location := best.Name | |||||
| if b.Expired_location == "expired" { | |||||
| location = "expired" | |||||
| } | |||||
| js, err := json.Marshal(model.LocationChange{ | |||||
| Beacon_ref: *b, | |||||
| Name: b.Name, | |||||
| Previous_location: b.Previous_confident_location, | |||||
| New_location: location, | |||||
| Timestamp: time.Now().Unix(), | |||||
| }) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| err = cl.Publish(&client.PublishOptions{ | |||||
| QoS: mqtt.QoS1, | |||||
| TopicName: []byte("afa-systems/presence/changes"), | |||||
| Message: js, | |||||
| }) | |||||
| if err != nil { | |||||
| log.Printf("mqtt publish error: %v", err) | |||||
| } | |||||
| } | |||||
| func appendHTTPResult(ctx *model.AppContext, b model.Beacon, best model.BestLocation) { | |||||
| ctx.HTTPResults.HTTPResultsLock.Lock() | |||||
| defer ctx.HTTPResults.HTTPResultsLock.Unlock() | |||||
| r := model.HTTPLocation{ | |||||
| Name: b.Name, | |||||
| Beacon_id: b.Beacon_id, | |||||
| Location: best.Name, | |||||
| Distance: best.Distance, | |||||
| Last_seen: best.Last_seen, | |||||
| } | |||||
| ctx.HTTPResults.HTTPResults.Beacons = append(ctx.HTTPResults.HTTPResults.Beacons, r) | |||||
| } | |||||
| @@ -1,62 +0,0 @@ | |||||
| package mqttclient | |||||
| import ( | |||||
| "fmt" | |||||
| "log" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/AFASystems/presence/internal/pkg/persistence" | |||||
| "github.com/boltdb/bolt" | |||||
| "github.com/yosssi/gmq/mqtt/client" | |||||
| ) | |||||
| func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, ctx *model.AppContext) chan<- model.Incoming_json { | |||||
| ch := make(chan model.Incoming_json, 2000) | |||||
| persistence.CreateBucketIfNotExists(db) | |||||
| ticker := time.NewTicker(updateInterval) | |||||
| go runProcessor(ticker, cl, ch, ctx) | |||||
| return ch | |||||
| } | |||||
| func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incoming_json, ctx *model.AppContext) { | |||||
| for { | |||||
| select { | |||||
| case <-ticker.C: | |||||
| getLikelyLocations(&ctx.Settings.Settings, ctx, cl) | |||||
| case incoming := <-ch: | |||||
| ProcessIncoming(incoming, cl, ctx) | |||||
| } | |||||
| } | |||||
| } | |||||
| func ProcessIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model.AppContext) { | |||||
| defer func() { | |||||
| if err := recover(); err != nil { | |||||
| log.Println("work failed:", err) | |||||
| } | |||||
| }() | |||||
| incoming = IncomingBeaconFilter(incoming) | |||||
| id := GetBeaconID(incoming) | |||||
| now := time.Now().Unix() | |||||
| beacons := &ctx.Beacons | |||||
| beacons.Lock.Lock() | |||||
| defer beacons.Lock.Unlock() | |||||
| latestList := &ctx.LatestList | |||||
| settings := &ctx.Settings.Settings | |||||
| beacon, ok := beacons.Beacons[id] | |||||
| if !ok { | |||||
| updateLatestList(incoming, now, latestList) | |||||
| return | |||||
| } | |||||
| fmt.Println("updating beacon data") | |||||
| updateBeaconData(&beacon, incoming, now, cl, settings) | |||||
| beacons.Beacons[beacon.Beacon_id] = beacon | |||||
| } | |||||
| @@ -1,6 +1,6 @@ | |||||
| #!/bin/bash | #!/bin/bash | ||||
| URL="http://127.0.0.1:1902/api/beacons" | URL="http://127.0.0.1:1902/api/beacons" | ||||
| BEACON_ID="C3000057B9F7" | |||||
| BEACON_ID="C83F8F17DB35" | |||||
| echo "POST (create)" | echo "POST (create)" | ||||
| curl -s -X POST $URL \ | curl -s -X POST $URL \ | ||||
| @@ -10,32 +10,6 @@ echo -e "\n" | |||||
| sleep 1 | sleep 1 | ||||
| echo "GET (list after create)" | |||||
| curl -s -X GET $URL | |||||
| echo -e "\n" | |||||
| sleep 1 | |||||
| echo "PUT (update)" | |||||
| curl -s -X PUT $URL \ | |||||
| -H "Content-Type: application/json" \ | |||||
| -d '{"Beacon_id":"'"$BEACON_ID"'","Name":"Beacon1-updated","tx_power":-60}' | |||||
| echo -e "\n" | |||||
| sleep 1 | |||||
| echo "GET (list after update)" | |||||
| curl -s -X GET $URL | |||||
| echo -e "\n" | |||||
| sleep 1 | |||||
| echo "DELETE" | echo "DELETE" | ||||
| curl -s -X DELETE "$URL/$BEACON_ID" | curl -s -X DELETE "$URL/$BEACON_ID" | ||||
| echo -e "\n" | echo -e "\n" | ||||
| sleep 1 | |||||
| echo "GET (list after delete)" | |||||
| curl -s -X GET $URL | |||||
| echo -e "\n" | |||||
| @@ -1,9 +0,0 @@ | |||||
| # `/test` | |||||
| Additional external test apps and test data. Feel free to structure the `/test` directory anyway you want. For bigger projects it makes sense to have a data subdirectory. For example, you can have `/test/data` or `/test/testdata` if you need Go to ignore what's in that directory. Note that Go will also ignore directories or files that begin with "." or "_", so you have more flexibility in terms of how you name your test data directory. | |||||
| Examples: | |||||
| * https://github.com/openshift/origin/tree/master/test (test data is in the `/testdata` subdirectory) | |||||
| @@ -1,160 +0,0 @@ | |||||
| package httpservertest_test | |||||
| import ( | |||||
| "bytes" | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "net/http" | |||||
| "net/http/httptest" | |||||
| "os" | |||||
| "sync" | |||||
| "testing" | |||||
| "github.com/AFASystems/presence/internal/pkg/httpserver" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/boltdb/bolt" | |||||
| "github.com/gorilla/mux" | |||||
| ) | |||||
| // Functions beaconsAddHandler, beaconsListHandler, beaconsDeleteHandler | |||||
| func TestBeaconCRUD(t *testing.T) { | |||||
| tmpfile, _ := os.CreateTemp("", "testdb-*.db") | |||||
| defer os.Remove(tmpfile.Name()) | |||||
| db, err := bolt.Open(tmpfile.Name(), 0600, nil) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| model.Db = db | |||||
| ctx := model.AppContext{ | |||||
| Beacons: model.BeaconsList{ | |||||
| Beacons: make(map[string]model.Beacon), | |||||
| Lock: sync.RWMutex{}, | |||||
| }, | |||||
| ButtonsList: make(map[string]model.Button), | |||||
| } | |||||
| b := model.Beacon{Name: "B1", Beacon_id: "1"} | |||||
| body, err := json.Marshal(b) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| req := httptest.NewRequest("POST", "/api/beacons", bytes.NewReader(body)) | |||||
| w := httptest.NewRecorder() | |||||
| httpserver.BeaconsAddHandler(&ctx.Beacons)(w, req) | |||||
| if w.Code != http.StatusOK { | |||||
| t.Fatalf("create failed: %d", w.Code) | |||||
| } | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| req = httptest.NewRequest("GET", "/api/beacons", nil) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.BeaconsListHandler(&ctx.Beacons)(w, req) | |||||
| fmt.Println("Status:", w.Code) | |||||
| fmt.Println("Body:", w.Body.String()) | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| newB := model.Beacon{Name: "B2", Beacon_id: "2"} | |||||
| newBody, err := json.Marshal(newB) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| req = httptest.NewRequest("PUT", "/api/beacons", bytes.NewReader(newBody)) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.BeaconsAddHandler(&ctx.Beacons)(w, req) | |||||
| if w.Code != http.StatusOK { | |||||
| t.Fatalf("create failed: %d", w.Code) | |||||
| } | |||||
| req = httptest.NewRequest("GET", "/api/beacons", nil) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.BeaconsListHandler(&ctx.Beacons)(w, req) | |||||
| fmt.Println("Status:", w.Code) | |||||
| fmt.Println("Body:", w.Body.String()) | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| req = httptest.NewRequest("DELETE", "/api/beacons/1", nil) | |||||
| req = mux.SetURLVars(req, map[string]string{"beacon_id": "1"}) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.BeaconsDeleteHandler(&ctx.Beacons, ctx.ButtonsList)(w, req) | |||||
| fmt.Println("Status: ", w.Code) | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| req = httptest.NewRequest("GET", "/api/beacons", nil) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.BeaconsListHandler(&ctx.Beacons)(w, req) | |||||
| fmt.Println("Status:", w.Code) | |||||
| fmt.Println("Body:", w.Body.String()) | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| } | |||||
| func TestSettingsCRUD(t *testing.T) { | |||||
| tmpfile, _ := os.CreateTemp("", "testdb-*.db") | |||||
| defer os.Remove(tmpfile.Name()) | |||||
| db, err := bolt.Open(tmpfile.Name(), 0600, nil) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| model.Db = db | |||||
| ctx := model.AppContext{ | |||||
| Settings: model.Settings{}, | |||||
| } | |||||
| settings := model.Settings{ | |||||
| Location_confidence: 10, | |||||
| Last_seen_threshold: 10, | |||||
| Beacon_metrics_size: 10, | |||||
| HA_send_interval: 10, | |||||
| HA_send_changes_only: true, | |||||
| } | |||||
| body, err := json.Marshal(settings) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| req := httptest.NewRequest("POST", "/api/settings", bytes.NewReader(body)) | |||||
| w := httptest.NewRecorder() | |||||
| httpserver.SettingsEditHandler(&ctx.Settings)(w, req) | |||||
| fmt.Println("status: ", w.Code) | |||||
| if w.Code != http.StatusOK { | |||||
| t.Fatalf("create failed: %d", w.Code) | |||||
| } | |||||
| fmt.Println("--------------------------------------------------------------") | |||||
| req = httptest.NewRequest("GET", "/api/settings", nil) | |||||
| w = httptest.NewRecorder() | |||||
| httpserver.SettingsListHandler(&ctx.Settings)(w, req) | |||||
| fmt.Println("Status:", w.Code) | |||||
| fmt.Println("Body:", w.Body.String()) | |||||
| } | |||||
| @@ -1,46 +0,0 @@ | |||||
| package mqtt_test | |||||
| import ( | |||||
| "os" | |||||
| "testing" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/model" | |||||
| "github.com/AFASystems/presence/internal/pkg/mqttclient" | |||||
| "github.com/AFASystems/presence/internal/pkg/persistence" | |||||
| "github.com/boltdb/bolt" | |||||
| ) | |||||
| func TestIncomingMQTTProcessor(t *testing.T) { | |||||
| ctx := &model.AppContext{ | |||||
| Beacons: model.BeaconsList{Beacons: make(map[string]model.Beacon)}, | |||||
| Settings: model.Settings{ | |||||
| Last_seen_threshold: 10, | |||||
| Location_confidence: 3, | |||||
| }, | |||||
| } | |||||
| tmpfile, _ := os.CreateTemp("", "testdb-*.db") | |||||
| defer os.Remove(tmpfile.Name()) | |||||
| db, err := bolt.Open(tmpfile.Name(), 0600, nil) | |||||
| if err != nil { | |||||
| t.Fatal(err) | |||||
| } | |||||
| model.Db = db | |||||
| persistence.LoadState(model.Db, ctx) | |||||
| ch := mqttclient.IncomingMQTTProcessor(20*time.Millisecond, nil, model.Db, ctx) | |||||
| msg := model.Incoming_json{MAC: "15:02:31", Hostname: "testHost", RSSI: -55} | |||||
| ch <- msg | |||||
| time.Sleep(100 * time.Millisecond) | |||||
| ctx.Beacons.Lock.RLock() | |||||
| defer ctx.Beacons.Lock.RUnlock() | |||||
| if len(ctx.LatestList.LatestList) == 0 { | |||||
| t.Fatal("latest list map to update") | |||||
| } | |||||
| } | |||||