You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

839 line
22 KiB

  1. package main
  2. import (
  3. "archive/tar"
  4. "bufio"
  5. "bytes"
  6. "compress/flate"
  7. "compress/gzip"
  8. "context"
  9. "encoding/json"
  10. "fmt"
  11. "hash/crc64"
  12. "io"
  13. "log"
  14. "net/http"
  15. _ "net/http/pprof"
  16. "os"
  17. "os/signal"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "syscall"
  23. "time"
  24. redisprom "github.com/globocom/go-redis-prometheus"
  25. "github.com/go-redis/redis/v8"
  26. "github.com/gorilla/mux"
  27. "github.com/prometheus/client_golang/prometheus/promhttp"
  28. "github.com/tevino/abool/v2"
  29. )
  30. const (
  31. ItemChannelBuffer = 100000
  32. ItemWrapSize = 100000
  33. )
  34. type ProjectRedisConfig struct {
  35. Host string `json:"host"`
  36. Pass string `json:"pass"`
  37. Port int `json:"port"`
  38. }
  39. type ProjectConfig struct {
  40. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  41. }
  42. type BackfeedItem struct {
  43. PrimaryShard byte
  44. SecondaryShard string
  45. Item []byte
  46. }
  47. type ProjectBackfeedManager struct {
  48. Context context.Context
  49. Cancel context.CancelFunc
  50. Done chan bool
  51. C chan *BackfeedItem
  52. Name string
  53. BackfeedRedis *redis.ClusterClient
  54. ProjectRedis *redis.Client
  55. //Lock sync.RWMutex
  56. ProjectConfig ProjectConfig
  57. }
  58. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  59. if that.ProjectConfig.RedisConfig == nil && new == nil {
  60. return false
  61. }
  62. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  63. }
  64. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  65. //that.Lock.RLock()
  66. //defer that.Lock.RUnlock()
  67. //if that.C == nil {
  68. // return false
  69. //}
  70. select {
  71. case <-ctx.Done():
  72. return ctx.Err()
  73. case <-that.Context.Done():
  74. return fmt.Errorf("backfeed channel closed")
  75. case that.C <- item:
  76. return nil
  77. //default:
  78. // return fmt.Errorf("backfeed channel full")
  79. }
  80. }
  81. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  82. if blocking {
  83. select {
  84. case <-that.Context.Done():
  85. return nil, false
  86. case item, ok := <-that.C:
  87. return item, ok
  88. }
  89. } else {
  90. select {
  91. case <-that.Context.Done():
  92. return nil, false
  93. case item, ok := <-that.C:
  94. return item, ok
  95. default:
  96. return nil, false
  97. }
  98. }
  99. }
  100. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  101. // log.Printf("closing item channel for %s", that.Name)
  102. // that.Lock.Lock()
  103. // defer that.Lock.Unlock()
  104. // if that.C == nil {
  105. // return
  106. // }
  107. // close(that.C)
  108. // that.C = nil
  109. //}
  110. func (that *ProjectBackfeedManager) Do() {
  111. defer close(that.Done)
  112. //defer that.CloseItemChannel()
  113. defer that.Cancel()
  114. for {
  115. select {
  116. case <-that.Context.Done():
  117. break
  118. case <-that.Done:
  119. break
  120. default:
  121. }
  122. item, ok := that.PopItem(true)
  123. if !ok {
  124. break
  125. }
  126. keyMap := map[string][][]byte{}
  127. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  128. keyMap[key] = append(keyMap[key], item.Item)
  129. wrapped := 1
  130. for wrapped < ItemWrapSize {
  131. item, ok := that.PopItem(false)
  132. if !ok {
  133. break
  134. }
  135. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  136. keyMap[key] = append(keyMap[key], item.Item)
  137. wrapped++
  138. }
  139. select {
  140. case <-that.Context.Done():
  141. break
  142. case <-that.Done:
  143. break
  144. default:
  145. }
  146. now := time.Now()
  147. resultMap := map[string]*redis.Cmd{}
  148. pipe := that.BackfeedRedis.Pipeline()
  149. lastTS := make([]any, 0, len(keyMap)*2)
  150. for key := range keyMap {
  151. lastTS = append(lastTS, key)
  152. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  153. }
  154. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  155. for key, items := range keyMap {
  156. args := []any{
  157. "bf.madd",
  158. key,
  159. }
  160. for _, item := range items {
  161. args = append(args, item)
  162. }
  163. resultMap[key] = pipe.Do(context.Background(), args...)
  164. }
  165. if _, err := pipe.Exec(context.Background()); err != nil {
  166. log.Printf("%s", err)
  167. }
  168. var sAddItems []any
  169. for key, items := range keyMap {
  170. res, err := resultMap[key].BoolSlice()
  171. if err != nil {
  172. log.Printf("%s", err)
  173. continue
  174. }
  175. if len(res) != len(keyMap[key]) {
  176. continue
  177. }
  178. for i, v := range res {
  179. if v {
  180. sAddItems = append(sAddItems, items[i])
  181. }
  182. }
  183. }
  184. dupes := wrapped - len(sAddItems)
  185. if len(sAddItems) != 0 {
  186. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  187. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  188. }
  189. }
  190. if dupes > 0 {
  191. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  192. }
  193. }
  194. }
  195. type GlobalBackfeedManager struct {
  196. Context context.Context
  197. Cancel context.CancelFunc
  198. ActiveFeeds map[string]*ProjectBackfeedManager
  199. ActiveSlugs map[string]string
  200. TrackerRedis *redis.Client
  201. BackfeedRedis *redis.ClusterClient
  202. Lock sync.RWMutex
  203. Populated *abool.AtomicBool
  204. }
  205. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  206. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  207. if err != nil {
  208. return err
  209. }
  210. var projects []string
  211. projectSlugMap := map[string][]string{}
  212. for slug, project := range slugProjectMap {
  213. projectSlugMap[project] = append(projectSlugMap[project], slug)
  214. }
  215. for project := range projectSlugMap {
  216. projects = append(projects, project)
  217. }
  218. projectConfigs := map[string]ProjectConfig{}
  219. if len(projects) != 0 {
  220. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  221. if err != nil {
  222. return err
  223. }
  224. if len(projects) != len(cfgi) {
  225. return fmt.Errorf("hmget result had unexpected length")
  226. }
  227. for i, project := range projects {
  228. configString, ok := cfgi[i].(string)
  229. if !ok {
  230. continue
  231. }
  232. config := ProjectConfig{}
  233. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  234. continue
  235. }
  236. projectConfigs[project] = config
  237. }
  238. }
  239. projects = nil
  240. for project := range projectSlugMap {
  241. if _, has := projectConfigs[project]; !has {
  242. delete(projectSlugMap, project)
  243. continue
  244. }
  245. projects = append(projects, project)
  246. }
  247. for slug, project := range slugProjectMap {
  248. if _, has := projectConfigs[project]; !has {
  249. delete(slugProjectMap, slug)
  250. }
  251. }
  252. // add feeds for new projects
  253. for _, project := range projects {
  254. projectConfig := projectConfigs[project]
  255. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  256. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  257. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  258. outdatedProjectBackfeedManager = projectBackfeedManager
  259. } else {
  260. continue
  261. }
  262. }
  263. ctx, cancel := context.WithCancel(that.Context)
  264. projectBackfeedManager := &ProjectBackfeedManager{
  265. Context: ctx,
  266. Cancel: cancel,
  267. Done: make(chan bool),
  268. C: make(chan *BackfeedItem, ItemChannelBuffer),
  269. BackfeedRedis: that.BackfeedRedis,
  270. Name: project,
  271. ProjectConfig: projectConfig,
  272. }
  273. if projectConfig.RedisConfig != nil {
  274. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  275. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  276. Username: "default",
  277. Password: projectConfig.RedisConfig.Pass,
  278. ReadTimeout: 15 * time.Minute,
  279. })
  280. } else {
  281. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  282. }
  283. go projectBackfeedManager.Do()
  284. that.Lock.Lock()
  285. that.ActiveFeeds[project] = projectBackfeedManager
  286. that.Lock.Unlock()
  287. if outdatedProjectBackfeedManager != nil {
  288. outdatedProjectBackfeedManager.Cancel()
  289. <-outdatedProjectBackfeedManager.Done
  290. log.Printf("updated project: %s", project)
  291. } else {
  292. log.Printf("added project: %s", project)
  293. }
  294. }
  295. that.Lock.Lock()
  296. that.ActiveSlugs = slugProjectMap
  297. that.Lock.Unlock()
  298. // remove feeds for old projects
  299. for project, projectBackfeedManager := range that.ActiveFeeds {
  300. if _, has := projectSlugMap[project]; has {
  301. continue
  302. }
  303. log.Printf("removing project: %s", project)
  304. that.Lock.Lock()
  305. delete(that.ActiveFeeds, project)
  306. that.Lock.Unlock()
  307. projectBackfeedManager.Cancel()
  308. <-projectBackfeedManager.Done
  309. log.Printf("removed project: %s", project)
  310. }
  311. if !that.Populated.IsSet() {
  312. that.Populated.Set()
  313. }
  314. return nil
  315. }
  316. type Splitter struct {
  317. Delimiter []byte
  318. IgnoreEOF bool
  319. }
  320. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  321. for i := 0; i < len(data); i++ {
  322. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  323. return i + len(that.Delimiter), data[:i], nil
  324. }
  325. }
  326. if len(data) == 0 || !atEOF {
  327. return 0, nil, nil
  328. }
  329. if atEOF && that.IgnoreEOF {
  330. return len(data), data, nil
  331. }
  332. return 0, data, io.ErrUnexpectedEOF
  333. }
  334. func GenShardHash(b []byte) (final byte) {
  335. for i, b := range b {
  336. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  337. }
  338. return final
  339. }
  340. func WriteResponse(res http.ResponseWriter, statusCode int, v any) {
  341. res.Header().Set("Content-Type", "application/json")
  342. res.WriteHeader(statusCode)
  343. if statusCode == http.StatusNoContent {
  344. return
  345. }
  346. if err, isError := v.(error); isError {
  347. v = map[string]any{
  348. "error": fmt.Sprintf("%v", err),
  349. "status_code": statusCode,
  350. }
  351. } else {
  352. v = map[string]any{
  353. "data": v,
  354. "status_code": statusCode,
  355. }
  356. }
  357. json.NewEncoder(res).Encode(v)
  358. }
  359. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  360. that.Lock.RLock()
  361. defer that.Lock.RUnlock()
  362. project, has := that.ActiveSlugs[slug]
  363. if !has {
  364. return nil
  365. }
  366. projectBackfeedManager, has := that.ActiveFeeds[project]
  367. if !has {
  368. return nil
  369. }
  370. return projectBackfeedManager
  371. }
  372. type LastAccessStatsKey struct {
  373. Project string
  374. Shard string
  375. SubShard string
  376. }
  377. type LastAccessStatsEntry struct {
  378. First time.Time `json:"first"`
  379. Last time.Time `json:"last"`
  380. Size int64 `json:"size"`
  381. }
  382. type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry
  383. func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) {
  384. mapped := map[string]map[string]any{}
  385. for key, value := range that {
  386. mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{
  387. "first": value.First.Format(time.RFC3339),
  388. "last": value.Last.Format(time.RFC3339),
  389. "size": value.Size,
  390. }
  391. }
  392. return json.Marshal(mapped)
  393. }
  394. func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) {
  395. parts := strings.SplitN(s, ":", 3)
  396. if len(parts) != 3 {
  397. return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s)
  398. }
  399. return LastAccessStatsKey{
  400. Project: parts[0],
  401. Shard: parts[1],
  402. SubShard: parts[2],
  403. }, nil
  404. }
  405. func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) {
  406. defer req.Body.Close()
  407. merge := map[string]bool{}
  408. if vv, ok := req.URL.Query()["merge"]; ok {
  409. for _, v := range vv {
  410. merge[v] = true
  411. }
  412. }
  413. lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result()
  414. if err != nil && err != redis.Nil {
  415. WriteResponse(res, http.StatusInternalServerError, err)
  416. return
  417. }
  418. memoryUsages := map[string]*redis.IntCmd{}
  419. pipe := that.BackfeedRedis.Pipeline()
  420. for key := range lastTs {
  421. memoryUsages[key] = pipe.MemoryUsage(req.Context(), key)
  422. }
  423. _, err = pipe.Exec(req.Context())
  424. if err != nil && err != redis.Nil {
  425. WriteResponse(res, http.StatusInternalServerError, err)
  426. return
  427. }
  428. lastAccessStats := LastAccessStatsMap{}
  429. for key, value := range lastTs {
  430. // value is in unix timestamp format
  431. ts, err := strconv.ParseInt(value, 10, 64)
  432. if err != nil {
  433. WriteResponse(res, http.StatusInternalServerError, err)
  434. return
  435. }
  436. lastAccessStatsKey, err := LastAccessStatsKeyFromString(key)
  437. if err != nil {
  438. WriteResponse(res, http.StatusInternalServerError, err)
  439. return
  440. }
  441. if merge["project"] {
  442. lastAccessStatsKey.Project = "*"
  443. }
  444. if merge["shard"] {
  445. lastAccessStatsKey.Shard = "*"
  446. }
  447. if merge["sub_shard"] {
  448. lastAccessStatsKey.SubShard = "*"
  449. }
  450. parsedTs := time.Unix(ts, 0)
  451. if v, has := lastAccessStats[lastAccessStatsKey]; !has {
  452. lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{
  453. First: parsedTs,
  454. Last: parsedTs,
  455. Size: memoryUsages[key].Val(),
  456. }
  457. } else {
  458. if v.First.After(parsedTs) {
  459. v.First = parsedTs
  460. }
  461. if v.Last.Before(parsedTs) {
  462. v.Last = parsedTs
  463. }
  464. v.Size += memoryUsages[key].Val()
  465. }
  466. }
  467. WriteResponse(res, http.StatusOK, lastAccessStats)
  468. }
  469. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  470. defer req.Body.Close()
  471. vars := mux.Vars(req)
  472. slug := vars["slug"]
  473. secondaryShard := req.URL.Query().Get("shard")
  474. projectBackfeedManager := that.GetFeed(slug)
  475. if projectBackfeedManager == nil {
  476. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  477. return
  478. }
  479. splitter := &Splitter{
  480. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  481. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  482. }
  483. if len(splitter.Delimiter) == 0 {
  484. splitter.Delimiter = []byte{0x00}
  485. }
  486. var body io.ReadCloser
  487. switch req.Header.Get("Content-Encoding") {
  488. case "":
  489. body = req.Body
  490. case "gzip":
  491. var err error
  492. body, err = gzip.NewReader(req.Body)
  493. if err != nil {
  494. WriteResponse(res, http.StatusBadRequest, err)
  495. return
  496. }
  497. defer body.Close()
  498. case "deflate":
  499. body = flate.NewReader(req.Body)
  500. defer body.Close()
  501. default:
  502. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding")))
  503. }
  504. scanner := bufio.NewScanner(body)
  505. scanner.Split(splitter.Split)
  506. n := 0
  507. for scanner.Scan() {
  508. b := scanner.Bytes()
  509. if len(b) == 0 {
  510. continue
  511. }
  512. bcopy := make([]byte, len(b))
  513. copy(bcopy, b)
  514. item := &BackfeedItem{
  515. PrimaryShard: GenShardHash(bcopy),
  516. SecondaryShard: secondaryShard,
  517. Item: bcopy,
  518. }
  519. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  520. WriteResponse(res, http.StatusInternalServerError, err)
  521. return
  522. }
  523. n++
  524. }
  525. if err := scanner.Err(); err != nil {
  526. WriteResponse(res, http.StatusBadRequest, err)
  527. return
  528. }
  529. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  530. return
  531. }
  532. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  533. if that.Populated.IsNotSet() {
  534. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  535. return
  536. }
  537. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  538. client.ClientGetName(ctx)
  539. return client.Ping(ctx).Err()
  540. }); err != nil {
  541. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  542. return
  543. }
  544. WriteResponse(res, http.StatusOK, "ok")
  545. }
  546. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  547. WriteResponse(res, http.StatusOK, "pong")
  548. }
  549. type DumpChunkName struct {
  550. Key string `json:"key"`
  551. Distance int `json:"distance"`
  552. Cursor int64 `json:"cursor"`
  553. Checksum uint64 `json:"checksum"`
  554. }
  555. func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {
  556. vars := mux.Vars(req)
  557. key := vars["key"]
  558. if key == "" {
  559. key = "*:*:*"
  560. }
  561. if strings.Count(key, ":") < 2 {
  562. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  563. return
  564. }
  565. lock := sync.Mutex{}
  566. keys := []string{}
  567. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  568. cursor := uint64(0)
  569. var shardKeys []string
  570. for {
  571. var err error
  572. var keysBatch []string
  573. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  574. if err != nil && err != redis.Nil {
  575. return err
  576. }
  577. shardKeys = append(shardKeys, keysBatch...)
  578. if cursor == 0 {
  579. break
  580. }
  581. }
  582. lock.Lock()
  583. defer lock.Unlock()
  584. keys = append(keys, shardKeys...)
  585. return nil
  586. }); err != nil && err != redis.Nil {
  587. WriteResponse(res, http.StatusInternalServerError, err)
  588. return
  589. }
  590. sort.Strings(keys)
  591. hasJsonAcceptHeader := false
  592. for _, accept := range strings.Split(req.Header.Get("Accept"), ",") {
  593. accept = strings.TrimSpace(accept)
  594. if accept == "application/json" || strings.HasPrefix(accept, "application/json;") {
  595. hasJsonAcceptHeader = true
  596. break
  597. }
  598. }
  599. if hasJsonAcceptHeader {
  600. WriteResponse(res, http.StatusOK, keys)
  601. return
  602. }
  603. if len(keys) == 0 {
  604. WriteResponse(res, http.StatusNoContent, nil)
  605. return
  606. }
  607. tarWriter := tar.NewWriter(res)
  608. defer tarWriter.Close()
  609. for _, key := range keys {
  610. cursor := int64(0)
  611. for i := 0; ; i++ {
  612. rawRes, err := that.BackfeedRedis.Do(req.Context(), "bf.scandump", key, cursor).Result()
  613. if err != nil && err != redis.Nil {
  614. WriteResponse(res, http.StatusInternalServerError, err)
  615. return
  616. }
  617. if rawRes == nil {
  618. break
  619. }
  620. resSlice, ok := rawRes.([]any)
  621. if !ok {
  622. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes))
  623. return
  624. }
  625. if len(resSlice) != 2 {
  626. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice)))
  627. return
  628. }
  629. cursor, ok = resSlice[0].(int64)
  630. if !ok {
  631. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0]))
  632. return
  633. }
  634. chunkString, ok := resSlice[1].(string)
  635. if !ok {
  636. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1]))
  637. return
  638. }
  639. chunk := []byte(chunkString)
  640. lastAccess := time.Time{}
  641. tsString, err := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key).Result()
  642. if err != nil && err != redis.Nil && tsString != "" {
  643. ts, err := strconv.ParseInt(tsString, 10, 64)
  644. if err == nil {
  645. lastAccess = time.Unix(0, ts)
  646. }
  647. }
  648. nameStruct := DumpChunkName{
  649. Key: key,
  650. Cursor: cursor,
  651. Distance: i,
  652. Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)),
  653. }
  654. name, err := json.Marshal(nameStruct)
  655. if err != nil {
  656. WriteResponse(res, http.StatusInternalServerError, err)
  657. return
  658. }
  659. if err := tarWriter.WriteHeader(&tar.Header{
  660. Typeflag: tar.TypeReg,
  661. Name: string(name),
  662. Size: int64(len(chunk)),
  663. Mode: 0600,
  664. ModTime: lastAccess,
  665. AccessTime: lastAccess,
  666. ChangeTime: lastAccess,
  667. PAXRecords: map[string]string{
  668. "ARCHIVETEAM.bffchunk.key": key,
  669. "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor),
  670. "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i),
  671. "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum),
  672. },
  673. Format: tar.FormatPAX,
  674. }); err != nil {
  675. WriteResponse(res, http.StatusInternalServerError, err)
  676. }
  677. if _, err := tarWriter.Write(chunk); err != nil {
  678. WriteResponse(res, http.StatusInternalServerError, err)
  679. }
  680. if cursor == 0 && len(chunk) == 0 {
  681. break
  682. }
  683. }
  684. }
  685. }
  686. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  687. that.Populated.UnSet()
  688. that.Cancel()
  689. for project, projectBackfeedManager := range that.ActiveFeeds {
  690. log.Printf("waiting for %s channel to shut down...", project)
  691. <-projectBackfeedManager.Done
  692. delete(that.ActiveFeeds, project)
  693. }
  694. }
  695. func main() {
  696. log.SetFlags(log.Flags() | log.Lshortfile)
  697. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  698. if err != nil {
  699. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  700. }
  701. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  702. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  703. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  704. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  705. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  706. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  707. ReadTimeout: 15 * time.Minute,
  708. PoolSize: 256,
  709. })
  710. backfeedRedisMetricsHook := redisprom.NewHook(
  711. redisprom.WithInstanceName("backfeed"),
  712. )
  713. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  714. trackerRedisMetricsHook := redisprom.NewHook(
  715. redisprom.WithInstanceName("tracker"),
  716. )
  717. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  718. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  719. log.Panicf("unable to ping tracker redis: %s", err)
  720. }
  721. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  722. log.Panicf("unable to ping backfeed redis: %s", err)
  723. }
  724. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  725. client.ClientGetName(ctx)
  726. return client.Ping(ctx).Err()
  727. })
  728. globalBackfeedManager := &GlobalBackfeedManager{
  729. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  730. ActiveSlugs: map[string]string{},
  731. TrackerRedis: trackerRedisClient,
  732. BackfeedRedis: backfeedRedisClient,
  733. Populated: abool.New(),
  734. }
  735. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  736. defer globalBackfeedManager.CancelAllFeeds()
  737. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  738. log.Panicf("unable to set up backfeed projects: %s", err)
  739. }
  740. r := mux.NewRouter()
  741. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  742. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  743. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  744. r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats)
  745. r.Methods(http.MethodGet).Path("/dump/{key}").HandlerFunc(globalBackfeedManager.HandleDump)
  746. rMetrics := mux.NewRouter()
  747. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  748. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  749. doneChan := make(chan bool)
  750. serveErrChan := make(chan error)
  751. go func() {
  752. s := &http.Server{
  753. Addr: os.Getenv("HTTP_ADDR"),
  754. IdleTimeout: 1 * time.Hour,
  755. MaxHeaderBytes: 1 * 1024 * 1024,
  756. Handler: r,
  757. }
  758. serveErrChan <- s.ListenAndServe()
  759. }()
  760. metricsErrChan := make(chan error)
  761. go func() {
  762. if os.Getenv("METRICS_ADDR") != "" {
  763. s := &http.Server{
  764. Addr: os.Getenv("METRICS_ADDR"),
  765. IdleTimeout: 1 * time.Hour,
  766. MaxHeaderBytes: 1 * 1024 * 1024,
  767. Handler: rMetrics,
  768. }
  769. metricsErrChan <- s.ListenAndServe()
  770. } else {
  771. <-doneChan
  772. metricsErrChan <- nil
  773. }
  774. }()
  775. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  776. if os.Getenv("METRICS_ADDR") != "" {
  777. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  778. }
  779. sc := make(chan os.Signal, 1)
  780. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  781. ticker := time.NewTicker(1 * time.Second)
  782. for {
  783. select {
  784. case <-sc:
  785. return
  786. case <-ticker.C:
  787. }
  788. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  789. log.Printf("unable to refresh backfeed projects: %s", err)
  790. }
  791. }
  792. }