diff --git a/cmd/presenSe/presense.go b/cmd/presenSe/presense.go index e64fcf2..b6cff15 100644 --- a/cmd/presenSe/presense.go +++ b/cmd/presenSe/presense.go @@ -1,7 +1,182 @@ package main -import "fmt" +import ( + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "strings" + "time" + + "github.com/AFASystems/presence/internal/pkg/config" + "github.com/AFASystems/presence/internal/pkg/httpserver" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/mqtt_client" + "github.com/AFASystems/presence/internal/pkg/persistence" + "github.com/boltdb/bolt" + "github.com/gorilla/websocket" + "github.com/yosssi/gmq/mqtt" + "github.com/yosssi/gmq/mqtt/client" +) func main() { - fmt.Println("this is the main file") + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, os.Kill) + cfg := config.Load() + + db, err := bolt.Open("presence.db", 0644, nil) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + model.Db = db + + cli := client.New(&client.Options{ + ErrorHandler: func(err error) { + fmt.Println(err) + }, + }) + + defer cli.Terminate() + + err = cli.Connect(&client.ConnectOptions{ + Network: "tcp", + Address: cfg.MQTTHost, + ClientID: []byte(cfg.MQTTClientID), + UserName: []byte(cfg.MQTTUser), + Password: []byte(cfg.MQTTPass), + }) + + if err != nil { + panic(err) + } + + ctx := &model.AppContext{ + HTTPResults: model.HTTPResultsList{ + HTTPResults: model.HTTPLocationsList{Beacons: []model.HTTPLocation{}}, + }, + Beacons: model.BeaconsList{ + Beacons: make(map[string]model.Beacon), + }, + ButtonsList: make(map[string]model.Button), + Settings: model.Settings{ + Location_confidence: 4, + Last_seen_threshold: 15, + Beacon_metrics_size: 30, + HA_send_interval: 5, + HA_send_changes_only: false, + }, + Clients: make(map[*websocket.Conn]bool), + Broadcast: make(chan model.Message, 100), + Locations: model.LocationsList{Locations: make(map[string]model.Location)}, + LatestList: model.LatestBeaconsList{LatestList: make(map[string]model.Beacon)}, + } + + persistence.LoadState(model.Db, ctx) + incomingChan := mqtt_client.IncomingMQTTProcessor(1*time.Second, cli, model.Db, ctx) + + err = cli.Subscribe(&client.SubscribeOptions{ + SubReqs: []*client.SubReq{ + &client.SubReq{ + TopicFilter: []byte("publish_out/#"), + QoS: mqtt.QoS0, + Handler: func(topicName, message []byte) { + msgStr := string(message) + t := strings.Split(string(topicName), "/") + hostname := t[1] + + if strings.HasPrefix(msgStr, "[") { + var readings []model.RawReading + err := json.Unmarshal(message, &readings) + if err != nil { + log.Printf("Errore parsing JSON: %v", err) + return + } + + for _, reading := range readings { + if reading.Type == "Gateway" { + continue + } + incoming := model.Incoming_json{ + Hostname: hostname, + MAC: reading.MAC, + RSSI: int64(reading.RSSI), + Data: reading.RawData, + HB_ButtonCounter: parseButtonState(reading.RawData), + } + incomingChan <- incoming + } + } else { + s := strings.Split(string(message), ",") + if len(s) < 6 { + log.Printf("Messaggio CSV non valido: %s", msgStr) + return + } + + rawdata := s[4] + buttonCounter := parseButtonState(rawdata) + if buttonCounter > 0 { + incoming := model.Incoming_json{} + i, _ := strconv.ParseInt(s[3], 10, 64) + incoming.Hostname = hostname + incoming.Beacon_type = "hb_button" + incoming.MAC = s[1] + incoming.RSSI = i + incoming.Data = rawdata + incoming.HB_ButtonCounter = buttonCounter + + read_line := strings.TrimRight(string(s[5]), "\r\n") + it, err33 := strconv.Atoi(read_line) + if err33 != nil { + fmt.Println(it) + fmt.Println(err33) + os.Exit(2) + } + incomingChan <- incoming + } + } + }, + }, + }, + }) + if err != nil { + panic(err) + } + + fmt.Println("CONNECTED TO MQTT") + fmt.Println("\n ") + fmt.Println("Visit http://" + cfg.HTTPAddr + " on your browser to see the web interface") + fmt.Println("\n ") + + go httpserver.StartHTTPServer(cfg.HTTPAddr, ctx) + + <-sigc + + if err := cli.Disconnect(); err != nil { + panic(err) + } +} + +func parseButtonState(raw string) int64 { + raw = strings.ToUpper(raw) + + if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 { + buttonField := raw[34:38] + if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil { + return buttonValue + } + } + + if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 { + counterField := raw[22:24] + buttonState, err := strconv.ParseInt(counterField, 16, 64) + if err == nil { + return buttonState + } + } + + return 0 } diff --git a/copy_files/_main.go b/copy_files/_main.go index 951f7de..970c94f 100644 --- a/copy_files/_main.go +++ b/copy_files/_main.go @@ -603,27 +603,6 @@ func getLikelyLocations(settings Settings, locations_list Locations_list, cl *cl if err != nil { panic(err) } - - // Read in a new message as JSON and map it to a Message object - //err := ws.ReadJSON(&msg) - - /*msg := Message{ - Email: "apple", - Username: "peach", - Message: "change",} - res1B, _ := json.Marshal(msg) - fmt.Println(string(res1B)) - - - - if err != nil { - log.Printf("error: %v", err) - - } - // Send the newly received message to the broadcast channel - broadcast <- msg*/ - - ///utils.Log.Printf("%s changes ",beacon.Beacon_id) s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --loct=%s", beacon.Beacon_id, beacon.Incoming_JSON.Hostname, location) ///utils.Log.Printf("%s",s) err, out, errout := Shellout(s) @@ -985,25 +964,6 @@ func createUser(id string, logWanted bool) user { func main() { loggers := []*user{} - // initialize empty-object json file if not found - //if _, err := ioutil.ReadFile(Db); err != nil { - // str := "{}" - // if err = ioutil.WriteFile(Db, []byte(str), 0644); err != nil { - // log.Fatal(err) - // } - //} - - // create channel to communicate over - //jobs := make(chan Job) - - // start watching jobs channel for work - //go ProcessJobs(jobs, Db) - - // create dependencies - //client := &TodoClient{Jobs: jobs} - //handlers := &TodoHandlers{Client: client} - //work := WorkRequest{Name: name, Delay: delay} - //jobs <- work http_host_path_ptr = flag.String("http_host_path", "0.0.0.0:8080", "The host:port that the HTTP server should listen on") //https_host_path_ptr = flag.String("https_host_path", "0.0.0.0:5443", "The host:port that the HTTP server should listen on") diff --git a/copy_files/_maincopy.go b/copy_files/_maincopy.go index 86fb692..587a31d 100644 --- a/copy_files/_maincopy.go +++ b/copy_files/_maincopy.go @@ -1,797 +1,27 @@ package main import ( - "bytes" - "encoding/gob" "encoding/json" - "flag" "fmt" - "io/ioutil" "log" - "math" - "net/http" "os" "os/signal" "strconv" "strings" - "sync" "time" //"./utils" - "os/exec" - - "github.com/boltdb/bolt" - "gopkg.in/natefinch/lumberjack.v2" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" - - "github.com/gorilla/websocket" -) - -var clients = make(map[*websocket.Conn]bool) // connected clients -var broadcast = make(chan Message) -var BEACONS Beacons_list - -var Buttons_list map[string]Button - -var cli *client.Client - -var http_results HTTP_locations_list -var http_results_lock sync.RWMutex - -var Latest_beacons_list map[string]Beacon -var latest_list_lock sync.RWMutex - -var db *bolt.DB -var err error - -var world = []byte("presence") - -var ( -// /logpath = flag.String("logpath", "/data/var/log/presence/presence.log", "Log Path") -) - -var ( - // Websocket http upgrader - upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, - } ) -var settings = Settings{ - Location_confidence: 4, - Last_seen_threshold: 15, - Beacon_metrics_size: 30, - HA_send_interval: 5, - HA_send_changes_only: false, -} - -// utility function -func parseButtonState(raw string) int64 { - raw = strings.ToUpper(raw) - - // Minew B7 / C7 / D7 - frame tipo: 0201060303E1FF1216E1FFA103... - if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 { - // La posizione 34-38 (indice 26:30) contiene il buttonCounter su 2 byte (hex) - buttonField := raw[34:38] // NB: offset 34-38 zero-based - if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil { - return buttonValue - } - } - - // Ingics (02010612FF590) - if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 { - counterField := raw[22:24] - buttonState, err := strconv.ParseInt(counterField, 16, 64) - if err == nil { - return buttonState - } - } - - // Aggiungeremo qui facilmente nuovi beacon in futuro - - return 0 -} - -func twos_comp(inp string) int64 { - i, _ := strconv.ParseInt("0x"+inp, 0, 64) - - return i - 256 -} - -func getBeaconID(incoming Incoming_json) string { - unique_id := fmt.Sprintf("%s", incoming.MAC) - /*if incoming.Beacon_type == "ibeacon" { - unique_id = fmt.Sprintf("%s_%s_%s", incoming.UUID, incoming.Major, incoming.Minor) - } else if incoming.Beacon_type == "eddystone" { - unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id) - } else if incoming.Beacon_type == "hb_button" { - unique_id = fmt.Sprintf("%s_%s", incoming.Namespace, incoming.Instance_id) - }*/ - return unique_id -} - -func incomingBeaconFilter(incoming Incoming_json) Incoming_json { - out_json := incoming - if incoming.Beacon_type == "hb_button" { - //do additional checks here to detect if a Habby Bubbles Button - // looks like 020104020a0011ff045600012d3859db59e1000b9453 - - raw_data := incoming.Data - //company_id := []byte{0x04, 0x56} - //product_id := []byte{0x00, 0x01} - hb_button_prefix_str := fmt.Sprintf("02010612FF5900") - if strings.HasPrefix(raw_data, hb_button_prefix_str) { - out_json.Namespace = "ddddeeeeeeffff5544ff" - //out_json.Instance_id = raw_data[24:36] - counter_str := fmt.Sprintf("0x%s", raw_data[22:24]) - counter, _ := strconv.ParseInt(counter_str, 0, 64) - out_json.HB_ButtonCounter = counter - - battery_str := fmt.Sprintf("0x%s%s", raw_data[20:22], raw_data[18:20]) - ////fmt.Println("battery has %s\n", battery_str) - - battery, _ := strconv.ParseInt(battery_str, 0, 64) - out_json.HB_Battery = battery - - out_json.TX_power = fmt.Sprintf("0x%s", "4") - - out_json.Beacon_type = "hb_button" - out_json.HB_ButtonMode = "presence_button" - - ///fmt.Println("Button adv has %#v\n", out_json) - } - } - - return out_json -} - -func processButton(bbeacon Beacon, cl *client.Client) { - btn := Button{Name: bbeacon.Name} - btn.Button_id = bbeacon.Beacon_id - btn.Button_type = bbeacon.Beacon_type - btn.Button_location = bbeacon.Previous_location - btn.Incoming_JSON = bbeacon.Incoming_JSON - btn.Distance = bbeacon.Distance - btn.Last_seen = bbeacon.Last_seen - btn.HB_ButtonCounter = bbeacon.HB_ButtonCounter - btn.HB_Battery = bbeacon.HB_Battery - btn.HB_RandomNonce = bbeacon.HB_RandomNonce - btn.HB_ButtonMode = bbeacon.HB_ButtonMode - - nonce, ok := Buttons_list[btn.Button_id] - if !ok || nonce.HB_RandomNonce != btn.HB_RandomNonce { - // send the button message to MQTT - sendButtonMessage(btn, cl) - } - Buttons_list[btn.Button_id] = btn -} - -func getiBeaconDistance(rssi int64, power string) float64 { - - ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) - //fmt.Printf("beaconpower: rssi %d ratio %e power %e \n",rssi, ratio, float64(twos_comp(power))) - distance := 100.0 - if ratio < 1.0 { - distance = math.Pow(ratio, 10) - } else { - distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 - } - return distance -} - -func getBeaconDistance(incoming Incoming_json) float64 { - distance := 1000.0 - - distance = getiBeaconDistance(incoming.RSSI, incoming.TX_power) - //distance = math.Abs(float64(incoming.RSSI)) - - return distance -} - -func getAverageDistance(beacon_metrics []beacon_metric) float64 { - total := 0.0 - - for _, v := range beacon_metrics { - total += v.distance - } - return (total / float64(len(beacon_metrics))) -} - -func sendHARoomMessage(beacon_id string, beacon_name string, distance float64, location string, cl *client.Client) { - //first make the json - ha_msg, err := json.Marshal(HA_message{Beacon_id: beacon_id, Beacon_name: beacon_name, Distance: distance}) - if err != nil { - panic(err) - } - - //send the message to HA - err = cl.Publish(&client.PublishOptions{ - QoS: mqtt.QoS1, - TopicName: []byte("afa-systems/presence/ha/" + location), - Message: ha_msg, - }) - if err != nil { - panic(err) - } -} - -func sendButtonMessage(btn Button, cl *client.Client) { - //first make the json - btn_msg, err := json.Marshal(btn) - if err != nil { - panic(err) - } - - //send the message to HA - err = cl.Publish(&client.PublishOptions{ - QoS: mqtt.QoS1, - TopicName: []byte("afa-systems/presence/button/" + btn.Button_id), - Message: btn_msg, - }) - if err != nil { - panic(err) - } - -} - -func sendButtonPressed(bcn Beacon, cl *client.Client) { - //first make the json - btn_msg, err := json.Marshal(bcn) - if err != nil { - panic(err) - } - - //send the message to HA - err = cl.Publish(&client.PublishOptions{ - QoS: mqtt.QoS1, - TopicName: []byte("afa-systems/presence/button/" + bcn.Beacon_id), - Message: btn_msg, - }) - if err != nil { - panic(err) - } - ///utils.Log.Printf("%s pressed ",bcn.Beacon_id) - s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --st=%d", bcn.Beacon_id, bcn.Incoming_JSON.Hostname, bcn.HB_ButtonCounter) - ///utils.Log.Printf("%s",s) - err, out, errout := Shellout(s) - if err != nil { - log.Printf("error: %v\n", err) - } - fmt.Println("--- stdout ---") - fmt.Println(out) - fmt.Println("--- stderr ---") - fmt.Println(errout) -} - -func getLikelyLocations(settings Settings, locations_list Locations_list, cl *client.Client) { - // create the http results structure - http_results_lock.Lock() - http_results = HTTP_locations_list{} - http_results.Beacons = make([]HTTP_location, 0) - ///http_results.Buttons = make([]Button, 0) - http_results_lock.Unlock() - - should_persist := false - - // iterate through the beacons we want to search for - for _, beacon := range BEACONS.Beacons { - - if len(beacon.beacon_metrics) == 0 { - ////fmt.Printf("beacon_metrics = 0:\n") - continue - } - - if (int64(time.Now().Unix()) - (beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp)) > settings.Last_seen_threshold { - ////fmt.Printf("beacon_metrics timestamp = %s %s \n",beacon.Name, beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp ) - if beacon.expired_location == "expired" { - //beacon.Location_confidence = - 1 - continue - } else { - beacon.expired_location = "expired" - msg := Message{ - Email: beacon.Previous_confident_location, - Username: beacon.Name, - Message: beacon.expired_location} - res1B, _ := json.Marshal(msg) - fmt.Println(string(res1B)) - - if err != nil { - log.Printf("error: %v", err) - - } - // Send the newly received message to the broadcast channel - broadcast <- msg - } - } else { - beacon.expired_location = "" - - } - - best_location := Best_location{} - - // go through its beacon metrics and pick out the location that appears most often - loc_list := make(map[string]float64) - seen_weight := 1.5 - rssi_weight := 0.75 - for _, metric := range beacon.beacon_metrics { - loc, ok := loc_list[metric.location] - if !ok { - loc = seen_weight + (rssi_weight * (1.0 - (float64(metric.rssi) / -100.0))) - } else { - loc = loc + seen_weight + (rssi_weight * (1.0 - (float64(metric.rssi) / -100.0))) - } - loc_list[metric.location] = loc - } - best_name := "" - ts := 0.0 - for name, times_seen := range loc_list { - if times_seen > ts { - best_name = name - ts = times_seen - } - } - /////fmt.Printf("BEST LOCATION FOR %s IS: %s with score: %f\n", beacon.Name, best_name, ts) - best_location = Best_location{name: best_name, distance: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].distance, last_seen: beacon.beacon_metrics[len(beacon.beacon_metrics)-1].timestamp} - - beacon.Location_history = append(beacon.Location_history, best_location.name) - if len(beacon.Location_history) > 10 { - beacon.Location_history = beacon.Location_history[1:] // manteniamo solo gli ultimi 10 - } - - // Calcoliamo la location più votata nello storico - location_counts := make(map[string]int) - for _, loc := range beacon.Location_history { - location_counts[loc]++ - } - - max_count := 0 - most_common_location := "" - for loc, count := range location_counts { - if count > max_count { - max_count = count - most_common_location = loc - } - } - - // Applichiamo un filtro: consideriamo il cambio solo se almeno 7 su 10 votano per una location - if max_count >= 7 { - beacon.Previous_location = most_common_location - if most_common_location == beacon.Previous_confident_location { - beacon.Location_confidence++ - } else { - beacon.Location_confidence = 1 - beacon.Previous_confident_location = most_common_location - } - } - - //create an http result from this - r := HTTP_location{} - r.Distance = best_location.distance - r.Name = beacon.Name - r.Beacon_name = beacon.Name - r.Beacon_id = beacon.Beacon_id - r.Beacon_type = beacon.Beacon_type - r.HB_Battery = beacon.HB_Battery - r.HB_ButtonMode = beacon.HB_ButtonMode - r.HB_ButtonCounter = beacon.HB_ButtonCounter - r.Location = best_location.name - r.Last_seen = best_location.last_seen - - ////fmt.Printf("beacon.Location_confidence %s, settings.Location_confidence %s, beacon.Previous_confident_location %s: best_location.name %s\n",beacon.Location_confidence, settings.Location_confidence, beacon.Previous_confident_location, best_location.name) - - if (beacon.Location_confidence == settings.Location_confidence && beacon.Previous_confident_location != best_location.name) || beacon.expired_location == "expired" { - // location has changed, send an mqtt message - - should_persist = true - fmt.Printf("detected a change!!! %#v\n\n", beacon) - if beacon.Previous_confident_location == "expired" && beacon.expired_location == "" { - msg := Message{ - Email: beacon.Previous_confident_location, - Username: beacon.Name, - Message: "OK"} - res1B, _ := json.Marshal(msg) - fmt.Println(string(res1B)) - - if err != nil { - log.Printf("error: %v", err) - - } - // Send the newly received message to the broadcast channel - broadcast <- msg - } - beacon.Location_confidence = 0 - location := "" - if beacon.expired_location == "expired" { - - location = "expired" - } else { - location = best_location.name - } - //first make the json - js, err := json.Marshal(Location_change{Beacon_ref: beacon, Name: beacon.Name, Beacon_name: beacon.Name, Previous_location: beacon.Previous_confident_location, New_location: location, Timestamp: time.Now().Unix()}) - if err != nil { - continue - } - - //send the message - err = cl.Publish(&client.PublishOptions{ - QoS: mqtt.QoS1, - TopicName: []byte("afa-systems/presence/changes"), - Message: js, - }) - if err != nil { - panic(err) - } - s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --loct=%s", beacon.Beacon_id, beacon.Incoming_JSON.Hostname, location) - ///utils.Log.Printf("%s",s) - err, out, errout := Shellout(s) - if err != nil { - log.Printf("error: %v\n", err) - } - fmt.Println("--- stdout ---") - fmt.Println(out) - fmt.Println("--- stderr ---") - fmt.Println(errout) - //////beacon.logger.Printf("Log content: user id %v \n", best_location.name) - if settings.HA_send_changes_only { - sendHARoomMessage(beacon.Beacon_id, beacon.Name, best_location.distance, best_location.name, cl) - } - - if beacon.expired_location == "expired" { - - beacon.Previous_confident_location = "expired" - r.Location = "expired" - } else { - beacon.Previous_confident_location = best_location.name - } - ///beacon.Previous_confident_location = best_location.name - - } - - beacon.Previous_location = best_location.name - r.Previous_confident_location = beacon.expired_location - BEACONS.Beacons[beacon.Beacon_id] = beacon - - http_results_lock.Lock() - http_results.Beacons = append(http_results.Beacons, r) - http_results_lock.Unlock() - - if best_location.name != "" { - if !settings.HA_send_changes_only { - secs := int64(time.Now().Unix()) - if secs%settings.HA_send_interval == 0 { - sendHARoomMessage(beacon.Beacon_id, beacon.Name, best_location.distance, best_location.name, cl) - } - } - } - err := cl.Publish(&client.PublishOptions{ - QoS: mqtt.QoS0, - TopicName: []byte("afa-systems/presence"), - Message: []byte(fmt.Sprintf("%s is most likely in %s with average distance %f", beacon.Name, best_location.name, best_location.distance)), - }) - if err != nil { - panic(err) - } - - } - - if should_persist { - persistBeacons() - } -} - -func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, logger []*user) chan<- Incoming_json { - - incoming_msgs_chan := make(chan Incoming_json, 2000) - - // load initial BEACONS - BEACONS.Beacons = make(map[string]Beacon) - // retrieve the data - - // create bucket if not exist - err = db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(world) - if err != nil { - return err - } - return nil - }) - - err = db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(world) - if bucket == nil { - return err - } - - key := []byte("beacons_list") - val := bucket.Get(key) - if val != nil { - buf := bytes.NewBuffer(val) - dec := gob.NewDecoder(buf) - err = dec.Decode(&BEACONS) - if err != nil { - log.Fatal("decode error:", err) - } - } - - key = []byte("buttons_list") - val = bucket.Get(key) - if val != nil { - buf := bytes.NewBuffer(val) - dec := gob.NewDecoder(buf) - err = dec.Decode(&Buttons_list) - if err != nil { - log.Fatal("decode error:", err) - } - } - - key = []byte("settings") - val = bucket.Get(key) - if val != nil { - buf := bytes.NewBuffer(val) - dec := gob.NewDecoder(buf) - err = dec.Decode(&settings) - if err != nil { - log.Fatal("decode error:", err) - } - } - - return nil - }) - - if err != nil { - log.Fatal(err) - } - Latest_beacons_list = make(map[string]Beacon) - - Buttons_list = make(map[string]Button) - - //create a map of locations, looked up by hostnames - locations_list := Locations_list{} - ls := make(map[string]Location) - locations_list.locations = ls - - ticker := time.NewTicker(updateInterval) - - go func() { - for { - select { - - case <-ticker.C: - getLikelyLocations(settings, locations_list, cl) - case incoming := <-incoming_msgs_chan: - func() { - defer func() { - if err := recover(); err != nil { - log.Println("work failed:", err) - } - }() - - incoming = incomingBeaconFilter(incoming) - this_beacon_id := getBeaconID(incoming) - - now := time.Now().Unix() - - beacon, ok := BEACONS.Beacons[this_beacon_id] - if !ok { - //should be unique - //if it's already in list, forget it. - latest_list_lock.Lock() - x, ok := Latest_beacons_list[this_beacon_id] - if ok { - //update its timestamp - x.Last_seen = now - x.Incoming_JSON = incoming - x.Distance = getBeaconDistance(incoming) - - Latest_beacons_list[this_beacon_id] = x - } else { - Latest_beacons_list[this_beacon_id] = Beacon{Beacon_id: this_beacon_id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)} - } - for k, v := range Latest_beacons_list { - if (now - v.Last_seen) > 10 { // 10 seconds - delete(Latest_beacons_list, k) - } - } - latest_list_lock.Unlock() - //continue - return - } - - beacon.Incoming_JSON = incoming - beacon.Last_seen = now - beacon.Beacon_type = incoming.Beacon_type - beacon.HB_ButtonCounter = incoming.HB_ButtonCounter - beacon.HB_Battery = incoming.HB_Battery - beacon.HB_RandomNonce = incoming.HB_RandomNonce - beacon.HB_ButtonMode = incoming.HB_ButtonMode - ////fmt.Println("button pressed " + this_beacon_id + " at " + strconv.Itoa(int(incoming.HB_ButtonCounter)) ) - - if beacon.beacon_metrics == nil { - beacon.beacon_metrics = make([]beacon_metric, settings.Beacon_metrics_size) - } - //create metric for this beacon - this_metric := beacon_metric{} - this_metric.distance = getBeaconDistance(incoming) - this_metric.timestamp = now - this_metric.rssi = int64(incoming.RSSI) - this_metric.location = incoming.Hostname - beacon.beacon_metrics = append(beacon.beacon_metrics, this_metric) - ///fmt.Printf("APPENDING a metric from %s len %d\n", beacon.Name, len(beacon.beacon_metrics)) - if len(beacon.beacon_metrics) > settings.Beacon_metrics_size { - //fmt.Printf("deleting a metric from %s len %d\n", beacon.Name, len(beacon.beacon_metrics)) - beacon.beacon_metrics = append(beacon.beacon_metrics[:0], beacon.beacon_metrics[0+1:]...) - } - //fmt.Printf("%#v\n", beacon.beacon_metrics) - if beacon.HB_ButtonCounter_Prev != beacon.HB_ButtonCounter { - beacon.HB_ButtonCounter_Prev = incoming.HB_ButtonCounter - // send the button message to MQTT - sendButtonPressed(beacon, cl) - } - BEACONS.Beacons[beacon.Beacon_id] = beacon - - /*if beacon.Beacon_type == "hb_button" { - processButton(beacon, cl) - }*/ - - //lookup location by hostname in locations - location, ok := locations_list.locations[incoming.Hostname] - if !ok { - //create the location - locations_list.locations[incoming.Hostname] = Location{} - location, ok = locations_list.locations[incoming.Hostname] - location.name = incoming.Hostname - } - locations_list.locations[incoming.Hostname] = location - }() - } - } - }() - - return incoming_msgs_chan -} -func ParseTimeStamp(utime string) (string, error) { - i, err := strconv.ParseInt(utime, 10, 64) - if err != nil { - return "", err - } - t := time.Unix(i, 0) - return t.Format(time.UnixDate), nil -} - -var http_host_path_ptr *string - -// var https_host_path_ptr *string -var httpws_host_path_ptr *string - -//var httpwss_host_path_ptr *string - -type Todo struct { - Id string `json:"id"` - Value string `json:"value" binding:"required"` -} - -type Job interface { - ExitChan() chan error - Run(todos map[string]Todo) (map[string]Todo, error) -} - -func ProcessJobs(jobs chan Job, db string) { - for { - j := <-jobs - - todos := make(map[string]Todo, 0) - content, err := ioutil.ReadFile(db) - if err == nil { - if err = json.Unmarshal(content, &todos); err == nil { - todosMod, err := j.Run(todos) - - if err == nil && todosMod != nil { - b, err := json.Marshal(todosMod) - if err == nil { - err = ioutil.WriteFile(db, b, 0644) - } - } - } - } - - j.ExitChan() <- err - } -} - -type user struct { - id string - logger *log.Logger -} - -const ShellToUse = "bash" - -func Shellout(command string) (error, string, string) { - var stdout bytes.Buffer - var stderr bytes.Buffer - ///utils.Log.Printf("command: %s",command) - cmd := exec.Command(ShellToUse, "-c", command) - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return err, stdout.String(), stderr.String() -} - -func createUser(id string, logWanted bool) user { - var l *log.Logger - - if logWanted { - // Here the log content will be added in the user log file - userFIle := &lumberjack.Logger{ - Filename: "/data/var/log/presence/presence/log_" + id + ".log", - MaxSize: 250, // mb - MaxBackups: 5, - MaxAge: 10, // in days - } - l = log.New(userFIle, "User: ", log.Ldate|log.Ltime|log.Lshortfile) - } else { - // Here the log content will go nowhere - l = log.New(ioutil.Discard, "User: ", log.Ldate|log.Ltime|log.Lshortfile) - } - return user{id, l} -} - func main() { - - loggers := []*user{} - - http_host_path_ptr = flag.String("http_host_path", "0.0.0.0:8080", "The host:port that the HTTP server should listen on") - //https_host_path_ptr = flag.String("https_host_path", "0.0.0.0:5443", "The host:port that the HTTP server should listen on") - httpws_host_path_ptr = flag.String("httpws_host_path", "0.0.0.0:8088", "The host:port websocket listen") - //httpwss_host_path_ptr = flag.String("httpwss_host_path", "0.0.0.0:8443", "The host:port secure websocket listen") - - mqtt_host_ptr := flag.String("mqtt_host", "localhost:1883", "The host:port of the MQTT server to listen for beacons on") - mqtt_username_ptr := flag.String("mqtt_username", "none", "The username needed to connect to the MQTT server, 'none' if it doesn't need one") - mqtt_password_ptr := flag.String("mqtt_password", "none", "The password needed to connect to the MQTT server, 'none' if it doesn't need one") - mqtt_client_id_ptr := flag.String("mqtt_client_id", "presence-detector", "The client ID for the MQTT server") - - flag.Parse() sigc := make(chan os.Signal, 1) signal.Notify(sigc, os.Interrupt, os.Kill) - // Create an MQTT Client. - cli := client.New(&client.Options{ - // Define the processing of the error handler. - ErrorHandler: func(err error) { - fmt.Println(err) - }, - }) - // Terminate the Client. - defer cli.Terminate() - - //open the database - db, err = bolt.Open("/data/conf/presence/presence.db", 0644, nil) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - // Connect to the MQTT Server. - err = cli.Connect(&client.ConnectOptions{ - Network: "tcp", - Address: *mqtt_host_ptr, - ClientID: []byte(*mqtt_client_id_ptr), - UserName: []byte(*mqtt_username_ptr), - Password: []byte(*mqtt_password_ptr), - }) - if err != nil { - panic(err) - } - incoming_updates_chan := IncomingMQTTProcessor(1*time.Second, cli, db, loggers) - // Subscribe to topics. err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ @@ -802,8 +32,6 @@ func main() { t := strings.Split(string(topicName), "/") hostname := t[1] - //Formato JSON multiplo - //publish_out/170361001234 [{"timestamp":"2025-06-11T11:27:28.492Z","type":"Gateway","mac":"E4B3230DB5CC","nums":10},{"timestamp":"2025-06-11T11:27:28.483Z","mac":"36CE2D7CA4E5","rssi":-27,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:28.586Z","mac":"36CE2D7CA4E5","rssi":-30,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:28.612Z","mac":"406260A302FC","rssi":-35,"rawData":"02011A020A0B0BFF4C001006371AAE2F6F5B"},{"timestamp":"2025-06-11T11:27:28.798Z","mac":"36CE2D7CA4E5","rssi":-28,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:28.905Z","mac":"36CE2D7CA4E5","rssi":-30,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:28.945Z","mac":"C300003947DF","rssi":-32,"rawData":"0201061AFF4C000215FDA50693A4E24FB1AFCFC6EB0764782500000000C5"},{"timestamp":"2025-06-11T11:27:29.013Z","mac":"36CE2D7CA4E5","rssi":-29,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:29.120Z","mac":"36CE2D7CA4E5","rssi":-27,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"},{"timestamp":"2025-06-11T11:27:29.166Z","mac":"406260A302FC","rssi":-34,"rawData":"02011A020A0B0BFF4C001006371AAE2F6F5B"},{"timestamp":"2025-06-11T11:27:29.337Z","mac":"36CE2D7CA4E5","rssi":-26,"rawData":"1EFF0600010F20226F50BB5F834F6C9CE3D876B0C3F665882955B368D3B96C"}] if strings.HasPrefix(msgStr, "[") { var readings []RawReading err := json.Unmarshal(message, &readings) @@ -826,7 +54,6 @@ func main() { incoming_updates_chan <- incoming } } else { - //publish_out/171061001180 $GPRP,C83F8F17DB35,F5B0B0419FEF,-44,02010612FF590080BC280103FFFFFFFF000000000000,1749648798 s := strings.Split(string(message), ",") if len(s) < 6 { log.Printf("Messaggio CSV non valido: %s", msgStr) diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go new file mode 100644 index 0000000..9722992 --- /dev/null +++ b/internal/pkg/config/config.go @@ -0,0 +1,33 @@ +package config + +import "os" + +type Config struct { + HTTPAddr string + WSAddr string + MQTTHost string + MQTTUser string + MQTTPass string + MQTTClientID string + DBPath string +} + +// getEnv returns env var value or a default if not set. +func getEnv(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func Load() *Config { + return &Config{ + HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"), + WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"), + MQTTHost: getEnv("MQTT_HOST", "localhost:1883"), + MQTTUser: getEnv("MQTT_USERNAME", "none"), + MQTTPass: getEnv("MQTT_PASSWORD", "none"), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), + DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), + } +} diff --git a/internal/pkg/httpserver/server.go b/internal/pkg/httpserver/server.go index bbb58e8..55f9210 100644 --- a/internal/pkg/httpserver/server.go +++ b/internal/pkg/httpserver/server.go @@ -10,7 +10,7 @@ import ( "time" "github.com/AFASystems/presence/internal/pkg/model" - "github.com/AFASystems/presence/internal/pkg/utils" + "github.com/AFASystems/presence/internal/pkg/persistence" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" @@ -44,17 +44,17 @@ func StartHTTPServer(addr string, ctx *model.AppContext) { // Set up HTTP server r := mux.NewRouter() - r.HandleFunc("/api/results", resultsHandler(&ctx.HTTPResultLock, &ctx.HTTPResults)) + r.HandleFunc("/api/results", resultsHandler(&ctx.HTTPResults)) r.HandleFunc("/api/beacons/{beacon_id}", BeaconsDeleteHandler(&ctx.Beacons, ctx.ButtonsList)).Methods("DELETE") r.HandleFunc("/api/beacons", BeaconsListHandler(&ctx.Beacons)).Methods("GET") r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("POST") //since beacons are hashmap, just have put and post be same thing. it'll either add or modify that entry r.HandleFunc("/api/beacons", BeaconsAddHandler(&ctx.Beacons)).Methods("PUT") - r.HandleFunc("/api/latest-beacons", latestBeaconsListHandler(&ctx.LatestListLock, ctx.LatestBeaconsList)).Methods("GET") + r.HandleFunc("/api/latest-beacons", latestBeaconsListHandler(&ctx.LatestList)).Methods("GET") - r.HandleFunc("/api/settings", settingsListHandler(&ctx.Settings)).Methods("GET") - r.HandleFunc("/api/settings", settingsEditHandler(&ctx.Settings)).Methods("POST") + r.HandleFunc("/api/settings", SettingsListHandler(&ctx.Settings)).Methods("GET") + r.HandleFunc("/api/settings", SettingsEditHandler(&ctx.Settings)).Methods("POST") r.PathPrefix("/js/").Handler(http.StripPrefix("/js/", http.FileServer(http.Dir("static_html/js/")))) r.PathPrefix("/css/").Handler(http.StripPrefix("/css/", http.FileServer(http.Dir("static_html/css/")))) @@ -64,8 +64,8 @@ func StartHTTPServer(addr string, ctx *model.AppContext) { http.Handle("/", r) mxWS := mux.NewRouter() - mxWS.HandleFunc("/ws/api/beacons", serveWs(&ctx.HTTPResultLock, &ctx.HTTPResults)) - mxWS.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(&ctx.LatestListLock, ctx.LatestBeaconsList)) + mxWS.HandleFunc("/ws/api/beacons", serveWs(&ctx.HTTPResults)) + mxWS.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(&ctx.LatestList)) mxWS.HandleFunc("/ws/broadcast", handleConnections(ctx.Clients, &ctx.Broadcast)) http.Handle("/ws/", mxWS) @@ -81,11 +81,11 @@ func StartHTTPServer(addr string, ctx *model.AppContext) { // TODO: rather add defer to unlock the files -func resultsHandler(lock *sync.RWMutex, httpResults *model.HTTPLocationsList) http.HandlerFunc { +func resultsHandler(httpResults *model.HTTPResultsList) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - lock.Lock() - js, err := json.Marshal(httpResults) - lock.Unlock() + httpResults.HTTPResultsLock.Lock() + defer httpResults.HTTPResultsLock.Unlock() + js, err := json.Marshal(httpResults.HTTPResults) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -128,7 +128,7 @@ func BeaconsAddHandler(beacons *model.BeaconsList) http.HandlerFunc { beacons.Beacons[inBeacon.Beacon_id] = inBeacon - err = utils.PersistBeacons(beacons) + err = persistence.PersistBeacons(beacons) if err != nil { http.Error(w, "trouble persisting beacons list, create bucket", 500) @@ -156,7 +156,7 @@ func BeaconsDeleteHandler(beacons *model.BeaconsList, buttonsList map[string]mod delete(buttonsList, beaconId) } - err := utils.PersistBeacons(beacons) + err := persistence.PersistBeacons(beacons) if err != nil { http.Error(w, "trouble persisting beacons list, create bucket", 500) return @@ -166,14 +166,14 @@ func BeaconsDeleteHandler(beacons *model.BeaconsList, buttonsList map[string]mod } } -func latestBeaconsListHandler(lock *sync.RWMutex, latestBeaconsList map[string]model.Beacon) http.HandlerFunc { +func latestBeaconsListHandler(latestList *model.LatestBeaconsList) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - lock.RLock() + latestList.LatestListLock.RLock() var la = make([]model.Beacon, 0) - for _, b := range latestBeaconsList { + for _, b := range latestList.LatestList { la = append(la, b) } - lock.RUnlock() + latestList.LatestListLock.RUnlock() js, err := json.Marshal(la) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -184,7 +184,7 @@ func latestBeaconsListHandler(lock *sync.RWMutex, latestBeaconsList map[string]m } } -func settingsListHandler(settings *model.Settings) http.HandlerFunc { +func SettingsListHandler(settings *model.Settings) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { js, err := json.Marshal(settings) if err != nil { @@ -196,7 +196,7 @@ func settingsListHandler(settings *model.Settings) http.HandlerFunc { } } -func settingsEditHandler(settings *model.Settings) http.HandlerFunc { +func SettingsEditHandler(settings *model.Settings) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inSettings model.Settings @@ -216,7 +216,7 @@ func settingsEditHandler(settings *model.Settings) http.HandlerFunc { *settings = inSettings - err = utils.PersistSettings(settings) + err = persistence.PersistSettings(settings) if err != nil { http.Error(w, "trouble persisting settings, create bucket", 500) return @@ -239,7 +239,7 @@ func reader(ws *websocket.Conn) { } } -func writer(ws *websocket.Conn, lock *sync.RWMutex, httpResults *model.HTTPLocationsList) { +func writer(ws *websocket.Conn, httpResult *model.HTTPResultsList) { pingTicker := time.NewTicker(pingPeriod) beaconTicker := time.NewTicker(beaconPeriod) defer func() { @@ -250,9 +250,9 @@ func writer(ws *websocket.Conn, lock *sync.RWMutex, httpResults *model.HTTPLocat for { select { case <-beaconTicker.C: - lock.RLock() - js, err := json.Marshal(httpResults) - lock.RUnlock() + httpResult.HTTPResultsLock.Lock() + defer httpResult.HTTPResultsLock.Unlock() + js, err := json.Marshal(httpResult.HTTPResults) if err != nil { js = []byte("error") @@ -271,7 +271,7 @@ func writer(ws *websocket.Conn, lock *sync.RWMutex, httpResults *model.HTTPLocat } } -func serveWs(lock *sync.RWMutex, httpResults *model.HTTPLocationsList) http.HandlerFunc { +func serveWs(httpResult *model.HTTPResultsList) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -281,7 +281,7 @@ func serveWs(lock *sync.RWMutex, httpResults *model.HTTPLocationsList) http.Hand return } - go writer(ws, lock, httpResults) + go writer(ws, httpResult) reader(ws) } } @@ -323,7 +323,7 @@ func latestBeaconWriter(ws *websocket.Conn, latestBeaconsList map[string]model.B } } -func serveLatestBeaconsWs(lock *sync.RWMutex, latestBeaconsList map[string]model.Beacon) http.HandlerFunc { +func serveLatestBeaconsWs(latestList *model.LatestBeaconsList) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -333,7 +333,7 @@ func serveLatestBeaconsWs(lock *sync.RWMutex, latestBeaconsList map[string]model return } - go latestBeaconWriter(ws, latestBeaconsList, lock) + go latestBeaconWriter(ws, latestList.LatestList, &latestList.LatestListLock) reader(ws) } } diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index 5767b64..2bd2ffe 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -48,10 +48,10 @@ type Advertisement struct { // BeaconMetric stores signal and distance data for a beacon. type BeaconMetric struct { - location string - distance float64 - rssi int64 - timestamp int64 + Location string + Distance float64 + Rssi int64 + Timestamp int64 } // Location defines a physical location and synchronization control. @@ -62,9 +62,9 @@ type Location struct { // BestLocation represents the most probable location of a beacon. type BestLocation struct { - distance float64 - name string - last_seen int64 + Distance float64 + Name string + Last_seen int64 } // HTTPLocation describes a beacon's state as served over HTTP. @@ -116,10 +116,10 @@ type Beacon struct { Distance float64 `json:"distance"` Previous_location string Previous_confident_location string - expired_location string + Expired_location string Location_confidence int64 Location_history []string - beacon_metrics []BeaconMetric + Beacon_metrics []BeaconMetric HB_ButtonCounter int64 `json:"hb_button_counter"` HB_ButtonCounter_Prev int64 `json:"hb_button_counter"` @@ -152,8 +152,8 @@ type BeaconsList struct { // LocationsList holds all known locations with concurrency protection. type LocationsList struct { - locations map[string]Location - lock sync.RWMutex + Locations map[string]Location + Lock sync.RWMutex } // Message defines the WebSocket or broadcast message payload. @@ -172,16 +172,25 @@ type RawReading struct { RawData string `json:"rawData"` } +type LatestBeaconsList struct { + LatestList map[string]Beacon + LatestListLock sync.RWMutex +} + +type HTTPResultsList struct { + HTTPResultsLock sync.RWMutex + HTTPResults HTTPLocationsList +} + type AppContext struct { - HTTPResultLock sync.RWMutex - HTTPResults HTTPLocationsList - LatestListLock sync.RWMutex - Beacons BeaconsList - ButtonsList map[string]Button - LatestBeaconsList map[string]Beacon - Settings Settings - Clients map[*websocket.Conn]bool - Broadcast chan Message + HTTPResults HTTPResultsList + Beacons BeaconsList + ButtonsList map[string]Button + Settings Settings + Clients map[*websocket.Conn]bool + Broadcast chan Message + Locations LocationsList + LatestList LatestBeaconsList } var World = []byte("presence") diff --git a/internal/pkg/mqtt_client/beacon.go b/internal/pkg/mqtt_client/beacon.go new file mode 100644 index 0000000..23424ef --- /dev/null +++ b/internal/pkg/mqtt_client/beacon.go @@ -0,0 +1,128 @@ +package mqtt_client + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "math" + "os/exec" + "strconv" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/yosssi/gmq/mqtt" + "github.com/yosssi/gmq/mqtt/client" +) + +func getBeaconID(incoming model.Incoming_json) string { + unique_id := fmt.Sprintf("%s", incoming.MAC) + return unique_id +} + +func updateLatestList(incoming model.Incoming_json, now int64, latestList *model.LatestBeaconsList) { + latestList.LatestListLock.Lock() + defer latestList.LatestListLock.Unlock() + + b := model.Beacon{ + Beacon_id: getBeaconID(incoming), + Beacon_type: incoming.Beacon_type, + Last_seen: now, + Incoming_JSON: incoming, + Beacon_location: incoming.Hostname, + Distance: getBeaconDistance(incoming), + } + + latestList.LatestList[b.Beacon_id] = b + + for id, v := range latestList.LatestList { + if now-v.Last_seen > 10 { + delete(latestList.LatestList, id) + } + } +} + +func updateBeaconData(beacon *model.Beacon, incoming model.Incoming_json, now int64, cl *client.Client, settings *model.Settings) { + beacon.Incoming_JSON = incoming + beacon.Last_seen = now + beacon.Beacon_type = incoming.Beacon_type + beacon.HB_ButtonCounter = incoming.HB_ButtonCounter + beacon.HB_Battery = incoming.HB_Battery + beacon.HB_RandomNonce = incoming.HB_RandomNonce + beacon.HB_ButtonMode = incoming.HB_ButtonMode + + m := model.BeaconMetric{ + Distance: getBeaconDistance(incoming), + Timestamp: now, + Rssi: int64(incoming.RSSI), + Location: incoming.Hostname, + } + + beacon.Beacon_metrics = append(beacon.Beacon_metrics, m) + if len(beacon.Beacon_metrics) > settings.Beacon_metrics_size { + beacon.Beacon_metrics = beacon.Beacon_metrics[1:] + } + + if beacon.HB_ButtonCounter_Prev != beacon.HB_ButtonCounter { + beacon.HB_ButtonCounter_Prev = incoming.HB_ButtonCounter + sendButtonPressed(*beacon, cl) + } +} + +func sendButtonPressed(beacon model.Beacon, cl *client.Client) { + btn_msg, err := json.Marshal(beacon) + if err != nil { + panic(err) + } + + err = cl.Publish(&client.PublishOptions{ + QoS: mqtt.QoS1, + TopicName: []byte("afa-systems/presence/button/" + beacon.Beacon_id), + Message: btn_msg, + }) + if err != nil { + panic(err) + } + s := fmt.Sprintf("/usr/bin/php /usr/local/presence/alarm_handler.php --idt=%s --idr=%s --st=%d", beacon.Beacon_id, beacon.Incoming_JSON.Hostname, beacon.HB_ButtonCounter) + err, out, errout := Shellout(s) + if err != nil { + log.Printf("error: %v\n", err) + } + fmt.Println("--- stdout ---") + fmt.Println(out) + fmt.Println("--- stderr ---") + fmt.Println(errout) +} + +func getBeaconDistance(incoming model.Incoming_json) float64 { + distance := 1000.0 + distance = getiBeaconDistance(incoming.RSSI, incoming.TX_power) + + return distance +} + +func getiBeaconDistance(rssi int64, power string) float64 { + ratio := float64(rssi) * (1.0 / float64(twos_comp(power))) + distance := 100.0 + if ratio < 1.0 { + distance = math.Pow(ratio, 10) + } else { + distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111 + } + return distance +} + +func twos_comp(inp string) int64 { + i, _ := strconv.ParseInt("0x"+inp, 0, 64) + + return i - 256 +} + +func Shellout(command string) (error, string, string) { + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd := exec.Command("bash", "-c", command) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return err, stdout.String(), stderr.String() +} diff --git a/internal/pkg/mqtt_client/fillter.go b/internal/pkg/mqtt_client/fillter.go new file mode 100644 index 0000000..455b8be --- /dev/null +++ b/internal/pkg/mqtt_client/fillter.go @@ -0,0 +1,35 @@ +package mqtt_client + +import ( + "fmt" + "strconv" + "strings" + + "github.com/AFASystems/presence/internal/pkg/model" +) + +func incomingBeaconFilter(incoming model.Incoming_json) model.Incoming_json { + out_json := incoming + if incoming.Beacon_type == "hb_button" { + raw_data := incoming.Data + hb_button_prefix_str := fmt.Sprintf("02010612FF5900") + if strings.HasPrefix(raw_data, hb_button_prefix_str) { + out_json.Namespace = "ddddeeeeeeffff5544ff" + counter_str := fmt.Sprintf("0x%s", raw_data[22:24]) + counter, _ := strconv.ParseInt(counter_str, 0, 64) + out_json.HB_ButtonCounter = counter + + battery_str := fmt.Sprintf("0x%s%s", raw_data[20:22], raw_data[18:20]) + + battery, _ := strconv.ParseInt(battery_str, 0, 64) + out_json.HB_Battery = battery + + out_json.TX_power = fmt.Sprintf("0x%s", "4") + + out_json.Beacon_type = "hb_button" + out_json.HB_ButtonMode = "presence_button" + } + } + + return out_json +} diff --git a/internal/pkg/mqtt_client/location.go b/internal/pkg/mqtt_client/location.go new file mode 100644 index 0000000..d5e854c --- /dev/null +++ b/internal/pkg/mqtt_client/location.go @@ -0,0 +1,165 @@ +package mqtt_client + +import ( + "encoding/json" + "log" + "time" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/persistence" + "github.com/yosssi/gmq/mqtt" + "github.com/yosssi/gmq/mqtt/client" +) + +func getLikelyLocations(settings *model.Settings, ctx *model.AppContext, cl *client.Client) { + ctx.HTTPResults.HTTPResultsLock.Lock() + defer ctx.HTTPResults.HTTPResultsLock.Unlock() + ctx.HTTPResults.HTTPResults = model.HTTPLocationsList{Beacons: []model.HTTPLocation{}} + + shouldPersist := false + + for id, beacon := range ctx.Beacons.Beacons { + if len(beacon.Beacon_metrics) == 0 { + continue + } + + if isExpired(&beacon, settings) { + handleExpiredBeacon(&beacon, cl, ctx) + continue + } + + best := calculateBestLocation(&beacon) + updateBeaconState(&beacon, best, settings, ctx, cl) + + appendHTTPResult(ctx, beacon, best) + ctx.Beacons.Beacons[id] = beacon + shouldPersist = true + } + + if shouldPersist { + persistence.PersistBeacons(&ctx.Beacons) + } +} + +func isExpired(b *model.Beacon, s *model.Settings) bool { + return time.Now().Unix()-b.Beacon_metrics[len(b.Beacon_metrics)-1].Timestamp > s.Last_seen_threshold +} + +func handleExpiredBeacon(b *model.Beacon, cl *client.Client, ctx *model.AppContext) { + if b.Expired_location == "expired" { + return + } + b.Expired_location = "expired" + msg := model.Message{ + Email: b.Previous_confident_location, + Username: b.Name, + Message: "expired", + } + data, _ := json.Marshal(msg) + log.Println(string(data)) + ctx.Broadcast <- msg +} + +func calculateBestLocation(b *model.Beacon) model.BestLocation { + locScores := map[string]float64{} + for _, m := range b.Beacon_metrics { + score := 1.5 + 0.75*(1.0-(float64(m.Rssi)/-100.0)) + locScores[m.Location] += score + } + bestName, bestScore := "", 0.0 + for name, score := range locScores { + if score > bestScore { + bestName, bestScore = name, score + } + } + last := b.Beacon_metrics[len(b.Beacon_metrics)-1] + return model.BestLocation{Name: bestName, Distance: last.Distance, Last_seen: last.Timestamp} +} + +func updateBeaconState(b *model.Beacon, best model.BestLocation, s *model.Settings, ctx *model.AppContext, cl *client.Client) { + updateLocationHistory(b, best.Name) + updateConfidence(b, best.Name, s) + + if locationChanged(b, best, s) { + publishLocationChange(b, best, cl) + b.Location_confidence = 0 + b.Previous_confident_location = best.Name + } +} + +func updateLocationHistory(b *model.Beacon, loc string) { + b.Location_history = append(b.Location_history, loc) + if len(b.Location_history) > 10 { + b.Location_history = b.Location_history[1:] + } +} + +func updateConfidence(b *model.Beacon, loc string, s *model.Settings) { + counts := map[string]int{} + for _, l := range b.Location_history { + counts[l]++ + } + + maxCount, mostCommon := 0, "" + for l, c := range counts { + if c > maxCount { + maxCount, mostCommon = c, l + } + } + + if maxCount >= 7 { + if mostCommon == b.Previous_confident_location { + b.Location_confidence++ + } else { + b.Location_confidence = 1 + b.Previous_confident_location = mostCommon + } + } +} + +func locationChanged(b *model.Beacon, best model.BestLocation, s *model.Settings) bool { + return (b.Location_confidence == s.Location_confidence && + b.Previous_confident_location != best.Name) || + b.Expired_location == "expired" +} + +func publishLocationChange(b *model.Beacon, best model.BestLocation, cl *client.Client) { + location := best.Name + if b.Expired_location == "expired" { + location = "expired" + } + + js, err := json.Marshal(model.LocationChange{ + Beacon_ref: *b, + Name: b.Name, + Previous_location: b.Previous_confident_location, + New_location: location, + Timestamp: time.Now().Unix(), + }) + if err != nil { + return + } + + err = cl.Publish(&client.PublishOptions{ + QoS: mqtt.QoS1, + TopicName: []byte("afa-systems/presence/changes"), + Message: js, + }) + if err != nil { + log.Printf("mqtt publish error: %v", err) + } +} + +func appendHTTPResult(ctx *model.AppContext, b model.Beacon, best model.BestLocation) { + ctx.HTTPResults.HTTPResultsLock.Lock() + defer ctx.HTTPResults.HTTPResultsLock.Unlock() + + r := model.HTTPLocation{ + Name: b.Name, + Beacon_id: b.Beacon_id, + Location: best.Name, + Distance: best.Distance, + Last_seen: best.Last_seen, + } + ctx.HTTPResults.HTTPResults.Beacons = append(ctx.HTTPResults.HTTPResults.Beacons, r) +} diff --git a/internal/pkg/mqtt_client/processor.go b/internal/pkg/mqtt_client/processor.go new file mode 100644 index 0000000..3a8d553 --- /dev/null +++ b/internal/pkg/mqtt_client/processor.go @@ -0,0 +1,61 @@ +package mqtt_client + +import ( + "log" + "time" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/persistence" + "github.com/boltdb/bolt" + "github.com/yosssi/gmq/mqtt/client" +) + +func IncomingMQTTProcessor(updateInterval time.Duration, cl *client.Client, db *bolt.DB, ctx *model.AppContext) chan<- model.Incoming_json { + ch := make(chan model.Incoming_json, 2000) + persistence.CreateBucketIfNotExists(db) + + ticker := time.NewTicker(updateInterval) + go runProcessor(ticker, cl, ch, ctx) + + return ch +} + +func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incoming_json, ctx *model.AppContext) { + for { + select { + case <-ticker.C: + getLikelyLocations(&ctx.Settings, ctx, cl) + case incoming := <-ch: + processIncoming(incoming, cl, ctx) + } + } +} + +func processIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model.AppContext) { + defer func() { + if err := recover(); err != nil { + log.Println("work failed:", err) + } + }() + + incoming = incomingBeaconFilter(incoming) // DONE + id := getBeaconID(incoming) // DONE + now := time.Now().Unix() + + beacons := &ctx.Beacons + + beacons.Lock.Lock() + defer beacons.Lock.Unlock() + + latestList := &ctx.LatestList + settings := &ctx.Settings + + beacon, ok := beacons.Beacons[id] + if !ok { + updateLatestList(incoming, now, latestList) // DONE + return + } + + updateBeaconData(&beacon, incoming, now, cl, settings) + beacons.Beacons[beacon.Beacon_id] = beacon +} diff --git a/internal/pkg/persistence/buckets.go b/internal/pkg/persistence/buckets.go new file mode 100644 index 0000000..83883e6 --- /dev/null +++ b/internal/pkg/persistence/buckets.go @@ -0,0 +1,18 @@ +package persistence + +import ( + "log" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/boltdb/bolt" +) + +func CreateBucketIfNotExists(db *bolt.DB) { + err := db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(model.World) + return err + }) + if err != nil { + log.Fatal(err) + } +} diff --git a/internal/pkg/persistence/load.go b/internal/pkg/persistence/load.go new file mode 100644 index 0000000..2aecf0e --- /dev/null +++ b/internal/pkg/persistence/load.go @@ -0,0 +1,39 @@ +package persistence + +import ( + "bytes" + "encoding/gob" + "log" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/boltdb/bolt" +) + +func LoadState(db *bolt.DB, ctx *model.AppContext) { + err := db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(model.World) + if bucket == nil { + return nil + } + + decode := func(key string, dest interface{}) { + val := bucket.Get([]byte(key)) + if val == nil { + return + } + buf := bytes.NewBuffer(val) + if err := gob.NewDecoder(buf).Decode(dest); err != nil { + log.Fatal("decode error: ", err) + } + } + + decode("beaconsList", &ctx.Beacons.Beacons) + decode("buttonsList", &ctx.ButtonsList) + decode("settings", &ctx.Settings) + return nil + }) + + if err != nil { + log.Fatal(err) + } +} diff --git a/internal/pkg/utils/utils.go b/internal/pkg/persistence/persist.go similarity index 98% rename from internal/pkg/utils/utils.go rename to internal/pkg/persistence/persist.go index 77f6325..bdc91ba 100644 --- a/internal/pkg/utils/utils.go +++ b/internal/pkg/persistence/persist.go @@ -1,4 +1,4 @@ -package utils +package persistence import ( "bytes" diff --git a/internal/structure.md b/internal/structure.md new file mode 100644 index 0000000..b891109 --- /dev/null +++ b/internal/structure.md @@ -0,0 +1,40 @@ +internal/ +│ +├── pkg/ +│ ├── model/ # All data types, structs, constants +│ │ ├── beacons.go +│ │ ├── settings.go +│ │ ├── context.go # AppContext with locks and maps +│ │ └── types.go +│ │ +│ ├── httpserver/ # HTTP + WebSocket handlers +│ │ ├── routes.go # Registers all endpoints +│ │ ├── handlers.go # Core REST handlers +│ │ ├── websocket.go # WS logic (connections, broadcast) +│ │ └── server.go # StartHTTPServer() +│ │ +│ ├── mqtt/ # MQTT-specific logic +│ │ ├── processor.go # IncomingMQTTProcessor + helpers +│ │ ├── publisher.go # sendHARoomMessage, sendButtonMessage +│ │ └── filters.go # incomingBeaconFilter, distance helpers +│ │ +│ ├── persistence/ # BoltDB helpers +│ │ ├── load.go # LoadState, SaveState +│ │ ├── buckets.go # createBucketIfNotExists +│ │ └── persist_beacons.go +│ │ +│ ├── utils/ # Small utility helpers (time, logging, etc.) +│ │ ├── time.go +│ │ ├── logging.go +│ │ └── shell.go +│ │ +│ └── config/ # Default values, env vars, flags +│ └── config.go +│ +└── test/ + ├── httpserver_test/ + │ └── beacons_test.go + ├── mqtt_test/ + │ └── processor_test.go + └── persistence_test/ + └── load_test.go diff --git a/test/httpserver_test/httpserver_test.go b/test/httpserver_test/httpserver_test.go index 32e5c7b..79445c3 100644 --- a/test/httpserver_test/httpserver_test.go +++ b/test/httpserver_test/httpserver_test.go @@ -112,5 +112,49 @@ func TestBeaconCRUD(t *testing.T) { } func TestSettingsCRUD(t *testing.T) { + tmpfile, _ := os.CreateTemp("", "testdb-*.db") + defer os.Remove(tmpfile.Name()) + + db, err := bolt.Open(tmpfile.Name(), 0600, nil) + if err != nil { + t.Fatal(err) + } + model.Db = db + ctx := model.AppContext{ + Settings: model.Settings{}, + } + + settings := model.Settings{ + Location_confidence: 10, + Last_seen_threshold: 10, + Beacon_metrics_size: 10, + HA_send_interval: 10, + HA_send_changes_only: true, + } + + body, err := json.Marshal(settings) + if err != nil { + t.Fatal(err) + } + + req := httptest.NewRequest("POST", "/api/settings", bytes.NewReader(body)) + w := httptest.NewRecorder() + + httpserver.SettingsEditHandler(&ctx.Settings)(w, req) + + fmt.Println("status: ", w.Code) + if w.Code != http.StatusOK { + t.Fatalf("create failed: %d", w.Code) + } + + fmt.Println("--------------------------------------------------------------") + + req = httptest.NewRequest("GET", "/api/settings", nil) + w = httptest.NewRecorder() + + httpserver.SettingsListHandler(&ctx.Settings)(w, req) + + fmt.Println("Status:", w.Code) + fmt.Println("Body:", w.Body.String()) } diff --git a/test/mqtt_test/mqtt_test.go b/test/mqtt_test/mqtt_test.go new file mode 100644 index 0000000..d9790c5 --- /dev/null +++ b/test/mqtt_test/mqtt_test.go @@ -0,0 +1,46 @@ +package mqtt_test + +import ( + "os" + "testing" + "time" + + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/mqtt" + "github.com/AFASystems/presence/internal/pkg/persistence" + "github.com/boltdb/bolt" +) + +func TestIncomingMQTTProcessor(t *testing.T) { + ctx := &model.AppContext{ + Beacons: model.BeaconsList{Beacons: make(map[string]model.Beacon)}, + Settings: model.Settings{ + Last_seen_threshold: 10, + Location_confidence: 3, + }, + } + + tmpfile, _ := os.CreateTemp("", "testdb-*.db") + defer os.Remove(tmpfile.Name()) + + db, err := bolt.Open(tmpfile.Name(), 0600, nil) + if err != nil { + t.Fatal(err) + } + model.Db = db + + persistence.LoadInitialState(ctx) + + ch := mqtt.IncomingMQTTProcessor(20*time.Millisecond, nil, model.Db, ctx) + msg := model.Incoming_json{MAC: "15:02:31", Hostname: "testHost", RSSI: -55} + ch <- msg + + time.Sleep(100 * time.Millisecond) + + ctx.Beacons.Lock.RLock() + defer ctx.Beacons.Lock.RUnlock() + + if len(ctx.LatestList.LatestList) == 0 { + t.Fatal("latest list map to update") + } +}