diff --git a/go.mod b/go.mod index 6115aa9..34b9723 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/globocom/go-redis-prometheus v0.4.0 github.com/go-redis/redis/v8 v8.11.5 github.com/gorilla/mux v1.8.0 + github.com/hashicorp/go-multierror v1.0.0 github.com/prometheus/client_golang v1.13.0 github.com/tevino/abool/v2 v2.1.0 ) @@ -15,6 +16,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/go.sum b/go.sum index f452614..839d9c1 100644 --- a/go.sum +++ b/go.sum @@ -191,10 +191,12 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= diff --git a/projectbackfeedmanager.go b/projectbackfeedmanager.go index 5bd5f4a..860fdcb 100644 --- a/projectbackfeedmanager.go +++ b/projectbackfeedmanager.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/hashicorp/go-multierror" ) type ProjectBackfeedManager struct { @@ -118,39 +119,58 @@ func (that *ProjectBackfeedManager) Do() { lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) } pipe.HSet(context.Background(), ":last_ts", lastTS...) - for key, items := range keyMap { - args := []any{ - "bf.madd", - key, - } - for _, item := range items { - args = append(args, item) - } - resultMap[key] = pipe.Do(context.Background(), args...) - } - if _, err := pipe.Exec(context.Background()); err != nil { - log.Printf("%s", err) - } + try := 0 var sAddItems []any - for key, items := range keyMap { - res, err := resultMap[key].BoolSlice() - if err != nil { - log.Printf("%s", err) - continue + for { + for key, items := range keyMap { + args := []any{ + "bf.madd", + key, + } + for _, item := range items { + args = append(args, item) + } + resultMap[key] = pipe.Do(context.Background(), args...) } - if len(res) != len(keyMap[key]) { - continue + if _, err := pipe.Exec(context.Background()); err != nil { + log.Printf("%s", err) } - for i, v := range res { - if v { - sAddItems = append(sAddItems, items[i]) + var err error + for key, items := range keyMap { + res, cmdErr := resultMap[key].BoolSlice() + if cmdErr != nil { + err = multierror.Append(err, cmdErr) + continue + } + if len(res) != len(keyMap[key]) { + err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) + continue } + for i, v := range res { + if v { + sAddItems = append(sAddItems, items[i]) + } + } + delete(keyMap, key) } + if err == nil { + break + } + log.Printf("%s", err) + time.Sleep(time.Duration(try) * time.Second) + try++ } dupes := wrapped - len(sAddItems) - if len(sAddItems) != 0 { - if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { - log.Printf("failed to sadd items for %s: %s", that.Name, err) + try = 0 + for { + if len(sAddItems) != 0 { + if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { + log.Printf("failed to sadd items for %s: %s", that.Name, err) + time.Sleep(time.Duration(try) * time.Second) + try++ + } else { + break + } } } if dupes > 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index dfc8c5f..c691e94 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -27,6 +27,12 @@ github.com/golang/protobuf/ptypes/timestamp # github.com/gorilla/mux v1.8.0 ## explicit; go 1.12 github.com/gorilla/mux +# github.com/hashicorp/errwrap v1.0.0 +## explicit +github.com/hashicorp/errwrap +# github.com/hashicorp/go-multierror v1.0.0 +## explicit +github.com/hashicorp/go-multierror # github.com/matttproud/golang_protobuf_extensions v1.0.2 ## explicit; go 1.9 github.com/matttproud/golang_protobuf_extensions/pbutil