Browse Source

Support packing ZST megaWARCs.

master
arkiver 4 years ago
parent
commit
5468d80e35
1 changed files with 139 additions and 43 deletions
  1. +139
    -43
      megawarc

+ 139
- 43
megawarc View File

@@ -62,13 +62,15 @@ megawarc restore FILE

import base64
import gzip
import hashlib
import json
import os.path
import os
import re
import struct
import subprocess
import sys
import tarfile
import zlib
import tempfile

from optparse import OptionParser
try:
@@ -76,6 +78,10 @@ try:
except ImportError:
from ordereddict import OrderedDict

import requests
import zstandard


class ProgressInfo(object):
def __init__(self, maximum):
self._current = 0
@@ -202,7 +208,8 @@ class CopyReader(object):


# check for gzip errors
def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
def test_gz(filename, offset, size, verbose=False, copy_to_file=None,
dict_file=None):
with open(filename, "r") as f_stream:
f = RangeFile(f_stream, offset, size)
if verbose and size > 10 * 1024 * 1024:
@@ -214,11 +221,20 @@ def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
start_pos = copy_to_file.tell()
try:
with open("/dev/null", "w") as dev_null:
gz = subprocess.Popen(["gunzip", "-tv"],
shell=False,
stdin=subprocess.PIPE,
stdout=dev_null,
stderr=dev_null)
if filename.endswith('.gz'):
gz = subprocess.Popen(["gunzip", "-tv"],
shell=False,
stdin=subprocess.PIPE,
stdout=dev_null,
stderr=dev_null)
if filename.endswith('.zst'):
gz = subprocess.Popen(
["zstd", "-d"] + (["-D", dict_file.name] if dict_file else []),
shell=False,
stdin=subprocess.PIPE,
stdout=dev_null,
stderr=dev_null
)
while True:
buf = f.read(4096)
size -= len(buf)
@@ -334,41 +350,104 @@ class MegawarcBuilder(object):
json_out.write("\n")


def init_zst_megawarc(out, project, dict_id, dict_server):
r = requests.get(dict_server, params={"project": project, "id": dict_id})
r.raise_for_status()
r = r.json()
if r["id"] != dict_id:
raise ValueError("Received wrong dictionary ID.")
r_dict = requests.get(r["url"])
r_dict.raise_for_status()
data = r_dict.content
if hashlib.sha256(data).hexdigest() != r["sha256"]:
raise ValueError("Hash of dictionary does not match.")
if data[:4] != b"\x28\xB5\x2F\xFD":
decompressed = data
data = zstandard.ZstdCompressor().compress(data)
else:
decompressed = zstandard.ZstdDecompressor().decompress(data)
out.write(b"\x5D\x2A\x4D\x18")
out.write(struct.pack("<L", len(data)))
out.write(data)
return decompressed


# adding .warc.gz and other files to megawarc tar+warc+json
class MegawarcPacker(object):
def __init__(self, output_basename):
self.verbose = False
self.dict_server = None
self.dictionary_server = None
self.output_basename = output_basename
self.output_warc_filename = output_basename + ".megawarc.warc.gz"
self.output_tar_filename = output_basename + ".megawarc.tar"
self.output_json_filename = output_basename + ".megawarc.json.gz"

self.tar_pos = 0
self.megawarcs = {}
self.zst_dicts = {}

def process(self, filelist):
with open(self.output_warc_filename, "wb") as warc_out:
with open(self.output_tar_filename, "wb") as tar_out:
json_out = gzip.open(self.output_json_filename, "wb")
try:
def each_file(arg, dirname, names):
for n in names:
n = os.path.join(dirname, n)
if os.path.isfile(n):
self.process_file(n, warc_out, tar_out, json_out)

for filename in filelist:
if os.path.isdir(filename):
os.path.walk(filename, each_file, None)
elif os.path.isfile(filename):
self.process_file(filename, warc_out, tar_out, json_out)

padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
if padding > 0:
tar_out.write("\0" * padding)
finally:
json_out.close()
try:
def each_file(arg, dirname, names):
for n in names:
n = os.path.join(dirname, n)
if os.path.isfile(n):
self.process_file(n)

for filename in filelist:
if os.path.isdir(filename):
os.path.walk(filename, each_file, None)
elif os.path.isfile(filename):
self.process_file(filename)

def process_file(self, filename, warc_out, tar_out, json_out):
finally:
for data in self.megawarcs.values():
for f in data.values():
if f["file"].name.endswith('.tar'):
padding = (tarfile.RECORDSIZE - f["file"].tell()) % tarfile.RECORDSIZE
if padding > 0:
f["file"].write("\0" * padding)
f["file"].close()

