|
- package mqtt_client
-
- import (
- "log"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/model"
- "github.com/AFASystems/presence/internal/pkg/persistence"
- "github.com/boltdb/bolt"
- "github.com/yosssi/gmq/mqtt/client"
- )
-
- func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, ctx *model.AppContext) chan<- model.Incoming_json {
- ch := make(chan model.Incoming_json, 2000)
- persistence.CreateBucketIfNotExists(db)
-
- ticker := time.NewTicker(updateInterval)
- go runProcessor(ticker, cl, ch, ctx)
-
- return ch
- }
-
- func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incoming_json, ctx *model.AppContext) {
- for {
- select {
- case <-ticker.C:
- getLikelyLocations(&ctx.Settings, ctx, cl)
- case incoming := <-ch:
- processIncoming(incoming, cl, ctx)
- }
- }
- }
-
- func processIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model.AppContext) {
- defer func() {
- if err := recover(); err != nil {
- log.Println("work failed:", err)
- }
- }()
-
- incoming = incomingBeaconFilter(incoming) // DONE
- id := getBeaconID(incoming) // DONE
- now := time.Now().Unix()
-
- beacons := &ctx.Beacons
-
- beacons.Lock.Lock()
- defer beacons.Lock.Unlock()
-
- latestList := &ctx.LatestList
- settings := &ctx.Settings
-
- beacon, ok := beacons.Beacons[id]
- if !ok {
- updateLatestList(incoming, now, latestList) // DONE
- return
- }
-
- updateBeaconData(&beacon, incoming, now, cl, settings)
- beacons.Beacons[beacon.Beacon_id] = beacon
- }
|