diff --git a/warc-tiny b/warc-tiny new file mode 100755 index 0000000..6147018 --- /dev/null +++ b/warc-tiny @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 + +# Tiny tool for WARC stuff. Currently has two modes: verifying the integrity of a WARC by comparing the digests and dumping the HTTP response bodies to stdout. + +import base64 +import gzip +import hashlib +import sys +import zlib + + +def GzipDecompressor(): + return zlib.decompressobj(16 + zlib.MAX_WBITS) + + +class DummyDecompressor: + def decompress(self, data): + return data + + +class Event: + pass + + +class NewFile(Event): + pass + + +class BeginOfRecord(Event): + def __init__(self, warcHeaders): + self._warcHeaders = warcHeaders + + @property + def warcHeaders(self): + return self._warcHeaders + + +class _DataChunk(Event): + def __init__(self, data): + self._data = data + + @property + def data(self): + return self._data + + def __repr__(self): + return '{}({!r}{})'.format(type(self).__name__, self._data[:50], '...' if len(self._data) > 50 else '') + + +class WARCBlockChunk(_DataChunk): + pass + + +class RawHTTPResponseBodyChunk(_DataChunk): + ''' + Because many tools misunderstood the WARC specifications, the Payload-Digest was often implemented without stripping transfer encoding. + This is like HTTPResponseBodyChunk but without transfer encoding stripping. + ''' + + +class HTTPResponseBodyChunk(_DataChunk): + ''' + Representing a part of the HTTP response body with transfer encoding stripped. + ''' + + +class EndOfRecord(Event): + pass + + +def iter_warc(f): + # Yields Events + + with gzip.open(f, 'rb') as fp: + buf = b'' + while True: + # Read WARC header + while b'\r\n\r\n' not in buf: + try: + buf = buf + fp.read(4096) + except EOFError: + break + if not buf: + break + warcHeaderBuf, buf = buf.split(b'\r\n\r\n', 1) + assert warcHeaderBuf.startswith(b'WARC/1.0\r\n') + assert b'\r\nContent-Length:' in warcHeaderBuf + warcHeaders = tuple(tuple(map(bytes.strip, x.split(b':', 1))) for x in warcHeaderBuf.split(b'\r\n')) + warcContentType = next(x[1] for x in warcHeaders if x[0] == b'Content-Type') + warcContentLength = int(next(x[1] for x in warcHeaders if x[0] == b'Content-Length')) + warcType = next(x[1] for x in warcHeaders if x[0] == b'WARC-Type') + yield BeginOfRecord(warcHeaders) + + # Read WARC block (and skip CRLFCRLF at the end of the record) + if len(buf) < warcContentLength + 4: + try: + buf = buf + fp.read(warcContentLength + 4 - len(buf)) + except EOFError: + pass + if len(buf) < warcContentLength + 4: + print('Error: truncated WARC', file = sys.stderr) + break + warcContent = buf[:warcContentLength] + buf = buf[warcContentLength + 4:] + + yield WARCBlockChunk(warcContent) + + # Decode HTTP response if it is one + if warcContentType in (b'application/http;msgtype=response', b'application/http; msgtype=response') and warcType in (b'request', b'response'): #TODO: Support revisit + if b'\r\n\r\n' in warcContent: + httpHeaders, httpBody = warcContent.split(b'\r\n\r\n', 1) + + # Parse headers and extract transfer encoding + httpHeaderLines = [tuple(map(bytes.strip, x.split(b':', 1))) for x in httpHeaders.split(b'\r\n')] + chunked = False + gzipped = False + if b'\r\ntransfer-encoding' in httpHeaders.lower(): + transferEncoding = next(x[1] for x in httpHeaderLines if x[0].lower() == b'transfer-encoding') + transferEncodings = map(bytes.strip, transferEncoding.split(b',')) + chunked = b'chunked' in transferEncodings + gzipped = b'gzip' in transferEncodings + + yield RawHTTPResponseBodyChunk(httpBody) + + # Decode body + if gzipped: + httpDecompressor = GzipDecompressor() + else: + httpDecompressor = DummyDecompressor() + if chunked: + while True: + try: + chunkLineEnd = httpBody.index(b'\r\n') + except ValueError: + print('Error: could not find chunk line end, skipping', file = sys.stderr) + break + chunkLine = httpBody[:chunkLineEnd] + if b';' in chunkLine: + chunkLength = chunkLine[:chunkLine.index(b';')].strip() + else: + chunkLength = chunkLine.strip() + if chunkLength.lstrip(b'0123456789abcdef') != b'': + print('Error: malformed chunk length, skipping', file = sys.stderr) + break + chunkLength = int(chunkLength, base = 16) + if chunkLength == 0: + break + chunk = httpDecompressor.decompress(httpBody[chunkLineEnd + 2 : chunkLineEnd + 2 + chunkLength]) + yield HTTPResponseBodyChunk(chunk) + httpBody = httpBody[chunkLineEnd + 2 + chunkLength + 2:] + else: + yield HTTPResponseBodyChunk(httpDecompressor.decompress(httpBody)[:50]) + else: + print('Warning: malformed HTTP response, skipping', file = sys.stderr) + yield EndOfRecord() + + +class ProcessMode: + def process_event(self, event): + raise NotImplementedError + + +class VerifyMode(ProcessMode): + def __init__(self): + self._blockDigester = None + self._recordedBlockDigest = None + self._payloadDigester = None + self._brokenPayloadDigester = None + self._recordedPayloadDigest = None + self._printedBrokenPayloadWarning = False + + def process_event(self, event): + if type(event) is NewFile: + self._printedBrokenPayloadWarning = False + elif type(event) is BeginOfRecord: + if any(x[0] == b'WARC-Block-Digest' for x in event.warcHeaders): + self._blockDigester = hashlib.sha1() + self._recordedBlockDigest = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Block-Digest') + else: + self._blockDigester = None + self._recordedBlockDigest = None + if any(x[0] == b'WARC-Payload-Digest' for x in event.warcHeaders): + self._payloadDigester = hashlib.sha1() + self._brokenPayloadDigester = hashlib.sha1() + self._recordedPayloadDigest = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Payload-Digest') + else: + self._payloadDigester = None + self._brokenPayloadDigester = None + self._recordedPayloadDigest = None + self._recordID = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Record-ID') + self._recordType = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Type') + elif type(event) is WARCBlockChunk: + self._blockDigester.update(event.data) + elif type(event) is HTTPResponseBodyChunk: + self._payloadDigester.update(event.data) + elif type(event) is RawHTTPResponseBodyChunk: + self._brokenPayloadDigester.update(event.data) + elif type(event) is EndOfRecord: + if self._blockDigester: + if self._recordedBlockDigest != b'sha1:' + base64.b32encode(self._blockDigester.digest()): + print('Block digest mismatch for record {}: recorded {} v calculated {}'.format(self._recordID, self._recordedBlockDigest, base64.b32encode(self._blockDigester.digest()))) + if self._payloadDigester and self._recordType in (b'request', b'response'): #TODO: Support revisit + if self._recordedPayloadDigest != b'sha1:' + base64.b32encode(self._payloadDigester.digest()): + if self._recordedPayloadDigest == b'sha1:' + base64.b32encode(self._brokenPayloadDigester.digest()): + if not self._printedBrokenPayloadWarning: + print('Warning: WARC uses incorrect payload digests without stripping the transfer encoding') + self._printedBrokenPayloadWarning = True + else: + print('Payload digest mismatch for record {}: recorded {} vs. calculated {} (calculated broken {})'.format(self._recordID, self._recordedPayloadDigest, base64.b32encode(self._payloadDigester.digest()), base64.b32encode(self._brokenPayloadDigester.digest()))) + + +class DumpResponsesMode(ProcessMode): + def __init__(self): + self._printEOR = False + + def process_event(self, event): + if type(event) is BeginOfRecord: + self._printEOR = False + elif type(event) is HTTPResponseBodyChunk: + self._printEOR = True + sys.stdout.buffer.write(event.data) + elif type(event) is EndOfRecord: + if self._printEOR: + sys.stdout.buffer.write(b'\r\n') + + +def main(): + processorMap = {'verify': VerifyMode, 'dump-responses': DumpResponsesMode} + + assert len(sys.argv) - 1 >= 2 + mode = sys.argv[1] + assert mode in processorMap + files = sys.argv[2:] + assert files + + processor = processorMap[mode]() + + for f in files: + print('Info: processing {}'.format(f), file = sys.stderr) + processor.process_event(NewFile()) + for event in iter_warc(f): + processor.process_event(event) + + +if __name__ == '__main__': + main()