You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
9.8 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc build FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc restore FILE
  44. Converts the megawarc back to the original tar.
  45. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  46. """
  47. import gzip
  48. import json
  49. import os.path
  50. import re
  51. import sys
  52. import tarfile
  53. import zlib
  54. from optparse import OptionParser
  55. from ordereddict import OrderedDict
  56. # modify tarfile.TarInfo to keep the original tar headers
  57. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  58. @classmethod
  59. def keepbuf_frombuf(cls, buf):
  60. entry = cls.orig_frombuf(buf)
  61. entry.buf = buf
  62. return entry
  63. tarfile.TarInfo.frombuf = keepbuf_frombuf
  64. # open input_filename and write the data from offset to
  65. # (offset+size) to stream
  66. def copy_to_stream(stream, input_filename, offset, size):
  67. with open(input_filename, "r") as f:
  68. f.seek(offset)
  69. to_read = size
  70. while to_read > 0:
  71. buf_size = min(to_read, 4096)
  72. buf = f.read(buf_size)
  73. if len(buf) < buf_size:
  74. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  75. stream.write(buf)
  76. to_read -= len(buf)
  77. stream.flush()
  78. # converting a .tar with warcs to megawarc tar+warc+json
  79. class MegawarcBuilder(object):
  80. def __init__(self, input_filename):
  81. self.verbose = False
  82. self.input_filename = input_filename
  83. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  84. self.output_tar_filename = input_filename + ".megawarc.tar"
  85. self.output_json_filename = input_filename + ".megawarc.json.gz"
  86. def process(self):
  87. with open(self.output_warc_filename, "wb") as warc_out:
  88. with open(self.output_tar_filename, "wb") as tar_out:
  89. with gzip.open(self.output_json_filename, "wb") as json_out:
  90. with tarfile.open(self.input_filename, "r") as tar:
  91. for tarinfo in tar:
  92. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  93. tar_out.flush()
  94. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  95. if padding > 0:
  96. tar_out.write("\0" * padding)
  97. def test_gz(self, offset, size):
  98. with open(self.input_filename, "r") as f:
  99. z = zlib.decompressobj(15 + 32)
  100. f.seek(offset)
  101. to_read = size
  102. while to_read > 0:
  103. buf_size = min(to_read, 4096)
  104. buf = f.read(buf_size)
  105. if len(buf) < buf_size:
  106. # end of file, not a valid gz
  107. return False
  108. else:
  109. z.decompress(buf)
  110. to_read -= len(buf)
  111. if z.flush()!="":
  112. # remaining uncompressed data
  113. return False
  114. return True
  115. def process_entry(self, entry, warc_out, tar_out, json_out):
  116. # calculate position of tar entry
  117. block_size = (tarfile.BLOCKSIZE + # header
  118. entry.size + # data
  119. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  120. data_offset = entry.offset + tarfile.BLOCKSIZE
  121. next_offset = entry.offset + block_size
  122. d_src_offsets = OrderedDict()
  123. d_src_offsets["entry"] = entry.offset
  124. d_src_offsets["data"] = data_offset
  125. d_src_offsets["next_entry"] = next_offset
  126. # decide what to do with this entry
  127. valid_warc_gz = False
  128. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  129. if self.verbose:
  130. print >>sys.stderr, "Checking %s" % entry.name
  131. valid_warc_gz = self.test_gz(data_offset, entry.size)
  132. if not valid_warc_gz:
  133. if self.verbose:
  134. print >>sys.stderr, "Invalid gzip %s" % entry.name
  135. # save in megawarc or in tar
  136. d_target = OrderedDict()
  137. if valid_warc_gz:
  138. # a warc file.gz, add to megawarc
  139. warc_offset = warc_out.tell()
  140. if self.verbose:
  141. print >>sys.stderr, "Copying %s to warc" % entry.name
  142. copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
  143. d_target["container"] = "warc"
  144. d_target["offset"] = warc_offset
  145. d_target["size"] = entry.size
  146. else:
  147. # not a warc.gz file, add to tar
  148. tar_offset = tar_out.tell()
  149. if self.verbose:
  150. print >>sys.stderr, "Copying %s to tar" % entry.name
  151. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  152. d_target["container"] = "tar"
  153. d_target["offset"] = tar_offset
  154. d_target["size"] = block_size
  155. # store details
  156. d = OrderedDict()
  157. d["target"] = d_target
  158. d["src_offsets"] = d_src_offsets
  159. d["header_fields"] = entry.get_info("utf-8", {})
  160. d["header_string"] = entry.buf
  161. # store metadata
  162. json.dump(d, json_out, separators=(',', ':'))
  163. json_out.write("\n")
  164. # recreate the original .tar from a megawarc tar+warc+json
  165. class MegawarcRestorer(object):
  166. def __init__(self, output_filename):
  167. self.verbose = False
  168. self.output_filename = output_filename
  169. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  170. self.input_tar_filename = output_filename + ".megawarc.tar"
  171. self.input_json_filename = output_filename + ".megawarc.json.gz"
  172. def process(self):
  173. with gzip.open(self.input_json_filename, "rb") as json_in:
  174. with open(self.output_filename, "wb") as tar_out:
  175. for line in json_in:
  176. entry = json.loads(line)
  177. self.process_entry(entry, tar_out)
  178. tar_out.flush()
  179. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  180. if padding > 0:
  181. tar_out.write("\0" * padding)
  182. def process_entry(self, entry, tar_out):
  183. if entry["target"]["container"] == "warc":
  184. if self.verbose:
  185. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  186. tar_out.write(entry["header_string"])
  187. copy_to_stream(tar_out, self.input_warc_filename,
  188. entry["target"]["offset"], entry["target"]["size"])
  189. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  190. if padding > 0:
  191. tar_out.write("\0" * padding)
  192. elif entry["target"]["container"] == "tar":
  193. if self.verbose:
  194. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  195. copy_to_stream(tar_out, self.input_tar_filename,
  196. entry["target"]["offset"], entry["target"]["size"])
  197. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  198. if padding > 0:
  199. tar_out.write("\0" * padding)
  200. else:
  201. raise Exception("Unkown container: %s for %s" %
  202. (entry["target"]["container"], entry["header_fields"]["name"]))
  203. def main():
  204. parser = OptionParser(
  205. usage="Usage: %prog [--verbose] build FILE\n %prog [--verbose] restore FILE",
  206. description="""%prog build FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  207. Use %prog build FILE to reconstruct original tar.
  208. """
  209. )
  210. parser.add_option("-v", "--verbose", dest="verbose",
  211. action="store_true",
  212. help="print status messages", default=False)
  213. (options, args) = parser.parse_args()
  214. if len(args) != 2:
  215. parser.print_usage()
  216. exit(1)
  217. if args[0] == "build":
  218. if not os.path.exists(args[1]):
  219. print >>sys.stderr, "Input file %s does not exist." % args[1]
  220. exit(1)
  221. try:
  222. mwb = MegawarcBuilder(args[1])
  223. mwb.verbose = options.verbose
  224. mwb.process()
  225. except:
  226. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  227. if os.path.exists(args[1]+ext):
  228. os.unlink(args[1]+ext)
  229. raise
  230. elif args[0] == "restore":
  231. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  232. if not os.path.exists(args[1]+ext):
  233. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  234. exit(1)
  235. if os.path.exists(args[1]):
  236. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  237. exit(1)
  238. try:
  239. mwr = MegawarcRestorer(args[1])
  240. mwr.verbose = options.verbose
  241. mwr.process()
  242. except:
  243. if os.path.exists(args[1]):
  244. os.unlink(args[1])
  245. raise
  246. else:
  247. parser.print_usage()
  248. exit(1)
  249. if __name__ == "__main__":
  250. main()