diff --git a/globalbackfeedmanager.go b/globalbackfeedmanager.go index 025bb51..955f08e 100644 --- a/globalbackfeedmanager.go +++ b/globalbackfeedmanager.go @@ -162,12 +162,23 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht vars := mux.Vars(req) slug := vars["slug"] secondaryShard := req.URL.Query().Get("shard") + queue := req.URL.Query().Get("queue") + + if strings.ContainsAny(secondaryShard, ":/") { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name")) return } + if strings.ContainsAny(queue, "/") { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid queue name")) + return + } + if queue == "" { + queue = "todo:backfeed" + } + skipBloom := req.URL.Query().Get("skipbloom") != "" skipFeed := req.URL.Query().Get("skipfeed") != "" if skipBloom && skipFeed { @@ -223,6 +234,7 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht Item: bcopy, SkipBloom: skipBloom, SkipFeed: skipFeed, + Queue: queue, } if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { WriteResponse(res, http.StatusInternalServerError, err) diff --git a/projectbackfeedmanager.go b/projectbackfeedmanager.go index 36cf054..d08c8ff 100644 --- a/projectbackfeedmanager.go +++ b/projectbackfeedmanager.go @@ -88,9 +88,9 @@ func (that *ProjectBackfeedManager) Do() { break default: } - keyMap := map[string][][]byte{} - var sAddItems []any - skipFeedItems := map[string]struct{}{} + queueKeyMap := map[string]map[string][][]byte{} + sAddQueueItems := map[string][]any{} + skipFeedQueueItems := map[string]map[string]struct{}{} wrapped := 0 for wrapped < ItemWrapSize { item, ok := that.PopItem(wrapped == 0) @@ -98,12 +98,12 @@ func (that *ProjectBackfeedManager) Do() { break } if item.SkipBloom { - sAddItems = append(sAddItems, item.Item) + sAddQueueItems[item.Queue] = append(sAddQueueItems[item.Queue], item.Item) } else { key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) - keyMap[key] = append(keyMap[key], item.Item) + queueKeyMap[item.Queue][key] = append(queueKeyMap[item.Queue][key], item.Item) if item.SkipFeed { - skipFeedItems[string(item.Item)] = Tag + skipFeedQueueItems[item.Queue][string(item.Item)] = Tag } } wrapped++ @@ -111,78 +111,95 @@ func (that *ProjectBackfeedManager) Do() { if wrapped == 0 { break } - if len(keyMap) > 0 { - try := 0 - lastTS := make([]any, 0, len(keyMap)*2) - now := time.Now() - for key := range keyMap { - lastTS = append(lastTS, key) - lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) - } - resultMap := map[string]*redis.Cmd{} - pipe.HSet(context.Background(), ":last_ts", lastTS...) - for { - for key, items := range keyMap { - args := []any{ - "bf.madd", - key, - } - for _, item := range items { - args = append(args, item) - } - resultMap[key] = pipe.Do(context.Background(), args...) - } - if _, err := pipe.Exec(context.Background()); err != nil { - log.Printf("%s", err) + if len(queueKeyMap) > 0 { + for queue, keyMap := range queueKeyMap { + lastTS := make([]any, 0, len(keyMap)*2) + now := time.Now() + for key := range keyMap { + lastTS = append(lastTS, key) + lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) } - var err error - for key, items := range keyMap { - res, cmdErr := resultMap[key].BoolSlice() - if cmdErr != nil { - err = multierror.Append(err, cmdErr) - continue + resultMap := map[string]*redis.Cmd{} + pipe.HSet(context.Background(), ":last_ts", lastTS...) + try := 0 + for { + for key, items := range keyMap { + args := []any{ + "bf.madd", + key, + } + for _, item := range items { + args = append(args, item) + } + resultMap[key] = pipe.Do(context.Background(), args...) } - if len(res) != len(keyMap[key]) { - err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) - continue + if _, err := pipe.Exec(context.Background()); err != nil { + log.Printf("%s", err) } - for i, v := range res { - if v { - sAddItems = append(sAddItems, items[i]) + var err error + for key, items := range keyMap { + res, cmdErr := resultMap[key].BoolSlice() + if cmdErr != nil { + err = multierror.Append(err, cmdErr) + continue + } + if len(res) != len(keyMap[key]) { + err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) + continue } + for i, v := range res { + if v { + sAddQueueItems[queue] = append(sAddQueueItems[queue], items[i]) + } + } + delete(keyMap, key) } - delete(keyMap, key) - } - if err == nil { - break + if err == nil { + break + } + log.Printf("%s", err) + time.Sleep(time.Duration(try) * time.Second) + try++ } - log.Printf("%s", err) - time.Sleep(time.Duration(try) * time.Second) - try++ + delete(queueKeyMap, queue) } } - dupes := wrapped - len(sAddItems) - if len(sAddItems) > 0 && len(skipFeedItems) > 0 { - sAddItemsFiltered := make([]any, 0, len(sAddItems)) - for _, item := range sAddItems { - itemBytes := item.([]byte) - itemString := string(itemBytes) - if _, exists := skipFeedItems[itemString]; !exists { - sAddItemsFiltered = append(sAddItemsFiltered, item) + items_len := 0 + for _, sAddItems := range(sAddQueueItems) { + items_len = items_len + len(sAddItems) + } + dupes := wrapped - items_len + if len(sAddQueueItems) > 0 && len(skipFeedQueueItems) > 0 { + for queue, sAddItems := range sAddQueueItems { + skipFeedItems, exists := skipFeedQueueItems[queue] + if !exists { + continue } + itemFiltered := make([]any, 0, len(sAddItems)) + for _, item := range sAddItems { + itemBytes := item.([]byte) + itemString := string(itemBytes) + if _, exists := skipFeedItems[itemString]; !exists { + itemFiltered = append(itemFiltered, item) + } + } + sAddQueueItems[queue] = itemFiltered + delete(skipFeedQueueItems, queue) } - sAddItems = sAddItemsFiltered } - if len(sAddItems) > 0 { - try := 0 - for { - if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { - log.Printf("failed to sadd items for %s: %s", that.Name, err) - time.Sleep(time.Duration(try) * time.Second) - try++ - } else { - break + if len(sAddQueueItems) > 0 { + for queue, sAddItems := range sAddQueueItems { + try := 0 + for { + if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:%s", that.Name, queue), sAddItems...).Err(); err != nil { + log.Printf("failed to sadd items for %s: %s", that.Name, err) + time.Sleep(time.Duration(try) * time.Second) + try++ + } else { + break + } } + delete(sAddQueueItems, queue) } } if dupes > 0 { diff --git a/structs.go b/structs.go index cb64fca..cfd3f84 100644 --- a/structs.go +++ b/structs.go @@ -10,4 +10,5 @@ type BackfeedItem struct { Item []byte SkipBloom bool SkipFeed bool + Queue string }