From 0ceeadd614a11a92dd3c39aab243149b5dc28046 Mon Sep 17 00:00:00 2001 From: Fusl Date: Mon, 20 Feb 2023 06:47:53 +0000 Subject: [PATCH] access stats --- main.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 6e47ed2..b68f36e 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "strconv" "strings" "sync" "syscall" @@ -384,6 +385,77 @@ func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager return projectBackfeedManager } +type LastAccessStatsKey struct { + Project string + Shard string + SubShard string +} + +type LastAccessStatsMap map[LastAccessStatsKey]time.Time + +func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { + mapped := map[string]string{} + for key, value := range that { + mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = value.Format(time.RFC3339) + } + return json.Marshal(mapped) +} + +func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { + parts := strings.SplitN(s, ":", 3) + if len(parts) != 3 { + return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) + } + return LastAccessStatsKey{ + Project: parts[0], + Shard: parts[1], + SubShard: parts[2], + }, nil +} + +func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + merge := map[string]bool{} + if vv, ok := req.URL.Query()["merge"]; ok { + for _, v := range vv { + merge[v] = true + } + } + lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + lastAccessStats := LastAccessStatsMap{} + for key, value := range lastTs { + // value is in unix timestamp format + ts, err := strconv.ParseInt(value, 10, 64) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if merge["project"] { + lastAccessStatsKey.Project = "*" + } + if merge["shard"] { + lastAccessStatsKey.Shard = "*" + } + if merge["sub_shard"] { + lastAccessStatsKey.SubShard = "*" + } + parsedTs := time.Unix(ts, 0) + if v, has := lastAccessStats[lastAccessStatsKey]; !has || v.Before(parsedTs) { + lastAccessStats[lastAccessStatsKey] = parsedTs + } + } + WriteResponse(res, http.StatusOK, lastAccessStats) +} + func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) { defer req.Body.Close() @@ -422,7 +494,7 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht Item: bcopy, } if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { - WriteResponse(res, http.StatusServiceUnavailable, err) + WriteResponse(res, http.StatusInternalServerError, err) return } n++ @@ -520,6 +592,7 @@ func main() { r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy) r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing) r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth) + r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats) rMetrics := mux.NewRouter() rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) rMetrics.Path("/metrics").Handler(promhttp.Handler())