您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

292 行
7.7 KiB

  1. package appcontext
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/model"
  9. "github.com/redis/go-redis/v9"
  10. "github.com/segmentio/kafka-go"
  11. )
  12. // AppState provides centralized access to application state
  13. type AppState struct {
  14. beacons model.BeaconsList
  15. settings model.Settings
  16. beaconEvents model.BeaconEventList
  17. beaconsLookup map[string]struct{}
  18. latestList model.LatestBeaconsList
  19. kafkaReadersList model.KafkaReadersList
  20. kafkaWritersList model.KafkaWritersList
  21. valkeyDB *redis.Client
  22. }
  23. // NewAppState creates a new application context AppState with default values
  24. func NewAppState() *AppState {
  25. return &AppState{
  26. beacons: model.BeaconsList{
  27. Beacons: make(map[string]model.Beacon),
  28. },
  29. settings: model.Settings{
  30. Settings: model.SettingsVal{
  31. LocationConfidence: 4,
  32. LastSeenThreshold: 15,
  33. BeaconMetricSize: 30,
  34. HASendInterval: 5,
  35. HASendChangesOnly: false,
  36. RSSIEnforceThreshold: false,
  37. RSSIMinThreshold: 100,
  38. },
  39. },
  40. beaconEvents: model.BeaconEventList{
  41. Beacons: make(map[string]model.BeaconEvent),
  42. },
  43. beaconsLookup: make(map[string]struct{}),
  44. latestList: model.LatestBeaconsList{
  45. LatestList: make(map[string]model.Beacon),
  46. },
  47. kafkaReadersList: model.KafkaReadersList{
  48. KafkaReaders: make([]*kafka.Reader, 0),
  49. },
  50. kafkaWritersList: model.KafkaWritersList{
  51. KafkaWriters: make([]*kafka.Writer, 0),
  52. },
  53. }
  54. }
  55. func (m *AppState) AddValkeyClient(url string) *redis.Client {
  56. valkeyDB := redis.NewClient(&redis.Options{
  57. Addr: url,
  58. Password: "",
  59. })
  60. m.valkeyDB = valkeyDB
  61. return valkeyDB
  62. }
  63. func (m *AppState) CleanValkeyClient() {
  64. fmt.Println("shutdown of valkey client starts")
  65. if err := m.valkeyDB.Close(); err != nil {
  66. fmt.Println("Error in shuting down valkey client")
  67. }
  68. fmt.Println("Succesfully shutting down valkey client")
  69. }
  70. func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
  71. kafkaWriter := &kafka.Writer{
  72. Addr: kafka.TCP(kafkaUrl),
  73. Topic: topic,
  74. Balancer: &kafka.LeastBytes{},
  75. Async: false,
  76. RequiredAcks: kafka.RequireAll,
  77. BatchSize: 100,
  78. BatchTimeout: 10 * time.Millisecond,
  79. }
  80. m.kafkaWritersList.KafkaWritersLock.Lock()
  81. m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
  82. m.kafkaWritersList.KafkaWritersLock.Unlock()
  83. return kafkaWriter
  84. }
  85. func (m *AppState) CleanKafkaWriters() {
  86. fmt.Println("shutdown of kafka readers starts")
  87. for _, r := range m.kafkaWritersList.KafkaWriters {
  88. if err := r.Close(); err != nil {
  89. fmt.Printf("Error in closing kafka writer %v", err)
  90. }
  91. }
  92. fmt.Println("Kafka writers graceful shutdown complete")
  93. }
  94. func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
  95. brokers := strings.Split(kafkaUrl, ",")
  96. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  97. Brokers: brokers,
  98. GroupID: groupID,
  99. Topic: topic,
  100. MinBytes: 1,
  101. MaxBytes: 10e6,
  102. })
  103. m.kafkaReadersList.KafkaReadersLock.Lock()
  104. m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
  105. m.kafkaReadersList.KafkaReadersLock.Unlock()
  106. return kafkaReader
  107. }
  108. func (m *AppState) CleanKafkaReaders() {
  109. for _, r := range m.kafkaReadersList.KafkaReaders {
  110. if err := r.Close(); err != nil {
  111. fmt.Printf("Error in closing kafka reader %v", err)
  112. }
  113. }
  114. fmt.Println("Kafka readers graceful shutdown complete")
  115. }
  116. // GetBeacons returns thread-safe access to beacons list
  117. func (m *AppState) GetBeacons() *model.BeaconsList {
  118. return &m.beacons
  119. }
  120. // GetSettings returns thread-safe access to settings
  121. func (m *AppState) GetSettings() *model.Settings {
  122. return &m.settings
  123. }
  124. // GetBeaconEvents returns thread-safe access to beacon events
  125. func (m *AppState) GetBeaconEvents() *model.BeaconEventList {
  126. return &m.beaconEvents
  127. }
  128. // GetBeaconsLookup returns thread-safe access to beacon lookup map
  129. func (m *AppState) GetBeaconsLookup() map[string]struct{} {
  130. return m.beaconsLookup
  131. }
  132. // GetLatestList returns thread-safe access to latest beacons list
  133. func (m *AppState) GetLatestList() *model.LatestBeaconsList {
  134. return &m.latestList
  135. }
  136. // AddBeaconToLookup adds a beacon ID to the lookup map
  137. func (m *AppState) AddBeaconToLookup(id string) {
  138. m.beaconsLookup[id] = struct{}{}
  139. }
  140. // RemoveBeaconFromLookup removes a beacon ID from the lookup map
  141. func (m *AppState) RemoveBeaconFromLookup(id string) {
  142. delete(m.beaconsLookup, id)
  143. }
  144. func (m *AppState) RemoveBeacon(id string) {
  145. m.beacons.Lock.Lock()
  146. delete(m.beacons.Beacons, id)
  147. m.beacons.Lock.Unlock()
  148. }
  149. // BeaconExists checks if a beacon exists in the lookup
  150. func (m *AppState) BeaconExists(id string) bool {
  151. _, exists := m.beaconsLookup[id]
  152. return exists
  153. }
  154. // GetBeacon returns a beacon by ID (thread-safe)
  155. func (m *AppState) GetBeacon(id string) (model.Beacon, bool) {
  156. m.beacons.Lock.RLock()
  157. defer m.beacons.Lock.RUnlock()
  158. beacon, exists := m.beacons.Beacons[id]
  159. return beacon, exists
  160. }
  161. // UpdateBeacon updates a beacon in the list (thread-safe)
  162. func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) {
  163. m.beacons.Lock.Lock()
  164. defer m.beacons.Lock.Unlock()
  165. m.beacons.Beacons[id] = beacon
  166. }
  167. // GetBeaconEvent returns a beacon event by ID (thread-safe)
  168. func (m *AppState) GetBeaconEvent(id string) (model.BeaconEvent, bool) {
  169. m.beaconEvents.Lock.RLock()
  170. defer m.beaconEvents.Lock.RUnlock()
  171. event, exists := m.beaconEvents.Beacons[id]
  172. return event, exists
  173. }
  174. // UpdateBeaconEvent updates a beacon event in the list (thread-safe)
  175. func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
  176. m.beaconEvents.Lock.Lock()
  177. defer m.beaconEvents.Lock.Unlock()
  178. m.beaconEvents.Beacons[id] = event
  179. }
  180. // GetLatestBeacon returns the latest beacon by ID (thread-safe)
  181. func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
  182. m.latestList.Lock.RLock()
  183. defer m.latestList.Lock.RUnlock()
  184. beacon, exists := m.latestList.LatestList[id]
  185. return beacon, exists
  186. }
  187. // UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
  188. func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
  189. m.latestList.Lock.Lock()
  190. defer m.latestList.Lock.Unlock()
  191. m.latestList.LatestList[id] = beacon
  192. }
  193. // GetAllBeacons returns a copy of all beacons
  194. func (m *AppState) GetAllBeacons() map[string]model.Beacon {
  195. m.beacons.Lock.RLock()
  196. defer m.beacons.Lock.RUnlock()
  197. beacons := make(map[string]model.Beacon)
  198. for id, beacon := range m.beacons.Beacons {
  199. beacons[id] = beacon
  200. }
  201. return beacons
  202. }
  203. // GetAllLatestBeacons returns a copy of all latest beacons
  204. func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
  205. m.latestList.Lock.RLock()
  206. defer m.latestList.Lock.RUnlock()
  207. beacons := make(map[string]model.Beacon)
  208. for id, beacon := range m.latestList.LatestList {
  209. beacons[id] = beacon
  210. }
  211. return beacons
  212. }
  213. // GetBeaconCount returns the number of tracked beacons
  214. func (m *AppState) GetBeaconCount() int {
  215. m.beacons.Lock.RLock()
  216. defer m.beacons.Lock.RUnlock()
  217. return len(m.beacons.Beacons)
  218. }
  219. // GetSettingsValue returns current settings as a value
  220. func (m *AppState) GetSettingsValue() model.SettingsVal {
  221. m.settings.Lock.RLock()
  222. defer m.settings.Lock.RUnlock()
  223. return m.settings.Settings
  224. }
  225. // UpdateSettings updates the system settings (thread-safe)
  226. func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {
  227. m.settings.Lock.Lock()
  228. defer m.settings.Lock.Unlock()
  229. m.settings.Settings = newSettings
  230. }
  231. func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) {
  232. d, err := json.Marshal(m.GetSettingsValue())
  233. if err != nil {
  234. fmt.Printf("Error in marshalling settings: %v", err)
  235. return
  236. }
  237. if err := client.Set(ctx, "settings", d, 0).Err(); err != nil {
  238. fmt.Printf("Error in persisting settings: %v", err)
  239. }
  240. }