diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 216d893..5a66253 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -137,6 +137,7 @@ func main() { // define kafka readers apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api") alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert") + mqttReader := appState.AddKafkaReader(cfg.KafkaURL, "mqtt", "bridge-mqtt") // define kafka writer writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons") @@ -145,10 +146,12 @@ func main() { chApi := make(chan model.ApiUpdate, 200) chAlert := make(chan model.Alert, 200) + chMqtt := make(chan []model.Tracker, 200) - wg.Add(2) + wg.Add(3) go kafkaclient.Consume(apiReader, chApi, ctx, &wg) go kafkaclient.Consume(alertReader, chAlert, ctx, &wg) + go kafkaclient.Consume(mqttReader, chMqtt, ctx, &wg) opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883)) @@ -199,6 +202,13 @@ eventloop: continue } client.Publish("/alerts", 0, true, p) + case msg := <-chMqtt: + fmt.Printf("trackers: %+v\n", msg) + p, err := json.Marshal(msg) + if err != nil { + continue + } + client.Publish("/trackers", 0, true, p) } } diff --git a/cmd/location/main.go b/cmd/location/main.go index 05a0e39..d346a71 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -66,6 +66,7 @@ eventLoop: break eventLoop case <-locTicker.C: settings := appState.GetSettings() + fmt.Printf("Settings: %+v\n", settings) switch settings.CurrentAlgorithm { case "filter": getLikelyLocations(appState, writer) @@ -75,6 +76,7 @@ eventLoop: case msg := <-chRaw: assignBeaconToList(msg, appState) case msg := <-chSettings: + fmt.Printf("settings msg: %+v\n", msg) appState.UpdateSettings(msg) } } @@ -92,7 +94,6 @@ func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { settings := appState.GetSettingsValue() for _, beacon := range beacons { - fmt.Println("id: ", beacon.ID) // Shrinking the model because other properties have nothing to do with the location r := model.HTTPLocation{ Method: "Standard", diff --git a/cmd/server/main.go b/cmd/server/main.go index 85deb0d..7c3ae30 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -26,6 +26,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" + "github.com/segmentio/kafka-go" "gorm.io/gorm" ) @@ -68,6 +69,7 @@ func main() { settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert") parserWriter := appState.AddKafkaWriter(cfg.KafkaURL, "parser") + mqttWriter := appState.AddKafkaWriter(cfg.KafkaURL, "mqtt") slog.Info("Kafka writers topics: apibeacons, settings initialized") configFile, err := os.Open("/app/cmd/server/config.json") @@ -97,7 +99,7 @@ func main() { } } - if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil { + if err := apiclient.UpdateDB(db, ctx, cfg, writer, appState); err != nil { fmt.Printf("Error in getting token: %v\n", err) } @@ -142,6 +144,8 @@ func main() { r.HandleFunc("/reslevis/settings", controller.SettingsUpdateController(db, settingsWriter, ctx)).Methods("PATCH") r.HandleFunc("/reslevis/settings", controller.SettingsListController(db)).Methods("GET") + beaconTicker := time.NewTicker(2 * time.Second) + wsHandler := http.HandlerFunc(serveWs(db, ctx)) restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r) mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -179,6 +183,20 @@ eventLoop: fmt.Printf("Error in saving decoder event for beacon: %s\n", id) continue } + case <-beaconTicker.C: + var list []model.Tracker + db.Find(&list) + eMsg, err := json.Marshal(list) + if err != nil { + fmt.Printf("Error in marshaling trackers list: %v\n", err) + continue + } + + msg := kafka.Message{ + Value: eMsg, + } + + mqttWriter.WriteMessages(ctx, msg) } } diff --git a/internal/pkg/apiclient/updatedb.go b/internal/pkg/apiclient/updatedb.go index 9a9dbc5..5a2d536 100644 --- a/internal/pkg/apiclient/updatedb.go +++ b/internal/pkg/apiclient/updatedb.go @@ -7,6 +7,7 @@ import ( "net/http" "reflect" + "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/model" @@ -15,7 +16,7 @@ import ( "gorm.io/gorm/clause" ) -func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer) error { +func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } @@ -57,6 +58,13 @@ func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafk syncTable(db, trackerZones) } + var settings model.Settings + db.First(&settings) + if settings.ID == 0 { + fmt.Println("settings are empty") + db.Create(appState.GetSettings()) + } + return nil } diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index 7acc8b9..ee2c7db 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -2,6 +2,7 @@ package appcontext import ( "fmt" + "maps" "strings" "time" @@ -32,6 +33,7 @@ func NewAppState() *AppState { Results: make(map[string]model.HTTPResult), }, settings: model.Settings{ + ID: 1, CurrentAlgorithm: "filter", // possible values filter or AI LocationConfidence: 4, LastSeenThreshold: 15, @@ -268,9 +270,7 @@ func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon { defer m.latestList.Lock.RUnlock() beacons := make(map[string]model.Beacon) - for id, beacon := range m.latestList.LatestList { - beacons[id] = beacon - } + maps.Copy(beacons, m.latestList.LatestList) return beacons } diff --git a/internal/pkg/controller/settings_controller.go b/internal/pkg/controller/settings_controller.go index 3a2e4de..9e70988 100644 --- a/internal/pkg/controller/settings_controller.go +++ b/internal/pkg/controller/settings_controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "encoding/json" + "fmt" "net/http" "github.com/AFASystems/presence/internal/pkg/model" @@ -32,6 +33,8 @@ func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Con return } + fmt.Printf("updates: %+v\n", updates) + if err := db.Model(&model.Settings{}).Where("id = ?", 1).Updates(updates).Error; err != nil { http.Error(w, err.Error(), 500) return @@ -46,6 +49,8 @@ func SettingsUpdateController(db *gorm.DB, writer *kafka.Writer, ctx context.Con Value: eMsg, } + fmt.Printf("Kafka message: %+v\n", eMsg) + writer.WriteMessages(ctx, msg) w.Write([]byte("Settings updated")) diff --git a/internal/pkg/model/model.md b/internal/pkg/model/model.md deleted file mode 100644 index 3caa8c5..0000000 --- a/internal/pkg/model/model.md +++ /dev/null @@ -1,3 +0,0 @@ -# MODELS - -This file includes type definitions for aggregate struct types \ No newline at end of file diff --git a/internal/pkg/model/settings.go b/internal/pkg/model/settings.go index 622b9c6..161ff4b 100644 --- a/internal/pkg/model/settings.go +++ b/internal/pkg/model/settings.go @@ -1,12 +1,13 @@ package model type Settings struct { - CurrentAlgorithm string - LocationConfidence int64 - LastSeenThreshold int64 - BeaconMetricSize int - HASendInterval int - HASendChangesOnly bool - RSSIEnforceThreshold bool - RSSIMinThreshold int64 + ID int `gorm:"primaryKey"` // this is always 1 + CurrentAlgorithm string `json:"current_algorithm" mapstructure:"current_algorithm"` + LocationConfidence int64 `json:"location_confidence" mapstructure:"location_confidence"` + LastSeenThreshold int64 `json:"last_seen_threshold" mapstructure:"last_seen_threshold"` + BeaconMetricSize int `json:"beacon_metric_size" mapstructure:"beacon_metric_size"` + HASendInterval int `json:"HA_send_interval" mapstructure:"HA_send_interval"` + HASendChangesOnly bool `json:"HA_send_changes_only" mapstructure:"HA_send_changes_only"` + RSSIEnforceThreshold bool `json:"RSSI_enforce_threshold" mapstructure:"RSSI_enforce_threshold"` + RSSIMinThreshold int64 `json:"RSSI_min_threshold" mapstructure:"RSSI_min_threshold"` } diff --git a/pkg/README.md b/pkg/README.md deleted file mode 100644 index 0d829ba..0000000 --- a/pkg/README.md +++ /dev/null @@ -1,112 +0,0 @@ -# `/pkg` - -Library code that's ok to use by external applications (e.g., `/pkg/mypubliclib`). Other projects will import these libraries expecting them to work, so think twice before you put something here :-) Note that the `internal` directory is a better way to ensure your private packages are not importable because it's enforced by Go. The `/pkg` directory is still a good way to explicitly communicate that the code in that directory is safe for use by others. The [`I'll take pkg over internal`](https://travisjeffery.com/b/2019/11/i-ll-take-pkg-over-internal/) blog post by Travis Jeffery provides a good overview of the `pkg` and `internal` directories and when it might make sense to use them. - -It's also a way to group Go code in one place when your root directory contains lots of non-Go components and directories making it easier to run various Go tools (as mentioned in these talks: [`Best Practices for Industrial Programming`](https://www.youtube.com/watch?v=PTE4VJIdHPg) from GopherCon EU 2018, [GopherCon 2018: Kat Zien - How Do You Structure Your Go Apps](https://www.youtube.com/watch?v=oL6JBUk6tj0) and [GoLab 2018 - Massimiliano Pippi - Project layout patterns in Go](https://www.youtube.com/watch?v=3gQa1LWwuzk)). - -Note that this is not a universally accepted pattern and for every popular repo that uses it you can find 10 that don't. It's up to you to decide if you want to use this pattern or not. Regardless of whether or not it's a good pattern more people will know what you mean than not. It might a bit confusing for some of the new Go devs, but it's a pretty simple confusion to resolve and that's one of the goals for this project layout repo. - -Ok not to use it if your app project is really small and where an extra level of nesting doesn't add much value (unless you really want to). Think about it when it's getting big enough and your root directory gets pretty busy (especially if you have a lot of non-Go app components). - -The `pkg` directory origins: The old Go source code used to use `pkg` for its packages and then various Go projects in the community started copying the pattern (see [`this`](https://twitter.com/bradfitz/status/1039512487538970624) Brad Fitzpatrick's tweet for more context). - - -Examples: - -* https://github.com/containerd/containerd/tree/main/pkg -* https://github.com/slimtoolkit/slim/tree/master/pkg -* https://github.com/telepresenceio/telepresence/tree/release/v2/pkg -* https://github.com/jaegertracing/jaeger/tree/master/pkg -* https://github.com/istio/istio/tree/master/pkg -* https://github.com/GoogleContainerTools/kaniko/tree/master/pkg -* https://github.com/google/gvisor/tree/master/pkg -* https://github.com/google/syzkaller/tree/master/pkg -* https://github.com/perkeep/perkeep/tree/master/pkg -* https://github.com/heptio/ark/tree/master/pkg -* https://github.com/argoproj/argo-workflows/tree/master/pkg -* https://github.com/argoproj/argo-cd/tree/master/pkg -* https://github.com/heptio/sonobuoy/tree/master/pkg -* https://github.com/helm/helm/tree/master/pkg -* https://github.com/k3s-io/k3s/tree/master/pkg -* https://github.com/kubernetes/kubernetes/tree/master/pkg -* https://github.com/kubernetes/kops/tree/master/pkg -* https://github.com/moby/moby/tree/master/pkg -* https://github.com/grafana/grafana/tree/master/pkg -* https://github.com/influxdata/influxdb/tree/master/pkg -* https://github.com/cockroachdb/cockroach/tree/master/pkg -* https://github.com/derekparker/delve/tree/master/pkg -* https://github.com/etcd-io/etcd/tree/master/pkg -* https://github.com/oklog/oklog/tree/master/pkg -* https://github.com/flynn/flynn/tree/master/pkg -* https://github.com/jesseduffield/lazygit/tree/master/pkg -* https://github.com/gopasspw/gopass/tree/master/pkg -* https://github.com/sosedoff/pgweb/tree/master/pkg -* https://github.com/GoogleContainerTools/skaffold/tree/master/pkg -* https://github.com/knative/serving/tree/master/pkg -* https://github.com/grafana/loki/tree/master/pkg -* https://github.com/bloomberg/goldpinger/tree/master/pkg -* https://github.com/Ne0nd0g/merlin/tree/master/pkg -* https://github.com/jenkins-x/jx/tree/master/pkg -* https://github.com/DataDog/datadog-agent/tree/master/pkg -* https://github.com/dapr/dapr/tree/master/pkg -* https://github.com/cortexproject/cortex/tree/master/pkg -* https://github.com/dexidp/dex/tree/master/pkg -* https://github.com/pusher/oauth2_proxy/tree/master/pkg -* https://github.com/pdfcpu/pdfcpu/tree/master/pkg -* https://github.com/weaveworks/kured/tree/master/pkg -* https://github.com/weaveworks/footloose/tree/master/pkg -* https://github.com/weaveworks/ignite/tree/master/pkg -* https://github.com/tmrts/boilr/tree/master/pkg -* https://github.com/kata-containers/runtime/tree/master/pkg -* https://github.com/okteto/okteto/tree/master/pkg -* https://github.com/solo-io/squash/tree/master/pkg -* https://github.com/google/exposure-notifications-server/tree/main/pkg -* https://github.com/spiffe/spire/tree/main/pkg -* https://github.com/rook/rook/tree/master/pkg -* https://github.com/buildpacks/pack/tree/main/pkg -* https://github.com/cilium/cilium/tree/main/pkg -* https://github.com/containernetworking/cni/tree/main/pkg -* https://github.com/crossplane/crossplane/tree/master/pkg -* https://github.com/dragonflyoss/Dragonfly2/tree/main/pkg -* https://github.com/kubeedge/kubeedge/tree/master/pkg -* https://github.com/kubevela/kubevela/tree/master/pkg -* https://github.com/kubevirt/kubevirt/tree/main/pkg -* https://github.com/kyverno/kyverno/tree/main/pkg -* https://github.com/thanos-io/thanos/tree/main/pkg -* https://github.com/cri-o/cri-o/tree/main/pkg -* https://github.com/fluxcd/flux2/tree/main/pkg -* https://github.com/kedacore/keda/tree/main/pkg -* https://github.com/linkerd/linkerd2/tree/main/pkg -* https://github.com/opencost/opencost/tree/develop/pkg -* https://github.com/antrea-io/antrea/tree/main/pkg -* https://github.com/karmada-io/karmada/tree/master/pkg -* https://github.com/kuberhealthy/kuberhealthy/tree/master/pkg -* https://github.com/submariner-io/submariner/tree/devel/pkg -* https://github.com/trickstercache/trickster/tree/main/pkg -* https://github.com/tellerops/teller/tree/master/pkg -* https://github.com/OpenFunction/OpenFunction/tree/main/pkg -* https://github.com/external-secrets/external-secrets/tree/main/pkg -* https://github.com/ko-build/ko/tree/main/pkg -* https://github.com/lima-vm/lima/tree/master/pkg -* https://github.com/clastix/capsule/tree/master/pkg -* https://github.com/carvel-dev/ytt/tree/develop/pkg -* https://github.com/clusternet/clusternet/tree/main/pkg -* https://github.com/fluid-cloudnative/fluid/tree/master/pkg -* https://github.com/inspektor-gadget/inspektor-gadget/tree/main/pkg -* https://github.com/sustainable-computing-io/kepler/tree/main/pkg -* https://github.com/GoogleContainerTools/kpt/tree/main/pkg -* https://github.com/guacsec/guac/tree/main/pkg -* https://github.com/kubeovn/kube-ovn/tree/master/pkg -* https://github.com/kube-vip/kube-vip/tree/main/pkg -* https://github.com/kubescape/kubescape/tree/master/pkg -* https://github.com/kudobuilder/kudo/tree/main/pkg -* https://github.com/kumahq/kuma/tree/master/pkg -* https://github.com/kubereboot/kured/tree/main/pkg -* https://github.com/nocalhost/nocalhost/tree/main/pkg -* https://github.com/openelb/openelb/tree/master/pkg -* https://github.com/openfga/openfga/tree/main/pkg -* https://github.com/openyurtio/openyurt/tree/master/pkg -* https://github.com/getporter/porter/tree/main/pkg -* https://github.com/sealerio/sealer/tree/main/pkg -* https://github.com/werf/werf/tree/main/pkg - diff --git a/pkg/_your_public_lib_/.keep b/pkg/_your_public_lib_/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/settingsApi.sh b/scripts/settingsApi.sh new file mode 100755 index 0000000..58a25bf --- /dev/null +++ b/scripts/settingsApi.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +curl -X PATCH "http://localhost:1902/reslevis/settings" \ + -H "Content-Type: application/json" \ + -d '{ + "current_algorithm": "ai", + "last_seen_threshold": 310, + "beacon_metric_size": 100, + "HA_send_interval": 60, + "HA_send_changes_only": true, + "RSSI_enforce_threshold": false, + "RSSI_min_threshold": -80 + }'