You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

663 line
18 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/flate"
  6. "compress/gzip"
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "log"
  12. "net/http"
  13. _ "net/http/pprof"
  14. "os"
  15. "os/signal"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "syscall"
  20. "time"
  21. redisprom "github.com/globocom/go-redis-prometheus"
  22. "github.com/go-redis/redis/v8"
  23. "github.com/gorilla/mux"
  24. "github.com/prometheus/client_golang/prometheus/promhttp"
  25. "github.com/tevino/abool/v2"
  26. )
  27. const (
  28. ItemChannelBuffer = 100000
  29. ItemWrapSize = 100000
  30. )
  31. type ProjectRedisConfig struct {
  32. Host string `json:"host"`
  33. Pass string `json:"pass"`
  34. Port int `json:"port"`
  35. }
  36. type ProjectConfig struct {
  37. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  38. }
  39. type BackfeedItem struct {
  40. PrimaryShard byte
  41. SecondaryShard string
  42. Item []byte
  43. }
  44. type ProjectBackfeedManager struct {
  45. Context context.Context
  46. Cancel context.CancelFunc
  47. Done chan bool
  48. C chan *BackfeedItem
  49. Name string
  50. BackfeedRedis *redis.ClusterClient
  51. ProjectRedis *redis.Client
  52. //Lock sync.RWMutex
  53. ProjectConfig ProjectConfig
  54. }
  55. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  56. if that.ProjectConfig.RedisConfig == nil && new == nil {
  57. return false
  58. }
  59. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  60. }
  61. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  62. //that.Lock.RLock()
  63. //defer that.Lock.RUnlock()
  64. //if that.C == nil {
  65. // return false
  66. //}
  67. select {
  68. case <-ctx.Done():
  69. return ctx.Err()
  70. case <-that.Context.Done():
  71. return fmt.Errorf("backfeed channel closed")
  72. case that.C <- item:
  73. return nil
  74. //default:
  75. // return fmt.Errorf("backfeed channel full")
  76. }
  77. }
  78. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  79. if blocking {
  80. select {
  81. case <-that.Context.Done():
  82. return nil, false
  83. case item, ok := <-that.C:
  84. return item, ok
  85. }
  86. } else {
  87. select {
  88. case <-that.Context.Done():
  89. return nil, false
  90. case item, ok := <-that.C:
  91. return item, ok
  92. default:
  93. return nil, false
  94. }
  95. }
  96. }
  97. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  98. // log.Printf("closing item channel for %s", that.Name)
  99. // that.Lock.Lock()
  100. // defer that.Lock.Unlock()
  101. // if that.C == nil {
  102. // return
  103. // }
  104. // close(that.C)
  105. // that.C = nil
  106. //}
  107. func (that *ProjectBackfeedManager) Do() {
  108. defer close(that.Done)
  109. //defer that.CloseItemChannel()
  110. defer that.Cancel()
  111. for {
  112. select {
  113. case <-that.Context.Done():
  114. break
  115. case <-that.Done:
  116. break
  117. default:
  118. }
  119. item, ok := that.PopItem(true)
  120. if !ok {
  121. break
  122. }
  123. keyMap := map[string][][]byte{}
  124. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  125. keyMap[key] = append(keyMap[key], item.Item)
  126. wrapped := 1
  127. for wrapped < ItemWrapSize {
  128. item, ok := that.PopItem(false)
  129. if !ok {
  130. break
  131. }
  132. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  133. keyMap[key] = append(keyMap[key], item.Item)
  134. wrapped++
  135. }
  136. select {
  137. case <-that.Context.Done():
  138. break
  139. case <-that.Done:
  140. break
  141. default:
  142. }
  143. now := time.Now()
  144. resultMap := map[string]*redis.Cmd{}
  145. pipe := that.BackfeedRedis.Pipeline()
  146. lastTS := make([]any, 0, len(keyMap)*2)
  147. for key := range keyMap {
  148. lastTS = append(lastTS, key)
  149. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  150. }
  151. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  152. for key, items := range keyMap {
  153. args := []any{
  154. "bf.madd",
  155. key,
  156. }
  157. for _, item := range items {
  158. args = append(args, item)
  159. }
  160. resultMap[key] = pipe.Do(context.Background(), args...)
  161. }
  162. if _, err := pipe.Exec(context.Background()); err != nil {
  163. log.Printf("%s", err)
  164. }
  165. var sAddItems []any
  166. for key, items := range keyMap {
  167. res, err := resultMap[key].BoolSlice()
  168. if err != nil {
  169. log.Printf("%s", err)
  170. continue
  171. }
  172. if len(res) != len(keyMap[key]) {
  173. continue
  174. }
  175. for i, v := range res {
  176. if v {
  177. sAddItems = append(sAddItems, items[i])
  178. }
  179. }
  180. }
  181. dupes := wrapped - len(sAddItems)
  182. if len(sAddItems) != 0 {
  183. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  184. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  185. }
  186. }
  187. if dupes > 0 {
  188. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  189. }
  190. }
  191. }
  192. type GlobalBackfeedManager struct {
  193. Context context.Context
  194. Cancel context.CancelFunc
  195. ActiveFeeds map[string]*ProjectBackfeedManager
  196. ActiveSlugs map[string]string
  197. TrackerRedis *redis.Client
  198. BackfeedRedis *redis.ClusterClient
  199. Lock sync.RWMutex
  200. Populated *abool.AtomicBool
  201. }
  202. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  203. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  204. if err != nil {
  205. return err
  206. }
  207. var projects []string
  208. projectSlugMap := map[string][]string{}
  209. for slug, project := range slugProjectMap {
  210. projectSlugMap[project] = append(projectSlugMap[project], slug)
  211. }
  212. for project := range projectSlugMap {
  213. projects = append(projects, project)
  214. }
  215. projectConfigs := map[string]ProjectConfig{}
  216. if len(projects) != 0 {
  217. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  218. if err != nil {
  219. return err
  220. }
  221. if len(projects) != len(cfgi) {
  222. return fmt.Errorf("hmget result had unexpected length")
  223. }
  224. for i, project := range projects {
  225. configString, ok := cfgi[i].(string)
  226. if !ok {
  227. continue
  228. }
  229. config := ProjectConfig{}
  230. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  231. continue
  232. }
  233. projectConfigs[project] = config
  234. }
  235. }
  236. projects = nil
  237. for project := range projectSlugMap {
  238. if _, has := projectConfigs[project]; !has {
  239. delete(projectSlugMap, project)
  240. continue
  241. }
  242. projects = append(projects, project)
  243. }
  244. for slug, project := range slugProjectMap {
  245. if _, has := projectConfigs[project]; !has {
  246. delete(slugProjectMap, slug)
  247. }
  248. }
  249. // add feeds for new projects
  250. for _, project := range projects {
  251. projectConfig := projectConfigs[project]
  252. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  253. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  254. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  255. outdatedProjectBackfeedManager = projectBackfeedManager
  256. } else {
  257. continue
  258. }
  259. }
  260. ctx, cancel := context.WithCancel(that.Context)
  261. projectBackfeedManager := &ProjectBackfeedManager{
  262. Context: ctx,
  263. Cancel: cancel,
  264. Done: make(chan bool),
  265. C: make(chan *BackfeedItem, ItemChannelBuffer),
  266. BackfeedRedis: that.BackfeedRedis,
  267. Name: project,
  268. ProjectConfig: projectConfig,
  269. }
  270. if projectConfig.RedisConfig != nil {
  271. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  272. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  273. Username: "default",
  274. Password: projectConfig.RedisConfig.Pass,
  275. ReadTimeout: 15 * time.Minute,
  276. })
  277. } else {
  278. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  279. }
  280. go projectBackfeedManager.Do()
  281. that.Lock.Lock()
  282. that.ActiveFeeds[project] = projectBackfeedManager
  283. that.Lock.Unlock()
  284. if outdatedProjectBackfeedManager != nil {
  285. outdatedProjectBackfeedManager.Cancel()
  286. <-outdatedProjectBackfeedManager.Done
  287. log.Printf("updated project: %s", project)
  288. } else {
  289. log.Printf("added project: %s", project)
  290. }
  291. }
  292. that.Lock.Lock()
  293. that.ActiveSlugs = slugProjectMap
  294. that.Lock.Unlock()
  295. // remove feeds for old projects
  296. for project, projectBackfeedManager := range that.ActiveFeeds {
  297. if _, has := projectSlugMap[project]; has {
  298. continue
  299. }
  300. log.Printf("removing project: %s", project)
  301. that.Lock.Lock()
  302. delete(that.ActiveFeeds, project)
  303. that.Lock.Unlock()
  304. projectBackfeedManager.Cancel()
  305. <-projectBackfeedManager.Done
  306. log.Printf("removed project: %s", project)
  307. }
  308. if !that.Populated.IsSet() {
  309. that.Populated.Set()
  310. }
  311. return nil
  312. }
  313. type Splitter struct {
  314. Delimiter []byte
  315. IgnoreEOF bool
  316. }
  317. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  318. for i := 0; i < len(data); i++ {
  319. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  320. return i + len(that.Delimiter), data[:i], nil
  321. }
  322. }
  323. if len(data) == 0 || !atEOF {
  324. return 0, nil, nil
  325. }
  326. if atEOF && that.IgnoreEOF {
  327. return len(data), data, nil
  328. }
  329. return 0, data, io.ErrUnexpectedEOF
  330. }
  331. func GenShardHash(b []byte) (final byte) {
  332. for i, b := range b {
  333. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  334. }
  335. return final
  336. }
  337. func WriteResponse(res http.ResponseWriter, statusCode int, v any) {
  338. res.Header().Set("Content-Type", "application/json")
  339. res.WriteHeader(statusCode)
  340. if statusCode == http.StatusNoContent {
  341. return
  342. }
  343. if err, isError := v.(error); isError {
  344. v = map[string]any{
  345. "error": fmt.Sprintf("%v", err),
  346. "status_code": statusCode,
  347. }
  348. } else {
  349. v = map[string]any{
  350. "data": v,
  351. "status_code": statusCode,
  352. }
  353. }
  354. json.NewEncoder(res).Encode(v)
  355. }
  356. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  357. that.Lock.RLock()
  358. defer that.Lock.RUnlock()
  359. project, has := that.ActiveSlugs[slug]
  360. if !has {
  361. return nil
  362. }
  363. projectBackfeedManager, has := that.ActiveFeeds[project]
  364. if !has {
  365. return nil
  366. }
  367. return projectBackfeedManager
  368. }
  369. type LastAccessStatsKey struct {
  370. Project string
  371. Shard string
  372. SubShard string
  373. }
  374. type LastAccessStatsMap map[LastAccessStatsKey]time.Time
  375. func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) {
  376. mapped := map[string]string{}
  377. for key, value := range that {
  378. mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = value.Format(time.RFC3339)
  379. }
  380. return json.Marshal(mapped)
  381. }
  382. func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) {
  383. parts := strings.SplitN(s, ":", 3)
  384. if len(parts) != 3 {
  385. return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s)
  386. }
  387. return LastAccessStatsKey{
  388. Project: parts[0],
  389. Shard: parts[1],
  390. SubShard: parts[2],
  391. }, nil
  392. }
  393. func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) {
  394. defer req.Body.Close()
  395. merge := map[string]bool{}
  396. if vv, ok := req.URL.Query()["merge"]; ok {
  397. for _, v := range vv {
  398. merge[v] = true
  399. }
  400. }
  401. lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result()
  402. if err != nil {
  403. WriteResponse(res, http.StatusInternalServerError, err)
  404. return
  405. }
  406. lastAccessStats := LastAccessStatsMap{}
  407. for key, value := range lastTs {
  408. // value is in unix timestamp format
  409. ts, err := strconv.ParseInt(value, 10, 64)
  410. if err != nil {
  411. WriteResponse(res, http.StatusInternalServerError, err)
  412. return
  413. }
  414. lastAccessStatsKey, err := LastAccessStatsKeyFromString(key)
  415. if err != nil {
  416. WriteResponse(res, http.StatusInternalServerError, err)
  417. return
  418. }
  419. if merge["project"] {
  420. lastAccessStatsKey.Project = "*"
  421. }
  422. if merge["shard"] {
  423. lastAccessStatsKey.Shard = "*"
  424. }
  425. if merge["sub_shard"] {
  426. lastAccessStatsKey.SubShard = "*"
  427. }
  428. parsedTs := time.Unix(ts, 0)
  429. if v, has := lastAccessStats[lastAccessStatsKey]; !has || v.Before(parsedTs) {
  430. lastAccessStats[lastAccessStatsKey] = parsedTs
  431. }
  432. }
  433. WriteResponse(res, http.StatusOK, lastAccessStats)
  434. }
  435. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  436. defer req.Body.Close()
  437. vars := mux.Vars(req)
  438. slug := vars["slug"]
  439. secondaryShard := req.URL.Query().Get("shard")
  440. projectBackfeedManager := that.GetFeed(slug)
  441. if projectBackfeedManager == nil {
  442. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  443. return
  444. }
  445. splitter := &Splitter{
  446. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  447. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  448. }
  449. if len(splitter.Delimiter) == 0 {
  450. splitter.Delimiter = []byte{0x00}
  451. }
  452. var body io.ReadCloser
  453. switch req.Header.Get("Content-Encoding") {
  454. case "":
  455. body = req.Body
  456. case "gzip":
  457. var err error
  458. body, err = gzip.NewReader(req.Body)
  459. if err != nil {
  460. WriteResponse(res, http.StatusBadRequest, err)
  461. return
  462. }
  463. defer body.Close()
  464. case "deflate":
  465. body = flate.NewReader(req.Body)
  466. defer body.Close()
  467. default:
  468. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding")))
  469. }
  470. scanner := bufio.NewScanner(body)
  471. scanner.Split(splitter.Split)
  472. statusCode := http.StatusNoContent
  473. n := 0
  474. for scanner.Scan() {
  475. b := scanner.Bytes()
  476. if len(b) == 0 {
  477. continue
  478. }
  479. bcopy := make([]byte, len(b))
  480. copy(bcopy, b)
  481. item := &BackfeedItem{
  482. PrimaryShard: GenShardHash(bcopy),
  483. SecondaryShard: secondaryShard,
  484. Item: bcopy,
  485. }
  486. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  487. WriteResponse(res, http.StatusInternalServerError, err)
  488. return
  489. }
  490. n++
  491. }
  492. if err := scanner.Err(); err != nil {
  493. WriteResponse(res, statusCode, err)
  494. return
  495. }
  496. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  497. return
  498. }
  499. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  500. if that.Populated.IsNotSet() {
  501. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  502. return
  503. }
  504. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  505. client.ClientGetName(ctx)
  506. return client.Ping(ctx).Err()
  507. }); err != nil {
  508. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  509. return
  510. }
  511. WriteResponse(res, http.StatusOK, "ok")
  512. }
  513. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  514. WriteResponse(res, http.StatusOK, "pong")
  515. }
  516. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  517. that.Populated.UnSet()
  518. that.Cancel()
  519. for project, projectBackfeedManager := range that.ActiveFeeds {
  520. log.Printf("waiting for %s channel to shut down...", project)
  521. <-projectBackfeedManager.Done
  522. delete(that.ActiveFeeds, project)
  523. }
  524. }
  525. func main() {
  526. log.SetFlags(log.Flags() | log.Lshortfile)
  527. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  528. if err != nil {
  529. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  530. }
  531. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  532. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  533. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  534. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  535. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  536. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  537. ReadTimeout: 15 * time.Minute,
  538. PoolSize: 256,
  539. })
  540. backfeedRedisMetricsHook := redisprom.NewHook(
  541. redisprom.WithInstanceName("backfeed"),
  542. )
  543. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  544. trackerRedisMetricsHook := redisprom.NewHook(
  545. redisprom.WithInstanceName("tracker"),
  546. )
  547. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  548. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  549. log.Panicf("unable to ping tracker redis: %s", err)
  550. }
  551. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  552. log.Panicf("unable to ping backfeed redis: %s", err)
  553. }
  554. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  555. client.ClientGetName(ctx)
  556. return client.Ping(ctx).Err()
  557. })
  558. globalBackfeedManager := &GlobalBackfeedManager{
  559. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  560. ActiveSlugs: map[string]string{},
  561. TrackerRedis: trackerRedisClient,
  562. BackfeedRedis: backfeedRedisClient,
  563. Populated: abool.New(),
  564. }
  565. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  566. defer globalBackfeedManager.CancelAllFeeds()
  567. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  568. log.Panicf("unable to set up backfeed projects: %s", err)
  569. }
  570. r := mux.NewRouter()
  571. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  572. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  573. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  574. r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats)
  575. rMetrics := mux.NewRouter()
  576. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  577. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  578. doneChan := make(chan bool)
  579. serveErrChan := make(chan error)
  580. go func() {
  581. s := &http.Server{
  582. Addr: os.Getenv("HTTP_ADDR"),
  583. IdleTimeout: 1 * time.Hour,
  584. MaxHeaderBytes: 1 * 1024 * 1024,
  585. Handler: r,
  586. }
  587. serveErrChan <- s.ListenAndServe()
  588. }()
  589. metricsErrChan := make(chan error)
  590. go func() {
  591. if os.Getenv("METRICS_ADDR") != "" {
  592. s := &http.Server{
  593. Addr: os.Getenv("METRICS_ADDR"),
  594. IdleTimeout: 1 * time.Hour,
  595. MaxHeaderBytes: 1 * 1024 * 1024,
  596. Handler: rMetrics,
  597. }
  598. metricsErrChan <- s.ListenAndServe()
  599. } else {
  600. <-doneChan
  601. metricsErrChan <- nil
  602. }
  603. }()
  604. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  605. if os.Getenv("METRICS_ADDR") != "" {
  606. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  607. }
  608. sc := make(chan os.Signal, 1)
  609. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  610. ticker := time.NewTicker(1 * time.Second)
  611. for {
  612. select {
  613. case <-sc:
  614. return
  615. case <-ticker.C:
  616. }
  617. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  618. log.Printf("unable to refresh backfeed projects: %s", err)
  619. }
  620. }
  621. }