Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

224 righe
9.3 KiB

  1. #!/usr/bin/env python
  2. import base64
  3. import datetime
  4. import json
  5. import logging
  6. import os
  7. import pathlib
  8. import shutil
  9. import time
  10. from typing import Optional
  11. import urllib.parse
  12. import click
  13. import minio
  14. import requests
  15. from progress import Progress
  16. logging.basicConfig(level=logging.INFO)
  17. BACKFEED_DELIM = "\n"
  18. # TODO: Create a function that abstracts away the while True try except loops. With exponential backoff.
  19. # time.sleep(min(2**tries, 64))
  20. # TODO: Add rsync support
  21. # TODO: Add rsync+ssh support
  22. # TODO: Add webdav support.
  23. # TODO: Fix the "ctrl-c handling" logic so it actually cleans up in the s3 bucket.
  24. @click.group()
  25. def sender():
  26. pass
  27. def watch_pass(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  28. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  29. logging.info("Checking for new items...")
  30. for original_directory in input_directory.iterdir():
  31. if original_directory.is_dir():
  32. original_name = original_directory.name
  33. new_directory = work_directory.joinpath(original_name)
  34. try:
  35. original_directory.rename(new_directory)
  36. except FileNotFoundError:
  37. logging.warning(f"Unable to move item {original_directory}")
  38. continue
  39. single_impl(new_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project,
  40. dispatcher, delete, backfeed_key)
  41. return True
  42. return False
  43. @sender.command()
  44. @click.option('--input-directory', envvar='UPLOAD_QUEUE_DIR', default="/data/upload-queue",
  45. type=click.Path(exists=True))
  46. @click.option('--work-directory', envvar='UPLOADER_WORKING_DIR', default="/data/uploader-work",
  47. type=click.Path(exists=True))
  48. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  49. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  50. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  51. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
  52. @click.option('--project', envvar='PROJECT', required=True)
  53. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  54. @click.option('--delete/--no-delete', envvar='DELETE', default=False)
  55. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  56. def watch(input_directory: pathlib.Path, work_directory: pathlib.Path, ia_collection: str, ia_item_title: str,
  57. ia_item_prefix: str, ia_item_date: str, project: str, dispatcher: str, delete: bool, backfeed_key: str):
  58. if not isinstance(input_directory, pathlib.Path):
  59. input_directory = pathlib.Path(input_directory)
  60. if not isinstance(work_directory, pathlib.Path):
  61. work_directory = pathlib.Path(work_directory)
  62. while True:
  63. if not watch_pass(input_directory, work_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date,
  64. project, dispatcher, delete, backfeed_key):
  65. logging.info("No item found, sleeping...")
  66. time.sleep(10)
  67. @sender.command()
  68. @click.option('--item-directory', type=click.Path(exists=True), required=True)
  69. @click.option('--ia-collection', envvar='IA_COLLECTION', required=True)
  70. @click.option('--ia-item-title', envvar='IA_ITEM_TITLE', required=True)
  71. @click.option('--ia-item-prefix', envvar='IA_ITEM_PREFIX', required=True)
  72. @click.option('--ia-item-date', envvar='IA_ITEM_DATE', required=False)
  73. @click.option('--project', envvar='PROJECT', required=True)
  74. @click.option('--dispatcher', envvar='DISPATCHER', required=True)
  75. @click.option('--delete/--no-delete', envvar='DELETE', default=False)
  76. @click.option('--backfeed-key', envvar='BACKFEED_KEY', required=True)
  77. def single(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
  78. ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
  79. single_impl(item_directory, ia_collection, ia_item_title, ia_item_prefix, ia_item_date, project, dispatcher, delete,
  80. backfeed_key)
  81. def single_impl(item_directory: pathlib.Path, ia_collection: str, ia_item_title: str, ia_item_prefix: str,
  82. ia_item_date: Optional[str], project: str, dispatcher: str, delete: bool, backfeed_key: str):
  83. if not isinstance(item_directory, pathlib.Path):
  84. item_directory = pathlib.Path(item_directory)
  85. logging.info(f"Processing item {item_directory}...")
  86. if ia_item_date is None:
  87. s = item_directory.name.split("_")
  88. if len(s) > 0:
  89. ds = s[0]
  90. try:
  91. d = datetime.datetime.strptime(ds, "%Y%m%d%H%M%S")
  92. ia_item_date = d.strftime("%Y-%m")
  93. except ValueError:
  94. pass
  95. meta_json_loc = item_directory.joinpath('__upload_meta.json')
  96. if meta_json_loc.exists():
  97. raise Exception("META JSON EXISTS WTF")
  98. meta_json = {
  99. "IA_COLLECTION": ia_collection,
  100. "IA_ITEM_TITLE": f"{ia_item_title} {item_directory.name}",
  101. "IA_ITEM_DATE": ia_item_date,
  102. "IA_ITEM_NAME": f"{ia_item_prefix}{item_directory.name}",
  103. "PROJECT": project,
  104. }
  105. with open(meta_json_loc, 'w') as f:
  106. f.write(json.dumps(meta_json))
  107. logging.info("Wrote metadata json.")
  108. total_size = 0
  109. files = list(item_directory.glob("**/*"))
  110. for item in files:
  111. total_size = total_size + os.path.getsize(item)
  112. logging.info(f"Item size is {total_size} bytes across {len(files)} files.")
  113. meta_json["SIZE_HINT"] = str(total_size)
  114. while True:
  115. try:
  116. r = requests.get(f"{dispatcher}/offload_target", params=meta_json, timeout=60)
  117. if r.status_code == 200:
  118. data = r.json()
  119. url = data["url"]
  120. break
  121. else:
  122. raise Exception(f"Invalid status code {r.status_code}: {r.text}")
  123. except Exception:
  124. logging.exception("Unable to fetch target")
  125. time.sleep(30)
  126. logging.info(f"Assigned target {url}")
  127. parsed_url = urllib.parse.urlparse(url)
  128. bf_item = None
  129. if parsed_url.scheme == "minio+http" or parsed_url.scheme == "minio+https":
  130. secure = (parsed_url.scheme == "minio+https")
  131. ep = parsed_url.hostname
  132. if parsed_url.port is not None:
  133. ep = f"{ep}:{parsed_url.port}"
  134. client = None
  135. while True:
  136. try:
  137. client = minio.Minio(endpoint=ep, access_key=parsed_url.username, secret_key=parsed_url.password,
  138. secure=secure)
  139. break
  140. except Exception:
  141. logging.exception("Failed to connect to minio")
  142. time.sleep(30)
  143. bucket_name = item_directory.name.replace("_", "-")
  144. logging.info("Making bucket...")
  145. while True:
  146. try:
  147. if client.bucket_exists(bucket_name=bucket_name):
  148. raise Exception("Bucket already exists!")
  149. client.make_bucket(bucket_name=bucket_name)
  150. break
  151. except Exception:
  152. logging.exception("Failed to make bucket")
  153. time.sleep(30)
  154. logging.info("Starting uploads...")
  155. for file in files:
  156. rel_file = file.relative_to(item_directory)
  157. while True:
  158. try:
  159. logging.info(f"Uploading file {rel_file}...")
  160. client.fput_object(bucket_name=bucket_name, object_name=str(rel_file), file_path=file,
  161. progress=Progress())
  162. break
  163. except minio.error.MinioException:
  164. logging.exception("Failed to upload")
  165. time.sleep(30)
  166. except Exception:
  167. logging.exception("Failed to upload")
  168. time.sleep(30)
  169. item_data = {"url": url, "item_name": item_directory.name, "bucket_name": bucket_name}
  170. bf_item_part = base64.urlsafe_b64encode(str(json.dumps(item_data)).encode("UTF-8")).decode("UTF-8")
  171. bf_item = f"{project}:{parsed_url.hostname}:{bf_item_part}"
  172. else:
  173. raise Exception("Unable to upload, don't understand url: {url}")
  174. if bf_item is None:
  175. raise Exception("Unable to create backfeed item")
  176. if backfeed_key == "SKIPBF":
  177. logging.warning(f"Skipping backfeed! Would have submitted: {bf_item}")
  178. else:
  179. while True:
  180. try:
  181. u = f"https://legacy-api.arpa.li/backfeed/legacy/{backfeed_key}"
  182. logging.info(f"Attempting to submit bf item {bf_item} to {u}...")
  183. resp = requests.post(u, params={"skipbloom": "1", "delimiter": BACKFEED_DELIM},
  184. data=f"{bf_item}{BACKFEED_DELIM}".encode("UTF-8"), timeout=60)
  185. if resp.status_code == 200:
  186. break
  187. logging.warning(f"Failed to submit to backfeed {resp.status_code}: {resp.text}")
  188. time.sleep(30)
  189. except Exception:
  190. logging.exception("Failed to submit to backfeed")
  191. time.sleep(30)
  192. logging.info("Backfeed submit complete!")
  193. if delete:
  194. logging.info("Removing item...")
  195. shutil.rmtree(item_directory)
  196. logging.info("Upload complete!")
  197. if __name__ == '__main__':
  198. sender()