No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 

643 líneas
17 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "syscall"
  18. "time"
  19. redisprom "github.com/globocom/go-redis-prometheus"
  20. "github.com/go-redis/redis/v8"
  21. "github.com/gorilla/mux"
  22. "github.com/prometheus/client_golang/prometheus/promhttp"
  23. "github.com/tevino/abool/v2"
  24. )
  25. const (
  26. ItemChannelBuffer = 100000
  27. ItemWrapSize = 100000
  28. )
  29. type ProjectRedisConfig struct {
  30. Host string `json:"host"`
  31. Pass string `json:"pass"`
  32. Port int `json:"port"`
  33. }
  34. type ProjectConfig struct {
  35. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  36. }
  37. type BackfeedItem struct {
  38. PrimaryShard byte
  39. SecondaryShard string
  40. Item []byte
  41. }
  42. type ProjectBackfeedManager struct {
  43. Context context.Context
  44. Cancel context.CancelFunc
  45. Done chan bool
  46. C chan *BackfeedItem
  47. Name string
  48. BackfeedRedis *redis.ClusterClient
  49. ProjectRedis *redis.Client
  50. //Lock sync.RWMutex
  51. ProjectConfig ProjectConfig
  52. }
  53. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  54. if that.ProjectConfig.RedisConfig == nil && new == nil {
  55. return false
  56. }
  57. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  58. }
  59. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  60. //that.Lock.RLock()
  61. //defer that.Lock.RUnlock()
  62. //if that.C == nil {
  63. // return false
  64. //}
  65. select {
  66. case <-ctx.Done():
  67. return ctx.Err()
  68. case <-that.Context.Done():
  69. return fmt.Errorf("backfeed channel closed")
  70. case that.C <- item:
  71. return nil
  72. //default:
  73. // return fmt.Errorf("backfeed channel full")
  74. }
  75. }
  76. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  77. if blocking {
  78. select {
  79. case <-that.Context.Done():
  80. return nil, false
  81. case item, ok := <-that.C:
  82. return item, ok
  83. }
  84. } else {
  85. select {
  86. case <-that.Context.Done():
  87. return nil, false
  88. case item, ok := <-that.C:
  89. return item, ok
  90. default:
  91. return nil, false
  92. }
  93. }
  94. }
  95. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  96. // log.Printf("closing item channel for %s", that.Name)
  97. // that.Lock.Lock()
  98. // defer that.Lock.Unlock()
  99. // if that.C == nil {
  100. // return
  101. // }
  102. // close(that.C)
  103. // that.C = nil
  104. //}
  105. func (that *ProjectBackfeedManager) Do() {
  106. defer close(that.Done)
  107. //defer that.CloseItemChannel()
  108. defer that.Cancel()
  109. for {
  110. select {
  111. case <-that.Context.Done():
  112. break
  113. case <-that.Done:
  114. break
  115. default:
  116. }
  117. item, ok := that.PopItem(true)
  118. if !ok {
  119. break
  120. }
  121. keyMap := map[string][][]byte{}
  122. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  123. keyMap[key] = append(keyMap[key], item.Item)
  124. wrapped := 1
  125. for wrapped < ItemWrapSize {
  126. item, ok := that.PopItem(false)
  127. if !ok {
  128. break
  129. }
  130. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  131. keyMap[key] = append(keyMap[key], item.Item)
  132. wrapped++
  133. }
  134. select {
  135. case <-that.Context.Done():
  136. break
  137. case <-that.Done:
  138. break
  139. default:
  140. }
  141. now := time.Now()
  142. resultMap := map[string]*redis.Cmd{}
  143. pipe := that.BackfeedRedis.Pipeline()
  144. lastTS := make([]any, 0, len(keyMap)*2)
  145. for key := range keyMap {
  146. lastTS = append(lastTS, key)
  147. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  148. }
  149. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  150. for key, items := range keyMap {
  151. args := []any{
  152. "bf.madd",
  153. key,
  154. }
  155. for _, item := range items {
  156. args = append(args, item)
  157. }
  158. resultMap[key] = pipe.Do(context.Background(), args...)
  159. }
  160. if _, err := pipe.Exec(context.Background()); err != nil {
  161. log.Printf("%s", err)
  162. }
  163. var sAddItems []any
  164. for key, items := range keyMap {
  165. res, err := resultMap[key].BoolSlice()
  166. if err != nil {
  167. log.Printf("%s", err)
  168. continue
  169. }
  170. if len(res) != len(keyMap[key]) {
  171. continue
  172. }
  173. for i, v := range res {
  174. if v {
  175. sAddItems = append(sAddItems, items[i])
  176. }
  177. }
  178. }
  179. dupes := wrapped - len(sAddItems)
  180. if len(sAddItems) != 0 {
  181. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  182. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  183. }
  184. }
  185. if dupes > 0 {
  186. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  187. }
  188. }
  189. }
  190. type GlobalBackfeedManager struct {
  191. Context context.Context
  192. Cancel context.CancelFunc
  193. ActiveFeeds map[string]*ProjectBackfeedManager
  194. ActiveSlugs map[string]string
  195. TrackerRedis *redis.Client
  196. BackfeedRedis *redis.ClusterClient
  197. Lock sync.RWMutex
  198. Populated *abool.AtomicBool
  199. }
  200. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  201. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  202. if err != nil {
  203. return err
  204. }
  205. var projects []string
  206. projectSlugMap := map[string][]string{}
  207. for slug, project := range slugProjectMap {
  208. projectSlugMap[project] = append(projectSlugMap[project], slug)
  209. }
  210. for project := range projectSlugMap {
  211. projects = append(projects, project)
  212. }
  213. projectConfigs := map[string]ProjectConfig{}
  214. if len(projects) != 0 {
  215. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  216. if err != nil {
  217. return err
  218. }
  219. if len(projects) != len(cfgi) {
  220. return fmt.Errorf("hmget result had unexpected length")
  221. }
  222. for i, project := range projects {
  223. configString, ok := cfgi[i].(string)
  224. if !ok {
  225. continue
  226. }
  227. config := ProjectConfig{}
  228. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  229. continue
  230. }
  231. projectConfigs[project] = config
  232. }
  233. }
  234. projects = nil
  235. for project := range projectSlugMap {
  236. if _, has := projectConfigs[project]; !has {
  237. delete(projectSlugMap, project)
  238. continue
  239. }
  240. projects = append(projects, project)
  241. }
  242. for slug, project := range slugProjectMap {
  243. if _, has := projectConfigs[project]; !has {
  244. delete(slugProjectMap, slug)
  245. }
  246. }
  247. // add feeds for new projects
  248. for _, project := range projects {
  249. projectConfig := projectConfigs[project]
  250. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  251. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  252. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  253. outdatedProjectBackfeedManager = projectBackfeedManager
  254. } else {
  255. continue
  256. }
  257. }
  258. ctx, cancel := context.WithCancel(that.Context)
  259. projectBackfeedManager := &ProjectBackfeedManager{
  260. Context: ctx,
  261. Cancel: cancel,
  262. Done: make(chan bool),
  263. C: make(chan *BackfeedItem, ItemChannelBuffer),
  264. BackfeedRedis: that.BackfeedRedis,
  265. Name: project,
  266. ProjectConfig: projectConfig,
  267. }
  268. if projectConfig.RedisConfig != nil {
  269. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  270. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  271. Username: "default",
  272. Password: projectConfig.RedisConfig.Pass,
  273. ReadTimeout: 15 * time.Minute,
  274. })
  275. } else {
  276. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  277. }
  278. go projectBackfeedManager.Do()
  279. that.Lock.Lock()
  280. that.ActiveFeeds[project] = projectBackfeedManager
  281. that.Lock.Unlock()
  282. if outdatedProjectBackfeedManager != nil {
  283. outdatedProjectBackfeedManager.Cancel()
  284. <-outdatedProjectBackfeedManager.Done
  285. log.Printf("updated project: %s", project)
  286. } else {
  287. log.Printf("added project: %s", project)
  288. }
  289. }
  290. that.Lock.Lock()
  291. that.ActiveSlugs = slugProjectMap
  292. that.Lock.Unlock()
  293. // remove feeds for old projects
  294. for project, projectBackfeedManager := range that.ActiveFeeds {
  295. if _, has := projectSlugMap[project]; has {
  296. continue
  297. }
  298. log.Printf("removing project: %s", project)
  299. that.Lock.Lock()
  300. delete(that.ActiveFeeds, project)
  301. that.Lock.Unlock()
  302. projectBackfeedManager.Cancel()
  303. <-projectBackfeedManager.Done
  304. log.Printf("removed project: %s", project)
  305. }
  306. if !that.Populated.IsSet() {
  307. that.Populated.Set()
  308. }
  309. return nil
  310. }
  311. type Splitter struct {
  312. Delimiter []byte
  313. IgnoreEOF bool
  314. }
  315. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  316. for i := 0; i < len(data); i++ {
  317. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  318. return i + len(that.Delimiter), data[:i], nil
  319. }
  320. }
  321. if len(data) == 0 || !atEOF {
  322. return 0, nil, nil
  323. }
  324. if atEOF && that.IgnoreEOF {
  325. return len(data), data, nil
  326. }
  327. return 0, data, io.ErrUnexpectedEOF
  328. }
  329. func GenShardHash(b []byte) (final byte) {
  330. for i, b := range b {
  331. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  332. }
  333. return final
  334. }
  335. func WriteResponse(res http.ResponseWriter, statusCode int, v any) {
  336. res.Header().Set("Content-Type", "application/json")
  337. res.WriteHeader(statusCode)
  338. if statusCode == http.StatusNoContent {
  339. return
  340. }
  341. if err, isError := v.(error); isError {
  342. v = map[string]any{
  343. "error": fmt.Sprintf("%v", err),
  344. "status_code": statusCode,
  345. }
  346. } else {
  347. v = map[string]any{
  348. "data": v,
  349. "status_code": statusCode,
  350. }
  351. }
  352. json.NewEncoder(res).Encode(v)
  353. }
  354. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  355. that.Lock.RLock()
  356. defer that.Lock.RUnlock()
  357. project, has := that.ActiveSlugs[slug]
  358. if !has {
  359. return nil
  360. }
  361. projectBackfeedManager, has := that.ActiveFeeds[project]
  362. if !has {
  363. return nil
  364. }
  365. return projectBackfeedManager
  366. }
  367. type LastAccessStatsKey struct {
  368. Project string
  369. Shard string
  370. SubShard string
  371. }
  372. type LastAccessStatsMap map[LastAccessStatsKey]time.Time
  373. func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) {
  374. mapped := map[string]string{}
  375. for key, value := range that {
  376. mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = value.Format(time.RFC3339)
  377. }
  378. return json.Marshal(mapped)
  379. }
  380. func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) {
  381. parts := strings.SplitN(s, ":", 3)
  382. if len(parts) != 3 {
  383. return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s)
  384. }
  385. return LastAccessStatsKey{
  386. Project: parts[0],
  387. Shard: parts[1],
  388. SubShard: parts[2],
  389. }, nil
  390. }
  391. func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) {
  392. defer req.Body.Close()
  393. merge := map[string]bool{}
  394. if vv, ok := req.URL.Query()["merge"]; ok {
  395. for _, v := range vv {
  396. merge[v] = true
  397. }
  398. }
  399. lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result()
  400. if err != nil {
  401. WriteResponse(res, http.StatusInternalServerError, err)
  402. return
  403. }
  404. lastAccessStats := LastAccessStatsMap{}
  405. for key, value := range lastTs {
  406. // value is in unix timestamp format
  407. ts, err := strconv.ParseInt(value, 10, 64)
  408. if err != nil {
  409. WriteResponse(res, http.StatusInternalServerError, err)
  410. return
  411. }
  412. lastAccessStatsKey, err := LastAccessStatsKeyFromString(key)
  413. if err != nil {
  414. WriteResponse(res, http.StatusInternalServerError, err)
  415. return
  416. }
  417. if merge["project"] {
  418. lastAccessStatsKey.Project = "*"
  419. }
  420. if merge["shard"] {
  421. lastAccessStatsKey.Shard = "*"
  422. }
  423. if merge["sub_shard"] {
  424. lastAccessStatsKey.SubShard = "*"
  425. }
  426. parsedTs := time.Unix(ts, 0)
  427. if v, has := lastAccessStats[lastAccessStatsKey]; !has || v.Before(parsedTs) {
  428. lastAccessStats[lastAccessStatsKey] = parsedTs
  429. }
  430. }
  431. WriteResponse(res, http.StatusOK, lastAccessStats)
  432. }
  433. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  434. defer req.Body.Close()
  435. vars := mux.Vars(req)
  436. slug := vars["slug"]
  437. secondaryShard := req.URL.Query().Get("shard")
  438. projectBackfeedManager := that.GetFeed(slug)
  439. if projectBackfeedManager == nil {
  440. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  441. return
  442. }
  443. splitter := &Splitter{
  444. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  445. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  446. }
  447. if len(splitter.Delimiter) == 0 {
  448. splitter.Delimiter = []byte{0x00}
  449. }
  450. scanner := bufio.NewScanner(req.Body)
  451. scanner.Split(splitter.Split)
  452. statusCode := http.StatusNoContent
  453. n := 0
  454. for scanner.Scan() {
  455. b := scanner.Bytes()
  456. if len(b) == 0 {
  457. continue
  458. }
  459. bcopy := make([]byte, len(b))
  460. copy(bcopy, b)
  461. item := &BackfeedItem{
  462. PrimaryShard: GenShardHash(bcopy),
  463. SecondaryShard: secondaryShard,
  464. Item: bcopy,
  465. }
  466. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  467. WriteResponse(res, http.StatusInternalServerError, err)
  468. return
  469. }
  470. n++
  471. }
  472. if err := scanner.Err(); err != nil {
  473. WriteResponse(res, statusCode, err)
  474. return
  475. }
  476. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  477. return
  478. }
  479. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  480. if that.Populated.IsNotSet() {
  481. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  482. return
  483. }
  484. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  485. client.ClientGetName(ctx)
  486. return client.Ping(ctx).Err()
  487. }); err != nil {
  488. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  489. return
  490. }
  491. WriteResponse(res, http.StatusOK, "ok")
  492. }
  493. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  494. WriteResponse(res, http.StatusOK, "pong")
  495. }
  496. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  497. that.Populated.UnSet()
  498. that.Cancel()
  499. for project, projectBackfeedManager := range that.ActiveFeeds {
  500. log.Printf("waiting for %s channel to shut down...", project)
  501. <-projectBackfeedManager.Done
  502. delete(that.ActiveFeeds, project)
  503. }
  504. }
  505. func main() {
  506. log.SetFlags(log.Flags() | log.Lshortfile)
  507. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  508. if err != nil {
  509. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  510. }
  511. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  512. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  513. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  514. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  515. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  516. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  517. ReadTimeout: 15 * time.Minute,
  518. PoolSize: 256,
  519. })
  520. backfeedRedisMetricsHook := redisprom.NewHook(
  521. redisprom.WithInstanceName("backfeed"),
  522. )
  523. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  524. trackerRedisMetricsHook := redisprom.NewHook(
  525. redisprom.WithInstanceName("tracker"),
  526. )
  527. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  528. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  529. log.Panicf("unable to ping tracker redis: %s", err)
  530. }
  531. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  532. log.Panicf("unable to ping backfeed redis: %s", err)
  533. }
  534. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  535. client.ClientGetName(ctx)
  536. return client.Ping(ctx).Err()
  537. })
  538. globalBackfeedManager := &GlobalBackfeedManager{
  539. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  540. ActiveSlugs: map[string]string{},
  541. TrackerRedis: trackerRedisClient,
  542. BackfeedRedis: backfeedRedisClient,
  543. Populated: abool.New(),
  544. }
  545. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  546. defer globalBackfeedManager.CancelAllFeeds()
  547. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  548. log.Panicf("unable to set up backfeed projects: %s", err)
  549. }
  550. r := mux.NewRouter()
  551. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  552. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  553. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  554. r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats)
  555. rMetrics := mux.NewRouter()
  556. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  557. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  558. doneChan := make(chan bool)
  559. serveErrChan := make(chan error)
  560. go func() {
  561. s := &http.Server{
  562. Addr: os.Getenv("HTTP_ADDR"),
  563. IdleTimeout: 1 * time.Hour,
  564. MaxHeaderBytes: 1 * 1024 * 1024,
  565. Handler: r,
  566. }
  567. serveErrChan <- s.ListenAndServe()
  568. }()
  569. metricsErrChan := make(chan error)
  570. go func() {
  571. if os.Getenv("METRICS_ADDR") != "" {
  572. s := &http.Server{
  573. Addr: os.Getenv("METRICS_ADDR"),
  574. IdleTimeout: 1 * time.Hour,
  575. MaxHeaderBytes: 1 * 1024 * 1024,
  576. Handler: rMetrics,
  577. }
  578. metricsErrChan <- s.ListenAndServe()
  579. } else {
  580. <-doneChan
  581. metricsErrChan <- nil
  582. }
  583. }()
  584. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  585. if os.Getenv("METRICS_ADDR") != "" {
  586. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  587. }
  588. sc := make(chan os.Signal, 1)
  589. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  590. ticker := time.NewTicker(1 * time.Second)
  591. for {
  592. select {
  593. case <-sc:
  594. return
  595. case <-ticker.C:
  596. }
  597. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  598. log.Printf("unable to refresh backfeed projects: %s", err)
  599. }
  600. }
  601. }