#!/usr/bin/env python """ megawarc is useful if you have .tar full of .warc.gz files and you really want one big .warc.gz. With megawarc you get your .warc.gz, but you can still restore the original .tar. The megawarc tool looks for .warc.gz in the .tar file and creates three files, the megawarc: FILE.warc.gz is the concatenated .warc.gz FILE.tar contains any non-warc files from the .tar FILE.json.gz contains metadata You need the JSON file to reconstruct the original .tar from the .warc.gz and .tar files. The JSON file has the location of every file from the original .tar file. METADATA FORMAT --------------- One line with a JSON object per file in the .tar. { "target": { "container": "warc" or "tar", (where is this file?) "offset": number, (where in the tar/warc does this file start? for files in the tar this includes the tar header, which is copied to the tar.) "size": size (where does this file end? for files in the tar, this includes the padding to 512 bytes) }, "src_offsets": { "entry": number, (where is this file in the original tar?) "data": number, (where does the data start? entry+512) "next_entry": number (where does the next tar entry start) }, "header_fields": { ... (parsed fields from the tar header) }, "header_base64": string (the base64-encoded tar header) } In older megawarcs the header is sometimes not base64-encoded: "header_string": string (the tar header for this entry) USAGE ----- megawarc convert FILE Converts the tar file (containing .warc.gz files) to a megawarc. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE. megawarc pack FILE INFILE_1 [[INFILE_2] ...] Creates a megawarc with basename FILE and recursively adds the given files and directories to it, as if they were in a tar file. It creates FILE.warc.gz, FILE.tar and FILE.json.gz. megawarc restore FILE Converts the megawarc back to the original tar. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE. """ import base64 import gzip import hashlib import json import os import re import struct import subprocess import sys import tarfile import tempfile from optparse import OptionParser try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict import requests import zstandard class ProgressInfo(object): def __init__(self, maximum): self._current = 0 self._maximum = maximum self._previous_percentage = None self._active = sys.stderr.isatty() self.print_status() def update(self, new_value): self._current = new_value self.print_status() def print_status(self): if not self._active: return percentage = int(float(self._current) / float(self._maximum) * 100) if self._maximum < 0: # count down percentage = 100-percentage percentage = max(0, min(100, percentage)) if self._previous_percentage != percentage: self._previous_percentage = percentage sys.stderr.write("\r %3d%%" % percentage) def clear(self): if self._active: sys.stderr.write("\r \r") self._active = False # open input_filename and write the data from offset to # (offset+size) to stream def copy_to_stream(stream, input_filename, offset, size, verbose=False): if verbose and size > 10 * 1024 * 1024: progress = ProgressInfo(-size) else: progress = None try: with open(input_filename, "r") as f: f.seek(offset) to_read = size while to_read > 0: buf_size = min(to_read, 4096) buf = f.read(buf_size) l = len(buf) if l < buf_size: raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l)) stream.write(buf) to_read -= l if progress: progress.update(-to_read) finally: if progress: progress.clear() # part of a stream as a file # (seek relative to an offset) class RangeFile(object): def __init__(self, stream, offset, size): self._stream = stream self._offset = offset self._size = size self._current_rel_offset = 0 self.seek(0) def tell(self): return self._current_rel_offset def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_SET: self._current_rel_offset = pos elif whence == os.SEEK_CUR: self._current_rel_offset += pos elif whence == os.SEEK_END: self._current_rel_offset = self._size + pos else: raise Exception("Unknown whence: %d." % whence) if self._current_rel_offset < 0 or self._current_rel_offset > self._size: raise Exception("Seek outside file: %d." % self._current_rel_offset) self._stream.seek(self._offset + self._current_rel_offset) def read(self, size): size = min(self._size - self._current_rel_offset, size) self._current_rel_offset += size buf = self._stream.read(size) if len(buf) < size: raise Exception("Expected to read %d but received %d." % (size, len(buf))) return buf # copies while reading class CopyReader(object): def __init__(self, in_stream, out_stream): self._in_stream = in_stream self._out_stream = out_stream self._last_read = 0 def tell(self): return self._in_stream.tell() def seek(self, pos, whence=os.SEEK_SET): self._in_stream.seek(pos, whence) def read(self, size): pos = self.tell() if self._last_read < pos: raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos)) buf = self._in_stream.read(size) read_before = self._last_read - pos if read_before == 0: new_read = buf else: new_read = buf[read_before:] l = len(new_read) if l > 0: self._last_read += l self._out_stream.write(new_read) return buf # check for gzip errors def test_gz(filename, offset, size, verbose=False, copy_to_file=None, dict_file=None): with open(filename, "r") as f_stream: f = RangeFile(f_stream, offset, size) if verbose and size > 10 * 1024 * 1024: progress = ProgressInfo(-size) else: progress = None if copy_to_file: f = CopyReader(f, copy_to_file) start_pos = copy_to_file.tell() try: with open("/dev/null", "w") as dev_null: if filename.endswith('.gz'): gz = subprocess.Popen(["gunzip", "-tv"], shell=False, stdin=subprocess.PIPE, stdout=dev_null, stderr=dev_null) if filename.endswith('.zst'): gz = subprocess.Popen( ["zstd", "-d"] + (["-D", dict_file.name] if dict_file else []), shell=False, stdin=subprocess.PIPE, stdout=dev_null, stderr=dev_null ) while True: buf = f.read(4096) size -= len(buf) if progress: progress.update(-size) if len(buf) > 0: gz.stdin.write(buf) else: break gz.stdin.close() ret = gz.wait() if ret != 0: raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret) if progress: progress.clear() except (IOError, OSError) as e: if progress: progress.clear() if verbose: print >>sys.stderr, e if copy_to_file: copy_to_file.truncate(start_pos) copy_to_file.seek(start_pos) return False return True # converting a .tar with warcs to megawarc tar+warc+json class MegawarcBuilder(object): def __init__(self, input_filename): self.verbose = False self.input_filename = input_filename self.output_warc_filename = input_filename + ".megawarc.warc.gz" self.output_tar_filename = input_filename + ".megawarc.tar" self.output_json_filename = input_filename + ".megawarc.json.gz" def process(self): with open(self.output_warc_filename, "wb") as warc_out: with open(self.output_tar_filename, "wb") as tar_out: json_out = gzip.open(self.output_json_filename, "wb") try: tar = tarfile.open(self.input_filename, "r") try: for tarinfo in tar: self.process_entry(tarinfo, warc_out, tar_out, json_out) padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE if padding > 0: tar_out.write("\0" * padding) finally: tar.close() finally: json_out.close() def process_entry(self, entry, warc_out, tar_out, json_out): with open(self.input_filename, "r") as tar: tar.seek(entry.offset) tar_header = tar.read(entry.offset_data - entry.offset) # calculate position of tar entry block_size = (len(tar_header) + # header entry.size + # data (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE) next_offset = entry.offset + block_size d_src_offsets = OrderedDict() d_src_offsets["entry"] = entry.offset d_src_offsets["data"] = entry.offset_data d_src_offsets["next_entry"] = next_offset # decide what to do with this entry valid_warc_gz = False if entry.isfile() and re.search(r"\.warc\.gz", entry.name): # this is a .warc.gz if self.verbose: print >>sys.stderr, "Checking %s" % entry.name # add to megawarc while copying to the megawarc.warc.gz warc_offset = warc_out.tell() valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size, copy_to_file=warc_out, verbose=self.verbose) # save in megawarc or in tar d_target = OrderedDict() if valid_warc_gz: # a warc file.gz, add to megawarc if self.verbose: print >>sys.stderr, "Copied %s to warc" % entry.name d_target["container"] = "warc" d_target["offset"] = warc_offset d_target["size"] = entry.size else: # not a warc.gz file, add to tar tar_offset = tar_out.tell() if self.verbose: print >>sys.stderr, "Copying %s to tar" % entry.name copy_to_stream(tar_out, self.input_filename, entry.offset, block_size) d_target["container"] = "tar" d_target["offset"] = tar_offset d_target["size"] = block_size # store details d = OrderedDict() d["target"] = d_target d["src_offsets"] = d_src_offsets d["header_fields"] = entry.get_info("utf-8", {}) d["header_base64"] = base64.b64encode(tar_header) # store metadata json.dump(d, json_out, separators=(',', ':')) json_out.write("\n") def init_zst_megawarc(out, project, dict_id, dict_server): r = requests.get(dict_server, params={"project": project, "id": dict_id}) r.raise_for_status() r = r.json() if r["id"] != dict_id: raise ValueError("Received wrong dictionary ID.") r_dict = requests.get(r["url"]) r_dict.raise_for_status() data = r_dict.content if hashlib.sha256(data).hexdigest() != r["sha256"]: raise ValueError("Hash of dictionary does not match.") if data[:4] != b"\x28\xB5\x2F\xFD": decompressed = data data = zstandard.ZstdCompressor().compress(data) else: decompressed = zstandard.ZstdDecompressor().decompress(data) out.write(b"\x5D\x2A\x4D\x18") out.write(struct.pack(" 0: f["file"].write("\0" * padding) f["file"].close() def process_file(self, filename): if filename.endswith(".zst"): find = re.search(r"\.([0-9a-zA-Z]+)\.([0-9]{10})\.warc\.zst$", filename) if not find: raise ValueError("Bad ZST WARC filename.") project, dict_id = find.groups() if dict_id not in self.megawarcs: base = self.output_basename + "." + dict_id self.megawarcs[dict_id] = { "warc": {"file": open(base + ".megawarc.warc.zst", "wb")}, "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")}, "tar": { "file": open(base + ".megawarc.tar", "wb"), "pos": 0 }, "dict": {"file": tempfile.NamedTemporaryFile("wb")} } self.megawarcs[dict_id]["dict"]["file"].write( init_zst_megawarc(self.megawarcs[dict_id]["warc"]["file"], project, dict_id, self.dict_server) ) self.megawarcs[dict_id]["dict"]["file"].flush() json_out = self.megawarcs[dict_id]["json"]["file"] warc_out = self.megawarcs[dict_id]["warc"]["file"] tar_out = self.megawarcs[dict_id]["tar"] elif filename.endswith(".gz"): dict_id = None if "gz" not in self.megawarcs: base = self.output_basename self.megawarcs["gz"] = { "warc": {"file": open(base + ".megawarc.warc.gz", "wb")}, "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")}, "tar": { "file": open(base + ".megawarc.tar", "wb"), "pos": 0 } } warc_out = self.megawarcs["gz"]["warc"]["file"] json_out = self.megawarcs["gz"]["json"]["file"] tar_out = self.megawarcs["gz"]["tar"] else: raise ValueError("Unsupported WARC compressed format.") # make tar header arcname = filename arcname = arcname.replace(os.sep, "/") arcname = arcname.lstrip("/") entry = tarfile.TarInfo() statres = os.stat(filename) stmd = statres.st_mode entry.name = arcname entry.mode = stmd entry.uid = statres.st_uid entry.gid = statres.st_gid entry.size = statres.st_size entry.mtime = statres.st_mtime entry.type = tarfile.REGTYPE tar_header = entry.tobuf() # find position in imaginary tar entry.offset = tar_out["pos"] # calculate position of tar entry tar_header_l = len(tar_header) block_size = (tar_header_l + # header entry.size + # data (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE) data_offset = entry.offset + tar_header_l next_offset = entry.offset + block_size # move to next position in imaginary tar tar_out["pos"] = next_offset d_src_offsets = OrderedDict() d_src_offsets["entry"] = entry.offset d_src_offsets["data"] = data_offset d_src_offsets["next_entry"] = next_offset # decide what to do with this file valid_warc_gz = False if re.search(r"\.warc\.(?:gz|zst)$", filename): if self.verbose: print >>sys.stderr, "Checking %s" % filename warc_offset = warc_out.tell() if dict_id is not None: valid_warc_gz = test_gz(filename, 0, entry.size, copy_to_file=warc_out, verbose=self.verbose, dict_file=self.megawarcs[dict_id]["dict"]["file"]) else: valid_warc_gz = test_gz(filename, 0, entry.size, copy_to_file=warc_out, verbose=self.verbose) # save in megawarc or in tar d_target = OrderedDict() if valid_warc_gz: # a warc file.gz, add to megawarc if self.verbose: print >>sys.stderr, "Copied %s to warc" % filename d_target["container"] = "warc" d_target["offset"] = warc_offset d_target["size"] = entry.size else: # not a warc.gz file, add to tar tar_offset = tar_out["file"].tell() if self.verbose: print >>sys.stderr, "Copying %s to tar" % filename tar_out["file"].write(tar_header) copy_to_stream(tar_out["file"], filename, 0, entry.size) padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE if padding > 0: tar_out["file"].write("\0" * padding) d_target["container"] = "tar" d_target["offset"] = tar_offset d_target["size"] = block_size # store details d = OrderedDict() d["target"] = d_target d["src_offsets"] = d_src_offsets d["header_fields"] = entry.get_info("utf-8", {}) d["header_base64"] = base64.b64encode(tar_header) # store metadata json.dump(d, json_out, separators=(',', ':')) json_out.write("\n") # recreate the original .tar from a megawarc tar+warc+json class MegawarcRestorer(object): def __init__(self, output_filename): self.verbose = False self.output_filename = output_filename self.input_warc_filename = output_filename + ".megawarc.warc.gz" self.input_tar_filename = output_filename + ".megawarc.tar" self.input_json_filename = output_filename + ".megawarc.json.gz" def process(self): json_in = gzip.open(self.input_json_filename, "rb") try: with open(self.output_filename, "wb") as tar_out: for line in json_in: entry = json.loads(line) self.process_entry(entry, tar_out) padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE if padding > 0: tar_out.write("\0" * padding) finally: json_in.close() def process_entry(self, entry, tar_out): if entry["target"]["container"] == "warc": if self.verbose: print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"] if "header_base64" in entry: tar_out.write(base64.b64decode(entry["header_base64"])) elif "header_string" in entry: tar_out.write(entry["header_string"]) else: raise Exception("Missing header_string or header_base64.") copy_to_stream(tar_out, self.input_warc_filename, entry["target"]["offset"], entry["target"]["size"]) padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE if padding > 0: tar_out.write("\0" * padding) elif entry["target"]["container"] == "tar": if self.verbose: print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"] copy_to_stream(tar_out, self.input_tar_filename, entry["target"]["offset"], entry["target"]["size"]) padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE if padding > 0: tar_out.write("\0" * padding) else: raise Exception("Unkown container: %s for %s" % (entry["target"]["container"], entry["header_fields"]["name"])) def main(): parser = OptionParser( usage=( "Usage: %prog [--verbose] convert FILE\n" " %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n" " %prog [--verbose] restore FILE" ), description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar. Use %prog pack FILE INFILE ... to create a megawarc containing the files. Use %prog restore FILE to reconstruct original tar. """ ) parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="print status messages", default=False) parser.add_option("-s", "--server", dest="server", type=str, help="server for ZST dictionaries", default=None) (options, args) = parser.parse_args() if len(args) < 2: parser.print_usage() exit(1) if args[0] == "convert": if not os.path.exists(args[1]): print >>sys.stderr, "Input file %s does not exist." % args[1] exit(1) try: mwb = MegawarcBuilder(args[1]) mwb.verbose = options.verbose mwb.process() except: for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"): if os.path.exists(args[1]+ext): os.unlink(args[1]+ext) raise elif args[0] == "pack": try: mwb = MegawarcPacker(args[1]) mwb.verbose = options.verbose mwb.dict_server = options.server mwb.process(args[2:]) except: for ext in ( ".megawarc.warc.gz", ".megawarc.warc.zst", ".megawarc.json.gz", ".megawarc.tar" ): if os.path.exists(args[1]+ext): os.unlink(args[1]+ext) raise elif args[0] == "restore": for ext in (".megawarc.warc.gz", ".megawarc.json.gz"): if not os.path.exists(args[1]+ext): print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext) exit(1) if os.path.exists(args[1]): print >>sys.stderr, "Outputfile %s already exists." % args[1] exit(1) try: mwr = MegawarcRestorer(args[1]) mwr.verbose = options.verbose mwr.process() except: if os.path.exists(args[1]): os.unlink(args[1]) raise else: parser.print_usage() exit(1) if __name__ == "__main__": main()