You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

463 lines
15 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc convert FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  44. Creates a megawarc with basename FILE and recursively adds the
  45. given files and directories to it, as if they were in a tar file.
  46. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  47. megawarc restore FILE
  48. Converts the megawarc back to the original tar.
  49. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  50. """
  51. import gzip
  52. import json
  53. import os.path
  54. import re
  55. import sys
  56. import tarfile
  57. import zlib
  58. from optparse import OptionParser
  59. try:
  60. from collections import OrderedDict
  61. except ImportError:
  62. from ordereddict import OrderedDict
  63. # open input_filename and write the data from offset to
  64. # (offset+size) to stream
  65. def copy_to_stream(stream, input_filename, offset, size):
  66. with open(input_filename, "r") as f:
  67. f.seek(offset)
  68. to_read = size
  69. while to_read > 0:
  70. buf_size = min(to_read, 4096)
  71. buf = f.read(buf_size)
  72. if len(buf) < buf_size:
  73. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  74. stream.write(buf)
  75. to_read -= len(buf)
  76. stream.flush()
  77. # part of a stream as a file
  78. # (seek relative to an offset)
  79. class RangeFile(object):
  80. def __init__(self, stream, offset, size):
  81. self._stream = stream
  82. self._offset = offset
  83. self._size = size
  84. self._current_rel_offset = 0
  85. def tell(self):
  86. return self._current_rel_offset
  87. def seek(self, pos, whence=os.SEEK_SET):
  88. if whence == os.SEEK_SET:
  89. self._current_rel_offset = pos
  90. elif whence == os.SEEK_CUR:
  91. self._current_rel_offset += pos
  92. elif whence == os.SEEK_END:
  93. self._current_rel_offset = self._size + pos
  94. else:
  95. raise Exception("Unknown whence: %d." % whence)
  96. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  97. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  98. self._stream.seek(self._offset + self._current_rel_offset)
  99. def read(self, size):
  100. size = min(self._size - self._current_rel_offset, size)
  101. self._current_rel_offset += size
  102. buf = self._stream.read(size)
  103. if len(buf) < size:
  104. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  105. return buf
  106. # check for gzip errors
  107. def test_gz(filename, offset, size, verbose=False):
  108. with open(filename, "r") as f_stream:
  109. f = RangeFile(f_stream, offset, size)
  110. try:
  111. gz = gzip.GzipFile(fileobj=f, mode="rb")
  112. while True:
  113. buf = gz.read(4096)
  114. if len(buf) == 0:
  115. break
  116. except (IOError, ValueError, zlib.error) as e:
  117. if verbose:
  118. print >>sys.stderr, e
  119. return False
  120. return True
  121. # converting a .tar with warcs to megawarc tar+warc+json
  122. class MegawarcBuilder(object):
  123. def __init__(self, input_filename):
  124. self.verbose = False
  125. self.input_filename = input_filename
  126. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  127. self.output_tar_filename = input_filename + ".megawarc.tar"
  128. self.output_json_filename = input_filename + ".megawarc.json.gz"
  129. def process(self):
  130. with open(self.output_warc_filename, "wb") as warc_out:
  131. with open(self.output_tar_filename, "wb") as tar_out:
  132. with gzip.open(self.output_json_filename, "wb") as json_out:
  133. with tarfile.open(self.input_filename, "r") as tar:
  134. for tarinfo in tar:
  135. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  136. tar_out.flush()
  137. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  138. if padding > 0:
  139. tar_out.write("\0" * padding)
  140. def process_entry(self, entry, warc_out, tar_out, json_out):
  141. with open(self.input_filename, "r") as tar:
  142. tar.seek(entry.offset)
  143. tar_header = tar.read(entry.offset_data - entry.offset)
  144. # calculate position of tar entry
  145. block_size = (len(tar_header) + # header
  146. entry.size + # data
  147. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  148. next_offset = entry.offset + block_size
  149. d_src_offsets = OrderedDict()
  150. d_src_offsets["entry"] = entry.offset
  151. d_src_offsets["data"] = entry.offset_data
  152. d_src_offsets["next_entry"] = next_offset
  153. # decide what to do with this entry
  154. valid_warc_gz = False
  155. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  156. if self.verbose:
  157. print >>sys.stderr, "Checking %s" % entry.name
  158. valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size, self.verbose)
  159. if not valid_warc_gz:
  160. if self.verbose:
  161. print >>sys.stderr, "Invalid gzip %s" % entry.name
  162. # save in megawarc or in tar
  163. d_target = OrderedDict()
  164. if valid_warc_gz:
  165. # a warc file.gz, add to megawarc
  166. warc_offset = warc_out.tell()
  167. if self.verbose:
  168. print >>sys.stderr, "Copying %s to warc" % entry.name
  169. copy_to_stream(warc_out, self.input_filename, entry.offset_data, entry.size)
  170. d_target["container"] = "warc"
  171. d_target["offset"] = warc_offset
  172. d_target["size"] = entry.size
  173. else:
  174. # not a warc.gz file, add to tar
  175. tar_offset = tar_out.tell()
  176. if self.verbose:
  177. print >>sys.stderr, "Copying %s to tar" % entry.name
  178. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  179. d_target["container"] = "tar"
  180. d_target["offset"] = tar_offset
  181. d_target["size"] = block_size
  182. # store details
  183. d = OrderedDict()
  184. d["target"] = d_target
  185. d["src_offsets"] = d_src_offsets
  186. d["header_fields"] = entry.get_info("utf-8", {})
  187. d["header_string"] = tar_header
  188. # store metadata
  189. json.dump(d, json_out, separators=(',', ':'))
  190. json_out.write("\n")
  191. # adding .warc.gz and other files to megawarc tar+warc+json
  192. class MegawarcPacker(object):
  193. def __init__(self, output_basename):
  194. self.verbose = False
  195. self.output_basename = output_basename
  196. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  197. self.output_tar_filename = output_basename + ".megawarc.tar"
  198. self.output_json_filename = output_basename + ".megawarc.json.gz"
  199. self.tar_pos = 0
  200. def process(self, filelist):
  201. with open(self.output_warc_filename, "wb") as warc_out:
  202. with open(self.output_tar_filename, "wb") as tar_out:
  203. with gzip.open(self.output_json_filename, "wb") as json_out:
  204. def each_file(arg, dirname, names):
  205. for n in names:
  206. n = os.path.join(dirname, n)
  207. if os.path.isfile(n):
  208. self.process_file(n, warc_out, tar_out, json_out)
  209. for filename in filelist:
  210. if os.path.isdir(filename):
  211. os.path.walk(filename, each_file, None)
  212. elif os.path.isfile(filename):
  213. self.process_file(filename, warc_out, tar_out, json_out)
  214. tar_out.flush()
  215. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  216. if padding > 0:
  217. tar_out.write("\0" * padding)
  218. def process_file(self, filename, warc_out, tar_out, json_out):
  219. # make tar header
  220. arcname = filename
  221. arcname = arcname.replace(os.sep, "/")
  222. arcname = arcname.lstrip("/")
  223. entry = tarfile.TarInfo()
  224. statres = os.stat(filename)
  225. stmd = statres.st_mode
  226. entry.name = arcname
  227. entry.mode = stmd
  228. entry.uid = statres.st_uid
  229. entry.gid = statres.st_gid
  230. entry.size = statres.st_size
  231. entry.mtime = statres.st_mtime
  232. entry.type = tarfile.REGTYPE
  233. tar_header = entry.tobuf()
  234. # find position in imaginary tar
  235. entry.offset = self.tar_pos
  236. # calculate position of tar entry
  237. block_size = (len(tar_header) + # header
  238. entry.size + # data
  239. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  240. data_offset = entry.offset + len(tar_header)
  241. next_offset = entry.offset + block_size
  242. # move to next position in imaginary tar
  243. self.tar_pos = next_offset
  244. d_src_offsets = OrderedDict()
  245. d_src_offsets["entry"] = entry.offset
  246. d_src_offsets["data"] = data_offset
  247. d_src_offsets["next_entry"] = next_offset
  248. # decide what to do with this file
  249. valid_warc_gz = False
  250. if re.search(r"\.warc\.gz", filename):
  251. if self.verbose:
  252. print >>sys.stderr, "Checking %s" % filename
  253. valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose)
  254. if not valid_warc_gz:
  255. if self.verbose:
  256. print >>sys.stderr, "Invalid gzip %s" % filename
  257. # save in megawarc or in tar
  258. d_target = OrderedDict()
  259. if valid_warc_gz:
  260. # a warc file.gz, add to megawarc
  261. warc_offset = warc_out.tell()
  262. if self.verbose:
  263. print >>sys.stderr, "Copying %s to warc" % filename
  264. copy_to_stream(warc_out, filename, 0, entry.size)
  265. d_target["container"] = "warc"
  266. d_target["offset"] = warc_offset
  267. d_target["size"] = entry.size
  268. else:
  269. # not a warc.gz file, add to tar
  270. tar_offset = tar_out.tell()
  271. if self.verbose:
  272. print >>sys.stderr, "Copying %s to tar" % filename
  273. tar_out.write(tar_header)
  274. copy_to_stream(tar_out, filename, 0, entry.size)
  275. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  276. if padding > 0:
  277. tar_out.write("\0" * padding)
  278. tar_out.flush()
  279. d_target["container"] = "tar"
  280. d_target["offset"] = tar_offset
  281. d_target["size"] = block_size
  282. # store details
  283. d = OrderedDict()
  284. d["target"] = d_target
  285. d["src_offsets"] = d_src_offsets
  286. d["header_fields"] = entry.get_info("utf-8", {})
  287. d["header_string"] = tar_header
  288. # store metadata
  289. json.dump(d, json_out, separators=(',', ':'))
  290. json_out.write("\n")
  291. # recreate the original .tar from a megawarc tar+warc+json
  292. class MegawarcRestorer(object):
  293. def __init__(self, output_filename):
  294. self.verbose = False
  295. self.output_filename = output_filename
  296. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  297. self.input_tar_filename = output_filename + ".megawarc.tar"
  298. self.input_json_filename = output_filename + ".megawarc.json.gz"
  299. def process(self):
  300. with gzip.open(self.input_json_filename, "rb") as json_in:
  301. with open(self.output_filename, "wb") as tar_out:
  302. for line in json_in:
  303. entry = json.loads(line)
  304. self.process_entry(entry, tar_out)
  305. tar_out.flush()
  306. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  307. if padding > 0:
  308. tar_out.write("\0" * padding)
  309. def process_entry(self, entry, tar_out):
  310. if entry["target"]["container"] == "warc":
  311. if self.verbose:
  312. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  313. tar_out.write(entry["header_string"])
  314. copy_to_stream(tar_out, self.input_warc_filename,
  315. entry["target"]["offset"], entry["target"]["size"])
  316. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  317. if padding > 0:
  318. tar_out.write("\0" * padding)
  319. elif entry["target"]["container"] == "tar":
  320. if self.verbose:
  321. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  322. copy_to_stream(tar_out, self.input_tar_filename,
  323. entry["target"]["offset"], entry["target"]["size"])
  324. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  325. if padding > 0:
  326. tar_out.write("\0" * padding)
  327. else:
  328. raise Exception("Unkown container: %s for %s" %
  329. (entry["target"]["container"], entry["header_fields"]["name"]))
  330. def main():
  331. parser = OptionParser(
  332. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  333. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  334. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  335. Use %prog restore FILE to reconstruct original tar.
  336. """
  337. )
  338. parser.add_option("-v", "--verbose", dest="verbose",
  339. action="store_true",
  340. help="print status messages", default=False)
  341. (options, args) = parser.parse_args()
  342. if len(args) < 2:
  343. parser.print_usage()
  344. exit(1)
  345. if args[0] == "convert":
  346. if not os.path.exists(args[1]):
  347. print >>sys.stderr, "Input file %s does not exist." % args[1]
  348. exit(1)
  349. try:
  350. mwb = MegawarcBuilder(args[1])
  351. mwb.verbose = options.verbose
  352. mwb.process()
  353. except:
  354. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  355. if os.path.exists(args[1]+ext):
  356. os.unlink(args[1]+ext)
  357. raise
  358. elif args[0] == "pack":
  359. try:
  360. mwb = MegawarcPacker(args[1])
  361. mwb.verbose = options.verbose
  362. mwb.process(args[2:])
  363. except:
  364. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  365. if os.path.exists(args[1]+ext):
  366. os.unlink(args[1]+ext)
  367. raise
  368. elif args[0] == "restore":
  369. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  370. if not os.path.exists(args[1]+ext):
  371. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  372. exit(1)
  373. if os.path.exists(args[1]):
  374. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  375. exit(1)
  376. try:
  377. mwr = MegawarcRestorer(args[1])
  378. mwr.verbose = options.verbose
  379. mwr.process()
  380. except:
  381. if os.path.exists(args[1]):
  382. os.unlink(args[1])
  383. raise
  384. else:
  385. parser.print_usage()
  386. exit(1)
  387. if __name__ == "__main__":
  388. main()