The little things give you away... A collection of various small helper stuff
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

278 lines
8.5 KiB

  1. #!/bin/bash
  2. defaultFormat='{url}'
  3. function usage_exit {
  4. echo 's3-bucket-list-qwarc [options] BUCKETURL [MARKER...]' >&2
  5. echo >&2
  6. echo 'List the contents of an open S3 (or S3-like) bucket, possibly in parallel using multiple markers' >&2
  7. echo >&2
  8. echo 'Options:' >&2
  9. echo ' --concurrency N Start qwarc with a concurrency of N; the default is to run all markers at once' >&2
  10. echo " --format FORMAT Modify the output format; FORMAT defaults to '${defaultFormat}'; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)" >&2
  11. echo ' --jsonl Enable JSONL output; cannot be used with --format' >&2
  12. echo ' --no-start-marker Disable the automatic marker to start from the beginning of the list' >&2
  13. echo ' --no-end-marker Disable the automatic marker to scan to the end of the list' >&2
  14. echo ' --with-list-urls Enables printing the list URLs retrieved to FD 3' >&2
  15. exit $1
  16. }
  17. concurrency=
  18. listUrls=
  19. noStartMarker=
  20. noEndMarker=
  21. format=
  22. jsonl=
  23. cmd="$(printf '%q ' 's3-bucket-list-qwarc' "$@" | sed 's, $,,')"
  24. while [[ $# -gt 0 ]]
  25. do
  26. if [[ "$1" == '--help' || "$1" == '-h' ]]
  27. then
  28. usage_exit 0
  29. elif [[ "$1" == '--concurrency' ]]
  30. then
  31. declare -i concurrency="$2"
  32. shift
  33. elif [[ "$1" == '--format' ]]
  34. then
  35. format="$2"
  36. shift
  37. elif [[ "$1" == '--jsonl' ]]
  38. then
  39. jsonl=1
  40. elif [[ "$1" == '--no-start-marker' ]]
  41. then
  42. noStartMarker=1
  43. elif [[ "$1" == '--no-end-marker' ]]
  44. then
  45. noEndMarker=1
  46. elif [[ "$1" == '--with-list-urls' ]]
  47. then
  48. listUrls='yes'
  49. else
  50. break
  51. fi
  52. shift
  53. done
  54. bucketUrl="$1"
  55. shift
  56. # Remaining arguments are markers
  57. if [[ -z "${concurrency}" ]]
  58. then
  59. declare -i concurrency=$#
  60. if [[ -z "${noStartMarker}" ]]; then concurrency+=1; fi
  61. if [[ -z "${noEndMarker}" ]]; then concurrency+=1; fi
  62. concurrency+=-1 # Because the obvious -= doesn't work...
  63. fi
  64. if [[ "${listUrls}" ]] && ! { command >&3; } 2>/dev/null
  65. then
  66. echo 'Error: --with-list-urls requires FD 3 to be open' >&2
  67. exit 1
  68. fi
  69. if [[ "${jsonl}" && "${format}" ]]
  70. then
  71. echo 'Error: --jsonl and --format options are mutually exclusive' >&2
  72. exit 1
  73. fi
  74. if [[ -z "${format}" ]]
  75. then
  76. format="${defaultFormat}"
  77. fi
  78. # Validate and process bucket URL
  79. if [[ "${bucketUrl}" != http://* && "${bucketUrl}" != https://* ]]
  80. then
  81. echo 'Invalid bucket URL: must be HTTP or HTTPS' >&2
  82. exit 1
  83. fi
  84. if [[ "${bucketUrl}" == *'?'* ]]
  85. then
  86. echo 'Invalid bucket URL: must not have a query' >&2
  87. exit 1
  88. fi
  89. if [[ "${bucketUrl}" != *://*/* || "${bucketUrl}" != */ ]]
  90. then
  91. bucketUrl="${bucketUrl}/"
  92. fi
  93. # Construct prefix for files and output
  94. prefix="${bucketUrl#*://}" # Remove protocol
  95. while [[ "${prefix}" == */ ]]; do prefix="${prefix%/}"; done # Remove trailing slashes
  96. prefix="${prefix//\//_}" # Replace slashes with underscores
  97. # Ensure no collisions
  98. if [[ -e s3-bucket-list-qwarc ]]
  99. then
  100. echo 'Error: s3-bucket-list-qwarc exists in this directory.' >&2
  101. exit 1
  102. fi
  103. if [[ "$(shopt -s nullglob; echo "${prefix}"*)" ]]
  104. then
  105. echo "Error: there already exist files with the prefix '${prefix}' in this directory." >&2
  106. exit 1
  107. fi
  108. # Write the qwarc spec file
  109. # Indentation... Inspired by https://stackoverflow.com/a/33817423
  110. readarray code <<EOF
  111. #!/usr/bin/env python3
  112. import html
  113. import json
  114. import logging
  115. import os
  116. import qwarc
  117. import qwarc.utils
  118. import shlex
  119. import urllib.parse
  120. import yarl
  121. format = os.environ['S3_FORMAT']
  122. jsonl = os.environ['S3_JSONL'] == '1'
  123. bucketUrl = yarl.URL(os.environ['S3_BUCKET_URL'])
  124. withListUrls = bool(os.environ.get('S3_WITH_LIST_URLS')) # True if present and non-empty
  125. markersFilename = os.environ['S3_MARKERS_FILENAME']
  126. if withListUrls:
  127. try:
  128. listUrlsFD = os.fdopen(3, 'w')
  129. except OSError:
  130. logging.critical('FD 3 is not open')
  131. raise
  132. class S3ListBucket(qwarc.Item):
  133. itemType = 's3listbucket'
  134. # itemValue = ('marker1', 'marker2') encoded as JSON
  135. @classmethod
  136. def generate(cls):
  137. yield from map(lambda x: json.dumps(x, separators = (',', ':')), cls._generate())
  138. @classmethod
  139. def _generate(cls):
  140. with open(markersFilename, 'r') as fp:
  141. it = iter(fp)
  142. lastLine = next(it).strip() or None
  143. for line in it:
  144. line = line.strip() or None
  145. yield (lastLine, line)
  146. lastLine = line
  147. async def process(self):
  148. marker1, marker2 = json.loads(self.itemValue)
  149. marker = marker1
  150. while True:
  151. url = bucketUrl.with_query({'marker': marker} if marker is not None else {})
  152. if withListUrls:
  153. self.logger.info(f'List URL: {str(url)!r}')
  154. print(f'{url}', file = listUrlsFD)
  155. response = await self.fetch(url)
  156. if response.status != 200:
  157. self.logger.error(f'Could not fetch page on marker {marker!r}')
  158. break
  159. body = await response.read()
  160. # Isn't this a 503?
  161. if b'<Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message>' in body:
  162. self.logger.error(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}')
  163. if attempt >= 10:
  164. if 'marker' in params:
  165. self.logger.error(f'To retry, use marker {marker!r}')
  166. break
  167. attempt += 1
  168. continue
  169. if not body.startswith(b'<?xml version="1.0" encoding="UTF-8"?>\n<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">') and \
  170. not body.startswith(b"<?xml version='1.0' encoding='UTF-8'?><ListBucketResult xmlns='http://doc.s3.amazonaws.com/2006-03-01'>"):
  171. self.logger.error(f'Invalid body on marker {marker!r}: {body[:200]}...')
  172. break
  173. if b'<Marker></Marker>' in body[:200] and marker is not None:
  174. self.logger.error('Marker loop (empty marker in response despite providing one)')
  175. break
  176. # No risk, no fun!
  177. contents = body.split(b'<Contents>')
  178. assert all(content.startswith(b'<Key>') for content in contents[1:])
  179. assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
  180. assert contents[-1].endswith(b'</Contents></ListBucketResult>')
  181. contents[-1] = contents[-1][:-len('</ListBucketResult>')]
  182. for content in contents[1:]:
  183. key = html.unescape(content[5 : content.index(b'</Key>')].decode('utf-8')) # 5 = len(b'<Key>')
  184. if marker2 is not None and key >= marker2:
  185. break
  186. fields = {}
  187. url = f'{bucketUrl}{urllib.parse.quote(key)}'
  188. fields['URL'] = url
  189. tags = content.split(b'>')
  190. assert len(tags) % 2 == 0
  191. assert tags[-1] == b''
  192. assert tags[-2] == b'</Contents'
  193. openTags = [] # Current open tag hierarchy
  194. for tag in tags[:-2]:
  195. if tag.startswith(b'<'):
  196. openTags.append(tag[1:])
  197. continue
  198. assert openTags
  199. if tag.endswith(b'</' + openTags[-1]):
  200. k = b'>'.join(openTags).decode('utf-8')
  201. assert k not in fields, f'{k!r} encountered twice (previous value: {fields[k]!r})'
  202. fields[k] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
  203. openTags.pop()
  204. continue
  205. assert False
  206. if 'Size' in fields:
  207. fields['Size'] = int(fields['Size'])
  208. if jsonl:
  209. s = json.dumps(fields)
  210. else:
  211. s = format.format(**fields, key = key, url = url, size = fields.get('Size'))
  212. self.logger.info(f'Output: {s!r}')
  213. print(s)
  214. lastKey = key
  215. if marker2 is not None and lastKey >= marker2:
  216. break
  217. truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
  218. assert truncated in (True, False)
  219. if not truncated:
  220. break
  221. if marker is not None and marker == lastKey:
  222. self.logger.error('Marker loop (same last key as previous marker)')
  223. break
  224. marker = lastKey
  225. attempt = 1
  226. specDependencies = qwarc.utils.SpecDependencies(
  227. files = ['s3-bucket-list-qwarc', os.environ['S3_MARKERS_FILENAME']],
  228. extra = {'env': {k: os.environ.get(k) for k in ('S3BL_CMD', 'S3_FORMAT', 'S3_BUCKET_URL', 'S3_MARKERS_FILENAME', 'S3_WITH_LIST_URLS')}}
  229. )
  230. EOF
  231. printf '%s' "${code[@]# }" >"${prefix}.py" # That's a tab character after the hash.
  232. # Generate the markers file
  233. { if [[ -z "${noStartMarker}" ]]; then echo; fi; printf '%s\n' "$@"; if [[ -z "${noEndMarker}" ]]; then echo; fi; } >"${prefix}-markers"
  234. # Copy this script
  235. rsync -a "$0" s3-bucket-list-qwarc
  236. # Collect environment variables
  237. envvars=()
  238. envvars+=(S3BL_CMD="${cmd}")
  239. envvars+=(S3_FORMAT="${format}")
  240. envvars+=(S3_JSONL="${jsonl}")
  241. envvars+=(S3_BUCKET_URL="${bucketUrl}")
  242. envvars+=(S3_MARKERS_FILENAME="${prefix}-markers")
  243. if [[ "${listUrls}" ]]; then envvars+=(S3_WITH_LIST_URLS="${listUrls}"); fi
  244. # Lift-off!
  245. env "${envvars[@]}" qwarc --concurrency "${concurrency}" --database "${prefix}.db" --log "${prefix}.log" --warc "${prefix}" "${prefix}.py"