選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 

419 行
10 KiB

  1. package decoder
  2. import (
  3. "context"
  4. "encoding/json"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  9. "github.com/AFASystems/presence/internal/pkg/model"
  10. "github.com/segmentio/kafka-go"
  11. )
  12. // TestIntegration_DecoderEndToEnd tests the complete decoder flow
  13. func TestIntegration_DecoderEndToEnd(t *testing.T) {
  14. if testing.Short() {
  15. t.Skip("Skipping integration test in short mode")
  16. }
  17. // Check if Kafka is available
  18. kafkaURL := os.Getenv("KAFKA_URL")
  19. if kafkaURL == "" {
  20. kafkaURL = "localhost:9092"
  21. }
  22. // Create test topics
  23. rawTopic := "test-rawbeacons-" + time.Now().Format("20060102150405")
  24. alertTopic := "test-alertbeacons-" + time.Now().Format("20060102150405")
  25. // Setup
  26. appState := appcontext.NewAppState()
  27. parserRegistry := &model.ParserRegistry{}
  28. // Register a test parser
  29. config := model.Config{
  30. Name: "integration-test-parser",
  31. Prefix: "02",
  32. Length: 2,
  33. MinLength: 2,
  34. MaxLength: 20,
  35. }
  36. parserRegistry.Register("integration-test-parser", config)
  37. // Create Kafka writer
  38. writer := kafka.NewWriter(kafka.WriterConfig{
  39. Brokers: []string{kafkaURL},
  40. Topic: alertTopic,
  41. })
  42. defer writer.Close()
  43. // Create Kafka reader to verify messages
  44. reader := kafka.NewReader(kafka.ReaderConfig{
  45. Brokers: []string{kafkaURL},
  46. Topic: alertTopic,
  47. GroupID: "test-group-" + time.Now().Format("20060102150405"),
  48. })
  49. defer reader.Close()
  50. // Create a test beacon advertisement
  51. adv := model.BeaconAdvertisement{
  52. ID: "integration-test-beacon",
  53. Data: "020106", // Valid hex data
  54. }
  55. // Process the beacon
  56. err := decodeBeacon(adv, appState, writer, parserRegistry)
  57. if err != nil {
  58. t.Logf("Decode beacon returned error (may be expected if no parser matches): %v", err)
  59. }
  60. // Give Kafka time to propagate
  61. time.Sleep(1 * time.Second)
  62. // Verify event was stored in appState
  63. event, exists := appState.GetBeaconEvent("integration-test-beacon")
  64. if exists {
  65. t.Logf("Event stored in appState: %+v", event)
  66. }
  67. }
  68. // TestIntegration_ParserRegistryOperations tests parser registry with real Kafka
  69. func TestIntegration_ParserRegistryOperations(t *testing.T) {
  70. if testing.Short() {
  71. t.Skip("Skipping integration test in short mode")
  72. }
  73. kafkaURL := os.Getenv("KAFKA_URL")
  74. if kafkaURL == "" {
  75. kafkaURL = "localhost:9092"
  76. }
  77. alertTopic := "test-alertbeacons-registry-" + time.Now().Format("20060102150405")
  78. // Setup
  79. appState := appcontext.NewAppState()
  80. parserRegistry := &model.ParserRegistry{}
  81. writer := kafka.NewWriter(kafka.WriterConfig{
  82. Brokers: []string{kafkaURL},
  83. Topic: alertTopic,
  84. })
  85. defer writer.Close()
  86. // Test parser registration through Kafka message flow
  87. parserMsg := model.KafkaParser{
  88. ID: "add",
  89. Name: "kafka-test-parser",
  90. Config: model.Config{
  91. Name: "kafka-test-parser",
  92. Prefix: "02",
  93. Length: 2,
  94. MinLength: 2,
  95. MaxLength: 20,
  96. },
  97. }
  98. // Simulate parser registry update
  99. switch parserMsg.ID {
  100. case "add":
  101. config := parserMsg.Config
  102. parserRegistry.Register(config.Name, config)
  103. case "delete":
  104. parserRegistry.Unregister(parserMsg.Name)
  105. case "update":
  106. config := parserMsg.Config
  107. parserRegistry.Register(config.Name, config)
  108. }
  109. // Verify parser was registered
  110. if len(parserRegistry.ParserList) != 1 {
  111. t.Errorf("Expected 1 parser in registry, got %d", len(parserRegistry.ParserList))
  112. }
  113. if _, exists := parserRegistry.ParserList["kafka-test-parser"]; !exists {
  114. t.Error("Parser should exist in registry")
  115. }
  116. }
  117. // TestIntegration_MultipleBeaconsSequential tests processing multiple beacons
  118. func TestIntegration_MultipleBeaconsSequential(t *testing.T) {
  119. if testing.Short() {
  120. t.Skip("Skipping integration test in short mode")
  121. }
  122. kafkaURL := os.Getenv("KAFKA_URL")
  123. if kafkaURL == "" {
  124. kafkaURL = "localhost:9092"
  125. }
  126. alertTopic := "test-alertbeacons-multi-" + time.Now().Format("20060102150405")
  127. // Setup
  128. appState := appcontext.NewAppState()
  129. parserRegistry := &model.ParserRegistry{}
  130. // Register parser
  131. config := model.Config{
  132. Name: "multi-test-parser",
  133. Prefix: "02",
  134. Length: 2,
  135. MinLength: 2,
  136. MaxLength: 20,
  137. }
  138. parserRegistry.Register("multi-test-parser", config)
  139. writer := kafka.NewWriter(kafka.WriterConfig{
  140. Brokers: []string{kafkaURL},
  141. Topic: alertTopic,
  142. })
  143. defer writer.Close()
  144. reader := kafka.NewReader(kafka.ReaderConfig{
  145. Brokers: []string{kafkaURL},
  146. Topic: alertTopic,
  147. GroupID: "test-group-multi-" + time.Now().Format("20060102150405"),
  148. MinBytes: 10e3,
  149. MaxBytes: 10e6,
  150. })
  151. defer reader.Close()
  152. // Process multiple beacons
  153. beacons := []model.BeaconAdvertisement{
  154. {ID: "beacon-1", Data: "020106"},
  155. {ID: "beacon-2", Data: "020107"},
  156. {ID: "beacon-3", Data: "020108"},
  157. }
  158. for _, adv := range beacons {
  159. err := decodeBeacon(adv, appState, writer, parserRegistry)
  160. if err != nil {
  161. t.Logf("Processing beacon %s returned error: %v", adv.ID, err)
  162. }
  163. }
  164. // Give Kafka time to propagate
  165. time.Sleep(2 * time.Second)
  166. // Verify events in appState
  167. for _, adv := range beacons {
  168. event, exists := appState.GetBeaconEvent(adv.ID)
  169. if exists {
  170. t.Logf("Event for %s: %+v", adv.ID, event)
  171. }
  172. }
  173. }
  174. // TestIntegration_EventDeduplication tests that duplicate events are not published
  175. func TestIntegration_EventDeduplication(t *testing.T) {
  176. if testing.Short() {
  177. t.Skip("Skipping integration test in short mode")
  178. }
  179. kafkaURL := os.Getenv("KAFKA_URL")
  180. if kafkaURL == "" {
  181. kafkaURL = "localhost:9092"
  182. }
  183. alertTopic := "test-alertbeacons-dedup-" + time.Now().Format("20060102150405")
  184. // Setup
  185. appState := appcontext.NewAppState()
  186. parserRegistry := &model.ParserRegistry{}
  187. // Register parser
  188. config := model.Config{
  189. Name: "dedup-test-parser",
  190. Prefix: "02",
  191. Length: 2,
  192. MinLength: 2,
  193. MaxLength: 20,
  194. }
  195. parserRegistry.Register("dedup-test-parser", config)
  196. writer := kafka.NewWriter(kafka.WriterConfig{
  197. Brokers: []string{kafkaURL},
  198. Topic: alertTopic,
  199. })
  200. defer writer.Close()
  201. reader := kafka.NewReader(kafka.ReaderConfig{
  202. Brokers: []string{kafkaURL},
  203. Topic: alertTopic,
  204. GroupID: "test-group-dedup-" + time.Now().Format("20060102150405"),
  205. })
  206. defer reader.Close()
  207. // Create identical beacon advertisement
  208. adv := model.BeaconAdvertisement{
  209. ID: "dedup-test-beacon",
  210. Data: "020106",
  211. }
  212. // Process first time
  213. err := decodeBeacon(adv, appState, writer, parserRegistry)
  214. if err != nil {
  215. t.Logf("First processing returned error: %v", err)
  216. }
  217. // Process second time with identical data
  218. err = decodeBeacon(adv, appState, writer, parserRegistry)
  219. if err != nil {
  220. t.Logf("Second processing returned error: %v", err)
  221. }
  222. // Give Kafka time to propagate
  223. time.Sleep(1 * time.Second)
  224. // Try to read from Kafka - should have at most 1 message due to deduplication
  225. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  226. defer cancel()
  227. messageCount := 0
  228. for {
  229. msg, err := reader.ReadMessage(ctx)
  230. if err != nil {
  231. break
  232. }
  233. messageCount++
  234. t.Logf("Read message %d: %s", messageCount, string(msg.Value))
  235. if messageCount > 1 {
  236. t.Error("Expected at most 1 message due to deduplication, got more")
  237. break
  238. }
  239. }
  240. t.Logf("Total messages read: %d", messageCount)
  241. }
  242. // TestIntegration_AppStatePersistence tests that events persist in AppState
  243. func TestIntegration_AppStatePersistence(t *testing.T) {
  244. if testing.Short() {
  245. t.Skip("Skipping integration test in short mode")
  246. }
  247. kafkaURL := os.Getenv("KAFKA_URL")
  248. if kafkaURL == "" {
  249. kafkaURL = "localhost:9092"
  250. }
  251. alertTopic := "test-alertbeacons-persist-" + time.Now().Format("20060102150405")
  252. // Setup
  253. appState := appcontext.NewAppState()
  254. parserRegistry := &model.ParserRegistry{}
  255. config := model.Config{
  256. Name: "persist-test-parser",
  257. Prefix: "02",
  258. Length: 2,
  259. MinLength: 2,
  260. MaxLength: 20,
  261. }
  262. parserRegistry.Register("persist-test-parser", config)
  263. writer := kafka.NewWriter(kafka.WriterConfig{
  264. Brokers: []string{kafkaURL},
  265. Topic: alertTopic,
  266. })
  267. defer writer.Close()
  268. // Process beacon
  269. adv := model.BeaconAdvertisement{
  270. ID: "persist-test-beacon",
  271. Data: "020106",
  272. }
  273. err := decodeBeacon(adv, appState, writer, parserRegistry)
  274. if err != nil {
  275. t.Logf("Processing returned error: %v", err)
  276. }
  277. // Verify event persists in AppState
  278. event, exists := appState.GetBeaconEvent("persist-test-beacon")
  279. if !exists {
  280. t.Error("Event should exist in AppState after processing")
  281. } else {
  282. t.Logf("Event persisted: ID=%s, Type=%s, Battery=%d",
  283. event.ID, event.Type, event.Battery)
  284. // Verify event can be serialized to JSON
  285. jsonData, err := event.ToJSON()
  286. if err != nil {
  287. t.Errorf("Failed to serialize event to JSON: %v", err)
  288. } else {
  289. t.Logf("Event JSON: %s", string(jsonData))
  290. }
  291. }
  292. }
  293. // TestIntegration_ParserUpdateFlow tests updating parsers during runtime
  294. func TestIntegration_ParserUpdateFlow(t *testing.T) {
  295. if testing.Short() {
  296. t.Skip("Skipping integration test in short mode")
  297. }
  298. kafkaURL := os.Getenv("KAFKA_URL")
  299. if kafkaURL == "" {
  300. kafkaURL = "localhost:9092"
  301. }
  302. alertTopic := "test-alertbeacons-update-" + time.Now().Format("20060102150405")
  303. // Setup
  304. appState := appcontext.NewAppState()
  305. parserRegistry := &model.ParserRegistry{}
  306. writer := kafka.NewWriter(kafka.WriterConfig{
  307. Brokers: []string{kafkaURL},
  308. Topic: alertTopic,
  309. })
  310. defer writer.Close()
  311. // Initial parser config
  312. config1 := model.Config{
  313. Name: "update-test-parser",
  314. Prefix: "02",
  315. Length: 2,
  316. MinLength: 2,
  317. MaxLength: 20,
  318. }
  319. parserRegistry.Register("update-test-parser", config1)
  320. // Process with initial config
  321. adv := model.BeaconAdvertisement{
  322. ID: "update-test-beacon",
  323. Data: "020106",
  324. }
  325. err := decodeBeacon(adv, appState, writer, parserRegistry)
  326. t.Logf("First processing: %v", err)
  327. // Update parser config
  328. config2 := model.Config{
  329. Name: "update-test-parser",
  330. Prefix: "03",
  331. Length: 3,
  332. MinLength: 3,
  333. MaxLength: 25,
  334. }
  335. parserRegistry.Register("update-test-parser", config2)
  336. // Process again with updated config
  337. adv2 := model.BeaconAdvertisement{
  338. ID: "update-test-beacon-2",
  339. Data: "030107",
  340. }
  341. err = decodeBeacon(adv2, appState, writer, parserRegistry)
  342. t.Logf("Second processing with updated parser: %v", err)
  343. // Verify parser still exists
  344. if _, exists := parserRegistry.ParserList["update-test-parser"]; !exists {
  345. t.Error("Parser should exist after update")
  346. }
  347. }