You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

232 line
6.1 KiB

  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "log/slog"
  8. "slices"
  9. "strings"
  10. "time"
  11. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  12. "github.com/AFASystems/presence/internal/pkg/model"
  13. "github.com/google/uuid"
  14. "github.com/segmentio/kafka-go"
  15. "gorm.io/gorm"
  16. )
  17. func findTracker(msg model.HTTPLocation, db *gorm.DB) (model.Tracker, error) {
  18. var tracker model.Tracker
  19. if msg.MAC != "" {
  20. if err := db.Where("mac = ?", strings.ToUpper(strings.ReplaceAll(msg.MAC, ":", ""))).Find(&tracker).Error; err != nil {
  21. return model.Tracker{}, err
  22. }
  23. return tracker, nil
  24. }
  25. if msg.ID != "" {
  26. if err := db.Where("id = ?", msg.ID).Find(&tracker).Error; err != nil {
  27. return model.Tracker{}, err
  28. }
  29. return tracker, nil
  30. }
  31. return model.Tracker{}, errors.New("both ID and MAC are not provided")
  32. }
  33. func findZones(trackerID string, db *gorm.DB) ([]string, error) {
  34. var zones []model.TrackerZones
  35. if err := db.Select("zoneList").Where("tracker = ?", trackerID).Find(&zones).Error; err != nil {
  36. return nil, err
  37. }
  38. var allowedZones []string
  39. for _, z := range zones {
  40. allowedZones = append(allowedZones, z.ZoneList...)
  41. }
  42. return allowedZones, nil
  43. }
  44. func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
  45. tracker, err := findTracker(msg, db)
  46. if err != nil {
  47. msg := fmt.Sprintf("Error in finding tracker: %v", err)
  48. slog.Error(msg)
  49. return
  50. }
  51. allowedZones, err := findZones(tracker.ID, db)
  52. if err != nil {
  53. msg := fmt.Sprintf("Error in finding zones: %v", err)
  54. slog.Error(msg)
  55. return
  56. }
  57. var gw model.Gateway
  58. mac := formatMac(msg.Location)
  59. if err := db.Select("*").Where("mac = ?", mac).First(&gw).Error; err != nil {
  60. msg := fmt.Sprintf("Gateway not found for MAC: %s", mac)
  61. slog.Error(msg)
  62. return
  63. }
  64. var floor model.Floor
  65. if err := db.Where("id = ?", gw.Floor).First(&floor).Error; err != nil {
  66. msg := fmt.Sprintf("Floor not found for ID: %s", gw.Floor)
  67. slog.Error(msg)
  68. return
  69. }
  70. if err := db.Create(&model.Tracks{
  71. UUID: msg.ID,
  72. Timestamp: time.Now(),
  73. Gateway: gw.ID,
  74. GatewayMac: gw.MAC,
  75. Tracker: msg.ID,
  76. Floor: gw.Floor,
  77. Building: gw.Building,
  78. TrackerMac: tracker.MAC,
  79. Signal: msg.RSSI,
  80. X: gw.X,
  81. Y: gw.Y,
  82. Z: float32(floor.FloorNumber),
  83. }).Error; err != nil {
  84. msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
  85. slog.Error(msg)
  86. return
  87. }
  88. err = db.Where("id = ?", msg.ID).Updates(model.Tracker{Position: gw.ID, X: gw.X, Y: gw.Y, Floor: floor.ID}).Error
  89. if err != nil {
  90. msg := fmt.Sprintf("Error in updating tracker: %v", err)
  91. slog.Error(msg)
  92. return
  93. }
  94. sendRestrictedZoneAlert(gw.Zone, msg.ID, writer, ctx, allowedZones, db)
  95. }
  96. func LocationToBeaconServiceAI(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) {
  97. tracker, err := findTracker(msg, db)
  98. if err != nil {
  99. msg := fmt.Sprintf("Error in finding tracker: %v", err)
  100. slog.Error(msg)
  101. return
  102. }
  103. allowedZones, err := findZones(tracker.ID, db)
  104. if err != nil {
  105. msg := fmt.Sprintf("Error in finding zones: %v", err)
  106. slog.Error(msg)
  107. return
  108. }
  109. var gw model.Gateway
  110. if err := db.Order(fmt.Sprintf("POW(x - %f, 2) + POW(y - %f, 2)", msg.X, msg.Y)).First(&gw).Error; err != nil {
  111. msg := fmt.Sprintf("Error in finding gateway: %v", err)
  112. slog.Error(msg)
  113. return
  114. }
  115. if err := db.Create(&model.Tracks{UUID: tracker.ID, Timestamp: time.Now(), Gateway: gw.ID, GatewayMac: gw.MAC, Tracker: tracker.ID, Floor: gw.Floor, Building: gw.Building, TrackerMac: tracker.MAC, X: msg.X, Y: msg.Y, Z: msg.Z}).Error; err != nil {
  116. msg := fmt.Sprintf("Error in saving distance for beacon: %v", err)
  117. slog.Error(msg)
  118. return
  119. }
  120. var floor model.Floor
  121. floorFound := msg.Z >= 0
  122. if floorFound {
  123. if err := db.Where("floor_number = ?", int(msg.Z)).First(&floor).Error; err != nil {
  124. if errors.Is(err, gorm.ErrRecordNotFound) {
  125. slog.Warn(fmt.Sprintf("Floor not found for floor number: %f, skipping floor update", msg.Z))
  126. floorFound = false
  127. } else {
  128. slog.Error(fmt.Sprintf("Error querying floor for floor number: %f: %v", msg.Z, err))
  129. return
  130. }
  131. }
  132. }
  133. var trackerUpdate model.Tracker
  134. if floorFound {
  135. trackerUpdate = model.Tracker{Position: gw.ID, X: msg.X, Y: msg.Y, Floor: floor.ID}
  136. } else {
  137. trackerUpdate = model.Tracker{Position: gw.ID, X: msg.X, Y: msg.Y}
  138. }
  139. err = db.Where("id = ?", tracker.ID).Updates(trackerUpdate).Error
  140. if err != nil {
  141. slog.Error(fmt.Sprintf("Error in updating tracker: %v", err))
  142. return
  143. }
  144. sendRestrictedZoneAlert(gw.Zone, tracker.ID, writer, ctx, allowedZones, db)
  145. }
  146. func SendAlert(trackerId, alertType, zone string, writer *kafka.Writer, ctx context.Context, db *gorm.DB) {
  147. var existingAlert model.Alert
  148. result := db.Select("status").Where("tracker_id = ? AND type = ?", trackerId, alertType).Order("timestamp DESC").Limit(1).Find(&existingAlert)
  149. if result.RowsAffected == 0 || existingAlert.Status == "resolved" {
  150. alert := model.Alert{
  151. ID: uuid.New().String(),
  152. TrackerID: trackerId,
  153. Type: alertType,
  154. Status: "new",
  155. Timestamp: time.Now(),
  156. Zone: zone,
  157. }
  158. if err := InsertAlert(alert, db, ctx); err != nil {
  159. msg := fmt.Sprintf("Error in inserting alert: %v", err)
  160. slog.Error(msg)
  161. return
  162. }
  163. eMsg, err := json.Marshal(alert)
  164. if err != nil {
  165. msg := "Error in marshaling"
  166. slog.Error(msg)
  167. return
  168. } else {
  169. msg := kafka.Message{
  170. Value: eMsg,
  171. }
  172. if err := kafkaclient.Write(ctx, writer, msg); err != nil {
  173. msg := fmt.Sprintf("Error in writing message: %v", err)
  174. slog.Error(msg)
  175. return
  176. }
  177. }
  178. return
  179. } else {
  180. return
  181. }
  182. }
  183. func sendRestrictedZoneAlert(gwZone, trackerId string, writer *kafka.Writer, ctx context.Context, allowedZones []string, db *gorm.DB) {
  184. if len(allowedZones) != 0 && !slices.Contains(allowedZones, gwZone) {
  185. SendAlert(trackerId, "Restricted zone", gwZone, writer, ctx, db)
  186. }
  187. }
  188. func formatMac(MAC string) string {
  189. var res strings.Builder
  190. for i := 0; i < len(MAC); i += 2 {
  191. if i > 0 {
  192. res.WriteByte(':')
  193. }
  194. end := min(i+2, len(MAC))
  195. res.WriteString(MAC[i:end])
  196. }
  197. return res.String()
  198. }