|
|
@@ -124,6 +124,7 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
//defer that.CloseItemChannel() |
|
|
|
defer that.Cancel() |
|
|
|
|
|
|
|
pipe := that.BackfeedRedis.Pipeline() |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-that.Context.Done(): |
|
|
@@ -158,7 +159,6 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
} |
|
|
|
now := time.Now() |
|
|
|
resultMap := map[string]*redis.Cmd{} |
|
|
|
pipe := that.BackfeedRedis.Pipeline() |
|
|
|
lastTS := make([]any, 0, len(keyMap)*2) |
|
|
|
for key := range keyMap { |
|
|
|
lastTS = append(lastTS, key) |
|
|
@@ -641,10 +641,18 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http |
|
|
|
} |
|
|
|
tarWriter := tar.NewWriter(res) |
|
|
|
defer tarWriter.Close() |
|
|
|
pipe := that.BackfeedRedis.Pipeline() |
|
|
|
for _, key := range keys { |
|
|
|
cursor := int64(0) |
|
|
|
for i := 0; ; i++ { |
|
|
|
rawRes, err := that.BackfeedRedis.Do(req.Context(), "bf.scandump", key, cursor).Result() |
|
|
|
rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) |
|
|
|
tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) |
|
|
|
_, err := pipe.Exec(req.Context()) |
|
|
|
if err != nil && err != redis.Nil { |
|
|
|
WriteResponse(res, http.StatusInternalServerError, err) |
|
|
|
return |
|
|
|
} |
|
|
|
rawRes, err := rawResResult.Result() |
|
|
|
if err != nil && err != redis.Nil { |
|
|
|
WriteResponse(res, http.StatusInternalServerError, err) |
|
|
|
return |
|
|
@@ -674,7 +682,7 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http |
|
|
|
chunk := []byte(chunkString) |
|
|
|
|
|
|
|
lastAccess := time.Time{} |
|
|
|
tsString, err := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key).Result() |
|
|
|
tsString, err := tsStringResult.Result() |
|
|
|
if err == nil && tsString != "" { |
|
|
|
ts, err := strconv.ParseInt(tsString, 10, 64) |
|
|
|
if err == nil { |
|
|
|