package main import ( "context" "encoding/json" "fmt" "math" "strconv" "time" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) func main() { // Load global context to init beacons and latest list appCtx := model.AppContext{ Settings: model.Settings{ Settings: model.SettingsVal{ LocationConfidence: 4, LastSeenThreshold: 15, BeaconMetricSize: 30, HASendInterval: 5, HASendChangesOnly: false, RSSIEnforceThreshold: true, RSSIMinThreshold: 100, }, }, BeaconsLookup: make(map[string]struct{}), LatestList: model.LatestBeaconsList{ LatestList: make(map[string]model.Beacon), }, } cfg := config.Load() // Kafka reader for Raw MQTT beacons rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") defer rawReader.Close() // Kafka reader for API server updates apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") defer apiReader.Close() writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents") defer writer.Close() fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") locTicker := time.NewTicker(1 * time.Second) defer locTicker.Stop() chRaw := make(chan model.BeaconAdvertisement, 2000) chApi := make(chan model.ApiUpdate, 2000) go kafkaclient.Consume(rawReader, chRaw) go kafkaclient.Consume(apiReader, chApi) for { select { case <-locTicker.C: getLikelyLocations(&appCtx, writer) case msg := <-chRaw: assignBeaconToList(msg, &appCtx) case msg := <-chApi: switch msg.Method { case "POST": id := msg.Beacon.ID appCtx.BeaconsLookup[id] = struct{}{} case "DELETE": fmt.Println("Incoming delete message") } } } } func getLikelyLocations(ctx *model.AppContext, writer *kafka.Writer) { fmt.Println("get likely locations called") ctx.Beacons.Lock.Lock() beacons := ctx.Beacons.Beacons ctx.Beacons.Lock.Unlock() for _, beacon := range beacons { fmt.Printf("beacon: %+v", beacon) // Shrinking the model because other properties have nothing to do with the location r := model.HTTPLocation{ Method: "Standard", Distance: 999, Name: beacon.Name, ID: beacon.ID, Location: "", LastSeen: 999, } mSize := len(beacon.BeaconMetrics) if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > ctx.Settings.Settings.LastSeenThreshold { continue } locList := make(map[string]float64) seenW := 1.5 rssiW := 0.75 for _, metric := range beacon.BeaconMetrics { res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0))) locList[metric.Location] += res } bestLocName := "" maxScore := 0.0 for locName, score := range locList { if score > maxScore { maxScore = score bestLocName = locName } } if bestLocName == beacon.PreviousLocation { beacon.LocationConfidence++ } else { beacon.LocationConfidence = 0 } r.Distance = beacon.BeaconMetrics[mSize-1].Distance r.Location = bestLocName r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp if beacon.LocationConfidence == ctx.Settings.Settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName { beacon.LocationConfidence = 0 // Who do I need this if I am sending entire structure anyways? who knows js, err := json.Marshal(model.LocationChange{ Method: "LocationChange", BeaconRef: beacon, Name: beacon.Name, PreviousLocation: beacon.PreviousConfidentLocation, NewLocation: bestLocName, Timestamp: time.Now().Unix(), }) if err != nil { beacon.PreviousConfidentLocation = bestLocName beacon.PreviousLocation = bestLocName ctx.Beacons.Lock.Lock() ctx.Beacons.Beacons[beacon.ID] = beacon ctx.Beacons.Lock.Unlock() continue } msg := kafka.Message{ Value: js, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending Kafka message") } } beacon.PreviousLocation = bestLocName ctx.Beacons.Lock.Lock() ctx.Beacons.Beacons[beacon.ID] = beacon ctx.Beacons.Lock.Unlock() js, err := json.Marshal(r) if err != nil { continue } msg := kafka.Message{ Value: js, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending Kafka message") } } } func assignBeaconToList(adv model.BeaconAdvertisement, ctx *model.AppContext) { id := adv.MAC _, ok := ctx.BeaconsLookup[id] now := time.Now().Unix() if !ok { // handle removing from the list somewhere else, probably at the point where this is being used which is nowhere in the original code ctx.LatestList.Lock.Lock() ctx.LatestList.LatestList[id] = model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: getBeaconDistance(adv)} ctx.LatestList.Lock.Unlock() return } fmt.Println("Beacon exists: ", id) if ctx.Settings.Settings.RSSIEnforceThreshold && (int64(adv.RSSI) < ctx.Settings.Settings.RSSIMinThreshold) { return } ctx.Beacons.Lock.Lock() beacon := ctx.Beacons.Beacons[id] ctx.Beacons.Lock.Unlock() beacon.IncomingJSON = adv beacon.LastSeen = now beacon.BeaconType = adv.BeaconType beacon.HSButtonCounter = adv.HSButtonCounter beacon.HSBattery = adv.HSBatteryLevel beacon.HSRandomNonce = adv.HSRandomNonce beacon.HSButtonMode = adv.HSButtonMode if beacon.BeaconMetrics == nil { beacon.BeaconMetrics = make([]model.BeaconMetric, 0, ctx.Settings.Settings.BeaconMetricSize) } metric := model.BeaconMetric{ Distance: getBeaconDistance(adv), Timestamp: now, RSSI: int64(adv.RSSI), Location: adv.Hostname, } if len(beacon.BeaconMetrics) >= ctx.Settings.Settings.BeaconMetricSize { copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:]) beacon.BeaconMetrics[ctx.Settings.Settings.BeaconMetricSize-1] = metric } else { beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric) } ctx.Beacons.Lock.Lock() ctx.Beacons.Beacons[id] = beacon ctx.Beacons.Lock.Unlock() } func getBeaconDistance(adv model.BeaconAdvertisement) float64 { ratio := float64(adv.RSSI) * (1.0 / float64(twosComp(adv.TXPower))) distance := 100.0 if ratio < 1.0 { distance = math.Pow(ratio, 10) } else { distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 } return distance } func twosComp(inp string) int64 { i, _ := strconv.ParseInt("0x"+inp, 0, 64) return i - 256 }