def process_file(self, filename):
if filename.endswith(".zst"):
find = re.search(r"\.([0-9a-zA-Z]+)\.([0-9]{10})\.warc\.zst$", filename)
if not find:
raise ValueError("Bad ZST WARC filename.")
project, dict_id = find.groups()
if dict_id not in self.megawarcs:
base = self.output_basename + "." + dict_id
self.megawarcs[dict_id] = {
"warc": {"file": open(base + ".megawarc.warc.zst", "wb")},
"json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")},
"tar": {
"file": open(base + ".megawarc.tar", "wb"),
"pos": 0
},
"dict": {"file": tempfile.NamedTemporaryFile("wb")}
}
self.megawarcs[dict_id]["dict"]["file"].write(
init_zst_megawarc(self.megawarcs[dict_id]["warc"]["file"], project,
dict_id, self.dict_server)
)
json_out = self.megawarcs[dict_id]["json"]["file"]
warc_out = self.megawarcs[dict_id]["warc"]["file"]
tar_out = self.megawarcs[dict_id]["tar"]
elif filename.endswith(".gz"):
dict_id = None
if "gz" not in self.megawarcs:
self.megawarcs["gz"] = {
"warc": {"file": open(base + ".megawarc.warc.zst", "wb")},
"json": {"file": gzip.open(base + ".megawarc.json.gz", "wb")},
"tar": {
"file": open(base + ".megawarc.tar", "wb"),
"pos": 0
}
}
warc_out = self.megawarcs["gz"]["warc"]["file"]
json_out = self.megawarcs["gz"]["json"]["file"]
tar_out = self.megawarcs["gz"]["tar"]
else:
raise ValueError("Unsupported WARC compressed format.")
# make tar header
arcname = filename
arcname = arcname.replace(os.sep, "/")
@@ -387,7 +466,7 @@ class MegawarcPacker(object):
tar_header = entry.tobuf()

# find position in imaginary tar
entry.offset = self.tar_pos
entry.offset = tar_out["pos"]

# calculate position of tar entry
tar_header_l = len(tar_header)
@@ -398,7 +477,7 @@ class MegawarcPacker(object):
next_offset = entry.offset + block_size

# move to next position in imaginary tar
self.tar_pos = next_offset
tar_out["pos"] = next_offset

d_src_offsets = OrderedDict()
d_src_offsets["entry"] = entry.offset
@@ -407,12 +486,17 @@ class MegawarcPacker(object):

# decide what to do with this file
valid_warc_gz = False
if re.search(r"\.warc\.gz", filename):
if re.search(r"\.warc\.(?:gz|zst)$", filename):
if self.verbose:
print >>sys.stderr, "Checking %s" % filename
warc_offset = warc_out.tell()
valid_warc_gz = test_gz(filename, 0, entry.size,
copy_to_file=warc_out, verbose=self.verbose)
if dict_id is not None:
valid_warc_gz = test_gz(filename, 0, entry.size,
copy_to_file=warc_out, verbose=self.verbose,
dict_file=self.megawarcs[dict_id]["dict"]["file"])
else:
valid_warc_gz = test_gz(filename, 0, entry.size,
copy_to_file=warc_out, verbose=self.verbose)

# save in megawarc or in tar
d_target = OrderedDict()
@@ -427,14 +511,14 @@ class MegawarcPacker(object):

else:
# not a warc.gz file, add to tar
tar_offset = tar_out.tell()
tar_offset = tar_out["file"].tell()
if self.verbose:
print >>sys.stderr, "Copying %s to tar" % filename
tar_out.write(tar_header)
copy_to_stream(tar_out, filename, 0, entry.size)
tar_out["file"].write(tar_header)
copy_to_stream(tar_out["file"], filename, 0, entry.size)
padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
if padding > 0:
tar_out.write("\0" * padding)
tar_out["file"].write("\0" * padding)

d_target["container"] = "tar"
d_target["offset"] = tar_offset
@@ -508,7 +592,11 @@ class MegawarcRestorer(object):

def main():
parser = OptionParser(
usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
usage=(
"Usage: %prog [--verbose] convert FILE\n"
" %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n"
" %prog [--verbose] restore FILE"
),
description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
Use %prog pack FILE INFILE ... to create a megawarc containing the files.
Use %prog restore FILE to reconstruct original tar.
@@ -517,6 +605,8 @@ Use %prog restore FILE to reconstruct original tar.
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true",
help="print status messages", default=False)
parser.add_option("-s", "--server", dest="server", type=str,
help="server for ZST dictionaries", default=None)
(options, args) = parser.parse_args()

if len(args) < 2:
@@ -542,9 +632,15 @@ Use %prog restore FILE to reconstruct original tar.
try:
mwb = MegawarcPacker(args[1])
mwb.verbose = options.verbose
mwb.dict_server = options.server
mwb.process(args[2:])
except:
for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
for ext in (
".megawarc.warc.gz",
".megawarc.warc.zst",
".megawarc.json.gz",
".megawarc.tar"
):
if os.path.exists(args[1]+ext):
os.unlink(args[1]+ext)
raise


Loading…
Cancel
Save