選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 

178 行
5.8 KiB

  1. package apiclient
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "log/slog"
  7. "net/http"
  8. "reflect"
  9. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  10. "github.com/AFASystems/presence/internal/pkg/config"
  11. "github.com/AFASystems/presence/internal/pkg/controller"
  12. "github.com/AFASystems/presence/internal/pkg/model"
  13. "github.com/segmentio/kafka-go"
  14. "gorm.io/gorm"
  15. "gorm.io/gorm/clause"
  16. )
  17. func UpdateDB(db *gorm.DB, ctx context.Context, cfg *config.Config, writer *kafka.Writer, appState *appcontext.AppState) error {
  18. slog.Info("UpdateDB: Avvio procedura di sincronizzazione database...")
  19. tr := &http.Transport{
  20. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  21. }
  22. client := &http.Client{Transport: tr}
  23. token, err := GetToken(ctx, cfg, client)
  24. if err != nil {
  25. slog.Error("UpdateDB: Fallito recupero del Token di autenticazione", "error", err)
  26. return err
  27. }
  28. slog.Info("UpdateDB: Token di autenticazione recuperato con successo")
  29. // 1. Sincronizzazione TRACKERS
  30. trackers, err := GetTrackers(token, client, cfg)
  31. if err != nil {
  32. slog.Error("UpdateDB: Errore durante la chiamata GetTrackers", "error", err)
  33. } else {
  34. slog.Info("UpdateDB: Tracker scaricati dall'API", "count", len(trackers))
  35. syncTable(db, trackers)
  36. slog.Info("UpdateDB: Tabella 'trackers' allineata su DB")
  37. if err := controller.SendKafkaMessage(writer, &model.ApiUpdate{Method: "DELETE", MAC: "all"}, ctx); err != nil {
  38. slog.Error("UpdateDB: Errore nell'invio del messaggio Kafka 'DELETE all' al lookup", "error", err)
  39. }
  40. kafkaMsgCount := 0
  41. for _, v := range trackers {
  42. apiUpdate := model.ApiUpdate{
  43. Method: "POST",
  44. ID: v.ID,
  45. MAC: v.MAC,
  46. }
  47. if err := controller.SendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
  48. slog.Error("UpdateDB: Errore nell'invio del messaggio Kafka 'POST tracker'", "tracker_id", v.ID, "error", err)
  49. } else {
  50. kafkaMsgCount++
  51. }
  52. }
  53. slog.Info("UpdateDB: Sincronizzazione messaggi Kafka per i tracker completata", "inviati", kafkaMsgCount)
  54. }
  55. // 2. Sincronizzazione GATEWAYS
  56. gateways, err := GetGateways(token, client, cfg)
  57. if err != nil {
  58. slog.Error("UpdateDB: Errore durante la chiamata GetGateways", "error", err)
  59. } else {
  60. slog.Info("UpdateDB: Gateway scaricati dall'API", "count", len(gateways))
  61. syncTable(db, gateways)
  62. slog.Info("UpdateDB: Tabella 'gateways' allineata su DB")
  63. }
  64. // 3. Sincronizzazione FLOORS
  65. floors, err := GetFloors(token, client, cfg)
  66. if err != nil {
  67. slog.Error("UpdateDB: Errore durante la chiamata GetFloors", "error", err)
  68. } else {
  69. slog.Info("UpdateDB: Piani (floors) scaricati dall'API", "count", len(floors))
  70. syncTable(db, floors)
  71. slog.Info("UpdateDB: Tabella 'floors' allineata su DB")
  72. }
  73. // 4. Sincronizzazione ZONES
  74. zones, err := GetZones(token, client, cfg)
  75. if err != nil {
  76. slog.Error("UpdateDB: Errore durante la chiamata GetZones", "error", err)
  77. } else {
  78. slog.Info("UpdateDB: Zone scaricate dall'API", "count", len(zones))
  79. syncTable(db, zones)
  80. slog.Info("UpdateDB: Tabella 'zones' allineata su DB")
  81. }
  82. // 5. Sincronizzazione TRACKER ZONES
  83. trackerZones, err := GetTrackerZones(token, client, cfg)
  84. if err != nil {
  85. slog.Error("UpdateDB: Errore durante la chiamata GetTrackerZones", "error", err)
  86. } else {
  87. slog.Info("UpdateDB: Tracker-Zones scaricate dall'API", "count", len(trackerZones))
  88. syncTable(db, trackerZones)
  89. slog.Info("UpdateDB: Tabella 'tracker_zones' allineata su DB")
  90. }
  91. // 6. Inferenza delle posizioni (Stime AI passate)
  92. inferredPosition, err := InferPosition(token, client, cfg)
  93. if err != nil {
  94. slog.Error("UpdateDB: Errore durante la chiamata InferPosition", "error", err)
  95. } else {
  96. updateCount := 0
  97. for _, v := range inferredPosition.Items {
  98. mac := convertMac(v.Mac)
  99. res := db.Model(&model.Tracker{}).Where("mac = ?", mac).Updates(map[string]interface{}{"x": v.X, "y": v.Y})
  100. if res.Error != nil {
  101. slog.Error("UpdateDB: Errore aggiornamento coordinate stimate per tracker", "mac", mac, "error", res.Error)
  102. } else if res.RowsAffected > 0 {
  103. updateCount++
  104. }
  105. }
  106. slog.Info("UpdateDB: Aggiornamento posizioni storiche completato", "tracker_aggiornati", updateCount)
  107. }
  108. // 7. Controllo e Inizializzazione SETTINGS
  109. var settings appcontext.Settings
  110. if err := db.First(&settings).Error; err != nil {
  111. if !errors.Is(err, gorm.ErrRecordNotFound) {
  112. slog.Error("UpdateDB: Errore durante la lettura delle impostazioni", "error", err)
  113. }
  114. }
  115. if settings.ID == 0 {
  116. slog.Info("UpdateDB: Tabella settings vuota. Inizializzazione con i valori di default di AppState...")
  117. if err := db.Create(appState.GetSettings()).Error; err != nil {
  118. slog.Error("UpdateDB: Errore critico durante la creazione dei settings di default", "error", err)
  119. } else {
  120. slog.Info("UpdateDB: Settings di default salvati con successo")
  121. }
  122. } else {
  123. slog.Info("UpdateDB: Settings già presenti nel database", "id", settings.ID)
  124. }
  125. slog.Info("UpdateDB: Procedura di sincronizzazione completata.")
  126. return nil
  127. }
  128. func syncTable[T any](db *gorm.DB, data []T) {
  129. if len(data) == 0 {
  130. slog.Warn("syncTable: Nessun dato ricevuto dall'API, salto la sincronizzazione per questa tabella.")
  131. return
  132. }
  133. var ids []string
  134. for _, item := range data {
  135. v := reflect.ValueOf(item).FieldByName("ID").String()
  136. if v != "" {
  137. ids = append(ids, v)
  138. }
  139. }
  140. if len(ids) == 0 {
  141. slog.Error("syncTable: Impossibile procedere. Trovati 0 ID validi tramite reflection nella struttura dei dati.")
  142. return
  143. }
  144. err := db.Transaction(func(tx *gorm.DB) error {
  145. // Elimina i record locali che non esistono più nel backend centrale
  146. if err := tx.Where("id NOT IN ?", ids).Delete(new(T)).Error; err != nil {
  147. return err
  148. }
  149. // Upsert dei dati aggiornati o nuovi
  150. return tx.Clauses(clause.OnConflict{UpdateAll: true}).Create(&data).Error
  151. })
  152. if err != nil {
  153. slog.Error("syncTable: Errore critico durante la transazione di sincronizzazione", "error", err)
  154. }
  155. }