Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

570 rindas
15 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. redisprom "github.com/globocom/go-redis-prometheus"
  19. "github.com/go-redis/redis/v8"
  20. "github.com/gorilla/mux"
  21. "github.com/prometheus/client_golang/prometheus/promhttp"
  22. "github.com/tevino/abool/v2"
  23. )
  24. const (
  25. ItemChannelBuffer = 100000
  26. ItemWrapSize = 100000
  27. )
  28. type ProjectRedisConfig struct {
  29. Host string `json:"host"`
  30. Pass string `json:"pass"`
  31. Port int `json:"port"`
  32. }
  33. type ProjectConfig struct {
  34. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  35. }
  36. type BackfeedItem struct {
  37. PrimaryShard byte
  38. SecondaryShard string
  39. Item []byte
  40. }
  41. type ProjectBackfeedManager struct {
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. C chan *BackfeedItem
  46. Name string
  47. BackfeedRedis *redis.ClusterClient
  48. ProjectRedis *redis.Client
  49. //Lock sync.RWMutex
  50. ProjectConfig ProjectConfig
  51. }
  52. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  53. if that.ProjectConfig.RedisConfig == nil && new == nil {
  54. return false
  55. }
  56. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  57. }
  58. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  59. //that.Lock.RLock()
  60. //defer that.Lock.RUnlock()
  61. //if that.C == nil {
  62. // return false
  63. //}
  64. select {
  65. case <-ctx.Done():
  66. return ctx.Err()
  67. case <-that.Context.Done():
  68. return fmt.Errorf("backfeed channel closed")
  69. case that.C <- item:
  70. return nil
  71. default:
  72. return fmt.Errorf("backfeed channel full")
  73. }
  74. }
  75. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  76. if blocking {
  77. select {
  78. case <-that.Context.Done():
  79. return nil, false
  80. case item, ok := <-that.C:
  81. return item, ok
  82. }
  83. } else {
  84. select {
  85. case <-that.Context.Done():
  86. return nil, false
  87. case item, ok := <-that.C:
  88. return item, ok
  89. default:
  90. return nil, false
  91. }
  92. }
  93. }
  94. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  95. // log.Printf("closing item channel for %s", that.Name)
  96. // that.Lock.Lock()
  97. // defer that.Lock.Unlock()
  98. // if that.C == nil {
  99. // return
  100. // }
  101. // close(that.C)
  102. // that.C = nil
  103. //}
  104. func (that *ProjectBackfeedManager) Do() {
  105. defer close(that.Done)
  106. //defer that.CloseItemChannel()
  107. defer that.Cancel()
  108. for {
  109. select {
  110. case <-that.Context.Done():
  111. break
  112. case <-that.Done:
  113. break
  114. default:
  115. }
  116. item, ok := that.PopItem(true)
  117. if !ok {
  118. break
  119. }
  120. keyMap := map[string][][]byte{}
  121. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  122. keyMap[key] = append(keyMap[key], item.Item)
  123. wrapped := 1
  124. for wrapped < ItemWrapSize {
  125. item, ok := that.PopItem(false)
  126. if !ok {
  127. break
  128. }
  129. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  130. keyMap[key] = append(keyMap[key], item.Item)
  131. wrapped++
  132. }
  133. select {
  134. case <-that.Context.Done():
  135. break
  136. case <-that.Done:
  137. break
  138. default:
  139. }
  140. now := time.Now()
  141. resultMap := map[string]*redis.Cmd{}
  142. pipe := that.BackfeedRedis.Pipeline()
  143. lastTS := make([]any, 0, len(keyMap)*2)
  144. for key := range keyMap {
  145. lastTS = append(lastTS, key)
  146. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  147. }
  148. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  149. for key, items := range keyMap {
  150. args := []any{
  151. "bf.madd",
  152. key,
  153. }
  154. for _, item := range items {
  155. args = append(args, item)
  156. }
  157. resultMap[key] = pipe.Do(context.Background(), args...)
  158. }
  159. if _, err := pipe.Exec(context.Background()); err != nil {
  160. log.Printf("%s", err)
  161. }
  162. var sAddItems []any
  163. for key, items := range keyMap {
  164. res, err := resultMap[key].BoolSlice()
  165. if err != nil {
  166. log.Printf("%s", err)
  167. continue
  168. }
  169. if len(res) != len(keyMap[key]) {
  170. continue
  171. }
  172. for i, v := range res {
  173. if v {
  174. sAddItems = append(sAddItems, items[i])
  175. }
  176. }
  177. }
  178. dupes := wrapped - len(sAddItems)
  179. if len(sAddItems) != 0 {
  180. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  181. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  182. }
  183. }
  184. if dupes > 0 {
  185. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  186. }
  187. }
  188. }
  189. type GlobalBackfeedManager struct {
  190. Context context.Context
  191. Cancel context.CancelFunc
  192. ActiveFeeds map[string]*ProjectBackfeedManager
  193. ActiveSlugs map[string]string
  194. TrackerRedis *redis.Client
  195. BackfeedRedis *redis.ClusterClient
  196. Lock sync.RWMutex
  197. Populated *abool.AtomicBool
  198. }
  199. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  200. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  201. if err != nil {
  202. return err
  203. }
  204. var projects []string
  205. projectSlugMap := map[string][]string{}
  206. for slug, project := range slugProjectMap {
  207. projectSlugMap[project] = append(projectSlugMap[project], slug)
  208. }
  209. for project := range projectSlugMap {
  210. projects = append(projects, project)
  211. }
  212. projectConfigs := map[string]ProjectConfig{}
  213. if len(projects) != 0 {
  214. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  215. if err != nil {
  216. return err
  217. }
  218. if len(projects) != len(cfgi) {
  219. return fmt.Errorf("hmget result had unexpected length")
  220. }
  221. for i, project := range projects {
  222. configString, ok := cfgi[i].(string)
  223. if !ok {
  224. continue
  225. }
  226. config := ProjectConfig{}
  227. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  228. continue
  229. }
  230. projectConfigs[project] = config
  231. }
  232. }
  233. projects = nil
  234. for project := range projectSlugMap {
  235. if _, has := projectConfigs[project]; !has {
  236. delete(projectSlugMap, project)
  237. continue
  238. }
  239. projects = append(projects, project)
  240. }
  241. for slug, project := range slugProjectMap {
  242. if _, has := projectConfigs[project]; !has {
  243. delete(slugProjectMap, slug)
  244. }
  245. }
  246. // add feeds for new projects
  247. for _, project := range projects {
  248. projectConfig := projectConfigs[project]
  249. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  250. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  251. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  252. outdatedProjectBackfeedManager = projectBackfeedManager
  253. } else {
  254. continue
  255. }
  256. }
  257. ctx, cancel := context.WithCancel(that.Context)
  258. projectBackfeedManager := &ProjectBackfeedManager{
  259. Context: ctx,
  260. Cancel: cancel,
  261. Done: make(chan bool),
  262. C: make(chan *BackfeedItem, ItemChannelBuffer),
  263. BackfeedRedis: that.BackfeedRedis,
  264. Name: project,
  265. ProjectConfig: projectConfig,
  266. }
  267. if projectConfig.RedisConfig != nil {
  268. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  269. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  270. Username: "default",
  271. Password: projectConfig.RedisConfig.Pass,
  272. ReadTimeout: 15 * time.Minute,
  273. })
  274. } else {
  275. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  276. }
  277. go projectBackfeedManager.Do()
  278. that.Lock.Lock()
  279. that.ActiveFeeds[project] = projectBackfeedManager
  280. that.Lock.Unlock()
  281. if outdatedProjectBackfeedManager != nil {
  282. outdatedProjectBackfeedManager.Cancel()
  283. <-outdatedProjectBackfeedManager.Done
  284. log.Printf("updated project: %s", project)
  285. } else {
  286. log.Printf("added project: %s", project)
  287. }
  288. }
  289. that.Lock.Lock()
  290. that.ActiveSlugs = slugProjectMap
  291. that.Lock.Unlock()
  292. // remove feeds for old projects
  293. for project, projectBackfeedManager := range that.ActiveFeeds {
  294. if _, has := projectSlugMap[project]; has {
  295. continue
  296. }
  297. log.Printf("removing project: %s", project)
  298. that.Lock.Lock()
  299. delete(that.ActiveFeeds, project)
  300. that.Lock.Unlock()
  301. projectBackfeedManager.Cancel()
  302. <-projectBackfeedManager.Done
  303. log.Printf("removed project: %s", project)
  304. }
  305. if !that.Populated.IsSet() {
  306. that.Populated.Set()
  307. }
  308. return nil
  309. }
  310. type Splitter struct {
  311. Delimiter []byte
  312. IgnoreEOF bool
  313. }
  314. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  315. for i := 0; i < len(data); i++ {
  316. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  317. return i + len(that.Delimiter), data[:i], nil
  318. }
  319. }
  320. if len(data) == 0 || !atEOF {
  321. return 0, nil, nil
  322. }
  323. if atEOF && that.IgnoreEOF {
  324. return len(data), data, nil
  325. }
  326. return 0, data, io.ErrUnexpectedEOF
  327. }
  328. func GenShardHash(b []byte) (final byte) {
  329. for i, b := range b {
  330. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  331. }
  332. return final
  333. }
  334. func WriteResponse(res http.ResponseWriter, statusCode int, v any) {
  335. res.Header().Set("Content-Type", "application/json")
  336. res.WriteHeader(statusCode)
  337. if statusCode == http.StatusNoContent {
  338. return
  339. }
  340. if err, isError := v.(error); isError {
  341. v = map[string]any{
  342. "error": fmt.Sprintf("%v", err),
  343. "status_code": statusCode,
  344. }
  345. } else {
  346. v = map[string]any{
  347. "data": v,
  348. "status_code": statusCode,
  349. }
  350. }
  351. json.NewEncoder(res).Encode(v)
  352. }
  353. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  354. that.Lock.RLock()
  355. defer that.Lock.RUnlock()
  356. project, has := that.ActiveSlugs[slug]
  357. if !has {
  358. return nil
  359. }
  360. projectBackfeedManager, has := that.ActiveFeeds[project]
  361. if !has {
  362. return nil
  363. }
  364. return projectBackfeedManager
  365. }
  366. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  367. defer req.Body.Close()
  368. vars := mux.Vars(req)
  369. slug := vars["slug"]
  370. secondaryShard := req.URL.Query().Get("shard")
  371. projectBackfeedManager := that.GetFeed(slug)
  372. if projectBackfeedManager == nil {
  373. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  374. return
  375. }
  376. splitter := &Splitter{
  377. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  378. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  379. }
  380. if len(splitter.Delimiter) == 0 {
  381. splitter.Delimiter = []byte{0x00}
  382. }
  383. scanner := bufio.NewScanner(req.Body)
  384. scanner.Split(splitter.Split)
  385. statusCode := http.StatusNoContent
  386. n := 0
  387. for scanner.Scan() {
  388. b := scanner.Bytes()
  389. if len(b) == 0 {
  390. continue
  391. }
  392. bcopy := make([]byte, len(b))
  393. copy(bcopy, b)
  394. item := &BackfeedItem{
  395. PrimaryShard: GenShardHash(bcopy),
  396. SecondaryShard: secondaryShard,
  397. Item: bcopy,
  398. }
  399. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  400. WriteResponse(res, http.StatusServiceUnavailable, err)
  401. return
  402. }
  403. n++
  404. }
  405. if err := scanner.Err(); err != nil {
  406. WriteResponse(res, statusCode, err)
  407. return
  408. }
  409. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  410. return
  411. }
  412. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  413. if that.Populated.IsNotSet() {
  414. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  415. return
  416. }
  417. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  418. client.ClientGetName(ctx)
  419. return client.Ping(ctx).Err()
  420. }); err != nil {
  421. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  422. return
  423. }
  424. WriteResponse(res, http.StatusOK, "ok")
  425. }
  426. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  427. WriteResponse(res, http.StatusOK, "pong")
  428. }
  429. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  430. that.Populated.UnSet()
  431. that.Cancel()
  432. for project, projectBackfeedManager := range that.ActiveFeeds {
  433. log.Printf("waiting for %s channel to shut down...", project)
  434. <-projectBackfeedManager.Done
  435. delete(that.ActiveFeeds, project)
  436. }
  437. }
  438. func main() {
  439. log.SetFlags(log.Flags() | log.Lshortfile)
  440. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  441. if err != nil {
  442. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  443. }
  444. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  445. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  446. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  447. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  448. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  449. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  450. ReadTimeout: 15 * time.Minute,
  451. PoolSize: 256,
  452. })
  453. backfeedRedisMetricsHook := redisprom.NewHook(
  454. redisprom.WithInstanceName("backfeed"),
  455. )
  456. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  457. trackerRedisMetricsHook := redisprom.NewHook(
  458. redisprom.WithInstanceName("tracker"),
  459. )
  460. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  461. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  462. log.Panicf("unable to ping tracker redis: %s", err)
  463. }
  464. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  465. log.Panicf("unable to ping backfeed redis: %s", err)
  466. }
  467. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  468. client.ClientGetName(ctx)
  469. return client.Ping(ctx).Err()
  470. })
  471. globalBackfeedManager := &GlobalBackfeedManager{
  472. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  473. ActiveSlugs: map[string]string{},
  474. TrackerRedis: trackerRedisClient,
  475. BackfeedRedis: backfeedRedisClient,
  476. Populated: abool.New(),
  477. }
  478. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  479. defer globalBackfeedManager.CancelAllFeeds()
  480. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  481. log.Panicf("unable to set up backfeed projects: %s", err)
  482. }
  483. r := mux.NewRouter()
  484. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  485. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  486. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  487. rMetrics := mux.NewRouter()
  488. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  489. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  490. doneChan := make(chan bool)
  491. serveErrChan := make(chan error)
  492. go func() {
  493. s := &http.Server{
  494. Addr: os.Getenv("HTTP_ADDR"),
  495. IdleTimeout: 1 * time.Hour,
  496. MaxHeaderBytes: 1 * 1024 * 1024,
  497. Handler: r,
  498. }
  499. serveErrChan <- s.ListenAndServe()
  500. }()
  501. metricsErrChan := make(chan error)
  502. go func() {
  503. if os.Getenv("METRICS_ADDR") != "" {
  504. s := &http.Server{
  505. Addr: os.Getenv("METRICS_ADDR"),
  506. IdleTimeout: 1 * time.Hour,
  507. MaxHeaderBytes: 1 * 1024 * 1024,
  508. Handler: rMetrics,
  509. }
  510. metricsErrChan <- s.ListenAndServe()
  511. } else {
  512. <-doneChan
  513. metricsErrChan <- nil
  514. }
  515. }()
  516. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  517. if os.Getenv("METRICS_ADDR") != "" {
  518. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  519. }
  520. sc := make(chan os.Signal, 1)
  521. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  522. ticker := time.NewTicker(1 * time.Second)
  523. for {
  524. select {
  525. case <-sc:
  526. return
  527. case <-ticker.C:
  528. }
  529. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  530. log.Printf("unable to refresh backfeed projects: %s", err)
  531. }
  532. }
  533. }