@@ -6,6 +6,7 @@ require ( | |||||
github.com/globocom/go-redis-prometheus v0.4.0 | github.com/globocom/go-redis-prometheus v0.4.0 | ||||
github.com/go-redis/redis/v8 v8.11.5 | github.com/go-redis/redis/v8 v8.11.5 | ||||
github.com/gorilla/mux v1.8.0 | github.com/gorilla/mux v1.8.0 | ||||
github.com/hashicorp/go-multierror v1.0.0 | |||||
github.com/prometheus/client_golang v1.13.0 | github.com/prometheus/client_golang v1.13.0 | ||||
github.com/tevino/abool/v2 v2.1.0 | github.com/tevino/abool/v2 v2.1.0 | ||||
) | ) | ||||
@@ -15,6 +16,7 @@ require ( | |||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect | github.com/cespare/xxhash/v2 v2.1.2 // indirect | ||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | ||||
github.com/golang/protobuf v1.5.2 // indirect | github.com/golang/protobuf v1.5.2 // indirect | ||||
github.com/hashicorp/errwrap v1.0.0 // indirect | |||||
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect | github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect | ||||
github.com/prometheus/client_model v0.2.0 // indirect | github.com/prometheus/client_model v0.2.0 // indirect | ||||
github.com/prometheus/common v0.37.0 // indirect | github.com/prometheus/common v0.37.0 // indirect | ||||
@@ -191,10 +191,12 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf | |||||
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= | github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= | ||||
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= | github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= | ||||
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= | github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= | ||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= | |||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | ||||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= | github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= | ||||
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= | github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= | ||||
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= | github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= | ||||
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= | |||||
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= | github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= | ||||
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= | github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= | ||||
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= | github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= | ||||
@@ -7,6 +7,7 @@ import ( | |||||
"time" | "time" | ||||
"github.com/go-redis/redis/v8" | "github.com/go-redis/redis/v8" | ||||
"github.com/hashicorp/go-multierror" | |||||
) | ) | ||||
type ProjectBackfeedManager struct { | type ProjectBackfeedManager struct { | ||||
@@ -118,39 +119,58 @@ func (that *ProjectBackfeedManager) Do() { | |||||
lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) | lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) | ||||
} | } | ||||
pipe.HSet(context.Background(), ":last_ts", lastTS...) | pipe.HSet(context.Background(), ":last_ts", lastTS...) | ||||
for key, items := range keyMap { | |||||
args := []any{ | |||||
"bf.madd", | |||||
key, | |||||
} | |||||
for _, item := range items { | |||||
args = append(args, item) | |||||
} | |||||
resultMap[key] = pipe.Do(context.Background(), args...) | |||||
} | |||||
if _, err := pipe.Exec(context.Background()); err != nil { | |||||
log.Printf("%s", err) | |||||
} | |||||
try := 0 | |||||
var sAddItems []any | var sAddItems []any | ||||
for key, items := range keyMap { | |||||
res, err := resultMap[key].BoolSlice() | |||||
if err != nil { | |||||
log.Printf("%s", err) | |||||
continue | |||||
for { | |||||
for key, items := range keyMap { | |||||
args := []any{ | |||||
"bf.madd", | |||||
key, | |||||
} | |||||
for _, item := range items { | |||||
args = append(args, item) | |||||
} | |||||
resultMap[key] = pipe.Do(context.Background(), args...) | |||||
} | } | ||||
if len(res) != len(keyMap[key]) { | |||||
continue | |||||
if _, err := pipe.Exec(context.Background()); err != nil { | |||||
log.Printf("%s", err) | |||||
} | } | ||||
for i, v := range res { | |||||
if v { | |||||
sAddItems = append(sAddItems, items[i]) | |||||
var err error | |||||
for key, items := range keyMap { | |||||
res, cmdErr := resultMap[key].BoolSlice() | |||||
if cmdErr != nil { | |||||
err = multierror.Append(err, cmdErr) | |||||
continue | |||||
} | |||||
if len(res) != len(keyMap[key]) { | |||||
err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) | |||||
continue | |||||
} | } | ||||
for i, v := range res { | |||||
if v { | |||||
sAddItems = append(sAddItems, items[i]) | |||||
} | |||||
} | |||||
delete(keyMap, key) | |||||
} | } | ||||
if err == nil { | |||||
break | |||||
} | |||||
log.Printf("%s", err) | |||||
time.Sleep(time.Duration(try) * time.Second) | |||||
try++ | |||||
} | } | ||||
dupes := wrapped - len(sAddItems) | dupes := wrapped - len(sAddItems) | ||||
if len(sAddItems) != 0 { | |||||
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { | |||||
log.Printf("failed to sadd items for %s: %s", that.Name, err) | |||||
try = 0 | |||||
for { | |||||
if len(sAddItems) != 0 { | |||||
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { | |||||
log.Printf("failed to sadd items for %s: %s", that.Name, err) | |||||
time.Sleep(time.Duration(try) * time.Second) | |||||
try++ | |||||
} else { | |||||
break | |||||
} | |||||
} | } | ||||
} | } | ||||
if dupes > 0 { | if dupes > 0 { | ||||
@@ -27,6 +27,12 @@ github.com/golang/protobuf/ptypes/timestamp | |||||
# github.com/gorilla/mux v1.8.0 | # github.com/gorilla/mux v1.8.0 | ||||
## explicit; go 1.12 | ## explicit; go 1.12 | ||||
github.com/gorilla/mux | github.com/gorilla/mux | ||||
# github.com/hashicorp/errwrap v1.0.0 | |||||
## explicit | |||||
github.com/hashicorp/errwrap | |||||
# github.com/hashicorp/go-multierror v1.0.0 | |||||
## explicit | |||||
github.com/hashicorp/go-multierror | |||||
# github.com/matttproud/golang_protobuf_extensions v1.0.2 | # github.com/matttproud/golang_protobuf_extensions v1.0.2 | ||||
## explicit; go 1.9 | ## explicit; go 1.9 | ||||
github.com/matttproud/golang_protobuf_extensions/pbutil | github.com/matttproud/golang_protobuf_extensions/pbutil | ||||