package main import ( "context" "fmt" "math" "strconv" "time" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/mqttclient" presenseredis "github.com/AFASystems/presence/internal/pkg/redis" "github.com/redis/go-redis/v9" ) // Move Kafka topics, Redis keys, intervals to env config // Replace hardcoded IPs with env vars // avoid defers -> lock and unlock right before and after usage // Distance formula uses twos_comp incorrectly should parse signed int not hex string // Use buffered log instead of fmt.Println ??? // Limit metrics slice size with ring buffer ?? // handle go routine exit signals with context.WithCancel() ?? // Make internal package for Kafka and Redis // Make internal package for processor: // Helper functions: twos_comp, getBeaconId func main() { // Load global context to init beacons and latest list appCtx := model.AppContext{ Beacons: model.BeaconsList{ Beacons: make(map[string]model.Beacon), }, LatestList: model.LatestBeaconsList{ LatestList: make(map[string]model.Beacon), }, Settings: model.Settings{ Settings: model.SettingsVal{ Location_confidence: 4, Last_seen_threshold: 15, Beacon_metrics_size: 30, HA_send_interval: 5, HA_send_changes_only: false, }, }, } cfg := config.Load() // Kafka writer idk why yet writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons") defer writer.Close() // Kafka reader for Raw MQTT beacons rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID") defer rawReader.Close() // Kafka reader for API server updates apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID") defer apiReader.Close() // Kafka reader for latest list updates latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID") defer latestReader.Close() // Kafka reader for settings updates settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID") defer settingsReader.Close() ctx := context.Background() // Init Redis Client client := redis.NewClient(&redis.Options{ Addr: cfg.RedisURL, Password: "", }) beaconsList := presenseredis.LoadBeaconsList(client, ctx) appCtx.Beacons.Beacons = beaconsList latestList := presenseredis.LoadLatestList(client, ctx) appCtx.LatestList.LatestList = latestList settings := presenseredis.LoadSettings(client, ctx) appCtx.Settings.Settings = settings // declare channel for collecting Kafka messages chRaw := make(chan model.Incoming_json, 2000) chApi := make(chan model.ApiUpdate, 2000) chLatest := make(chan model.Incoming_json, 2000) chSettings := make(chan model.SettingsVal, 10) go kafkaclient.Consume(rawReader, chRaw) go kafkaclient.Consume(apiReader, chApi) go kafkaclient.Consume(latestReader, chLatest) go kafkaclient.Consume(settingsReader, chSettings) go func() { // Syncing Redis cache every 1s with 2 lists: beacons, latest list ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { presenseredis.SaveBeaconsList(&appCtx, client, ctx) presenseredis.SaveLatestList(&appCtx, client, ctx) presenseredis.SaveSettings(&appCtx, client, ctx) } }() for { select { case msg := <-chRaw: processIncoming(msg, &appCtx) case msg := <-chApi: switch msg.Method { case "POST": appCtx.Beacons.Lock.Lock() appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon case "DELETE": _, exists := appCtx.Beacons.Beacons[msg.ID] if exists { appCtx.Beacons.Lock.Lock() delete(appCtx.Beacons.Beacons, msg.ID) } default: fmt.Println("unknown method: ", msg.Method) } appCtx.Beacons.Lock.Unlock() case msg := <-chLatest: fmt.Println("latest msg: ", msg) case msg := <-chSettings: appCtx.Settings.Lock.Lock() appCtx.Settings.Settings = msg fmt.Println("settings channel: ", msg) appCtx.Settings.Lock.Unlock() } } } func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) { defer func() { if err := recover(); err != nil { fmt.Println("work failed:", err) } }() fmt.Println("message came") incoming = mqttclient.IncomingBeaconFilter(incoming) id := mqttclient.GetBeaconID(incoming) now := time.Now().Unix() beacons := &ctx.Beacons beacons.Lock.Lock() defer beacons.Lock.Unlock() latestList := &ctx.LatestList latestList.Lock.Lock() defer latestList.Lock.Unlock() beacon, exists := beacons.Beacons[id] if !exists { x, exists := latestList.LatestList[id] if exists { x.Last_seen = now x.Incoming_JSON = incoming x.Distance = getBeaconDistance(incoming) latestList.LatestList[id] = x } else { latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)} } // Move this to seperate routine? for k, v := range latestList.LatestList { if (now - v.Last_seen) > 10 { delete(latestList.LatestList, k) } } return } updateBeacon(&beacon, incoming) beacons.Beacons[id] = beacon } func getBeaconDistance(incoming model.Incoming_json) float64 { rssi := incoming.RSSI power := incoming.TX_power distance := 100.0 ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) if ratio < 1.0 { distance = math.Pow(ratio, 10) } else { distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 } return distance } func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) { now := time.Now().Unix() beacon.Incoming_JSON = incoming beacon.Last_seen = now beacon.Beacon_type = incoming.Beacon_type beacon.HB_ButtonCounter = incoming.HB_ButtonCounter beacon.HB_Battery = incoming.HB_Battery beacon.HB_RandomNonce = incoming.HB_RandomNonce beacon.HB_ButtonMode = incoming.HB_ButtonMode if beacon.Beacon_metrics == nil { beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now } metric := model.BeaconMetric{} metric.Distance = getBeaconDistance(incoming) metric.Timestamp = now metric.Rssi = int64(incoming.RSSI) metric.Location = incoming.Hostname beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric) // Leave the HB button implementation for now } func twos_comp(inp string) int64 { i, _ := strconv.ParseInt("0x"+inp, 0, 64) return i - 256 }