You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

226 lines
5.8 KiB

  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "strconv"
  7. "time"
  8. "github.com/aws/aws-sdk-go/aws"
  9. "github.com/aws/aws-sdk-go/aws/awserr"
  10. "github.com/aws/aws-sdk-go/aws/credentials"
  11. "github.com/aws/aws-sdk-go/aws/session"
  12. "github.com/aws/aws-sdk-go/service/s3"
  13. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  14. )
  15. type S3Storage struct {
  16. Storage
  17. bucket string
  18. session *session.Session
  19. s3 *s3.S3
  20. logger *log.Logger
  21. noMultipart bool
  22. }
  23. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  24. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  25. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  26. }
  27. func (s *S3Storage) Type() string {
  28. return "s3"
  29. }
  30. func (s *S3Storage) Head(token string, filename string) (metadata Metadata, err error) {
  31. key := fmt.Sprintf("%s/%s", token, filename)
  32. headRequest := &s3.HeadObjectInput{
  33. Bucket: aws.String(s.bucket),
  34. Key: aws.String(key),
  35. }
  36. // content type , content length
  37. response, err := s.s3.HeadObject(headRequest)
  38. if err != nil {
  39. return Metadata{}, err
  40. }
  41. downloads, err := strconv.Atoi(*response.Metadata["downloads"])
  42. if err != nil {
  43. return Metadata{}, err
  44. }
  45. maxdownloads, err := strconv.Atoi(*response.Metadata["maxDownloads"])
  46. if err != nil {
  47. return Metadata{}, err
  48. }
  49. expires, err := time.Parse("2020-02-02 02:02:02", *response.Expires)
  50. if err != nil {
  51. return Metadata{}, err
  52. }
  53. metadata = Metadata{
  54. ContentType: "",
  55. ContentLength: *response.ContentLength,
  56. Downloads: downloads,
  57. MaxDownloads: maxdownloads,
  58. MaxDate: expires,
  59. DeletionToken: *response.Metadata["deletionToken"],
  60. Secret: *response.Metadata["deletionSecret"],
  61. }
  62. return metadata, nil
  63. }
  64. func (s *S3Storage) Meta(token string, filename string, metadata Metadata) error {
  65. key := fmt.Sprintf("%s/%s", token, filename)
  66. input := &s3.CopyObjectInput{
  67. Bucket: aws.String(s.bucket),
  68. CopySource: aws.String(key),
  69. Key: aws.String(key),
  70. MetadataDirective: aws.String("REPLACE"),
  71. Metadata: map[string]*string{
  72. "downloads": aws.String(strconv.Itoa(metadata.Downloads)),
  73. "maxDownloads": aws.String(strconv.Itoa(metadata.MaxDownloads)),
  74. "deletionToken": aws.String(metadata.DeletionToken),
  75. "deletionSecret": aws.String(metadata.Secret),
  76. },
  77. ContentType: aws.String(metadata.ContentType),
  78. Expires: aws.Time(metadata.MaxDate),
  79. }
  80. _, err := s.s3.CopyObject(input)
  81. if err != nil {
  82. return err
  83. }
  84. return nil
  85. }
  86. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, metadata Metadata, err error) {
  87. key := fmt.Sprintf("%s/%s", token, filename)
  88. getRequest := &s3.GetObjectInput{
  89. Bucket: aws.String(s.bucket),
  90. Key: aws.String(key),
  91. }
  92. response, err := s.s3.GetObject(getRequest)
  93. if err != nil {
  94. return
  95. }
  96. downloads, err := strconv.Atoi(*response.Metadata["downloads"])
  97. if err != nil {
  98. return nil, Metadata{}, err
  99. }
  100. maxdownloads, err := strconv.Atoi(*response.Metadata["maxDownloads"])
  101. if err != nil {
  102. return nil, Metadata{}, err
  103. }
  104. expires, err := time.Parse("2020-02-02 02:02:02", *response.Expires)
  105. if err != nil {
  106. return nil, Metadata{}, err
  107. }
  108. metadata = Metadata{
  109. ContentType: "",
  110. ContentLength: *response.ContentLength,
  111. Downloads: downloads,
  112. MaxDownloads: maxdownloads,
  113. MaxDate: expires,
  114. DeletionToken: *response.Metadata["deletionToken"],
  115. Secret: *response.Metadata["deletionSecret"],
  116. }
  117. reader = response.Body
  118. return
  119. }
  120. func (s *S3Storage) Delete(token string, filename string) (err error) {
  121. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  122. deleteRequest := &s3.DeleteObjectInput{
  123. Bucket: aws.String(s.bucket),
  124. Key: aws.String(metadata),
  125. }
  126. _, err = s.s3.DeleteObject(deleteRequest)
  127. if err != nil {
  128. return
  129. }
  130. key := fmt.Sprintf("%s/%s", token, filename)
  131. deleteRequest = &s3.DeleteObjectInput{
  132. Bucket: aws.String(s.bucket),
  133. Key: aws.String(key),
  134. }
  135. _, err = s.s3.DeleteObject(deleteRequest)
  136. return
  137. }
  138. func (s *S3Storage) Put(token string, filename string, reader io.Reader, metadata Metadata) (err error) {
  139. key := fmt.Sprintf("%s/%s", token, filename)
  140. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  141. var concurrency int
  142. if !s.noMultipart {
  143. concurrency = 20
  144. } else {
  145. concurrency = 1
  146. }
  147. // Create an uploader with the session and custom options
  148. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  149. u.Concurrency = concurrency // default is 5
  150. u.LeavePartsOnError = false
  151. })
  152. _, err = uploader.Upload(&s3manager.UploadInput{
  153. Bucket: aws.String(s.bucket),
  154. Key: aws.String(key),
  155. Body: reader,
  156. Metadata: map[string]*string{
  157. "downloads": aws.String(strconv.Itoa(metadata.Downloads)),
  158. "maxDownloads": aws.String(strconv.Itoa(metadata.MaxDownloads)),
  159. "deletionToken": aws.String(metadata.DeletionToken),
  160. "deletionSecret": aws.String(metadata.Secret),
  161. },
  162. ContentType: aws.String(metadata.ContentType),
  163. Expires: aws.Time(metadata.MaxDate),
  164. })
  165. return
  166. }
  167. func (s *S3Storage) IsNotExist(err error) bool {
  168. if err == nil {
  169. return false
  170. }
  171. if aerr, ok := err.(awserr.Error); ok {
  172. switch aerr.Code() {
  173. case s3.ErrCodeNoSuchKey:
  174. return true
  175. }
  176. }
  177. return false
  178. }
  179. func (s *S3Storage) DeleteExpired() error {
  180. // not necessary, as S3 has expireDate on files to automatically delete the them
  181. return nil
  182. }
  183. func getAwsSession(accessKey, secretKey, region, endpoint string, forcePathStyle bool) *session.Session {
  184. return session.Must(session.NewSession(&aws.Config{
  185. Region: aws.String(region),
  186. Endpoint: aws.String(endpoint),
  187. Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
  188. S3ForcePathStyle: aws.Bool(forcePathStyle),
  189. }))
  190. }