You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
7.2 KiB

  1. #!/usr/bin/env python
  2. # Fix megawarcs that have invalid warc.gz's in the warc.gz.
  3. #
  4. # This script will make new megawarc warc/tar/json files
  5. # (prefixed with FIXED-) where the invalid warcs are moved
  6. # to the tar file.
  7. #
  8. # Run
  9. # ./megawarc-fix BASENAME
  10. # where BASENAME is the part before .megawarc.(warc.gz|json.gz|tar)
  11. #
  12. import gzip
  13. import json
  14. import os.path
  15. import re
  16. import sys
  17. import tarfile
  18. import zlib
  19. from optparse import OptionParser
  20. try:
  21. from collections import OrderedDict
  22. except ImportError:
  23. from ordereddict import OrderedDict
  24. # modify tarfile.TarInfo to keep the original tar headers
  25. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  26. @classmethod
  27. def keepbuf_frombuf(cls, buf):
  28. entry = cls.orig_frombuf(buf)
  29. entry.buf = buf
  30. return entry
  31. tarfile.TarInfo.frombuf = keepbuf_frombuf
  32. # open input_filename and write the data from offset to
  33. # (offset+size) to stream
  34. def copy_to_stream(stream, input_filename, offset, size):
  35. with open(input_filename, "r") as f:
  36. f.seek(offset)
  37. to_read = size
  38. while to_read > 0:
  39. buf_size = min(to_read, 4096)
  40. buf = f.read(buf_size)
  41. if len(buf) < buf_size:
  42. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  43. stream.write(buf)
  44. to_read -= len(buf)
  45. stream.flush()
  46. # part of a stream as a file
  47. # (seek relative to an offset)
  48. class RangeFile(object):
  49. def __init__(self, stream, offset, size):
  50. self._stream = stream
  51. self._offset = offset
  52. self._size = size
  53. self._current_rel_offset = 0
  54. def tell(self):
  55. return self._current_rel_offset
  56. def seek(self, pos, whence=os.SEEK_SET):
  57. if whence == os.SEEK_SET:
  58. self._current_rel_offset = pos
  59. elif whence == os.SEEK_CUR:
  60. self._current_rel_offset += pos
  61. elif whence == os.SEEK_END:
  62. self._current_rel_offset = self._size + pos
  63. else:
  64. raise Exception("Unknown whence: %d." % whence)
  65. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  66. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  67. self._stream.seek(self._offset + self._current_rel_offset)
  68. def read(self, size):
  69. size = min(self._size - self._current_rel_offset, size)
  70. self._current_rel_offset += size
  71. buf = self._stream.read(size)
  72. if len(buf) < size:
  73. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  74. return buf
  75. # check for gzip errors
  76. def test_gz(filename, offset, size, verbose=False):
  77. with open(filename, "r") as f_stream:
  78. f = RangeFile(f_stream, offset, size)
  79. try:
  80. gz = gzip.GzipFile(fileobj=f, mode="rb")
  81. while True:
  82. buf = gz.read(4096)
  83. if len(buf) == 0:
  84. break
  85. except (IOError, ValueError, zlib.error) as e:
  86. if verbose:
  87. print >>sys.stderr, e
  88. return False
  89. return True
  90. class MegawarcFixer(object):
  91. def __init__(self, basename):
  92. self.verbose = False
  93. self.basename = basename
  94. self.input_warc_filename = basename + ".megawarc.warc.gz"
  95. self.input_tar_filename = basename + ".megawarc.tar"
  96. self.input_json_filename = basename + ".megawarc.json.gz"
  97. self.output_warc_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.warc.gz")
  98. self.output_tar_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.tar")
  99. self.output_json_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.json.gz")
  100. self.fixes = 0
  101. def process(self):
  102. with open(self.output_warc_filename, "wb") as warc_out:
  103. with open(self.output_tar_filename, "wb") as tar_out:
  104. with gzip.open(self.output_json_filename, "wb") as json_out:
  105. with gzip.open(self.input_json_filename, "rb") as json_in:
  106. for line in json_in:
  107. entry = json.loads(line)
  108. self.process_entry(entry, warc_out, tar_out, json_out)
  109. tar_out.flush()
  110. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  111. if padding > 0:
  112. tar_out.write("\0" * padding)
  113. def process_entry(self, entry, warc_out, tar_out, json_out):
  114. d_target = OrderedDict()
  115. if entry["target"]["container"] == "warc":
  116. # must check if this is a valid warc
  117. if self.verbose:
  118. print >>sys.stderr, "Checking %s from warc" % entry["header_fields"]["name"]
  119. valid_warc_gz = test_gz(self.input_warc_filename,
  120. entry["target"]["offset"], entry["target"]["size"])
  121. if valid_warc_gz:
  122. # a warc file.gz, add to megawarc
  123. if self.verbose:
  124. print >>sys.stderr, "Copying %s to warc" % entry["header_fields"]["name"]
  125. warc_offset = warc_out.tell()
  126. copy_to_stream(warc_out, self.input_warc_filename,
  127. entry["target"]["offset"], entry["target"]["size"])
  128. d_target["container"] = "warc"
  129. d_target["offset"] = warc_offset
  130. d_target["size"] = entry["target"]["size"]
  131. else:
  132. # not a warc.gz file, add to tar
  133. self.fixes += 1
  134. if self.verbose:
  135. print >>sys.stderr, "FIX: An invalid warc in the warc.gz, will be moved to tar."
  136. print >>sys.stderr, "Copying %s to tar" % entry["header_fields"]["name"]
  137. tar_offset = tar_out.tell()
  138. block_size = (tarfile.BLOCKSIZE + # header
  139. entry["target"]["size"] + # data
  140. (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE)
  141. tar_out.write(entry["header_string"])
  142. copy_to_stream(tar_out, self.input_warc_filename,
  143. entry["target"]["offset"], entry["target"]["size"])
  144. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  145. if padding > 0:
  146. tar_out.write("\0" * padding)
  147. d_target["container"] = "tar"
  148. d_target["offset"] = tar_offset
  149. d_target["size"] = block_size
  150. elif entry["target"]["container"] == "tar":
  151. if self.verbose:
  152. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  153. tar_offset = tar_out.tell()
  154. copy_to_stream(tar_out, self.input_tar_filename,
  155. entry["target"]["offset"], entry["target"]["size"])
  156. d_target["container"] = "tar"
  157. d_target["offset"] = tar_offset
  158. d_target["size"] = entry["target"]["size"]
  159. else:
  160. raise Exception("Unkown container: %s for %s" %
  161. (entry["target"]["container"], entry["header_fields"]["name"]))
  162. # store details with new target position
  163. d = OrderedDict()
  164. d["target"] = d_target
  165. d["src_offsets"] = entry["src_offsets"]
  166. d["header_fields"] = entry["header_fields"]
  167. d["header_string"] = entry["header_string"]
  168. json.dump(d, json_out, separators=(',', ':'))
  169. json_out.write("\n")
  170. def main():
  171. try:
  172. mwf = MegawarcFixer(sys.argv[1])
  173. mwf.verbose = True
  174. mwf.process()
  175. print >>sys.stderr, "Invalid warcs in megawarc.warc.gz: %d " % mwf.fixes
  176. except:
  177. for ext in (mwf.output_warc_filename, mwf.output_json_filename, mwf.output_tar_filename):
  178. if os.path.exists(sys.argv[1]+ext):
  179. os.unlink(sys.argv[1]+ext)
  180. raise
  181. if __name__ == "__main__":
  182. main()