#!/usr/bin/env python3 # Tiny tool for WARC stuff. # Operating modes: # warc-tiny colour FILES -- coloured output of the WARCs for easier reading # warc-tiny dump-responses [-m|--meta] FILES -- dump the HTTP response bodies to stdout # With --meta, prefix every line with the filename, record offset, record ID, and target URI; e.g. 'file.warc.gz:123::: foobar' # The record offset may be -1 if it is not known. # The filename is wrapped in angled brackets if it contains a colon; the target URI is always wrapped in angled brackets (since it virtually always contains a colon). # warc-tiny verify FILES -- verify the integrity of a WARC by comparing the digests import base64 import gzip import hashlib import sys import zlib def GzipDecompressor(): return zlib.decompressobj(16 + zlib.MAX_WBITS) class DummyDecompressor: def decompress(self, data): return data class Event: pass class NewFile(Event): def __init__(self, filename): self._filename = filename @property def filename(self): return self._filename class BeginOfRecord(Event): def __init__(self, warcHeaders, rawData): self._warcHeaders = warcHeaders self._rawData = rawData @property def warcHeaders(self): return self._warcHeaders @property def rawData(self): return self._rawData class _DataChunk(Event): def __init__(self, data): self._data = data @property def data(self): return self._data def __repr__(self): return '{}({!r}{})'.format(type(self).__name__, self._data[:50], '...' if len(self._data) > 50 else '') class WARCBlockChunk(_DataChunk): def __init__(self, data, isHttpHeader = None): super().__init__(data) self._isHttpHeader = isHttpHeader @property def isHttpHeader(self): # True: the chunk represents (part of) the HTTP header; False: the chunk represents (part of) the HTTP body; None: the chunk is not part of an HTTP record return self._isHttpHeader class RawHTTPBodyChunk(_DataChunk): ''' Because many tools misunderstood the WARC specifications, the Payload-Digest was often implemented without stripping transfer encoding. This is like HTTPBodyChunk but without transfer encoding stripping. ''' class HTTPBodyChunk(_DataChunk): ''' Representing a part of the HTTP body with transfer encoding stripped. ''' class EndOfRecord(Event): pass def iter_warc(f): # Yields Events # BeginOfRecord's rawData does not include the CRLF CRLF at the end of the headers, and WARCBlockChunk does not contain the CRLF CRLF after the block either. with gzip.open(f, 'rb') as fp: buf = b'' while True: # Read WARC header while b'\r\n\r\n' not in buf: try: buf = buf + fp.read(4096) except EOFError: break if not buf: break if not buf: break warcHeaderBuf, buf = buf.split(b'\r\n\r\n', 1) assert warcHeaderBuf.startswith(b'WARC/1.0\r\n') or warcHeaderBuf.startswith(b'WARC/1.1\r\n') assert b'\r\nContent-Length:' in warcHeaderBuf warcHeaders = tuple(tuple(map(bytes.strip, x.split(b':', 1))) for x in warcHeaderBuf.split(b'\r\n')) warcContentType = next(x[1] for x in warcHeaders if x[0] == b'Content-Type') warcContentLength = int(next(x[1] for x in warcHeaders if x[0] == b'Content-Length')) warcType = next(x[1] for x in warcHeaders if x[0] == b'WARC-Type') yield BeginOfRecord(warcHeaders, warcHeaderBuf) recordID = next(x[1] for x in warcHeaders if x[0] == b'WARC-Record-ID') # Read WARC block (and skip CRLFCRLF at the end of the record) if len(buf) < warcContentLength + 4: try: buf = buf + fp.read(warcContentLength + 4 - len(buf)) except EOFError: pass if len(buf) < warcContentLength + 4: print('Error: truncated WARC', file = sys.stderr) break warcContent = buf[:warcContentLength] buf = buf[warcContentLength + 4:] # Decode HTTP body if appropriate if warcContentType in (b'application/http;msgtype=request', b'application/http; msgtype=request') and warcType == b'request': httpType = 'request' elif warcContentType in (b'application/http;msgtype=response', b'application/http; msgtype=response') and warcType == b'response': httpType = 'response' else: httpType = None if httpType is not None: if b'\r\n\r\n' in warcContent: httpHeaders, httpBody = warcContent.split(b'\r\n\r\n', 1) # Parse headers and extract transfer encoding httpHeaderLines = [tuple(map(bytes.strip, x.split(b':', 1))) for x in httpHeaders.split(b'\r\n')] chunked = False gzipped = False if b'\r\ntransfer-encoding' in httpHeaders.lower(): transferEncoding = next(x[1] for x in httpHeaderLines if x[0].lower() == b'transfer-encoding') transferEncodings = set(map(bytes.strip, transferEncoding.split(b','))) chunked = b'chunked' in transferEncodings gzipped = b'gzip' in transferEncodings yield WARCBlockChunk(httpHeaders + b'\r\n\r\n', isHttpHeader = True) yield WARCBlockChunk(httpBody, isHttpHeader = False) yield RawHTTPBodyChunk(httpBody) # Decode body if gzipped: httpDecompressor = GzipDecompressor() else: httpDecompressor = DummyDecompressor() if chunked: while True: try: chunkLineEnd = httpBody.index(b'\r\n') except ValueError: print('Error: could not find chunk line end in record {}, skipping'.format(recordID), file = sys.stderr) break chunkLine = httpBody[:chunkLineEnd] if b';' in chunkLine: chunkLength = chunkLine[:chunkLine.index(b';')].strip() else: chunkLength = chunkLine.strip() if chunkLength.lstrip(b'0123456789abcdefABCDEF') != b'': print('Error: malformed chunk length {!r} in record {}, skipping'.format(chunkLength, recordID), file = sys.stderr) break chunkLength = int(chunkLength, base = 16) if chunkLength == 0: break chunk = httpDecompressor.decompress(httpBody[chunkLineEnd + 2 : chunkLineEnd + 2 + chunkLength]) yield HTTPBodyChunk(chunk) httpBody = httpBody[chunkLineEnd + 2 + chunkLength + 2:] else: yield HTTPBodyChunk(httpDecompressor.decompress(httpBody)) else: print('Warning: malformed HTTP request or response in record {}, skipping'.format(recordID), file = sys.stderr) yield WARCBlockChunk(warcContent) else: yield WARCBlockChunk(warcContent) yield EndOfRecord() class ProcessMode: @classmethod def split_args(cls, args): '''Split args into arguments to be passed into __init__ and filenames''' return (), args def process_event(self, event): raise NotImplementedError class Digest: def __init__(self, digest): self._digest = digest def format(self, digest = None): raise NotImplementedError def equals(self, digest): return self._digest == digest class Base32Digest(Digest): def format(self, digest = None): return base64.b32encode(digest if digest else self._digest) class HexDigest(Digest): def format(self, digest = None): return (digest if digest else self._digest).hex() class VerifyMode(ProcessMode): def __init__(self): self._blockDigester = None self._recordedBlockDigest = None self._payloadDigester = None self._brokenPayloadDigester = None self._recordedPayloadDigest = None self._printedBrokenPayloadWarning = False def parse_digest(self, digest): if not digest.startswith(b'sha1:'): print('Warning: don\'t understand hash format: {!r}'.format(digest), file = sys.stderr) return None if len(digest) == 37 and digest.rstrip(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567') == b'sha1:': # 5 for 'sha1:' + 32 for base-32 hash return Base32Digest(base64.b32decode(digest[5:])) if len(digest) == 45 and digest.rstrip(b'0123456789abcdef') == b'sha1:': return HexDigest(bytes.fromhex(digest[5:].decode('ascii'))) return None def process_event(self, event): if type(event) is NewFile: self._printedBrokenPayloadWarning = False elif type(event) is BeginOfRecord: if any(x[0] == b'WARC-Block-Digest' for x in event.warcHeaders): self._blockDigester = hashlib.sha1() self._recordedBlockDigest = self.parse_digest(next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Block-Digest')) else: self._blockDigester = None self._recordedBlockDigest = None if any(x[0] == b'WARC-Payload-Digest' for x in event.warcHeaders): self._payloadDigester = hashlib.sha1() self._brokenPayloadDigester = hashlib.sha1() self._recordedPayloadDigest = self.parse_digest(next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Payload-Digest')) else: self._payloadDigester = None self._brokenPayloadDigester = None self._recordedPayloadDigest = None self._recordID = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Record-ID') self._recordType = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Type') elif type(event) is WARCBlockChunk: if self._blockDigester: self._blockDigester.update(event.data) elif type(event) is HTTPBodyChunk: if self._payloadDigester: self._payloadDigester.update(event.data) elif type(event) is RawHTTPBodyChunk: if self._brokenPayloadDigester: self._brokenPayloadDigester.update(event.data) elif type(event) is EndOfRecord: if self._blockDigester and self._recordedBlockDigest: if not self._recordedBlockDigest.equals(self._blockDigester.digest()): print('Block digest mismatch for record {}: recorded {} v calculated {}'.format(self._recordID, self._recordedBlockDigest.format(), self._recordedBlockDigest.format(self._blockDigester.digest())), file = sys.stderr) if self._payloadDigester and self._recordType in (b'request', b'response'): #TODO: Support revisit if not self._recordedPayloadDigest.equals(self._payloadDigester.digest()): if self._recordedPayloadDigest.equals(self._brokenPayloadDigester.digest()): if not self._printedBrokenPayloadWarning: print('Warning: WARC uses incorrect payload digests without stripping the transfer encoding', file = sys.stderr) self._printedBrokenPayloadWarning = True else: print('Payload digest mismatch for record {}: recorded {} vs. calculated {} (calculated broken {})'.format(self._recordID, self._recordedPayloadDigest.format(), self._recordedPayloadDigest.format(self._payloadDigester.digest()), self._recordedPayloadDigest.format(self._brokenPayloadDigester.digest())), file = sys.stderr) class DumpResponsesMode(ProcessMode): @classmethod def split_args(cls, args): if args[0] == '-m' or args[0] == '--meta': return (True,), args[1:] return (False,), args def __init__(self, withMeta): self._printEOR = False self._isResponse = False self._withMeta = withMeta if withMeta: self._recordID = None self._targetURI = None self._buffer = b'' def _write(self, data): if not self._withMeta: sys.stdout.buffer.write(data) return buf = self._buffer + data lines = buf.split(b'\n') self._buffer = lines.pop() # Since there's an explicit `_write(b'\r\n')` at the end of the record, this implicitly resets the buffer as well for line in lines: sys.stdout.buffer.write(':'.join((self._filename, '-1', self._recordID, '<' + self._targetURI + '>', '')).encode('utf-8')) sys.stdout.buffer.write(line) sys.stdout.buffer.write(b'\n') def process_event(self, event): if type(event) is NewFile: self._filename = event.filename if ':' in self._filename: self._filename = '<' + self._filename + '>' elif type(event) is BeginOfRecord: warcContentType = next(x[1] for x in event.warcHeaders if x[0] == b'Content-Type') warcType = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Type') self._isResponse = warcContentType in (b'application/http;msgtype=response', b'application/http; msgtype=response') and warcType == b'response' self._printEOR = False if self._withMeta: # Both of these are URIs, and per RFC 3986, those can only contain ASCII characters. self._recordID = next(x[1] for x in event.warcHeaders if x[0] == b'WARC-Record-ID').decode('ascii') self._targetURI = next((x[1] for x in event.warcHeaders if x[0] == b'WARC-Target-URI'), b'').decode('ascii') self._buffer = b'' elif type(event) is HTTPBodyChunk: if self._isResponse: self._printEOR = True self._write(event.data) elif type(event) is EndOfRecord: if self._printEOR: self._write(b'\r\n') class COLOURS: RESET = b'\x1b[0m' GREEN = b'\x1b[0;32m' LIGHTGREEN = b'\x1b[1;32m' PURPLE = b'\x1b[0;35m' LIGHTPURPLE = b'\x1b[1;35m' RED = b'\x1b[0;31m' INVERTED = b'\x1b[7m' class ColourMode(ProcessMode): def __init__(self): self._hadHttpStatusLine = False def _replace_esc(self, data): return data.replace(b'\x1b', COLOURS.INVERTED + b'ESC' + COLOURS.RESET) def _print_line(self, line, colour, withLF = True, colourOnlyBeforeColon = False): if colourOnlyBeforeColon: if b':' in line: offset = line.index(b':') else: offset = 0 else: offset = len(line) if offset > 0: sys.stdout.buffer.write(colour) sys.stdout.buffer.write(self._replace_esc(line[:offset])) sys.stdout.buffer.write(COLOURS.RESET) sys.stdout.buffer.write(line[offset:]) if withLF: sys.stdout.buffer.write(b'\n') def _print_data(self, data, colour, colourOnlyBeforeColon): later = False for line in data.split(b'\r\n'): if later: sys.stdout.buffer.write(b'\n') self._print_line(line, colour, withLF = False, colourOnlyBeforeColon = colourOnlyBeforeColon) later = True def process_event(self, event): if type(event) is BeginOfRecord: firstNewline = event.rawData.index(b'\r\n') self._print_line(event.rawData[:firstNewline], COLOURS.LIGHTGREEN) self._print_data(event.rawData[firstNewline + 2:], COLOURS.GREEN, True) sys.stdout.buffer.write(b'\n\n') # separator between header and block self._hadHttpStatusLine = False elif type(event) is WARCBlockChunk: if event.isHttpHeader is True: if not self._hadHttpStatusLine: firstNewline = event.data.index(b'\r\n') self._print_line(event.data[:firstNewline], COLOURS.LIGHTPURPLE) offset = firstNewline + 2 self._hadHttpStatusLine = True else: offset = 0 self._print_data(event.data[offset:], COLOURS.PURPLE, True) elif event.isHttpHeader is False: self._print_data(event.data, COLOURS.RED, False) elif event.isHttpHeader is None: sys.stdout.buffer.write(self._replace_esc(event.data)) elif type(event) is EndOfRecord: sys.stdout.buffer.write(b'\n\n') def main(): processorMap = {'verify': VerifyMode, 'dump-responses': DumpResponsesMode, 'colour': ColourMode} assert len(sys.argv) - 1 >= 2 mode = sys.argv[1] assert mode in processorMap processorArgs, files = processorMap[mode].split_args(sys.argv[2:]) assert files processor = processorMap[mode](*processorArgs) try: for f in files: print('Info: processing {}'.format(f), file = sys.stderr) processor.process_event(NewFile(f)) for event in iter_warc(f): processor.process_event(event) except BrokenPipeError: return if __name__ == '__main__': main()