| @@ -28,4 +28,4 @@ main | |||
| *.sh | |||
| *.log | |||
| **/*.log | |||
| @@ -4,3 +4,14 @@ | |||
| {"time":"2025-12-04T12:58:48.509999563+01:00","level":"INFO","msg":"Locations algorithm initialized, subscribed to Kafka topics"} | |||
| {"time":"2025-12-04T13:00:19.083141862+01:00","level":"INFO","msg":"broken out of the main event loop"} | |||
| {"time":"2025-12-04T13:00:19.083262279+01:00","level":"INFO","msg":"All go routines have stopped, Beggining to close Kafka connections"} | |||
| {"time":"2025-12-11T20:57:46.752992493+01:00","level":"INFO","msg":"Locations algorithm initialized, subscribed to Kafka topics"} | |||
| {"time":"2025-12-11T20:58:32.221403094+01:00","level":"INFO","msg":"Beacon added to lookup: C83F8F17DB35"} | |||
| {"time":"2025-12-11T20:58:49.013687369+01:00","level":"INFO","msg":"broken out of the main event loop"} | |||
| {"time":"2025-12-11T20:58:49.01380182+01:00","level":"INFO","msg":"All go routines have stopped, Beggining to close Kafka connections"} | |||
| {"time":"2025-12-11T22:16:33.000113327+01:00","level":"INFO","msg":"Locations algorithm initialized, subscribed to Kafka topics"} | |||
| {"time":"2025-12-11T22:16:56.930565092+01:00","level":"INFO","msg":"Beacon added to lookup: C83F8F17DB35"} | |||
| {"time":"2025-12-11T22:17:01.230339099+01:00","level":"INFO","msg":"Beacon added to lookup: C83F8F17DB35"} | |||
| {"time":"2025-12-11T22:17:13.636880745+01:00","level":"INFO","msg":"Beacon added to lookup: C83F8F17DB35"} | |||
| {"time":"2025-12-11T22:17:23.816515709+01:00","level":"INFO","msg":"Beacon added to lookup: C83F8F17DB35"} | |||
| {"time":"2025-12-11T22:17:41.981977922+01:00","level":"INFO","msg":"broken out of the main event loop"} | |||
| {"time":"2025-12-11T22:17:41.98203123+01:00","level":"INFO","msg":"All go routines have stopped, Beggining to close Kafka connections"} | |||
| @@ -175,7 +175,7 @@ func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Conte | |||
| ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | |||
| return | |||
| case <-beaconTicker.C: | |||
| beacons := appstate.GetAllBeacons() | |||
| beacons := appstate.GetAllHttpResults() | |||
| js, err := json.Marshal(beacons) | |||
| if err != nil { | |||
| js = []byte("error") | |||
| @@ -15,6 +15,7 @@ import ( | |||
| // AppState provides centralized access to application state | |||
| type AppState struct { | |||
| beacons model.BeaconsList | |||
| httpResults model.HTTPResultList | |||
| settings model.Settings | |||
| beaconEvents model.BeaconEventList | |||
| beaconsLookup map[string]struct{} | |||
| @@ -30,6 +31,9 @@ func NewAppState() *AppState { | |||
| beacons: model.BeaconsList{ | |||
| Beacons: make(map[string]model.Beacon), | |||
| }, | |||
| httpResults: model.HTTPResultList{ | |||
| Results: make(map[string]model.HTTPResult), | |||
| }, | |||
| settings: model.Settings{ | |||
| Settings: model.SettingsVal{ | |||
| LocationConfidence: 4, | |||
| @@ -173,6 +177,12 @@ func (m *AppState) RemoveBeacon(id string) { | |||
| m.beacons.Lock.Unlock() | |||
| } | |||
| func (m *AppState) RemoveHTTPResult(id string) { | |||
| m.httpResults.Lock.Lock() | |||
| delete(m.httpResults.Results, id) | |||
| m.httpResults.Lock.Unlock() | |||
| } | |||
| // BeaconExists checks if a beacon exists in the lookup | |||
| func (m *AppState) BeaconExists(id string) bool { | |||
| _, exists := m.beaconsLookup[id] | |||
| @@ -188,6 +198,23 @@ func (m *AppState) GetBeacon(id string) (model.Beacon, bool) { | |||
| return beacon, exists | |||
| } | |||
| // GetHTTPResult returns a beacon from HTTP results by ID (thread-safe) | |||
| func (m *AppState) GetHTTPResult(id string) (model.HTTPResult, bool) { | |||
| m.httpResults.Lock.RLock() | |||
| defer m.httpResults.Lock.RUnlock() | |||
| beacon, exists := m.httpResults.Results[id] | |||
| return beacon, exists | |||
| } | |||
| // UpdateHTTPResult updates a beacon in the list (thread-safe) | |||
| func (m *AppState) UpdateHTTPResult(id string, beacon model.HTTPResult) { | |||
| m.httpResults.Lock.Lock() | |||
| defer m.httpResults.Lock.Unlock() | |||
| m.httpResults.Results[id] = beacon | |||
| } | |||
| // UpdateBeacon updates a beacon in the list (thread-safe) | |||
| func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) { | |||
| m.beacons.Lock.Lock() | |||
| @@ -242,6 +269,18 @@ func (m *AppState) GetAllBeacons() map[string]model.Beacon { | |||
| return beacons | |||
| } | |||
| // GetAllHttpResults returns a copy of all beacons | |||
| func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult { | |||
| m.httpResults.Lock.RLock() | |||
| defer m.httpResults.Lock.RUnlock() | |||
| beacons := make(map[string]model.HTTPResult) | |||
| for id, beacon := range m.httpResults.Results { | |||
| beacons[id] = beacon | |||
| } | |||
| return beacons | |||
| } | |||
| // GetAllLatestBeacons returns a copy of all latest beacons | |||
| func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon { | |||
| m.latestList.Lock.RLock() | |||
| @@ -17,7 +17,7 @@ func BeaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc | |||
| return func(w http.ResponseWriter, r *http.Request) { | |||
| vars := mux.Vars(r) | |||
| id := vars["beacon_id"] | |||
| beacon, ok := appstate.GetBeacon(id) | |||
| beacon, ok := appstate.GetHTTPResult(id) | |||
| if !ok { | |||
| w.Header().Set("Content-Type", "application/json") | |||
| w.WriteHeader(http.StatusNotFound) | |||
| @@ -33,7 +33,7 @@ func BeaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc | |||
| func BeaconsListController(appstate *appcontext.AppState) http.HandlerFunc { | |||
| return func(w http.ResponseWriter, r *http.Request) { | |||
| beacons := appstate.GetAllBeacons() | |||
| beacons := appstate.GetAllHttpResults() | |||
| w.Header().Set("Content-Type", "application/json") | |||
| w.WriteHeader(http.StatusOK) | |||
| json.NewEncoder(w).Encode(beacons) | |||
| @@ -77,7 +77,7 @@ func BeaconsDeleteController(writer *kafka.Writer, ctx context.Context, appstate | |||
| } | |||
| // If message is succesfully sent delete the beacon from the list | |||
| appstate.RemoveBeacon(beaconId) | |||
| appstate.RemoveHTTPResult(beaconId) | |||
| w.Write([]byte("ok")) | |||
| } | |||
| } | |||
| @@ -87,30 +87,22 @@ func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.Handle | |||
| decoder := json.NewDecoder(r.Body) | |||
| var inBeacon model.Beacon | |||
| err := decoder.Decode(&inBeacon) | |||
| fmt.Printf("hello world\n") | |||
| if err != nil { | |||
| http.Error(w, err.Error(), 400) | |||
| return | |||
| } | |||
| fmt.Printf("hello world\n") | |||
| fmt.Printf("in beacon: %+v\n", inBeacon) | |||
| if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { | |||
| http.Error(w, "name and beacon_id cannot be blank", 400) | |||
| return | |||
| } | |||
| fmt.Printf("Adding new print here also\n") | |||
| // fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) | |||
| apiUpdate := model.ApiUpdate{ | |||
| Method: "POST", | |||
| Beacon: inBeacon, | |||
| } | |||
| fmt.Printf("message: %+v\n", apiUpdate) | |||
| if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { | |||
| fmt.Println("error in sending Kafka POST message") | |||
| http.Error(w, "Error in sending kafka message", 500) | |||
| @@ -111,6 +111,22 @@ type BeaconEvent struct { | |||
| Event int | |||
| } | |||
| type HTTPResult struct { | |||
| ID string `json:"ID"` | |||
| BeaconType string `json:"type"` | |||
| Battery int64 `json:"battery"` | |||
| Event int `json:"event"` | |||
| Location string `json:"location"` | |||
| Distance float64 `json:"distance"` | |||
| LastSeen int64 `json:"timestamp"` | |||
| PreviousConfidentLocation string `json:"previous_confident_location"` | |||
| } | |||
| type HTTPResultList struct { | |||
| Results map[string]HTTPResult | |||
| Lock sync.RWMutex | |||
| } | |||
| // BeaconsList holds all known beacons and their synchronization lock. | |||
| type BeaconsList struct { | |||
| Beacons map[string]Beacon `json:"beacons"` | |||
| @@ -34,16 +34,16 @@ func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client | |||
| func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error { | |||
| id := msg.ID | |||
| beacon, ok := appState.GetBeacon(id) | |||
| beacon, ok := appState.GetHTTPResult(id) | |||
| if !ok { | |||
| appState.UpdateBeacon(id, model.Beacon{ID: id, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) | |||
| appState.UpdateHTTPResult(id, model.HTTPResult{ID: id, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) | |||
| } else { | |||
| beacon.ID = id | |||
| beacon.Location = msg.Location | |||
| beacon.Distance = msg.Distance | |||
| beacon.LastSeen = msg.LastSeen | |||
| beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation | |||
| appState.UpdateBeacon(id, beacon) | |||
| appState.UpdateHTTPResult(id, beacon) | |||
| } | |||
| if err := persistBeaconValkey(id, msg, client, ctx); err != nil { | |||
| return err | |||
| @@ -54,15 +54,15 @@ func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppSta | |||
| func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error { | |||
| id := msg.ID | |||
| beacon, ok := appState.GetBeacon(id) | |||
| beacon, ok := appState.GetHTTPResult(id) | |||
| if !ok { | |||
| appState.UpdateBeacon(id, model.Beacon{ID: id, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) | |||
| appState.UpdateHTTPResult(id, model.HTTPResult{ID: id, BeaconType: msg.Type, Battery: int64(msg.Battery), Event: msg.Event}) | |||
| } else { | |||
| beacon.ID = id | |||
| beacon.BeaconType = msg.Type | |||
| beacon.HSBattery = int64(msg.Battery) | |||
| beacon.Battery = int64(msg.Battery) | |||
| beacon.Event = msg.Event | |||
| appState.UpdateBeacon(id, beacon) | |||
| appState.UpdateHTTPResult(id, beacon) | |||
| } | |||
| if err := persistBeaconValkey(id, msg, client, ctx); err != nil { | |||
| return err | |||