From dc25393d35e57027049e216c5c2558470d1af84d Mon Sep 17 00:00:00 2001 From: Fusl Date: Mon, 17 Apr 2023 07:26:35 +0000 Subject: [PATCH] early bf.scandump support --- main.go | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 184 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 5918ef9..5dced6a 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "archive/tar" "bufio" "bytes" "compress/flate" @@ -8,12 +9,14 @@ import ( "context" "encoding/json" "fmt" + "hash/crc64" "io" "log" "net/http" _ "net/http/pprof" "os" "os/signal" + "sort" "strconv" "strings" "sync" @@ -393,12 +396,22 @@ type LastAccessStatsKey struct { SubShard string } -type LastAccessStatsMap map[LastAccessStatsKey]time.Time +type LastAccessStatsEntry struct { + First time.Time `json:"first"` + Last time.Time `json:"last"` + Size int64 `json:"size"` +} + +type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { - mapped := map[string]string{} + mapped := map[string]map[string]any{} for key, value := range that { - mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = value.Format(time.RFC3339) + mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ + "first": value.First.Format(time.RFC3339), + "last": value.Last.Format(time.RFC3339), + "size": value.Size, + } } return json.Marshal(mapped) } @@ -424,7 +437,17 @@ func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter } } lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() - if err != nil { + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + memoryUsages := map[string]*redis.IntCmd{} + pipe := that.BackfeedRedis.Pipeline() + for key := range lastTs { + memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) + } + _, err = pipe.Exec(req.Context()) + if err != nil && err != redis.Nil { WriteResponse(res, http.StatusInternalServerError, err) return } @@ -451,8 +474,20 @@ func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter lastAccessStatsKey.SubShard = "*" } parsedTs := time.Unix(ts, 0) - if v, has := lastAccessStats[lastAccessStatsKey]; !has || v.Before(parsedTs) { - lastAccessStats[lastAccessStatsKey] = parsedTs + if v, has := lastAccessStats[lastAccessStatsKey]; !has { + lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ + First: parsedTs, + Last: parsedTs, + Size: memoryUsages[key].Val(), + } + } else { + if v.First.After(parsedTs) { + v.First = parsedTs + } + if v.Last.Before(parsedTs) { + v.Last = parsedTs + } + v.Size += memoryUsages[key].Val() } } WriteResponse(res, http.StatusOK, lastAccessStats) @@ -499,7 +534,6 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht scanner := bufio.NewScanner(body) scanner.Split(splitter.Split) - statusCode := http.StatusNoContent n := 0 for scanner.Scan() { b := scanner.Bytes() @@ -520,7 +554,7 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht n++ } if err := scanner.Err(); err != nil { - WriteResponse(res, statusCode, err) + WriteResponse(res, http.StatusBadRequest, err) return } WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) @@ -546,6 +580,147 @@ func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.R WriteResponse(res, http.StatusOK, "pong") } +type DumpChunkName struct { + Key string `json:"key"` + Distance int `json:"distance"` + Cursor int64 `json:"cursor"` + Checksum uint64 `json:"checksum"` +} + +func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + key := vars["key"] + if key == "" { + key = "*:*:*" + } + if strings.Count(key, ":") < 2 { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) + return + } + lock := sync.Mutex{} + keys := []string{} + if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { + cursor := uint64(0) + var shardKeys []string + for { + var err error + var keysBatch []string + keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() + if err != nil && err != redis.Nil { + return err + } + shardKeys = append(shardKeys, keysBatch...) + if cursor == 0 { + break + } + } + lock.Lock() + defer lock.Unlock() + keys = append(keys, shardKeys...) + return nil + }); err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + sort.Strings(keys) + hasJsonAcceptHeader := false + for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { + accept = strings.TrimSpace(accept) + if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { + hasJsonAcceptHeader = true + break + } + } + if hasJsonAcceptHeader { + WriteResponse(res, http.StatusOK, keys) + return + } + if len(keys) == 0 { + WriteResponse(res, http.StatusNoContent, nil) + return + } + tarWriter := tar.NewWriter(res) + defer tarWriter.Close() + for _, key := range keys { + cursor := int64(0) + for i := 0; ; i++ { + rawRes, err := that.BackfeedRedis.Do(req.Context(), "bf.scandump", key, cursor).Result() + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if rawRes == nil { + break + } + resSlice, ok := rawRes.([]any) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) + return + } + if len(resSlice) != 2 { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) + return + } + cursor, ok = resSlice[0].(int64) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) + return + } + chunkString, ok := resSlice[1].(string) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) + return + } + chunk := []byte(chunkString) + + lastAccess := time.Time{} + tsString, err := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key).Result() + if err != nil && err != redis.Nil && tsString != "" { + ts, err := strconv.ParseInt(tsString, 10, 64) + if err == nil { + lastAccess = time.Unix(0, ts) + } + } + + nameStruct := DumpChunkName{ + Key: key, + Cursor: cursor, + Distance: i, + Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), + } + name, err := json.Marshal(nameStruct) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if err := tarWriter.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: string(name), + Size: int64(len(chunk)), + Mode: 0600, + ModTime: lastAccess, + AccessTime: lastAccess, + ChangeTime: lastAccess, + PAXRecords: map[string]string{ + "ARCHIVETEAM.bffchunk.key": key, + "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), + "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), + "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), + }, + Format: tar.FormatPAX, + }); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + } + if _, err := tarWriter.Write(chunk); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + } + if cursor == 0 && len(chunk) == 0 { + break + } + } + } +} + func (that *GlobalBackfeedManager) CancelAllFeeds() { that.Populated.UnSet() that.Cancel() @@ -613,6 +788,7 @@ func main() { r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing) r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth) r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats) + r.Methods(http.MethodGet).Path("/dump/{key}").HandlerFunc(globalBackfeedManager.HandleDump) rMetrics := mux.NewRouter() rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) rMetrics.Path("/metrics").Handler(promhttp.Handler())