You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
7.5 KiB

  1. #!/usr/bin/env python
  2. # Fix megawarcs that have invalid warc.gz's in the warc.gz.
  3. #
  4. # This script will make new megawarc warc/tar/json files
  5. # (prefixed with FIXED-) where the invalid warcs are moved
  6. # to the tar file.
  7. #
  8. # Run
  9. # ./megawarc-fix BASENAME
  10. # where BASENAME is the part before .megawarc.(warc.gz|json.gz|tar)
  11. #
  12. import base64
  13. import gzip
  14. import json
  15. import os.path
  16. import re
  17. import sys
  18. import tarfile
  19. import zlib
  20. from optparse import OptionParser
  21. try:
  22. from collections import OrderedDict
  23. except ImportError:
  24. from ordereddict import OrderedDict
  25. # modify tarfile.TarInfo to keep the original tar headers
  26. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  27. @classmethod
  28. def keepbuf_frombuf(cls, buf):
  29. entry = cls.orig_frombuf(buf)
  30. entry.buf = buf
  31. return entry
  32. tarfile.TarInfo.frombuf = keepbuf_frombuf
  33. # open input_filename and write the data from offset to
  34. # (offset+size) to stream
  35. def copy_to_stream(stream, input_filename, offset, size):
  36. with open(input_filename, "r") as f:
  37. f.seek(offset)
  38. to_read = size
  39. while to_read > 0:
  40. buf_size = min(to_read, 4096)
  41. buf = f.read(buf_size)
  42. if len(buf) < buf_size:
  43. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  44. stream.write(buf)
  45. to_read -= len(buf)
  46. stream.flush()
  47. # part of a stream as a file
  48. # (seek relative to an offset)
  49. class RangeFile(object):
  50. def __init__(self, stream, offset, size):
  51. self._stream = stream
  52. self._offset = offset
  53. self._size = size
  54. self._current_rel_offset = 0
  55. def tell(self):
  56. return self._current_rel_offset
  57. def seek(self, pos, whence=os.SEEK_SET):
  58. if whence == os.SEEK_SET:
  59. self._current_rel_offset = pos
  60. elif whence == os.SEEK_CUR:
  61. self._current_rel_offset += pos
  62. elif whence == os.SEEK_END:
  63. self._current_rel_offset = self._size + pos
  64. else:
  65. raise Exception("Unknown whence: %d." % whence)
  66. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  67. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  68. self._stream.seek(self._offset + self._current_rel_offset)
  69. def read(self, size):
  70. size = min(self._size - self._current_rel_offset, size)
  71. self._current_rel_offset += size
  72. buf = self._stream.read(size)
  73. if len(buf) < size:
  74. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  75. return buf
  76. # check for gzip errors
  77. def test_gz(filename, offset, size, verbose=False):
  78. with open(filename, "r") as f_stream:
  79. f = RangeFile(f_stream, offset, size)
  80. try:
  81. gz = gzip.GzipFile(fileobj=f, mode="rb")
  82. while True:
  83. buf = gz.read(4096)
  84. if len(buf) == 0:
  85. break
  86. except (IOError, ValueError, zlib.error) as e:
  87. if verbose:
  88. print >>sys.stderr, e
  89. return False
  90. return True
  91. class MegawarcFixer(object):
  92. def __init__(self, basename):
  93. self.verbose = False
  94. self.basename = basename
  95. self.input_warc_filename = basename + ".megawarc.warc.gz"
  96. self.input_tar_filename = basename + ".megawarc.tar"
  97. self.input_json_filename = basename + ".megawarc.json.gz"
  98. self.output_warc_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.warc.gz")
  99. self.output_tar_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.tar")
  100. self.output_json_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.json.gz")
  101. self.fixes = 0
  102. def process(self):
  103. with open(self.output_warc_filename, "wb") as warc_out:
  104. with open(self.output_tar_filename, "wb") as tar_out:
  105. with gzip.open(self.output_json_filename, "wb") as json_out:
  106. with gzip.open(self.input_json_filename, "rb") as json_in:
  107. for line in json_in:
  108. entry = json.loads(line)
  109. self.process_entry(entry, warc_out, tar_out, json_out)
  110. tar_out.flush()
  111. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  112. if padding > 0:
  113. tar_out.write("\0" * padding)
  114. def process_entry(self, entry, warc_out, tar_out, json_out):
  115. d_target = OrderedDict()
  116. if entry["target"]["container"] == "warc":
  117. # must check if this is a valid warc
  118. if self.verbose:
  119. print >>sys.stderr, "Checking %s from warc" % entry["header_fields"]["name"]
  120. valid_warc_gz = test_gz(self.input_warc_filename,
  121. entry["target"]["offset"], entry["target"]["size"])
  122. if valid_warc_gz:
  123. # a warc file.gz, add to megawarc
  124. if self.verbose:
  125. print >>sys.stderr, "Copying %s to warc" % entry["header_fields"]["name"]
  126. warc_offset = warc_out.tell()
  127. copy_to_stream(warc_out, self.input_warc_filename,
  128. entry["target"]["offset"], entry["target"]["size"])
  129. d_target["container"] = "warc"
  130. d_target["offset"] = warc_offset
  131. d_target["size"] = entry["target"]["size"]
  132. else:
  133. # not a warc.gz file, add to tar
  134. self.fixes += 1
  135. if self.verbose:
  136. print >>sys.stderr, "FIX: An invalid warc in the warc.gz, will be moved to tar."
  137. print >>sys.stderr, "Copying %s to tar" % entry["header_fields"]["name"]
  138. tar_offset = tar_out.tell()
  139. block_size = (tarfile.BLOCKSIZE + # header
  140. entry["target"]["size"] + # data
  141. (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE)
  142. if "header_base64" in entry:
  143. tar_out.write(base64.b64decode(entry["header_base64"]))
  144. elif "header_string" in entry:
  145. tar_out.write(entry["header_string"])
  146. copy_to_stream(tar_out, self.input_warc_filename,
  147. entry["target"]["offset"], entry["target"]["size"])
  148. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  149. if padding > 0:
  150. tar_out.write("\0" * padding)
  151. d_target["container"] = "tar"
  152. d_target["offset"] = tar_offset
  153. d_target["size"] = block_size
  154. elif entry["target"]["container"] == "tar":
  155. if self.verbose:
  156. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  157. tar_offset = tar_out.tell()
  158. copy_to_stream(tar_out, self.input_tar_filename,
  159. entry["target"]["offset"], entry["target"]["size"])
  160. d_target["container"] = "tar"
  161. d_target["offset"] = tar_offset
  162. d_target["size"] = entry["target"]["size"]
  163. else:
  164. raise Exception("Unkown container: %s for %s" %
  165. (entry["target"]["container"], entry["header_fields"]["name"]))
  166. # store details with new target position
  167. d = OrderedDict()
  168. d["target"] = d_target
  169. d["src_offsets"] = entry["src_offsets"]
  170. d["header_fields"] = entry["header_fields"]
  171. if "header_base64" in entry:
  172. d["header_base64"] = entry["header_base64"]
  173. elif "header_string" in entry:
  174. d["header_base64"] = base64.b64encode(entry["header_string"])
  175. json.dump(d, json_out, separators=(',', ':'))
  176. json_out.write("\n")
  177. def main():
  178. try:
  179. mwf = MegawarcFixer(sys.argv[1])
  180. mwf.verbose = True
  181. mwf.process()
  182. print >>sys.stderr, "Invalid warcs in megawarc.warc.gz: %d " % mwf.fixes
  183. except:
  184. for ext in (mwf.output_warc_filename, mwf.output_json_filename, mwf.output_tar_filename):
  185. if os.path.exists(sys.argv[1]+ext):
  186. os.unlink(sys.argv[1]+ext)
  187. raise
  188. if __name__ == "__main__":
  189. main()