package mqttclient import ( "fmt" "log" "time" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/persistence" "github.com/boltdb/bolt" "github.com/yosssi/gmq/mqtt/client" ) func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, ctx *model.AppContext) chan<- model.Incoming_json { ch := make(chan model.Incoming_json, 2000) persistence.CreateBucketIfNotExists(db) ticker := time.NewTicker(updateInterval) go runProcessor(ticker, cl, ch, ctx) return ch } func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incoming_json, ctx *model.AppContext) { for { select { case <-ticker.C: getLikelyLocations(&ctx.Settings.Settings, ctx, cl) case incoming := <-ch: ProcessIncoming(incoming, cl, ctx) } } } func ProcessIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model.AppContext) { defer func() { if err := recover(); err != nil { log.Println("work failed:", err) } }() incoming = IncomingBeaconFilter(incoming) id := GetBeaconID(incoming) now := time.Now().Unix() beacons := &ctx.Beacons beacons.Lock.Lock() defer beacons.Lock.Unlock() latestList := &ctx.LatestList settings := &ctx.Settings.Settings beacon, ok := beacons.Beacons[id] if !ok { updateLatestList(incoming, now, latestList) return } fmt.Println("updating beacon data") updateBeaconData(&beacon, incoming, now, cl, settings) beacons.Beacons[beacon.Beacon_id] = beacon }