Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.
 
 

216 Zeilen
5.4 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/hashicorp/go-multierror"
  9. )
  10. type ProjectBackfeedManager struct {
  11. Context context.Context
  12. Cancel context.CancelFunc
  13. Done chan bool
  14. C chan *BackfeedItem
  15. Name string
  16. BackfeedRedis *redis.ClusterClient
  17. ProjectRedis *redis.Client
  18. //Lock sync.RWMutex
  19. ProjectConfig ProjectConfig
  20. }
  21. type ProjectRedisConfig struct {
  22. Host string `json:"host"`
  23. Pass string `json:"pass"`
  24. Port int `json:"port"`
  25. }
  26. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  27. if that.ProjectConfig.RedisConfig == nil && new == nil {
  28. return false
  29. }
  30. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  31. }
  32. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  33. //that.Lock.RLock()
  34. //defer that.Lock.RUnlock()
  35. //if that.C == nil {
  36. // return false
  37. //}
  38. select {
  39. case <-ctx.Done():
  40. return ctx.Err()
  41. case <-that.Context.Done():
  42. return fmt.Errorf("backfeed channel closed")
  43. case that.C <- item:
  44. return nil
  45. //default:
  46. // return fmt.Errorf("backfeed channel full")
  47. }
  48. }
  49. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  50. if blocking {
  51. select {
  52. case <-that.Context.Done():
  53. return nil, false
  54. case item, ok := <-that.C:
  55. return item, ok
  56. }
  57. } else {
  58. select {
  59. case <-that.Context.Done():
  60. return nil, false
  61. case item, ok := <-that.C:
  62. return item, ok
  63. default:
  64. return nil, false
  65. }
  66. }
  67. }
  68. var Tag = struct{}{}
  69. func (that *ProjectBackfeedManager) Do() {
  70. defer close(that.Done)
  71. defer that.Cancel()
  72. pipe := that.BackfeedRedis.Pipeline()
  73. for {
  74. select {
  75. case <-that.Context.Done():
  76. break
  77. case <-that.Done:
  78. break
  79. default:
  80. }
  81. queueKeyMap := map[string]map[string][][]byte{}
  82. sAddQueueItems := map[string][]any{}
  83. skipFeedQueueItems := map[string]map[string]struct{}{}
  84. wrapped := 0
  85. for wrapped < ItemWrapSize {
  86. item, ok := that.PopItem(wrapped == 0)
  87. if !ok {
  88. break
  89. }
  90. if item.SkipBloom {
  91. sAddQueueItems[item.Queue] = append(sAddQueueItems[item.Queue], item.Item)
  92. } else {
  93. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  94. if _, exists := queueKeyMap[item.Queue]; !exists {
  95. queueKeyMap[item.Queue] = make(map[string][][]byte)
  96. }
  97. queueKeyMap[item.Queue][key] = append(queueKeyMap[item.Queue][key], item.Item)
  98. if item.SkipFeed {
  99. if _, exists := skipFeedQueueItems[item.Queue]; !exists {
  100. skipFeedQueueItems[item.Queue] = make(map[string]struct{})
  101. }
  102. skipFeedQueueItems[item.Queue][string(item.Item)] = Tag
  103. }
  104. }
  105. wrapped++
  106. }
  107. if wrapped == 0 {
  108. break
  109. }
  110. if len(queueKeyMap) > 0 {
  111. for queue, keyMap := range queueKeyMap {
  112. lastTS := make([]any, 0, len(keyMap)*2)
  113. now := time.Now()
  114. for key := range keyMap {
  115. lastTS = append(lastTS, key)
  116. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  117. }
  118. resultMap := map[string]*redis.Cmd{}
  119. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  120. try := 0
  121. for {
  122. for key, items := range keyMap {
  123. args := []any{
  124. "bf.madd",
  125. key,
  126. }
  127. for _, item := range items {
  128. args = append(args, item)
  129. }
  130. resultMap[key] = pipe.Do(context.Background(), args...)
  131. }
  132. if _, err := pipe.Exec(context.Background()); err != nil {
  133. log.Printf("%s", err)
  134. }
  135. var err error
  136. for key, items := range keyMap {
  137. res, cmdErr := resultMap[key].BoolSlice()
  138. if cmdErr != nil {
  139. err = multierror.Append(err, cmdErr)
  140. continue
  141. }
  142. if len(res) != len(keyMap[key]) {
  143. err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key])))
  144. continue
  145. }
  146. for i, v := range res {
  147. if v {
  148. sAddQueueItems[queue] = append(sAddQueueItems[queue], items[i])
  149. }
  150. }
  151. delete(keyMap, key)
  152. }
  153. if err == nil {
  154. break
  155. }
  156. log.Printf("%s", err)
  157. time.Sleep(time.Duration(try) * time.Second)
  158. try++
  159. }
  160. delete(queueKeyMap, queue)
  161. }
  162. }
  163. items_len := 0
  164. for _, sAddItems := range(sAddQueueItems) {
  165. items_len = items_len + len(sAddItems)
  166. }
  167. dupes := wrapped - items_len
  168. if len(sAddQueueItems) > 0 && len(skipFeedQueueItems) > 0 {
  169. for queue, sAddItems := range sAddQueueItems {
  170. skipFeedItems, exists := skipFeedQueueItems[queue]
  171. if !exists {
  172. continue
  173. }
  174. itemFiltered := make([]any, 0, len(sAddItems))
  175. for _, item := range sAddItems {
  176. itemBytes := item.([]byte)
  177. itemString := string(itemBytes)
  178. if _, exists := skipFeedItems[itemString]; !exists {
  179. itemFiltered = append(itemFiltered, item)
  180. }
  181. }
  182. sAddQueueItems[queue] = itemFiltered
  183. delete(skipFeedQueueItems, queue)
  184. }
  185. }
  186. if len(sAddQueueItems) > 0 {
  187. for queue, sAddItems := range sAddQueueItems {
  188. try := 0
  189. for {
  190. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:%s", that.Name, queue), sAddItems...).Err(); err != nil {
  191. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  192. time.Sleep(time.Duration(try) * time.Second)
  193. try++
  194. } else {
  195. break
  196. }
  197. }
  198. delete(sAddQueueItems, queue)
  199. }
  200. }
  201. if dupes > 0 {
  202. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  203. }
  204. }
  205. }