You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

286 regels
7.6 KiB

  1. package appcontext
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/model"
  9. "github.com/redis/go-redis/v9"
  10. "github.com/segmentio/kafka-go"
  11. )
  12. // AppState provides centralized access to application state
  13. type AppState struct {
  14. beacons model.BeaconsList
  15. settings model.Settings
  16. beaconEvents model.BeaconEventList
  17. beaconsLookup map[string]struct{}
  18. latestList model.LatestBeaconsList
  19. kafkaReadersList model.KafkaReadersList
  20. kafkaWritersList model.KafkaWritersList
  21. valkeyDB *redis.Client
  22. }
  23. // NewAppState creates a new application context AppState with default values
  24. func NewAppState() *AppState {
  25. return &AppState{
  26. beacons: model.BeaconsList{
  27. Beacons: make(map[string]model.Beacon),
  28. },
  29. settings: model.Settings{
  30. Settings: model.SettingsVal{
  31. LocationConfidence: 4,
  32. LastSeenThreshold: 15,
  33. BeaconMetricSize: 30,
  34. HASendInterval: 5,
  35. HASendChangesOnly: false,
  36. RSSIEnforceThreshold: false,
  37. RSSIMinThreshold: 100,
  38. },
  39. },
  40. beaconEvents: model.BeaconEventList{
  41. Beacons: make(map[string]model.BeaconEvent),
  42. },
  43. beaconsLookup: make(map[string]struct{}),
  44. latestList: model.LatestBeaconsList{
  45. LatestList: make(map[string]model.Beacon),
  46. },
  47. kafkaReadersList: model.KafkaReadersList{
  48. KafkaReaders: make([]*kafka.Reader, 0),
  49. },
  50. kafkaWritersList: model.KafkaWritersList{
  51. KafkaWriters: make([]*kafka.Writer, 0),
  52. },
  53. }
  54. }
  55. func (m *AppState) AddValkeyClient(url string) *redis.Client {
  56. valkeyDB := redis.NewClient(&redis.Options{
  57. Addr: url,
  58. Password: "",
  59. })
  60. m.valkeyDB = valkeyDB
  61. return valkeyDB
  62. }
  63. func (m *AppState) CleanValkeyClient() {
  64. fmt.Println("shutdown of valkey client starts")
  65. if err := m.valkeyDB.Close(); err != nil {
  66. fmt.Println("Error in shuting down valkey client")
  67. }
  68. fmt.Println("Succesfully shutting down valkey client")
  69. }
  70. func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
  71. kafkaWriter := &kafka.Writer{
  72. Addr: kafka.TCP(kafkaUrl),
  73. Topic: topic,
  74. Balancer: &kafka.LeastBytes{},
  75. Async: false,
  76. RequiredAcks: kafka.RequireAll,
  77. BatchSize: 100,
  78. BatchTimeout: 10 * time.Millisecond,
  79. }
  80. m.kafkaWritersList.KafkaWritersLock.Lock()
  81. m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
  82. m.kafkaWritersList.KafkaWritersLock.Unlock()
  83. return kafkaWriter
  84. }
  85. func (m *AppState) CleanKafkaWriters() {
  86. fmt.Println("shutdown of kafka readers starts")
  87. for _, r := range m.kafkaWritersList.KafkaWriters {
  88. if err := r.Close(); err != nil {
  89. fmt.Printf("Error in closing kafka writer %v", err)
  90. }
  91. }
  92. fmt.Println("Kafka writers graceful shutdown complete")
  93. }
  94. func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
  95. brokers := strings.Split(kafkaUrl, ",")
  96. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  97. Brokers: brokers,
  98. GroupID: groupID,
  99. Topic: topic,
  100. MinBytes: 1,
  101. MaxBytes: 10e6,
  102. })
  103. m.kafkaReadersList.KafkaReadersLock.Lock()
  104. m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
  105. m.kafkaReadersList.KafkaReadersLock.Unlock()
  106. return kafkaReader
  107. }
  108. func (m *AppState) CleanKafkaReaders() {
  109. for _, r := range m.kafkaReadersList.KafkaReaders {
  110. if err := r.Close(); err != nil {
  111. fmt.Printf("Error in closing kafka reader %v", err)
  112. }
  113. }
  114. fmt.Println("Kafka readers graceful shutdown complete")
  115. }
  116. // GetBeacons returns thread-safe access to beacons list
  117. func (m *AppState) GetBeacons() *model.BeaconsList {
  118. return &m.beacons
  119. }
  120. // GetSettings returns thread-safe access to settings
  121. func (m *AppState) GetSettings() *model.Settings {
  122. return &m.settings
  123. }
  124. // GetBeaconEvents returns thread-safe access to beacon events
  125. func (m *AppState) GetBeaconEvents() *model.BeaconEventList {
  126. return &m.beaconEvents
  127. }
  128. // GetBeaconsLookup returns thread-safe access to beacon lookup map
  129. func (m *AppState) GetBeaconsLookup() map[string]struct{} {
  130. return m.beaconsLookup
  131. }
  132. // GetLatestList returns thread-safe access to latest beacons list
  133. func (m *AppState) GetLatestList() *model.LatestBeaconsList {
  134. return &m.latestList
  135. }
  136. // AddBeaconToLookup adds a beacon ID to the lookup map
  137. func (m *AppState) AddBeaconToLookup(id string) {
  138. m.beaconsLookup[id] = struct{}{}
  139. }
  140. // RemoveBeaconFromLookup removes a beacon ID from the lookup map
  141. func (m *AppState) RemoveBeaconFromLookup(id string) {
  142. delete(m.beaconsLookup, id)
  143. }
  144. // BeaconExists checks if a beacon exists in the lookup
  145. func (m *AppState) BeaconExists(id string) bool {
  146. _, exists := m.beaconsLookup[id]
  147. return exists
  148. }
  149. // GetBeacon returns a beacon by ID (thread-safe)
  150. func (m *AppState) GetBeacon(id string) (model.Beacon, bool) {
  151. m.beacons.Lock.RLock()
  152. defer m.beacons.Lock.RUnlock()
  153. beacon, exists := m.beacons.Beacons[id]
  154. return beacon, exists
  155. }
  156. // UpdateBeacon updates a beacon in the list (thread-safe)
  157. func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) {
  158. m.beacons.Lock.Lock()
  159. defer m.beacons.Lock.Unlock()
  160. m.beacons.Beacons[id] = beacon
  161. }
  162. // GetBeaconEvent returns a beacon event by ID (thread-safe)
  163. func (m *AppState) GetBeaconEvent(id string) (model.BeaconEvent, bool) {
  164. m.beaconEvents.Lock.RLock()
  165. defer m.beaconEvents.Lock.RUnlock()
  166. event, exists := m.beaconEvents.Beacons[id]
  167. return event, exists
  168. }
  169. // UpdateBeaconEvent updates a beacon event in the list (thread-safe)
  170. func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
  171. m.beaconEvents.Lock.Lock()
  172. defer m.beaconEvents.Lock.Unlock()
  173. m.beaconEvents.Beacons[id] = event
  174. }
  175. // GetLatestBeacon returns the latest beacon by ID (thread-safe)
  176. func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
  177. m.latestList.Lock.RLock()
  178. defer m.latestList.Lock.RUnlock()
  179. beacon, exists := m.latestList.LatestList[id]
  180. return beacon, exists
  181. }
  182. // UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
  183. func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
  184. m.latestList.Lock.Lock()
  185. defer m.latestList.Lock.Unlock()
  186. m.latestList.LatestList[id] = beacon
  187. }
  188. // GetAllBeacons returns a copy of all beacons
  189. func (m *AppState) GetAllBeacons() map[string]model.Beacon {
  190. m.beacons.Lock.RLock()
  191. defer m.beacons.Lock.RUnlock()
  192. beacons := make(map[string]model.Beacon)
  193. for id, beacon := range m.beacons.Beacons {
  194. beacons[id] = beacon
  195. }
  196. return beacons
  197. }
  198. // GetAllLatestBeacons returns a copy of all latest beacons
  199. func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
  200. m.latestList.Lock.RLock()
  201. defer m.latestList.Lock.RUnlock()
  202. beacons := make(map[string]model.Beacon)
  203. for id, beacon := range m.latestList.LatestList {
  204. beacons[id] = beacon
  205. }
  206. return beacons
  207. }
  208. // GetBeaconCount returns the number of tracked beacons
  209. func (m *AppState) GetBeaconCount() int {
  210. m.beacons.Lock.RLock()
  211. defer m.beacons.Lock.RUnlock()
  212. return len(m.beacons.Beacons)
  213. }
  214. // GetSettingsValue returns current settings as a value
  215. func (m *AppState) GetSettingsValue() model.SettingsVal {
  216. m.settings.Lock.RLock()
  217. defer m.settings.Lock.RUnlock()
  218. return m.settings.Settings
  219. }
  220. // UpdateSettings updates the system settings (thread-safe)
  221. func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {
  222. m.settings.Lock.Lock()
  223. defer m.settings.Lock.Unlock()
  224. m.settings.Settings = newSettings
  225. }
  226. func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) {
  227. d, err := json.Marshal(m.GetSettingsValue())
  228. if err != nil {
  229. fmt.Printf("Error in marshalling settings: %v", err)
  230. return
  231. }
  232. if err := client.Set(ctx, "settings", d, 0).Err(); err != nil {
  233. fmt.Printf("Error in persisting settings: %v", err)
  234. }
  235. }