You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

344 line
9.4 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "compress/flate"
  5. "compress/gzip"
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. "github.com/gorilla/mux"
  17. "github.com/tevino/abool/v2"
  18. )
  19. type GlobalBackfeedManager struct {
  20. Context context.Context
  21. Cancel context.CancelFunc
  22. ActiveFeeds map[string]*ProjectBackfeedManager
  23. ActiveSlugs map[string]string
  24. TrackerRedis *redis.Client
  25. BackfeedRedis *redis.ClusterClient
  26. Lock sync.RWMutex
  27. Populated *abool.AtomicBool
  28. }
  29. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  30. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  31. if err != nil {
  32. return err
  33. }
  34. var projects []string
  35. projectSlugMap := map[string][]string{}
  36. for slug, project := range slugProjectMap {
  37. projectSlugMap[project] = append(projectSlugMap[project], slug)
  38. }
  39. for project := range projectSlugMap {
  40. projects = append(projects, project)
  41. }
  42. projectConfigs := map[string]ProjectConfig{}
  43. if len(projects) != 0 {
  44. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  45. if err != nil {
  46. return err
  47. }
  48. if len(projects) != len(cfgi) {
  49. return fmt.Errorf("hmget result had unexpected length")
  50. }
  51. for i, project := range projects {
  52. configString, ok := cfgi[i].(string)
  53. if !ok {
  54. continue
  55. }
  56. config := ProjectConfig{}
  57. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  58. continue
  59. }
  60. projectConfigs[project] = config
  61. }
  62. }
  63. projects = nil
  64. for project := range projectSlugMap {
  65. if _, has := projectConfigs[project]; !has {
  66. delete(projectSlugMap, project)
  67. continue
  68. }
  69. projects = append(projects, project)
  70. }
  71. for slug, project := range slugProjectMap {
  72. if _, has := projectConfigs[project]; !has {
  73. delete(slugProjectMap, slug)
  74. }
  75. }
  76. // add feeds for new projects
  77. for _, project := range projects {
  78. projectConfig := projectConfigs[project]
  79. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  80. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  81. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  82. outdatedProjectBackfeedManager = projectBackfeedManager
  83. } else {
  84. continue
  85. }
  86. }
  87. ctx, cancel := context.WithCancel(that.Context)
  88. projectBackfeedManager := &ProjectBackfeedManager{
  89. Context: ctx,
  90. Cancel: cancel,
  91. Done: make(chan bool),
  92. C: make(chan *BackfeedItem, ItemChannelBuffer),
  93. BackfeedRedis: that.BackfeedRedis,
  94. Name: project,
  95. ProjectConfig: projectConfig,
  96. }
  97. if projectConfig.RedisConfig != nil {
  98. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  99. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  100. Username: "default",
  101. Password: projectConfig.RedisConfig.Pass,
  102. ReadTimeout: 15 * time.Minute,
  103. })
  104. } else {
  105. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  106. }
  107. go projectBackfeedManager.Do()
  108. that.Lock.Lock()
  109. that.ActiveFeeds[project] = projectBackfeedManager
  110. that.Lock.Unlock()
  111. if outdatedProjectBackfeedManager != nil {
  112. outdatedProjectBackfeedManager.Cancel()
  113. <-outdatedProjectBackfeedManager.Done
  114. log.Printf("updated project: %s", project)
  115. } else {
  116. log.Printf("added project: %s", project)
  117. }
  118. }
  119. that.Lock.Lock()
  120. that.ActiveSlugs = slugProjectMap
  121. that.Lock.Unlock()
  122. // remove feeds for old projects
  123. for project, projectBackfeedManager := range that.ActiveFeeds {
  124. if _, has := projectSlugMap[project]; has {
  125. continue
  126. }
  127. log.Printf("removing project: %s", project)
  128. that.Lock.Lock()
  129. delete(that.ActiveFeeds, project)
  130. that.Lock.Unlock()
  131. projectBackfeedManager.Cancel()
  132. <-projectBackfeedManager.Done
  133. log.Printf("removed project: %s", project)
  134. }
  135. if !that.Populated.IsSet() {
  136. that.Populated.Set()
  137. }
  138. return nil
  139. }
  140. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  141. that.Lock.RLock()
  142. defer that.Lock.RUnlock()
  143. project, has := that.ActiveSlugs[slug]
  144. if !has {
  145. return nil
  146. }
  147. projectBackfeedManager, has := that.ActiveFeeds[project]
  148. if !has {
  149. return nil
  150. }
  151. return projectBackfeedManager
  152. }
  153. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  154. defer req.Body.Close()
  155. vars := mux.Vars(req)
  156. slug := vars["slug"]
  157. secondaryShard := req.URL.Query().Get("shard")
  158. queue := req.URL.Query().Get("queue")
  159. if strings.ContainsAny(secondaryShard, ":/") {
  160. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name"))
  161. return
  162. }
  163. if strings.ContainsAny(queue, "/") {
  164. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid queue name"))
  165. return
  166. }
  167. if queue == "" {
  168. queue = "todo:backfeed"
  169. }
  170. skipBloom := req.URL.Query().Get("skipbloom") != ""
  171. skipFeed := req.URL.Query().Get("skipfeed") != ""
  172. if skipBloom && skipFeed {
  173. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("skipbloom and skipfeed are mutually exclusive"))
  174. return
  175. }
  176. projectBackfeedManager := that.GetFeed(slug)
  177. if projectBackfeedManager == nil {
  178. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  179. return
  180. }
  181. splitter := &Splitter{
  182. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  183. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  184. }
  185. if len(splitter.Delimiter) == 0 {
  186. splitter.Delimiter = []byte{0x00}
  187. }
  188. var body io.ReadCloser
  189. switch req.Header.Get("Content-Encoding") {
  190. case "":
  191. body = req.Body
  192. case "gzip":
  193. var err error
  194. body, err = gzip.NewReader(req.Body)
  195. if err != nil {
  196. WriteResponse(res, http.StatusBadRequest, err)
  197. return
  198. }
  199. defer body.Close()
  200. case "deflate":
  201. body = flate.NewReader(req.Body)
  202. defer body.Close()
  203. default:
  204. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding")))
  205. }
  206. scanner := bufio.NewScanner(body)
  207. scanner.Split(splitter.Split)
  208. n := 0
  209. for scanner.Scan() {
  210. b := scanner.Bytes()
  211. if len(b) == 0 {
  212. continue
  213. }
  214. bcopy := make([]byte, len(b))
  215. copy(bcopy, b)
  216. item := &BackfeedItem{
  217. PrimaryShard: GenShardHash(bcopy),
  218. SecondaryShard: secondaryShard,
  219. Item: bcopy,
  220. SkipBloom: skipBloom,
  221. SkipFeed: skipFeed,
  222. Queue: queue,
  223. }
  224. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  225. WriteResponse(res, http.StatusInternalServerError, err)
  226. return
  227. }
  228. n++
  229. }
  230. if err := scanner.Err(); err != nil {
  231. WriteResponse(res, http.StatusBadRequest, err)
  232. return
  233. }
  234. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  235. return
  236. }
  237. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  238. if that.Populated.IsNotSet() {
  239. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  240. return
  241. }
  242. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  243. client.ClientGetName(ctx)
  244. return client.Ping(ctx).Err()
  245. }); err != nil {
  246. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  247. return
  248. }
  249. WriteResponse(res, http.StatusOK, "ok")
  250. }
  251. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  252. WriteResponse(res, http.StatusOK, "pong")
  253. }
  254. func (that *GlobalBackfeedManager) HandleUnlink(res http.ResponseWriter, req *http.Request) {
  255. vars := mux.Vars(req)
  256. key := vars["key"]
  257. if strings.Count(key, ":") < 2 {
  258. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  259. return
  260. }
  261. lock := sync.Mutex{}
  262. keys := []string{}
  263. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  264. cursor := uint64(0)
  265. var shardKeys []string
  266. for {
  267. var err error
  268. var keysBatch []string
  269. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  270. if err != nil && err != redis.Nil {
  271. return err
  272. }
  273. shardKeys = append(shardKeys, keysBatch...)
  274. if cursor == 0 {
  275. break
  276. }
  277. }
  278. lock.Lock()
  279. defer lock.Unlock()
  280. keys = append(keys, shardKeys...)
  281. return nil
  282. }); err != nil && err != redis.Nil {
  283. WriteResponse(res, http.StatusInternalServerError, err)
  284. return
  285. }
  286. pipe := that.BackfeedRedis.Pipeline()
  287. pipe.HDel(req.Context(), ":last_ts", keys...)
  288. for _, key := range keys {
  289. pipe.Unlink(req.Context(), key)
  290. }
  291. if _, err := pipe.Exec(req.Context()); err != nil && err != redis.Nil {
  292. WriteResponse(res, http.StatusInternalServerError, err)
  293. return
  294. }
  295. WriteResponse(res, http.StatusOK, keys)
  296. }
  297. func (that *GlobalBackfeedManager) HandleRedisInfo(res http.ResponseWriter, req *http.Request) {
  298. infos := map[string]string{}
  299. lock := sync.Mutex{}
  300. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  301. if info, err := client.Info(ctx, "all").Result(); err != nil && err != redis.Nil {
  302. return err
  303. } else {
  304. lock.Lock()
  305. defer lock.Unlock()
  306. infos[client.String()] = info
  307. }
  308. return nil
  309. }); err != nil {
  310. WriteResponse(res, http.StatusInternalServerError, err)
  311. return
  312. }
  313. WriteResponse(res, http.StatusOK, infos)
  314. }
  315. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  316. that.Populated.UnSet()
  317. that.Cancel()
  318. for project, projectBackfeedManager := range that.ActiveFeeds {
  319. log.Printf("waiting for %s channel to shut down...", project)
  320. <-projectBackfeedManager.Done
  321. delete(that.ActiveFeeds, project)
  322. }
  323. }