Browse Source

Make it more efficient; use gunzip -t to test.

This new version uses only one read pass (for valid .warc.gz).
It uses the gunzip utility to check the gzip files (faster than Python's GzipFile).
master
Alard 11 years ago
parent
commit
f74a4f1d7c
1 changed files with 73 additions and 25 deletions
  1. +73
    -25
      megawarc

+ 73
- 25
megawarc View File

@@ -65,6 +65,7 @@ import gzip
import json
import os.path
import re
import subprocess
import sys
import tarfile
import zlib
@@ -85,10 +86,11 @@ def copy_to_stream(stream, input_filename, offset, size):
while to_read > 0:
buf_size = min(to_read, 4096)
buf = f.read(buf_size)
if len(buf) < buf_size:
raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
l = len(buf)
if l < buf_size:
raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l))
stream.write(buf)
to_read -= len(buf)
to_read -= l

stream.flush()

@@ -103,6 +105,8 @@ class RangeFile(object):

self._current_rel_offset = 0

self.seek(0)

def tell(self):
return self._current_rel_offset

@@ -127,22 +131,69 @@ class RangeFile(object):
raise Exception("Expected to read %d but received %d." % (size, len(buf)))
return buf

# copies while reading
class CopyReader(object):
def __init__(self, in_stream, out_stream):
self._in_stream = in_stream
self._out_stream = out_stream
self._last_read = 0

def tell(self):
return self._in_stream.tell()

def seek(self, pos, whence=os.SEEK_SET):
self._in_stream.seek(pos, whence)

def read(self, size):
pos = self.tell()
if self._last_read < pos:
raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos))
buf = self._in_stream.read(size)
read_before = self._last_read - pos
if read_before == 0:
new_read = buf
else:
new_read = buf[read_before:]
l = len(new_read)
if l > 0:
self._last_read += l
self._out_stream.write(new_read)
return buf


# check for gzip errors
def test_gz(filename, offset, size, verbose=False):
def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
with open(filename, "r") as f_stream:
f = RangeFile(f_stream, offset, size)
if copy_to_file:
f = CopyReader(f, copy_to_file)
start_pos = copy_to_file.tell()
try:
gz = gzip.GzipFile(fileobj=f, mode="rb")
gz = subprocess.Popen(["gunzip", "-t", "-q"],
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
while True:
buf = gz.read(4096)
if len(buf) == 0:
buf = f.read(4096)
size -= len(buf)
if len(buf) > 0:
gz.stdin.write(buf)
else:
break
except (IOError, ValueError, zlib.error) as e:
gz.stdin.close()
gz.stdout.close()
gz.stderr.close()
ret = gz.wait()
if ret != 0:
raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
except (IOError, OSError) as e:
if verbose:
print >>sys.stderr, e
if copy_to_file:
copy_to_file.truncate(start_pos)
copy_to_file.seek(start_pos)
return False

return True


@@ -187,21 +238,20 @@ class MegawarcBuilder(object):
# decide what to do with this entry
valid_warc_gz = False
if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
# this is a .warc.gz
if self.verbose:
print >>sys.stderr, "Checking %s" % entry.name
valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size, self.verbose)
if not valid_warc_gz:
if self.verbose:
print >>sys.stderr, "Invalid gzip %s" % entry.name
# add to megawarc while copying to the megawarc.warc.gz
warc_offset = warc_out.tell()
valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size,
copy_to_file=warc_out, verbose=self.verbose)

# save in megawarc or in tar
d_target = OrderedDict()
if valid_warc_gz:
# a warc file.gz, add to megawarc
warc_offset = warc_out.tell()
if self.verbose:
print >>sys.stderr, "Copying %s to warc" % entry.name
copy_to_stream(warc_out, self.input_filename, entry.offset_data, entry.size)
print >>sys.stderr, "Copied %s to warc" % entry.name

d_target["container"] = "warc"
d_target["offset"] = warc_offset
@@ -284,10 +334,11 @@ class MegawarcPacker(object):
entry.offset = self.tar_pos

# calculate position of tar entry
block_size = (len(tar_header) + # header
tar_header_l = len(tar_header)
block_size = (tar_header_l + # header
entry.size + # data
(tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
data_offset = entry.offset + len(tar_header)
data_offset = entry.offset + tar_header_l
next_offset = entry.offset + block_size

# move to next position in imaginary tar
@@ -303,19 +354,16 @@ class MegawarcPacker(object):
if re.search(r"\.warc\.gz", filename):
if self.verbose:
print >>sys.stderr, "Checking %s" % filename
valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose)
if not valid_warc_gz:
if self.verbose:
print >>sys.stderr, "Invalid gzip %s" % filename
warc_offset = warc_out.tell()
valid_warc_gz = test_gz(filename, 0, entry.size,
copy_to_file=warc_out, verbose=self.verbose)

# save in megawarc or in tar
d_target = OrderedDict()
if valid_warc_gz:
# a warc file.gz, add to megawarc
warc_offset = warc_out.tell()
if self.verbose:
print >>sys.stderr, "Copying %s to warc" % filename
copy_to_stream(warc_out, filename, 0, entry.size)
print >>sys.stderr, "Copied %s to warc" % filename

d_target["container"] = "warc"
d_target["offset"] = warc_offset


Loading…
Cancel
Save