From f9af817d5165fed52c2fd503281fd7dd39dac8f4 Mon Sep 17 00:00:00 2001 From: Alard Date: Thu, 11 Oct 2012 15:55:25 +0200 Subject: [PATCH] First version. --- README.md | 55 ++++++++++ megawarc | 300 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 355 insertions(+) create mode 100644 README.md create mode 100755 megawarc diff --git a/README.md b/README.md new file mode 100644 index 0000000..59dd131 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +Megawarc +======== +megawarc is useful if you have .tar full of .warc.gz files and you really want one big .warc.gz. With megawarc you get your .warc.gz, but you can still restore the original .tar. + +The megawarc tool looks for .warc.gz in the .tar file and creates three files, the megawarc: +* FILE.warc.gz is the concatenated .warc.gz +* FILE.tar contains any non-warc files from the .tar +* FILE.json.gz contains metadata + +You need the JSON file to reconstruct the original .tar from +the .warc.gz and .tar files. The JSON file has the location +of every file from the original .tar file. + + +Metadata format +--------------- +One line with a JSON object per file in the .tar. +```js +{ + "target": { + "container": "warc" or "tar", // where is this file? + "offset": number, // where in the tar/warc does this file start? + // for files in the tar this includes the tar header, which is + // copied to the tar. + "size": size // where does this file end? + // for files in the tar, this includes the padding to 512 bytes + }, + "src_offsets": { + "entry": number, // where is this file in the original tar? + "data": number, // where does the data start? entry+512 + "next_entry": number // where does the next tar entry start + }, + "header_fields": { + ... // parsed fields from the tar header + }, + "header_string": string // the tar header for this entry +} +``` + +Usage +----- +``` +megawarc build FILE +``` + +Converts the tar file (containing .warc.gz files) to a megawarc. +It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE. + +``` +megawarc restore FILE +``` + +Converts the megawarc back to the original tar. +It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE. + diff --git a/megawarc b/megawarc new file mode 100755 index 0000000..fba40f8 --- /dev/null +++ b/megawarc @@ -0,0 +1,300 @@ +#!/usr/bin/env python +""" +megawarc is useful if you have .tar full of .warc.gz files and +you really want one big .warc.gz. With megawarc you get your +.warc.gz, but you can still restore the original .tar. + +The megawarc tool looks for .warc.gz in the .tar file and +creates three files, the megawarc: + FILE.warc.gz is the concatenated .warc.gz + FILE.tar contains any non-warc files from the .tar + FILE.json.gz contains metadata + +You need the JSON file to reconstruct the original .tar from +the .warc.gz and .tar files. The JSON file has the location +of every file from the original .tar file. + + +METADATA FORMAT +--------------- +One line with a JSON object per file in the .tar. +{ + "target": { + "container": "warc" or "tar", (where is this file?) + "offset": number, (where in the tar/warc does this + file start? for files in the tar + this includes the tar header, + which is copied to the tar.) + "size": size (where does this file end? + for files in the tar, this includes + the padding to 512 bytes) + }, + "src_offsets": { + "entry": number, (where is this file in the original tar?) + "data": number, (where does the data start? entry+512) + "next_entry": number (where does the next tar entry start) + }, + "header_fields": { + ... (parsed fields from the tar header) + }, + "header_string": string (the tar header for this entry) +} + + +USAGE +----- +megawarc build FILE + Converts the tar file (containing .warc.gz files) to a megawarc. + It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE. + +megawarc restore FILE + Converts the megawarc back to the original tar. + It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE. +""" + +import gzip +import json +import os.path +import re +import sys +import tarfile +import zlib + +from optparse import OptionParser +from ordereddict import OrderedDict + + +# modify tarfile.TarInfo to keep the original tar headers +tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf +@classmethod +def keepbuf_frombuf(cls, buf): + entry = cls.orig_frombuf(buf) + entry.buf = buf + return entry +tarfile.TarInfo.frombuf = keepbuf_frombuf + + +# open input_filename and write the data from offset to +# (offset+size) to stream +def copy_to_stream(stream, input_filename, offset, size): + with open(input_filename, "r") as f: + f.seek(offset) + + to_read = size + while to_read > 0: + buf_size = min(to_read, 4096) + buf = f.read(buf_size) + if len(buf) < buf_size: + raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf))) + stream.write(buf) + to_read -= len(buf) + + stream.flush() + + +# converting a .tar with warcs to megawarc tar+warc+json +class MegawarcBuilder(object): + def __init__(self, input_filename): + self.verbose = False + self.input_filename = input_filename + self.output_warc_filename = input_filename + ".megawarc.warc.gz" + self.output_tar_filename = input_filename + ".megawarc.tar" + self.output_json_filename = input_filename + ".megawarc.json.gz" + + def process(self): + with open(self.output_warc_filename, "wb") as warc_out: + with open(self.output_tar_filename, "wb") as tar_out: + with gzip.open(self.output_json_filename, "wb") as json_out: + with tarfile.open(self.input_filename, "r") as tar: + for tarinfo in tar: + self.process_entry(tarinfo, warc_out, tar_out, json_out) + + tar_out.flush() + padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE + if padding > 0: + tar_out.write("\0" * padding) + + def test_gz(self, offset, size): + with open(self.input_filename, "r") as f: + z = zlib.decompressobj(15 + 32) + f.seek(offset) + + to_read = size + while to_read > 0: + buf_size = min(to_read, 4096) + buf = f.read(buf_size) + if len(buf) < buf_size: + # end of file, not a valid gz + return False + else: + z.decompress(buf) + to_read -= len(buf) + + if z.flush()!="": + # remaining uncompressed data + return False + + return True + + def process_entry(self, entry, warc_out, tar_out, json_out): + # calculate position of tar entry + block_size = (tarfile.BLOCKSIZE + # header + entry.size + # data + (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE) + data_offset = entry.offset + tarfile.BLOCKSIZE + next_offset = entry.offset + block_size + + d_src_offsets = OrderedDict() + d_src_offsets["entry"] = entry.offset + d_src_offsets["data"] = data_offset + d_src_offsets["next_entry"] = next_offset + + # decide what to do with this entry + valid_warc_gz = False + if entry.isfile() and re.search(r"\.warc\.gz", entry.name): + if self.verbose: + print >>sys.stderr, "Checking %s" % entry.name + valid_warc_gz = self.test_gz(data_offset, entry.size) + if not valid_warc_gz: + if self.verbose: + print >>sys.stderr, "Invalid gzip %s" % entry.name + + # save in megawarc or in tar + d_target = OrderedDict() + if valid_warc_gz: + # a warc file.gz, add to megawarc + warc_offset = warc_out.tell() + if self.verbose: + print >>sys.stderr, "Copying %s to warc" % entry.name + copy_to_stream(warc_out, self.input_filename, data_offset, entry.size) + + d_target["container"] = "warc" + d_target["offset"] = warc_offset + d_target["size"] = entry.size + + else: + # not a warc.gz file, add to tar + tar_offset = tar_out.tell() + if self.verbose: + print >>sys.stderr, "Copying %s to tar" % entry.name + copy_to_stream(tar_out, self.input_filename, entry.offset, block_size) + + d_target["container"] = "tar" + d_target["offset"] = tar_offset + d_target["size"] = block_size + + # store details + d = OrderedDict() + d["target"] = d_target + d["src_offsets"] = d_src_offsets + d["header_fields"] = entry.get_info("utf-8", {}) + d["header_string"] = entry.buf + + # store metadata + json.dump(d, json_out, separators=(',', ':')) + json_out.write("\n") + + +# recreate the original .tar from a megawarc tar+warc+json +class MegawarcRestorer(object): + def __init__(self, output_filename): + self.verbose = False + self.output_filename = output_filename + self.input_warc_filename = output_filename + ".megawarc.warc.gz" + self.input_tar_filename = output_filename + ".megawarc.tar" + self.input_json_filename = output_filename + ".megawarc.json.gz" + + def process(self): + with gzip.open(self.input_json_filename, "rb") as json_in: + with open(self.output_filename, "wb") as tar_out: + for line in json_in: + entry = json.loads(line) + self.process_entry(entry, tar_out) + + tar_out.flush() + padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE + if padding > 0: + tar_out.write("\0" * padding) + + + def process_entry(self, entry, tar_out): + if entry["target"]["container"] == "warc": + if self.verbose: + print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"] + tar_out.write(entry["header_string"]) + copy_to_stream(tar_out, self.input_warc_filename, + entry["target"]["offset"], entry["target"]["size"]) + padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE + if padding > 0: + tar_out.write("\0" * padding) + + elif entry["target"]["container"] == "tar": + if self.verbose: + print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"] + copy_to_stream(tar_out, self.input_tar_filename, + entry["target"]["offset"], entry["target"]["size"]) + padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE + if padding > 0: + tar_out.write("\0" * padding) + + else: + raise Exception("Unkown container: %s for %s" % + (entry["target"]["container"], entry["header_fields"]["name"])) + + +def main(): + parser = OptionParser( + usage="Usage: %prog [--verbose] build FILE\n %prog [--verbose] restore FILE", + description="""%prog build FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar. +Use %prog build FILE to reconstruct original tar. + """ + ) + parser.add_option("-v", "--verbose", dest="verbose", + action="store_true", + help="print status messages", default=False) + (options, args) = parser.parse_args() + + if len(args) != 2: + parser.print_usage() + exit(1) + + if args[0] == "build": + if not os.path.exists(args[1]): + print >>sys.stderr, "Input file %s does not exist." % args[1] + exit(1) + + try: + mwb = MegawarcBuilder(args[1]) + mwb.verbose = options.verbose + mwb.process() + except: + for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"): + if os.path.exists(args[1]+ext): + os.unlink(args[1]+ext) + raise + + elif args[0] == "restore": + for ext in (".megawarc.warc.gz", ".megawarc.json.gz"): + if not os.path.exists(args[1]+ext): + print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext) + exit(1) + if os.path.exists(args[1]): + print >>sys.stderr, "Outputfile %s already exists." % args[1] + exit(1) + + try: + mwr = MegawarcRestorer(args[1]) + mwr.verbose = options.verbose + mwr.process() + except: + if os.path.exists(args[1]): + os.unlink(args[1]) + raise + + else: + parser.print_usage() + exit(1) + +if __name__ == "__main__": + main() +