You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

300 lines
8.0 KiB

  1. package appcontext
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/AFASystems/presence/internal/pkg/model"
  7. "github.com/segmentio/kafka-go"
  8. )
  9. // AppState provides centralized access to application state
  10. type AppState struct {
  11. beacons model.BeaconsList
  12. httpResults model.HTTPResultList
  13. settings model.Settings
  14. beaconEvents model.BeaconEventList
  15. beaconsLookup map[string]string
  16. latestList model.LatestBeaconsList
  17. kafkaReadersList model.KafkaReadersList
  18. kafkaWritersList model.KafkaWritersList
  19. }
  20. // NewAppState creates a new application context AppState with default values
  21. func NewAppState() *AppState {
  22. return &AppState{
  23. beacons: model.BeaconsList{
  24. Beacons: make(map[string]model.Beacon),
  25. },
  26. httpResults: model.HTTPResultList{
  27. Results: make(map[string]model.HTTPResult),
  28. },
  29. settings: model.Settings{
  30. Settings: model.SettingsVal{
  31. LocationConfidence: 4,
  32. LastSeenThreshold: 15,
  33. BeaconMetricSize: 30,
  34. HASendInterval: 5,
  35. HASendChangesOnly: false,
  36. RSSIEnforceThreshold: false,
  37. RSSIMinThreshold: 100,
  38. },
  39. },
  40. beaconEvents: model.BeaconEventList{
  41. Beacons: make(map[string]model.BeaconEvent),
  42. },
  43. beaconsLookup: make(map[string]string),
  44. latestList: model.LatestBeaconsList{
  45. LatestList: make(map[string]model.Beacon),
  46. },
  47. kafkaReadersList: model.KafkaReadersList{
  48. KafkaReaders: make([]*kafka.Reader, 0),
  49. },
  50. kafkaWritersList: model.KafkaWritersList{
  51. KafkaWriters: make([]*kafka.Writer, 0),
  52. },
  53. }
  54. }
  55. func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
  56. kafkaWriter := &kafka.Writer{
  57. Addr: kafka.TCP(kafkaUrl),
  58. Topic: topic,
  59. Balancer: &kafka.LeastBytes{},
  60. Async: false,
  61. RequiredAcks: kafka.RequireAll,
  62. BatchSize: 100,
  63. BatchTimeout: 10 * time.Millisecond,
  64. }
  65. m.kafkaWritersList.KafkaWritersLock.Lock()
  66. m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
  67. m.kafkaWritersList.KafkaWritersLock.Unlock()
  68. return kafkaWriter
  69. }
  70. func (m *AppState) CleanKafkaWriters() {
  71. fmt.Println("shutdown of kafka readers starts")
  72. for _, r := range m.kafkaWritersList.KafkaWriters {
  73. if err := r.Close(); err != nil {
  74. fmt.Printf("Error in closing kafka writer %v", err)
  75. }
  76. }
  77. fmt.Println("Kafka writers graceful shutdown complete")
  78. }
  79. func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
  80. brokers := strings.Split(kafkaUrl, ",")
  81. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  82. Brokers: brokers,
  83. GroupID: groupID,
  84. Topic: topic,
  85. MinBytes: 1,
  86. MaxBytes: 10e6,
  87. })
  88. m.kafkaReadersList.KafkaReadersLock.Lock()
  89. m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
  90. m.kafkaReadersList.KafkaReadersLock.Unlock()
  91. return kafkaReader
  92. }
  93. func (m *AppState) CleanKafkaReaders() {
  94. for _, r := range m.kafkaReadersList.KafkaReaders {
  95. if err := r.Close(); err != nil {
  96. fmt.Printf("Error in closing kafka reader %v", err)
  97. }
  98. }
  99. fmt.Println("Kafka readers graceful shutdown complete")
  100. }
  101. // GetBeacons returns thread-safe access to beacons list
  102. func (m *AppState) GetBeacons() *model.BeaconsList {
  103. return &m.beacons
  104. }
  105. // GetSettings returns thread-safe access to settings
  106. func (m *AppState) GetSettings() *model.Settings {
  107. return &m.settings
  108. }
  109. // GetBeaconEvents returns thread-safe access to beacon events
  110. func (m *AppState) GetBeaconEvents() *model.BeaconEventList {
  111. return &m.beaconEvents
  112. }
  113. // GetBeaconsLookup returns thread-safe access to beacon lookup map
  114. func (m *AppState) GetBeaconsLookup() map[string]string {
  115. return m.beaconsLookup
  116. }
  117. // GetLatestList returns thread-safe access to latest beacons list
  118. func (m *AppState) GetLatestList() *model.LatestBeaconsList {
  119. return &m.latestList
  120. }
  121. // AddBeaconToLookup adds a beacon ID to the lookup map
  122. func (m *AppState) AddBeaconToLookup(id, value string) {
  123. m.beaconsLookup[id] = value
  124. }
  125. // RemoveBeaconFromLookup removes a beacon ID from the lookup map
  126. func (m *AppState) RemoveBeaconFromLookup(id string) {
  127. delete(m.beaconsLookup, id)
  128. }
  129. func (m *AppState) CleanLookup() {
  130. clear(m.beaconsLookup)
  131. }
  132. func (m *AppState) RemoveBeacon(id string) {
  133. m.beacons.Lock.Lock()
  134. delete(m.beacons.Beacons, id)
  135. m.beacons.Lock.Unlock()
  136. }
  137. func (m *AppState) RemoveHTTPResult(id string) {
  138. m.httpResults.Lock.Lock()
  139. delete(m.httpResults.Results, id)
  140. m.httpResults.Lock.Unlock()
  141. }
  142. // BeaconExists checks if a beacon exists in the lookup
  143. func (m *AppState) BeaconExists(id string) (string, bool) {
  144. val, exists := m.beaconsLookup[id]
  145. return val, exists
  146. }
  147. // GetBeacon returns a beacon by ID (thread-safe)
  148. func (m *AppState) GetBeacon(id string) (model.Beacon, bool) {
  149. m.beacons.Lock.RLock()
  150. defer m.beacons.Lock.RUnlock()
  151. beacon, exists := m.beacons.Beacons[id]
  152. return beacon, exists
  153. }
  154. // GetHTTPResult returns a beacon from HTTP results by ID (thread-safe)
  155. func (m *AppState) GetHTTPResult(id string) (model.HTTPResult, bool) {
  156. m.httpResults.Lock.RLock()
  157. defer m.httpResults.Lock.RUnlock()
  158. beacon, exists := m.httpResults.Results[id]
  159. return beacon, exists
  160. }
  161. // UpdateHTTPResult updates a beacon in the list (thread-safe)
  162. func (m *AppState) UpdateHTTPResult(id string, beacon model.HTTPResult) {
  163. m.httpResults.Lock.Lock()
  164. defer m.httpResults.Lock.Unlock()
  165. m.httpResults.Results[id] = beacon
  166. }
  167. // UpdateBeacon updates a beacon in the list (thread-safe)
  168. func (m *AppState) UpdateBeacon(id string, beacon model.Beacon) {
  169. m.beacons.Lock.Lock()
  170. defer m.beacons.Lock.Unlock()
  171. m.beacons.Beacons[id] = beacon
  172. }
  173. // GetBeaconEvent returns a beacon event by ID (thread-safe)
  174. func (m *AppState) GetBeaconEvent(id string) (model.BeaconEvent, bool) {
  175. m.beaconEvents.Lock.RLock()
  176. defer m.beaconEvents.Lock.RUnlock()
  177. event, exists := m.beaconEvents.Beacons[id]
  178. return event, exists
  179. }
  180. // UpdateBeaconEvent updates a beacon event in the list (thread-safe)
  181. func (m *AppState) UpdateBeaconEvent(id string, event model.BeaconEvent) {
  182. m.beaconEvents.Lock.Lock()
  183. defer m.beaconEvents.Lock.Unlock()
  184. m.beaconEvents.Beacons[id] = event
  185. }
  186. // GetLatestBeacon returns the latest beacon by ID (thread-safe)
  187. func (m *AppState) GetLatestBeacon(id string) (model.Beacon, bool) {
  188. m.latestList.Lock.RLock()
  189. defer m.latestList.Lock.RUnlock()
  190. beacon, exists := m.latestList.LatestList[id]
  191. return beacon, exists
  192. }
  193. // UpdateLatestBeacon updates the latest beacon in the list (thread-safe)
  194. func (m *AppState) UpdateLatestBeacon(id string, beacon model.Beacon) {
  195. m.latestList.Lock.Lock()
  196. defer m.latestList.Lock.Unlock()
  197. m.latestList.LatestList[id] = beacon
  198. }
  199. // GetAllBeacons returns a copy of all beacons
  200. func (m *AppState) GetAllBeacons() map[string]model.Beacon {
  201. m.beacons.Lock.RLock()
  202. defer m.beacons.Lock.RUnlock()
  203. beacons := make(map[string]model.Beacon)
  204. for id, beacon := range m.beacons.Beacons {
  205. beacons[id] = beacon
  206. }
  207. return beacons
  208. }
  209. // GetAllHttpResults returns a copy of all beacons
  210. func (m *AppState) GetAllHttpResults() map[string]model.HTTPResult {
  211. m.httpResults.Lock.RLock()
  212. defer m.httpResults.Lock.RUnlock()
  213. beacons := make(map[string]model.HTTPResult)
  214. for id, beacon := range m.httpResults.Results {
  215. beacons[id] = beacon
  216. }
  217. return beacons
  218. }
  219. // GetAllLatestBeacons returns a copy of all latest beacons
  220. func (m *AppState) GetAllLatestBeacons() map[string]model.Beacon {
  221. m.latestList.Lock.RLock()
  222. defer m.latestList.Lock.RUnlock()
  223. beacons := make(map[string]model.Beacon)
  224. for id, beacon := range m.latestList.LatestList {
  225. beacons[id] = beacon
  226. }
  227. return beacons
  228. }
  229. // GetBeaconCount returns the number of tracked beacons
  230. func (m *AppState) GetBeaconCount() int {
  231. m.beacons.Lock.RLock()
  232. defer m.beacons.Lock.RUnlock()
  233. return len(m.beacons.Beacons)
  234. }
  235. // GetSettingsValue returns current settings as a value
  236. func (m *AppState) GetSettingsValue() model.SettingsVal {
  237. m.settings.Lock.RLock()
  238. defer m.settings.Lock.RUnlock()
  239. return m.settings.Settings
  240. }
  241. // UpdateSettings updates the system settings (thread-safe)
  242. func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {
  243. m.settings.Lock.Lock()
  244. defer m.settings.Lock.Unlock()
  245. m.settings.Settings = newSettings
  246. }