From 5468d80e35b3dcb85d36624580c813326af706fe Mon Sep 17 00:00:00 2001 From: arkiver Date: Wed, 1 Apr 2020 01:01:08 +0200 Subject: [PATCH] Support packing ZST megaWARCs. --- megawarc | 182 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 139 insertions(+), 43 deletions(-) diff --git a/megawarc b/megawarc index b30c897..17bf41f 100755 --- a/megawarc +++ b/megawarc @@ -62,13 +62,15 @@ megawarc restore FILE import base64 import gzip +import hashlib import json -import os.path +import os import re +import struct import subprocess import sys import tarfile -import zlib +import tempfile from optparse import OptionParser try: @@ -76,6 +78,10 @@ try: except ImportError: from ordereddict import OrderedDict +import requests +import zstandard + + class ProgressInfo(object): def __init__(self, maximum): self._current = 0 @@ -202,7 +208,8 @@ class CopyReader(object): # check for gzip errors -def test_gz(filename, offset, size, verbose=False, copy_to_file=None): +def test_gz(filename, offset, size, verbose=False, copy_to_file=None, + dict_file=None): with open(filename, "r") as f_stream: f = RangeFile(f_stream, offset, size) if verbose and size > 10 * 1024 * 1024: @@ -214,11 +221,20 @@ def test_gz(filename, offset, size, verbose=False, copy_to_file=None): start_pos = copy_to_file.tell() try: with open("/dev/null", "w") as dev_null: - gz = subprocess.Popen(["gunzip", "-tv"], - shell=False, - stdin=subprocess.PIPE, - stdout=dev_null, - stderr=dev_null) + if filename.endswith('.gz'): + gz = subprocess.Popen(["gunzip", "-tv"], + shell=False, + stdin=subprocess.PIPE, + stdout=dev_null, + stderr=dev_null) + if filename.endswith('.zst'): + gz = subprocess.Popen( + ["zstd", "-d"] + (["-D", dict_file.name] if dict_file else []), + shell=False, + stdin=subprocess.PIPE, + stdout=dev_null, + stderr=dev_null + ) while True: buf = f.read(4096) size -= len(buf) @@ -334,41 +350,104 @@ class MegawarcBuilder(object): json_out.write("\n") +def init_zst_megawarc(out, project, dict_id, dict_server): + r = requests.get(dict_server, params={"project": project, "id": dict_id}) + r.raise_for_status() + r = r.json() + if r["id"] != dict_id: + raise ValueError("Received wrong dictionary ID.") + r_dict = requests.get(r["url"]) + r_dict.raise_for_status() + data = r_dict.content + if hashlib.sha256(data).hexdigest() != r["sha256"]: + raise ValueError("Hash of dictionary does not match.") + if data[:4] != b"\x28\xB5\x2F\xFD": + decompressed = data + data = zstandard.ZstdCompressor().compress(data) + else: + decompressed = zstandard.ZstdDecompressor().decompress(data) + out.write(b"\x5D\x2A\x4D\x18") + out.write(struct.pack(" 0: - tar_out.write("\0" * padding) - finally: - json_out.close() + try: + def each_file(arg, dirname, names): + for n in names: + n = os.path.join(dirname, n) + if os.path.isfile(n): + self.process_file(n) + + for filename in filelist: + if os.path.isdir(filename): + os.path.walk(filename, each_file, None) + elif os.path.isfile(filename): + self.process_file(filename) - def process_file(self, filename, warc_out, tar_out, json_out): + finally: + for data in self.megawarcs.values(): + for f in data.values(): + if f["file"].name.endswith('.tar'): + padding = (tarfile.RECORDSIZE - f["file"].tell()) % tarfile.RECORDSIZE + if padding > 0: + f["file"].write("\0" * padding) + f["file"].close() + + def process_file(self, filename): + if filename.endswith(".zst"): + find = re.search(r"\.([0-9a-zA-Z]+)\.([0-9]{10})\.warc\.zst$", filename) + if not find: + raise ValueError("Bad ZST WARC filename.") + project, dict_id = find.groups() + if dict_id not in self.megawarcs: + base = self.output_basename + "." + dict_id + self.megawarcs[dict_id] = { + "warc": {"file": open(base + ".megawarc.warc.zst", "wb")}, + "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")}, + "tar": { + "file": open(base + ".megawarc.tar", "wb"), + "pos": 0 + }, + "dict": {"file": tempfile.NamedTemporaryFile("wb")} + } + self.megawarcs[dict_id]["dict"]["file"].write( + init_zst_megawarc(self.megawarcs[dict_id]["warc"]["file"], project, + dict_id, self.dict_server) + ) + json_out = self.megawarcs[dict_id]["json"]["file"] + warc_out = self.megawarcs[dict_id]["warc"]["file"] + tar_out = self.megawarcs[dict_id]["tar"] + elif filename.endswith(".gz"): + dict_id = None + if "gz" not in self.megawarcs: + self.megawarcs["gz"] = { + "warc": {"file": open(base + ".megawarc.warc.zst", "wb")}, + "json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")}, + "tar": { + "file": open(base + ".megawarc.tar", "wb"), + "pos": 0 + } + } + warc_out = self.megawarcs["gz"]["warc"]["file"] + json_out = self.megawarcs["gz"]["json"]["file"] + tar_out = self.megawarcs["gz"]["tar"] + else: + raise ValueError("Unsupported WARC compressed format.") # make tar header arcname = filename arcname = arcname.replace(os.sep, "/") @@ -387,7 +466,7 @@ class MegawarcPacker(object): tar_header = entry.tobuf() # find position in imaginary tar - entry.offset = self.tar_pos + entry.offset = tar_out["pos"] # calculate position of tar entry tar_header_l = len(tar_header) @@ -398,7 +477,7 @@ class MegawarcPacker(object): next_offset = entry.offset + block_size # move to next position in imaginary tar - self.tar_pos = next_offset + tar_out["pos"] = next_offset d_src_offsets = OrderedDict() d_src_offsets["entry"] = entry.offset @@ -407,12 +486,17 @@ class MegawarcPacker(object): # decide what to do with this file valid_warc_gz = False - if re.search(r"\.warc\.gz", filename): + if re.search(r"\.warc\.(?:gz|zst)$", filename): if self.verbose: print >>sys.stderr, "Checking %s" % filename warc_offset = warc_out.tell() - valid_warc_gz = test_gz(filename, 0, entry.size, - copy_to_file=warc_out, verbose=self.verbose) + if dict_id is not None: + valid_warc_gz = test_gz(filename, 0, entry.size, + copy_to_file=warc_out, verbose=self.verbose, + dict_file=self.megawarcs[dict_id]["dict"]["file"]) + else: + valid_warc_gz = test_gz(filename, 0, entry.size, + copy_to_file=warc_out, verbose=self.verbose) # save in megawarc or in tar d_target = OrderedDict() @@ -427,14 +511,14 @@ class MegawarcPacker(object): else: # not a warc.gz file, add to tar - tar_offset = tar_out.tell() + tar_offset = tar_out["file"].tell() if self.verbose: print >>sys.stderr, "Copying %s to tar" % filename - tar_out.write(tar_header) - copy_to_stream(tar_out, filename, 0, entry.size) + tar_out["file"].write(tar_header) + copy_to_stream(tar_out["file"], filename, 0, entry.size) padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE if padding > 0: - tar_out.write("\0" * padding) + tar_out["file"].write("\0" * padding) d_target["container"] = "tar" d_target["offset"] = tar_offset @@ -508,7 +592,11 @@ class MegawarcRestorer(object): def main(): parser = OptionParser( - usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE", + usage=( + "Usage: %prog [--verbose] convert FILE\n" + " %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n" + " %prog [--verbose] restore FILE" + ), description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar. Use %prog pack FILE INFILE ... to create a megawarc containing the files. Use %prog restore FILE to reconstruct original tar. @@ -517,6 +605,8 @@ Use %prog restore FILE to reconstruct original tar. parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="print status messages", default=False) + parser.add_option("-s", "--server", dest="server", type=str, + help="server for ZST dictionaries", default=None) (options, args) = parser.parse_args() if len(args) < 2: @@ -542,9 +632,15 @@ Use %prog restore FILE to reconstruct original tar. try: mwb = MegawarcPacker(args[1]) mwb.verbose = options.verbose + mwb.dict_server = options.server mwb.process(args[2:]) except: - for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"): + for ext in ( + ".megawarc.warc.gz", + ".megawarc.warc.zst", + ".megawarc.json.gz", + ".megawarc.tar" + ): if os.path.exists(args[1]+ext): os.unlink(args[1]+ext) raise