Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

331 строка
8.8 KiB

  1. package appcontext
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/model"
  9. "github.com/redis/go-redis/v9"
  10. "github.com/segmentio/kafka-go"
  11. )
  12. // AppState provides centralized access to application state
  13. type AppState struct {
  14. beacons model.BeaconsList
  15. httpResults model.HTTPResultList
  16. settings model.Settings
  17. beaconEvents model.BeaconEventList
  18. beaconsLookup map[string]struct{}
  19. latestList model.LatestBeaconsList
  20. kafkaReadersList model.KafkaReadersList
  21. kafkaWritersList model.KafkaWritersList
  22. valkeyDB *redis.Client
  23. }
  24. // NewAppState creates a new application context AppState with default values
  25. func NewAppState() *AppState {
  26. return &AppState{
  27. beacons: model.BeaconsList{
  28. Beacons: make(map[string]model.Beacon),
  29. },
  30. httpResults: model.HTTPResultList{
  31. Results: make(map[string]model.HTTPResult),
  32. },
  33. settings: model.Settings{
  34. Settings: model.SettingsVal{
  35. LocationConfidence: 4,
  36. LastSeenThreshold: 15,
  37. BeaconMetricSize: 30,
  38. HASendInterval: 5,
  39. HASendChangesOnly: false,
  40. RSSIEnforceThreshold: false,
  41. RSSIMinThreshold: 100,
  42. },
  43. },
  44. beaconEvents: model.BeaconEventList{
  45. Beacons: make(map[string]model.BeaconEvent),
  46. },
  47. beaconsLookup: make(map[string]struct{}),
  48. latestList: model.LatestBeaconsList{
  49. LatestList: make(map[string]model.Beacon),
  50. },
  51. kafkaReadersList: model.KafkaReadersList{
  52. KafkaReaders: make([]*kafka.Reader, 0),
  53. },
  54. kafkaWritersList: model.KafkaWritersList{
  55. KafkaWriters: make([]*kafka.Writer, 0),
  56. },
  57. }
  58. }
  59. func (m *AppState) AddValkeyClient(url string) *redis.Client {
  60. valkeyDB := redis.NewClient(&redis.Options{
  61. Addr: url,
  62. Password: "",
  63. })
  64. m.valkeyDB = valkeyDB
  65. return valkeyDB
  66. }
  67. func (m *AppState) CleanValkeyClient() {
  68. fmt.Println("shutdown of valkey client starts")
  69. if err := m.valkeyDB.Close(); err != nil {
  70. fmt.Println("Error in shuting down valkey client")
  71. }
  72. fmt.Println("Succesfully shutting down valkey client")
  73. }
  74. func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
  75. kafkaWriter := &kafka.Writer{
  76. Addr: kafka.TCP(kafkaUrl),
  77. Topic: topic,
  78. Balancer: &kafka.LeastBytes{},
  79. Async: false,
  80. RequiredAcks: kafka.RequireAll,
  81. BatchSize: 100,
  82. BatchTimeout: 10 * time.Millisecond,
  83. }
  84. m.kafkaWritersList.KafkaWritersLock.Lock()
  85. m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
  86. m.kafkaWritersList.KafkaWritersLock.Unlock()
  87. return kafkaWriter
  88. }
  89. func (m *AppState) CleanKafkaWriters() {
  90. fmt.Println("shutdown of kafka readers starts")
  91. for _, r := range m.kafkaWritersList.KafkaWriters {
  92. if err := r.Close(); err != nil {
  93. fmt.Printf("Error in closing kafka writer %v", err)
  94. }
  95. }
  96. fmt.Println("Kafka writers graceful shutdown complete")
  97. }
  98. func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
  99. brokers := strings.Split(kafkaUrl, ",")
  100. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  101. Brokers: brokers,
  102. GroupID: groupID,
  103. Topic: topic,
  104. MinBytes: 1,
  105. MaxBytes: 10e6,
  106. })
  107. m.kafkaReadersList.KafkaReadersLock.Lock()
  108. m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
  109. m.kafkaReadersList.KafkaReadersLock.Unlock()
  110. return kafkaReader
  111. }
  112. func (m *AppState) CleanKafkaReaders() {
  113. for _, r := range m.kafkaReadersList.KafkaReaders {
  114. if err := r.Close(); err != nil {
  115. fmt.Printf("Error in closing kafka reader %v", err)
  116. }
  117. }
  118. fmt.Println("Kafka readers graceful shutdown complete")
  119. }
  120. // GetBeacons returns thread-safe access to beacons list
  121. func (m *AppState) GetBeacons() *model.BeaconsList {
  122. return &m.beacons
  123. }
  124. // GetSettings returns thread-safe access to settings
  125. func (m *AppState) GetSettings() *model.Settings {
  126. return &m.settings
  127. }
  128. // GetBeaconEvents returns thread-safe access to beacon events
  129. func (m *AppState) GetBeaconEvents() *model.BeaconEventList {
  130. return &m.beaconEvents
  131. }
  132. // GetBeaconsLookup returns thread-safe access to beacon lookup map
  133. func (m *AppState) GetBeaconsLookup() map[string]struct{} {
  134. return m.beaconsLookup
  135. }
  136. // GetLatestList returns thread-safe access to latest beacons list
  137. func (m *AppState) GetLatestList() *model.LatestBeaconsList {
  138. return &m.latestList
  139. }
  140. // AddBeaconToLookup adds a beacon ID to the lookup map
  141. func (m *AppState) AddBeaconToLookup(id string) {
  142. m.beaconsLookup[id] = struct{}{}
  143. }
  144. // RemoveBeaconFromLookup removes a beacon ID from the lookup map
  145. func (m *AppState) RemoveBeaconFromLookup(id string) {
  146. delete(m.beaconsLookup, id)
  147. }
  148. func (m *AppState) RemoveBeacon(id string) {
  149. m.beacons.Lock.Lock()
  150. delete(m.beacons.Beacons, id)
  151. m.beacons.Lock.Unlock()
  152. }
  153. func (m *AppState) RemoveHTTPResult(id string) {
  154. m.httpResults.Lock.Lock()
  155. delete(m.httpResults.Results, id)
  156. m.httpResults.Lock.Unlock()
  157. }
  158. // BeaconExists checks if a beacon exists in the lookup
  159. func (m *AppState) BeaconExists(id string) bool {
  160. _, exists := m.beaconsLookup[id]
  161. return exists
  162. }
  163. // GetBeacon returns a beacon by ID (thread-safe)
  164. func (m *AppState) GetBeacon(id string) (model.Beacon, bool) {
  165. m.beacons.Lock.RLock()
  166. defer m.beacons.Lock.RUnlock()
  167. beacon, exists := m.beacons.Beacons[id]
  168. return beacon, exists
  169. }
  170. // GetHTTPResult returns a beacon from HTTP results by ID (thread-safe)
  171. func (m *AppState) GetHTTPResult(id string) (model.HTTPResult, bool) {
  172. m.httpResults.Lock.RLock()
  173. defer m.httpResults.Lock.RUnlock()
  174. beacon, exists := m.httpResults.Results[id]
  175. return beacon, exists
  176. }
  177. // UpdateHTTPResult updates a beacon in the list (thread-safe)
  178. func (m *AppState) UpdateHTTPResult(id string, beacon model.HTTPResult) {
  179. m.httpResults.Lock.Lock()
  180. defer m.httpResults.Lock.Unlock()
  181. m.httpResults.Results[id] = beacon
  182. }
  183. // UpdateBeacon updates a beacon in the list (thread-safe)
  184. func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) {
  185. m.beacons.Lock.Lock()
  186. defer m.beacons.Lock.Unlock()
  187. m.beacons.Beacons[id] = beacon
  188. }
  189. // GetBeaconEvent returns a beacon event by ID (thread-safe)
  190. func (m *AppState) GetBeaconEvent(id string) (model.BeaconEvent, bool) {
  191. m.beaconEvents.Lock.RLock()
  192. defer m.beaconEvents.Lock.RUnlock()
  193. event, exists := m.beaconEvents.Beacons[id]
  194. return event, exists
  195. }
  196. // UpdateBeaconEvent updates a beacon event in the list (thread-safe)
  197. func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
  198. m.beaconEvents.Lock.Lock()
  199. defer m.beaconEvents.Lock.Unlock()
  200. m.beaconEvents.Beacons[id] = event
  201. }
  202. // GetLatestBeacon returns the latest beacon by ID (thread-safe)
  203. func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
  204. m.latestList.Lock.RLock()
  205. defer m.latestList.Lock.RUnlock()
  206. beacon, exists := m.latestList.LatestList[id]
  207. return beacon, exists
  208. }
  209. // UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
  210. func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
  211. m.latestList.Lock.Lock()
  212. defer m.latestList.Lock.Unlock()
  213. m.latestList.LatestList[id] = beacon
  214. }
  215. // GetAllBeacons returns a copy of all beacons
  216. func (m *AppState) GetAllBeacons() map[string]model.Beacon {
  217. m.beacons.Lock.RLock()
  218. defer m.beacons.Lock.RUnlock()
  219. beacons := make(map[string]model.Beacon)
  220. for id, beacon := range m.beacons.Beacons {
  221. beacons[id] = beacon
  222. }
  223. return beacons
  224. }
  225. // GetAllHttpResults returns a copy of all beacons
  226. func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult {
  227. m.httpResults.Lock.RLock()
  228. defer m.httpResults.Lock.RUnlock()
  229. beacons := make(map[string]model.HTTPResult)
  230. for id, beacon := range m.httpResults.Results {
  231. beacons[id] = beacon
  232. }
  233. return beacons
  234. }
  235. // GetAllLatestBeacons returns a copy of all latest beacons
  236. func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
  237. m.latestList.Lock.RLock()
  238. defer m.latestList.Lock.RUnlock()
  239. beacons := make(map[string]model.Beacon)
  240. for id, beacon := range m.latestList.LatestList {
  241. beacons[id] = beacon
  242. }
  243. return beacons
  244. }
  245. // GetBeaconCount returns the number of tracked beacons
  246. func (m *AppState) GetBeaconCount() int {
  247. m.beacons.Lock.RLock()
  248. defer m.beacons.Lock.RUnlock()
  249. return len(m.beacons.Beacons)
  250. }
  251. // GetSettingsValue returns current settings as a value
  252. func (m *AppState) GetSettingsValue() model.SettingsVal {
  253. m.settings.Lock.RLock()
  254. defer m.settings.Lock.RUnlock()
  255. return m.settings.Settings
  256. }
  257. // UpdateSettings updates the system settings (thread-safe)
  258. func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {
  259. m.settings.Lock.Lock()
  260. defer m.settings.Lock.Unlock()
  261. m.settings.Settings = newSettings
  262. }
  263. func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) {
  264. d, err := json.Marshal(m.GetSettingsValue())
  265. if err != nil {
  266. fmt.Printf("Error in marshalling settings: %v", err)
  267. return
  268. }
  269. if err := client.Set(ctx, "settings", d, 0).Err(); err != nil {
  270. fmt.Printf("Error in persisting settings: %v", err)
  271. }
  272. }