diff --git a/megawarc b/megawarc index 0340bd6..d9d6449 100755 --- a/megawarc +++ b/megawarc @@ -65,6 +65,7 @@ import gzip import json import os.path import re +import subprocess import sys import tarfile import zlib @@ -85,10 +86,11 @@ def copy_to_stream(stream, input_filename, offset, size): while to_read > 0: buf_size = min(to_read, 4096) buf = f.read(buf_size) - if len(buf) < buf_size: - raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf))) + l = len(buf) + if l < buf_size: + raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l)) stream.write(buf) - to_read -= len(buf) + to_read -= l stream.flush() @@ -103,6 +105,8 @@ class RangeFile(object): self._current_rel_offset = 0 + self.seek(0) + def tell(self): return self._current_rel_offset @@ -127,22 +131,69 @@ class RangeFile(object): raise Exception("Expected to read %d but received %d." % (size, len(buf))) return buf +# copies while reading +class CopyReader(object): + def __init__(self, in_stream, out_stream): + self._in_stream = in_stream + self._out_stream = out_stream + self._last_read = 0 + + def tell(self): + return self._in_stream.tell() + + def seek(self, pos, whence=os.SEEK_SET): + self._in_stream.seek(pos, whence) + + def read(self, size): + pos = self.tell() + if self._last_read < pos: + raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos)) + buf = self._in_stream.read(size) + read_before = self._last_read - pos + if read_before == 0: + new_read = buf + else: + new_read = buf[read_before:] + l = len(new_read) + if l > 0: + self._last_read += l + self._out_stream.write(new_read) + return buf + # check for gzip errors -def test_gz(filename, offset, size, verbose=False): +def test_gz(filename, offset, size, verbose=False, copy_to_file=None): with open(filename, "r") as f_stream: f = RangeFile(f_stream, offset, size) + if copy_to_file: + f = CopyReader(f, copy_to_file) + start_pos = copy_to_file.tell() try: - gz = gzip.GzipFile(fileobj=f, mode="rb") + gz = subprocess.Popen(["gunzip", "-t", "-q"], + shell=False, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) while True: - buf = gz.read(4096) - if len(buf) == 0: + buf = f.read(4096) + size -= len(buf) + if len(buf) > 0: + gz.stdin.write(buf) + else: break - except (IOError, ValueError, zlib.error) as e: + gz.stdin.close() + gz.stdout.close() + gz.stderr.close() + ret = gz.wait() + if ret != 0: + raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret) + except (IOError, OSError) as e: if verbose: print >>sys.stderr, e + if copy_to_file: + copy_to_file.truncate(start_pos) + copy_to_file.seek(start_pos) return False - return True @@ -187,21 +238,20 @@ class MegawarcBuilder(object): # decide what to do with this entry valid_warc_gz = False if entry.isfile() and re.search(r"\.warc\.gz", entry.name): + # this is a .warc.gz if self.verbose: print >>sys.stderr, "Checking %s" % entry.name - valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size, self.verbose) - if not valid_warc_gz: - if self.verbose: - print >>sys.stderr, "Invalid gzip %s" % entry.name + # add to megawarc while copying to the megawarc.warc.gz + warc_offset = warc_out.tell() + valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size, + copy_to_file=warc_out, verbose=self.verbose) # save in megawarc or in tar d_target = OrderedDict() if valid_warc_gz: # a warc file.gz, add to megawarc - warc_offset = warc_out.tell() if self.verbose: - print >>sys.stderr, "Copying %s to warc" % entry.name - copy_to_stream(warc_out, self.input_filename, entry.offset_data, entry.size) + print >>sys.stderr, "Copied %s to warc" % entry.name d_target["container"] = "warc" d_target["offset"] = warc_offset @@ -284,10 +334,11 @@ class MegawarcPacker(object): entry.offset = self.tar_pos # calculate position of tar entry - block_size = (len(tar_header) + # header + tar_header_l = len(tar_header) + block_size = (tar_header_l + # header entry.size + # data (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE) - data_offset = entry.offset + len(tar_header) + data_offset = entry.offset + tar_header_l next_offset = entry.offset + block_size # move to next position in imaginary tar @@ -303,19 +354,16 @@ class MegawarcPacker(object): if re.search(r"\.warc\.gz", filename): if self.verbose: print >>sys.stderr, "Checking %s" % filename - valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose) - if not valid_warc_gz: - if self.verbose: - print >>sys.stderr, "Invalid gzip %s" % filename + warc_offset = warc_out.tell() + valid_warc_gz = test_gz(filename, 0, entry.size, + copy_to_file=warc_out, verbose=self.verbose) # save in megawarc or in tar d_target = OrderedDict() if valid_warc_gz: # a warc file.gz, add to megawarc - warc_offset = warc_out.tell() if self.verbose: - print >>sys.stderr, "Copying %s to warc" % filename - copy_to_stream(warc_out, filename, 0, entry.size) + print >>sys.stderr, "Copied %s to warc" % filename d_target["container"] = "warc" d_target["offset"] = warc_offset