From d0dd045ee1fbf6e86303df90ff07ae8f3ea11975 Mon Sep 17 00:00:00 2001 From: Alard Date: Fri, 12 Oct 2012 23:58:06 +0200 Subject: [PATCH] Fixing script. --- megawarc-fix | 215 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100755 megawarc-fix diff --git a/megawarc-fix b/megawarc-fix new file mode 100755 index 0000000..b71d456 --- /dev/null +++ b/megawarc-fix @@ -0,0 +1,215 @@ +#!/usr/bin/env python +# Fix megawarcs that have invalid warc.gz's in the warc.gz. +# +# This script will make new megawarc warc/tar/json files +# (prefixed with FIXED-) where the invalid warcs are moved +# to the tar file. +# +# Run +# ./megawarc-fix BASENAME +# where BASENAME is the part before .megawarc.(warc.gz|json.gz|tar) +# +import gzip +import json +import os.path +import re +import sys +import tarfile +import zlib + +from optparse import OptionParser +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + +# modify tarfile.TarInfo to keep the original tar headers +tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf +@classmethod +def keepbuf_frombuf(cls, buf): + entry = cls.orig_frombuf(buf) + entry.buf = buf + return entry +tarfile.TarInfo.frombuf = keepbuf_frombuf + + +# open input_filename and write the data from offset to +# (offset+size) to stream +def copy_to_stream(stream, input_filename, offset, size): + with open(input_filename, "r") as f: + f.seek(offset) + + to_read = size + while to_read > 0: + buf_size = min(to_read, 4096) + buf = f.read(buf_size) + if len(buf) < buf_size: + raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf))) + stream.write(buf) + to_read -= len(buf) + + stream.flush() + + +# part of a stream as a file +# (seek relative to an offset) +class RangeFile(object): + def __init__(self, stream, offset, size): + self._stream = stream + self._offset = offset + self._size = size + + self._current_rel_offset = 0 + + def tell(self): + return self._current_rel_offset + + def seek(self, pos, whence=os.SEEK_SET): + if whence == os.SEEK_SET: + self._current_rel_offset = pos + elif whence == os.SEEK_CUR: + self._current_rel_offset += pos + elif whence == os.SEEK_END: + self._current_rel_offset = self._size + pos + else: + raise Exception("Unknown whence: %d." % whence) + if self._current_rel_offset < 0 or self._current_rel_offset > self._size: + raise Exception("Seek outside file: %d." % self._current_rel_offset) + self._stream.seek(self._offset + self._current_rel_offset) + + def read(self, size): + size = min(self._size - self._current_rel_offset, size) + self._current_rel_offset += size + buf = self._stream.read(size) + if len(buf) < size: + raise Exception("Expected to read %d but received %d." % (size, len(buf))) + return buf + + +# check for gzip errors +def test_gz(filename, offset, size, verbose=False): + with open(filename, "r") as f_stream: + f = RangeFile(f_stream, offset, size) + try: + gz = gzip.GzipFile(fileobj=f, mode="rb") + while True: + buf = gz.read(4096) + if len(buf) == 0: + break + except (IOError, ValueError, zlib.error) as e: + if verbose: + print >>sys.stderr, e + return False + + return True + + +class MegawarcFixer(object): + def __init__(self, basename): + self.verbose = False + self.basename = basename + self.input_warc_filename = basename + ".megawarc.warc.gz" + self.input_tar_filename = basename + ".megawarc.tar" + self.input_json_filename = basename + ".megawarc.json.gz" + self.output_warc_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.warc.gz") + self.output_tar_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.tar") + self.output_json_filename = os.path.join(os.path.dirname(basename), "FIXED-" + os.path.basename(basename) + ".megawarc.json.gz") + self.fixes = 0 + + def process(self): + with open(self.output_warc_filename, "wb") as warc_out: + with open(self.output_tar_filename, "wb") as tar_out: + with gzip.open(self.output_json_filename, "wb") as json_out: + with gzip.open(self.input_json_filename, "rb") as json_in: + for line in json_in: + entry = json.loads(line) + self.process_entry(entry, warc_out, tar_out, json_out) + + tar_out.flush() + padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE + if padding > 0: + tar_out.write("\0" * padding) + + def process_entry(self, entry, warc_out, tar_out, json_out): + d_target = OrderedDict() + if entry["target"]["container"] == "warc": + # must check if this is a valid warc + if self.verbose: + print >>sys.stderr, "Checking %s from warc" % entry["header_fields"]["name"] + valid_warc_gz = test_gz(self.input_warc_filename, + entry["target"]["offset"], entry["target"]["size"]) + + if valid_warc_gz: + # a warc file.gz, add to megawarc + if self.verbose: + print >>sys.stderr, "Copying %s to warc" % entry["header_fields"]["name"] + warc_offset = warc_out.tell() + copy_to_stream(warc_out, self.input_warc_filename, + entry["target"]["offset"], entry["target"]["size"]) + + d_target["container"] = "warc" + d_target["offset"] = warc_offset + d_target["size"] = entry["target"]["size"] + + else: + # not a warc.gz file, add to tar + self.fixes += 1 + if self.verbose: + print >>sys.stderr, "FIX: An invalid warc in the warc.gz, will be moved to tar." + print >>sys.stderr, "Copying %s to tar" % entry["header_fields"]["name"] + tar_offset = tar_out.tell() + block_size = (tarfile.BLOCKSIZE + # header + entry["target"]["size"] + # data + (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE) + tar_out.write(entry["header_string"]) + copy_to_stream(tar_out, self.input_warc_filename, + entry["target"]["offset"], entry["target"]["size"]) + padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE + if padding > 0: + tar_out.write("\0" * padding) + + d_target["container"] = "tar" + d_target["offset"] = tar_offset + d_target["size"] = block_size + + elif entry["target"]["container"] == "tar": + if self.verbose: + print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"] + tar_offset = tar_out.tell() + copy_to_stream(tar_out, self.input_tar_filename, + entry["target"]["offset"], entry["target"]["size"]) + + d_target["container"] = "tar" + d_target["offset"] = tar_offset + d_target["size"] = entry["target"]["size"] + + else: + raise Exception("Unkown container: %s for %s" % + (entry["target"]["container"], entry["header_fields"]["name"])) + + # store details with new target position + d = OrderedDict() + d["target"] = d_target + d["src_offsets"] = entry["src_offsets"] + d["header_fields"] = entry["header_fields"] + d["header_string"] = entry["header_string"] + + json.dump(d, json_out, separators=(',', ':')) + json_out.write("\n") + + +def main(): + try: + mwf = MegawarcFixer(sys.argv[1]) + mwf.verbose = True + mwf.process() + print >>sys.stderr, "Invalid warcs in megawarc.warc.gz: %d " % mwf.fixes + except: + for ext in (mwf.output_warc_filename, mwf.output_json_filename, mwf.output_tar_filename): + if os.path.exists(sys.argv[1]+ext): + os.unlink(sys.argv[1]+ext) + raise + +if __name__ == "__main__": + main() +