#!/usr/bin/env python3 # Tiny tool for WARC stuff. Currently has two modes: verifying the integrity of a WARC by comparing the digests and dumping the HTTP response bodies to stdout. import base64 import gzip import hashlib import sys import zlib def GzipDecompressor(): return zlib.decompressobj(16 + zlib.MAX_WBITS) class DummyDecompressor: def decompress(self, data): return data class Event: pass class NewFile(Event): pass class BeginOfRecord(Event): def __init__(self, warcHeaders): self._warcHeaders = warcHeaders @property def warcHeaders(self): return self._warcHeaders class _DataChunk(Event): def __init__(self, data): self._data = data @property def data(self): return self._data def __repr__(self): return '{}({!r}{})'.format(type(self).__name__, self._data[:50], '...' if len(self._data) > 50 else '') class WARCBlockChunk(_DataChunk): pass class RawHTTPResponseBodyChunk(_DataChunk): ''' Because many tools misunderstood the WARC specifications, the Payload-Digest was often implemented without stripping transfer encoding. This is like HTTPResponseBodyChunk but without transfer encoding stripping. ''' class HTTPResponseBodyChunk(_DataChunk): ''' Representing a part of the HTTP response body with transfer encoding stripped. ''' class EndOfRecord(Event): pass def iter_warc(f): # Yields Events with gzip.open(f, 'rb') as fp: buf = b'' while True: # Read WARC header while b'\r\n\r\n' not in buf: try: buf = buf + fp.read(4096) except EOFError: break if not buf: break warcHeaderBuf, buf = buf.split(b'\r\n\r\n', 1) assert warcHeaderBuf.startswith(b'WARC/1.0\r\n') assert b'\r\nContent-Length:' in warcHeaderBuf warcHeaders = tuple(tuple(map(bytes.strip, x.split(b':', 1))) for x in warcHeaderBuf.split(b'\r\n')) warcContentType = next(x[1] for x in warcHeaders if x[0] == b'Content-Type') warcContentLength = int(next(x[1] for x in warcHeaders if x[0] == b'Content-Length')) warcType = next(x[1] for x in warcHeaders if x[0] == b'WARC-Type') yield BeginOfRecord(warcHeaders) # Read WARC block (and skip CRLFCRLF at the end of the record) if len(buf) < warcContentLength + 4: try: buf = buf + fp.read(warcContentLength + 4 - len(buf)) except EOFError: pass if len(buf) < warcContentLength + 4: print('Error: truncated WARC', file = sys.stderr) break warcContent = buf[:warcContentLength] buf = buf[warcContentLength + 4:] yield WARCBlockChunk(warcContent) # Decode HTTP response if it is one if warcContentType in (b'application/http;msgtype=response', b'application/http; msgtype=response') and warcType in (b'request', b'response'): #TODO: Support revisit if b'\r\n\r\n' in warcContent: httpHeaders, httpBody = warcContent.split(b'\r\n\r\n', 1) # Parse headers and extract transfer encoding httpHeaderLines = [tuple(map(bytes.strip, x.split(b':', 1))) for x in httpHeaders.split(b'\r\n')] chunked = False gzipped = False if b'\r\ntransfer-encoding' in httpHeaders.lower(): transferEncoding = next(x[1] for x in httpHeaderLines if x[0].lower() == b'transfer-encoding') transferEncodings = map(bytes.strip, transferEncoding.split(b',')) chunked = b'chunked' in transferEncodings gzipped = b'gzip' in transferEncodings yield RawHTTPResponseBodyChunk(httpBody) # Decode body if gzipped: httpDecompressor = GzipDecompressor() else: httpDecompressor = DummyDecompressor() if chunked: while True: try: chunkLineEnd = httpBody.index(b'\r\n') except ValueError: print('Error: could not find chunk line end, skipping', file = sys.stderr) break chunkLine = httpBody[:chunkLineEnd] if b';' in chunkLine: chunkLength = chunkLine[:chunkLine.index(b';')].strip() else: chunkLength = chunkLine.strip() if chunkLength.lstrip(b'0123456789abcdef') != b'': print('Error: malformed chunk length, skipping', file = sys.stderr) break chunkLength = int(chunkLength, base = 16) if chunkLength == 0: break chunk = httpDecompressor.decompress(httpBody[chunkLineEnd + 2 : chunkLineEnd + 2 + chunkLength]) yield HTTPResponseBodyChunk(chunk) httpBody = httpBody[chunkLineEnd + 2 + chunkLength + 2:] else: yield HTTPResponseBodyChunk(httpDecompressor.decompress(httpBody)[:50]) else: print('Warning: malformed HTTP response, skipping', file = sys.stderr) yield EndOfRecord() class ProcessMode: def process_event(self, event): raise NotImplementedError class VerifyMode(ProcessMode): def __init__(self): self._blockDigester = None self._recordedBlockDigest = None self._payloadDigester = None self._brokenPayloadDigester = None self._recordedPayloadDigest = None self._printedBrokenPayloadWarning = False def process_event(self, event): if type(event) is NewFile: self._printedBrokenPayloadWarning = False elif type(event) is BeginOfRecord: if any(x[0] == b'WARC-Block-Digest' for x in event.warcHeaders): self._blockDigester = hashlib.sha1() self._recordedBlockDigest = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Block-Digest') else: self._blockDigester = None self._recordedBlockDigest = None if any(x[0] == b'WARC-Payload-Digest' for x in event.warcHeaders): self._payloadDigester = hashlib.sha1() self._brokenPayloadDigester = hashlib.sha1() self._recordedPayloadDigest = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Payload-Digest') else: self._payloadDigester = None self._brokenPayloadDigester = None self._recordedPayloadDigest = None self._recordID = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Record-ID') self._recordType = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Type') elif type(event) is WARCBlockChunk: self._blockDigester.update(event.data) elif type(event) is HTTPResponseBodyChunk: self._payloadDigester.update(event.data) elif type(event) is RawHTTPResponseBodyChunk: self._brokenPayloadDigester.update(event.data) elif type(event) is EndOfRecord: if self._blockDigester: if self._recordedBlockDigest != b'sha1:' + base64.b32encode(self._blockDigester.digest()): print('Block digest mismatch for record {}: recorded {} v calculated {}'.format(self._recordID, self._recordedBlockDigest, base64.b32encode(self._blockDigester.digest()))) if self._payloadDigester and self._recordType in (b'request', b'response'): #TODO: Support revisit if self._recordedPayloadDigest != b'sha1:' + base64.b32encode(self._payloadDigester.digest()): if self._recordedPayloadDigest == b'sha1:' + base64.b32encode(self._brokenPayloadDigester.digest()): if not self._printedBrokenPayloadWarning: print('Warning: WARC uses incorrect payload digests without stripping the transfer encoding') self._printedBrokenPayloadWarning = True else: print('Payload digest mismatch for record {}: recorded {} vs. calculated {} (calculated broken {})'.format(self._recordID, self._recordedPayloadDigest, base64.b32encode(self._payloadDigester.digest()), base64.b32encode(self._brokenPayloadDigester.digest()))) class DumpResponsesMode(ProcessMode): def __init__(self): self._printEOR = False def process_event(self, event): if type(event) is BeginOfRecord: self._printEOR = False elif type(event) is HTTPResponseBodyChunk: self._printEOR = True sys.stdout.buffer.write(event.data) elif type(event) is EndOfRecord: if self._printEOR: sys.stdout.buffer.write(b'\r\n') def main(): processorMap = {'verify': VerifyMode, 'dump-responses': DumpResponsesMode} assert len(sys.argv) - 1 >= 2 mode = sys.argv[1] assert mode in processorMap files = sys.argv[2:] assert files processor = processorMap[mode]() for f in files: print('Info: processing {}'.format(f), file = sys.stderr) processor.process_event(NewFile()) for event in iter_warc(f): processor.process_event(event) if __name__ == '__main__': main()