You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

370 regels
8.6 KiB

  1. package decoder
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  7. "github.com/AFASystems/presence/internal/pkg/model"
  8. )
  9. func TestEventLoop_RawMessageProcessing(t *testing.T) {
  10. // Setup
  11. appState := appcontext.NewAppState()
  12. mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
  13. parserRegistry := &model.ParserRegistry{}
  14. chRaw := make(chan model.BeaconAdvertisement, 10)
  15. ctx, cancel := context.WithCancel(context.Background())
  16. defer cancel()
  17. // Create a test message
  18. msg := model.BeaconAdvertisement{
  19. ID: "test-beacon",
  20. Data: "020106",
  21. }
  22. // Simulate event loop processing
  23. go func() {
  24. for {
  25. select {
  26. case <-ctx.Done():
  27. return
  28. case m := <-chRaw:
  29. processIncoming(m, appState, mockWriter, parserRegistry)
  30. }
  31. }
  32. }()
  33. // Send message
  34. chRaw <- msg
  35. // Give it time to process
  36. time.Sleep(100 * time.Millisecond)
  37. // Cancel context
  38. cancel()
  39. // Verify message was processed (even if no parser matched, processIncoming was called)
  40. // We just verify no panic occurred
  41. }
  42. func TestEventLoop_ParserRegistryUpdates(t *testing.T) {
  43. // Setup
  44. appState := appcontext.NewAppState()
  45. parserRegistry := &model.ParserRegistry{}
  46. chParser := make(chan model.KafkaParser, 10)
  47. // Test ADD operation
  48. addMsg := model.KafkaParser{
  49. ID: "add",
  50. Name: "new-parser",
  51. Config: model.Config{
  52. Name: "new-parser",
  53. Prefix: "02",
  54. Length: 2,
  55. },
  56. }
  57. chParser <- addMsg
  58. // Simulate event loop handling
  59. select {
  60. case msg := <-chParser:
  61. switch msg.ID {
  62. case "add":
  63. config := msg.Config
  64. parserRegistry.Register(config.Name, config)
  65. case "delete":
  66. parserRegistry.Unregister(msg.Name)
  67. case "update":
  68. config := msg.Config
  69. parserRegistry.Register(config.Name, config)
  70. }
  71. case <-time.After(1 * time.Second):
  72. t.Fatal("Timeout waiting for parser message")
  73. }
  74. // Verify parser was added
  75. if len(parserRegistry.ParserList) != 1 {
  76. t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList))
  77. }
  78. // Test DELETE operation
  79. deleteMsg := model.KafkaParser{
  80. ID: "delete",
  81. Name: "new-parser",
  82. }
  83. chParser <- deleteMsg
  84. select {
  85. case msg := <-chParser:
  86. switch msg.ID {
  87. case "add":
  88. config := msg.Config
  89. parserRegistry.Register(config.Name, config)
  90. case "delete":
  91. parserRegistry.Unregister(msg.Name)
  92. case "update":
  93. config := msg.Config
  94. parserRegistry.Register(config.Name, config)
  95. }
  96. case <-time.After(1 * time.Second):
  97. t.Fatal("Timeout waiting for parser message")
  98. }
  99. // Verify parser was deleted
  100. if len(parserRegistry.ParserList) != 0 {
  101. t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList))
  102. }
  103. }
  104. func TestEventLoop_UpdateParser(t *testing.T) {
  105. // Setup
  106. appState := appcontext.NewAppState()
  107. parserRegistry := &model.ParserRegistry{}
  108. // Add initial parser
  109. parserRegistry.Register("test-parser", model.Config{
  110. Name: "test-parser",
  111. Prefix: "02",
  112. Length: 2,
  113. })
  114. chParser := make(chan model.KafkaParser, 10)
  115. // Test UPDATE operation
  116. updateMsg := model.KafkaParser{
  117. ID: "update",
  118. Name: "test-parser",
  119. Config: model.Config{
  120. Name: "test-parser",
  121. Prefix: "03",
  122. Length: 3,
  123. },
  124. }
  125. chParser <- updateMsg
  126. // Simulate event loop handling
  127. select {
  128. case msg := <-chParser:
  129. switch msg.ID {
  130. case "add":
  131. config := msg.Config
  132. parserRegistry.Register(config.Name, config)
  133. case "delete":
  134. parserRegistry.Unregister(msg.Name)
  135. case "update":
  136. config := msg.Config
  137. parserRegistry.Register(config.Name, config)
  138. }
  139. case <-time.After(1 * time.Second):
  140. t.Fatal("Timeout waiting for parser message")
  141. }
  142. // Verify parser still exists (was updated, not deleted)
  143. if len(parserRegistry.ParserList) != 1 {
  144. t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList))
  145. }
  146. if _, exists := parserRegistry.ParserList["test-parser"]; !exists {
  147. t.Error("Parser should still exist after update")
  148. }
  149. }
  150. func TestEventLoop_MultipleParserOperations(t *testing.T) {
  151. // Setup
  152. appState := appcontext.NewAppState()
  153. parserRegistry := &model.ParserRegistry{}
  154. chParser := make(chan model.KafkaParser, 10)
  155. // Send multiple operations
  156. operations := []model.KafkaParser{
  157. {ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}},
  158. {ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}},
  159. {ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}},
  160. {ID: "delete", Name: "parser-2"},
  161. {ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}},
  162. }
  163. for _, op := range operations {
  164. chParser <- op
  165. }
  166. // Process all operations
  167. for i := 0; i < len(operations); i++ {
  168. select {
  169. case msg := <-chParser:
  170. switch msg.ID {
  171. case "add":
  172. config := msg.Config
  173. parserRegistry.Register(config.Name, config)
  174. case "delete":
  175. parserRegistry.Unregister(msg.Name)
  176. case "update":
  177. config := msg.Config
  178. parserRegistry.Register(config.Name, config)
  179. }
  180. case <-time.After(1 * time.Second):
  181. t.Fatalf("Timeout processing operation %d", i)
  182. }
  183. }
  184. // Verify final state
  185. if len(parserRegistry.ParserList) != 2 {
  186. t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList))
  187. }
  188. // parser-1 should exist (updated)
  189. if _, exists := parserRegistry.ParserList["parser-1"]; !exists {
  190. t.Error("parser-1 should exist")
  191. }
  192. // parser-2 should not exist (deleted)
  193. if _, exists := parserRegistry.ParserList["parser-2"]; exists {
  194. t.Error("parser-2 should not exist")
  195. }
  196. // parser-3 should exist (added)
  197. if _, exists := parserRegistry.ParserList["parser-3"]; !exists {
  198. t.Error("parser-3 should exist")
  199. }
  200. }
  201. func TestEventLoop_ContextCancellation(t *testing.T) {
  202. // Setup
  203. ctx, cancel := context.WithCancel(context.Background())
  204. defer cancel()
  205. chRaw := make(chan model.BeaconAdvertisement, 10)
  206. chParser := make(chan model.KafkaParser, 10)
  207. // Cancel immediately
  208. cancel()
  209. // Verify context is cancelled
  210. select {
  211. case <-ctx.Done():
  212. // Expected - context was cancelled
  213. return
  214. case msg := <-chRaw:
  215. t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg)
  216. case msg := <-chParser:
  217. t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg)
  218. case <-time.After(1 * time.Second):
  219. t.Error("Timeout - context cancellation should have been immediate")
  220. }
  221. }
  222. func TestEventLoop_ChannelBuffering(t *testing.T) {
  223. // Setup
  224. appState := appcontext.NewAppState()
  225. mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
  226. parserRegistry := &model.ParserRegistry{}
  227. // Create buffered channels (like in main)
  228. chRaw := make(chan model.BeaconAdvertisement, 2000)
  229. chParser := make(chan model.KafkaParser, 200)
  230. ctx, cancel := context.WithCancel(context.Background())
  231. defer cancel()
  232. // Send multiple messages without blocking
  233. for i := 0; i < 100; i++ {
  234. msg := model.BeaconAdvertisement{
  235. ID: "test-beacon",
  236. Data: "020106",
  237. }
  238. chRaw <- msg
  239. }
  240. // Verify all messages are buffered
  241. if len(chRaw) != 100 {
  242. t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw))
  243. }
  244. // Send parser updates
  245. for i := 0; i < 10; i++ {
  246. msg := model.KafkaParser{
  247. ID: "add",
  248. Name: "parser-" + string(rune('A'+i)),
  249. Config: model.Config{
  250. Name: "parser-" + string(rune('A'+i)),
  251. Prefix: "02",
  252. Length: 2,
  253. },
  254. }
  255. chParser <- msg
  256. }
  257. // Verify all parser messages are buffered
  258. if len(chParser) != 10 {
  259. t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser))
  260. }
  261. // Cancel context
  262. cancel()
  263. }
  264. func TestEventLoop_ParserAndRawChannels(t *testing.T) {
  265. // Setup
  266. appState := appcontext.NewAppState()
  267. mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
  268. parserRegistry := &model.ParserRegistry{}
  269. chRaw := make(chan model.BeaconAdvertisement, 10)
  270. chParser := make(chan model.KafkaParser, 10)
  271. ctx, cancel := context.WithCancel(context.Background())
  272. defer cancel()
  273. // Send both raw and parser messages
  274. rawMsg := model.BeaconAdvertisement{
  275. ID: "test-beacon",
  276. Data: "020106",
  277. }
  278. parserMsg := model.KafkaParser{
  279. ID: "add",
  280. Name: "test-parser",
  281. Config: model.Config{
  282. Name: "test-parser",
  283. Prefix: "02",
  284. Length: 2,
  285. },
  286. }
  287. chRaw <- rawMsg
  288. chParser <- parserMsg
  289. // Process both messages
  290. processedRaw := false
  291. processedParser := false
  292. for i := 0; i < 2; i++ {
  293. select {
  294. case <-chRaw:
  295. processedRaw = true
  296. case <-chParser:
  297. processedParser = true
  298. case <-time.After(1 * time.Second):
  299. t.Fatal("Timeout waiting for messages")
  300. }
  301. }
  302. if !processedRaw {
  303. t.Error("Raw message should have been processed")
  304. }
  305. if !processedParser {
  306. t.Error("Parser message should have been processed")
  307. }
  308. cancel()
  309. }