From f5c3eb42b3e93c54d6f5312bd3c747617ffb2424 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Fri, 28 May 2021 04:49:59 +0000 Subject: [PATCH] WIP attempt to remove warcio --- qwarc/utils.py | 115 +++++++++++++++++++++++++ qwarc/warc.py | 223 ++++++++++++++++++++++++++++++------------------- 2 files changed, 252 insertions(+), 86 deletions(-) diff --git a/qwarc/utils.py b/qwarc/utils.py index d217fd6..2fa473e 100644 --- a/qwarc/utils.py +++ b/qwarc/utils.py @@ -2,12 +2,14 @@ from qwarc.const import * import aiohttp import asyncio import functools +import io import logging import os import pkg_resources import platform import time import typing +import zlib PAGESIZE = os.sysconf('SC_PAGE_SIZE') @@ -260,3 +262,116 @@ class ReadonlyFileView: if key == 'writable': return False return getattr(self._fp, key) + + +def iter_file(f, length = None, blockSize = 1048576): + '''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.''' + read = 0 + while True: + buf = f.read(blockSize) + if not buf: # EOF + if length and read < length: + raise RuntimeError('Reached EOF before reading enough data') + break + if length and read + len(buf) > length: + initialBufLen = len(buf) + buf = buf[0 : length - read] + f.seek(len(buf) - initialBufLen, io.SEEK_CUR) + read += len(buf) + yield buf + if length and read >= length: + if read > length: # This should never happen due to the truncation above. + raise RuntimeError('Overread') + break + + +def read_http_headers(f, copy = None): + headers = {} + + # Status line or request line + line = f.readline() + if copy: + copy.write(line) + + line = f.readline() + if copy: + copy.write(line) + while line and line not in (b'\r\n', b'\r', b'\n'): + # Split into header name and value + name, value = line.split(b':', 1) + name = name.strip(b' \t') + #TODO name validation + + # Read next line + line = f.readline() + if copy: + copy.write(line) + + # Handle continuation lines + continuation = line[0:1] in (b' ', b'\t') + if continuation: + value = [] + while continuation: + value.append(line) + line = f.readline() + if copy: + copy.write(line) + continuation = line[0:1] in (b' ', b'\t') + value = b''.join(value) + + # Decode and store + try: + name = name.decode('utf-8') + except UnicodeDecodeError: + name = name.decode('iso-8859-1') + try: + value = value.decode('utf-8') + except UnicodeDecodeError: + value = value.decode('iso-8859-1') + headers[name.lower()] = value + + # `line` is already the next line, if any + return headers + + +def read_http_body(f, length, headers): + if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')): + while True: + chunkLine = f.readline() + if b';' in chunkLine: + chunkLength = chunkLine.split(b';', 1)[0].strip() + else: + chunkLength = chunkLine.strip() + chunkLength = int(chunkLength, base = 16) + if chunkLength == 0: + break + yield from iter_file(f, length = chunkLength) + assert f.read(2) == b'\r\n' # Chunk terminator + + # Consume trailer + line = f.readline() + while line and line not in (b'\r\n', b'\r', b'\n'): + line = f.readline() + else: + yield from iter_file(f, length = length) + + + +class GzipWrapper: + def __init__(self, f): + self._file = f + self._compressor = None + + def __enter__(self): + self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + return self + + def write(self, data): + buf = self._compressor.compress(data) + self._file.write(buf) + + def __exit__(self, excType, excVal, excTb): + buf = self._compressor.flush() + self._file.write(buf) + self._file.flush() + self._compressor = None diff --git a/qwarc/warc.py b/qwarc/warc.py index 8e71ee8..3a9f0d2 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -1,16 +1,34 @@ +import base64 import fcntl -import gzip +import hashlib import io import itertools import json import logging -import os import qwarc.utils -import tempfile import time +import uuid import warcio +class _WARCRecord: + def __init__(self, headers, body, length): + self.headers = headers + self.body = body + self.length = length + + +class _Digester: + def __init__(self): + self._digester = hashlib.sha1() + + def update(self, data): + self._digester.update(data) + + def __str__(self): + return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}' + + class WARC: def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename): ''' @@ -31,7 +49,6 @@ class WARC: self._closed = True self._file = None - self._warcWriter = None self._dedupe = dedupe self._dedupeMap = {} @@ -62,10 +79,50 @@ class WARC: else: break logging.info(f'Opened {filename}') - self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False self._counter += 1 + def _create_warc_record(self, recordType, headers, body, length = None): + startPos = body.tell() + + if 'WARC-Record-ID' not in headers: + headers['WARC-Record-ID'] = f'' + + headers['WARC-Type'] = recordType + + digester = _Digester() + for buf in qwarc.utils.iter_file(body, length = length): + digester.update(buf) + body.seek(startPos) + headers['WARC-Block-Digest'] = str(digester) + + if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'): + digester = _Digester() + httpHeaders = qwarc.utils.read_http_headers(body) + for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders): + digester.update(buf) + body.seek(startPos) + headers['WARC-Payload-Digest'] = str(digester) + + if not length: + body.seek(0, io.SEEK_END) + length = body.tell() - startPos + body.seek(startPos) + headers['Content-Length'] = str(length) + + return _WARCRecord(headers, body, length) + + def _write_warc_record(self, record): + with qwarc.utils.GzipWrapper(self._file) as fp: + fp.write(b'WARC/1.1\r\n') + fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items())) + fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers + + for buf in qwarc.utils.iter_file(record.body, length = record.length): + fp.write(buf) + + fp.write(b'\r\n\r\n') # Record separator + def _write_warcinfo_record(self): data = { 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), @@ -77,18 +134,15 @@ class WARC: 'extra': self._specDependencies.extra, } payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')) - # Workaround for https://github.com/webrecorder/warcio/issues/87 - digester = warcio.utils.Digester('sha1') - digester.update(payload.getvalue()) - record = self._warcWriter.create_warc_record( - None, - 'warcinfo', - payload = payload, - warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)}, - length = len(payload.getvalue()), + record = self._create_warc_record( + 'warcinfo', + { + 'Content-Type': 'application/json; charset=utf-8', + }, + payload ) - self._warcWriter.write_record(record) - return record.rec_headers.get_header('WARC-Record-ID') + self._write_warc_record(record) + return record.headers['WARC-Record-ID'] def write_client_response(self, response): ''' @@ -100,63 +154,62 @@ class WARC: for r in response.iter_all(): usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:] requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp)) - r.rawRequestData.seek(0, io.SEEK_END) - length = r.rawRequestData.tell() r.rawRequestData.seek(0) - requestRecord = self._warcWriter.create_warc_record( - str(r.url), - 'request', - payload = r.rawRequestData, - length = length, - warc_headers_dict = { - 'WARC-Date': requestDate, - 'WARC-IP-Address': r.remoteAddress[0], - 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, - } + requestRecord = self._create_warc_record( + 'request', + { + 'WARC-Date': requestDate, + 'WARC-Target-URI': str(r.url), + 'WARC-IP-Address': r.remoteAddress[0], + 'Content-Type': 'application/http; msgtype=request', + 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, + }, + r.rawRequestData ) - requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID') - r.rawResponseData.seek(0, io.SEEK_END) - length = r.rawResponseData.tell() + requestRecordID = requestRecord.headers['WARC-Record-ID'] r.rawResponseData.seek(0) - responseRecord = self._warcWriter.create_warc_record( - str(r.url), - 'response', - payload = r.rawResponseData, - length = length, - warc_headers_dict = { - 'WARC-Date': requestDate, - 'WARC-IP-Address': r.remoteAddress[0], - 'WARC-Concurrent-To': requestRecordID, - 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, - } + responseRecord = self._create_warc_record( + 'response', + { + 'WARC-Date': requestDate, + 'WARC-Target-URI': str(r.url), + 'WARC-IP-Address': r.remoteAddress[0], + 'Content-Type': 'application/http; msgtype=response', + 'WARC-Concurrent-To': requestRecordID, + 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, + }, + r.rawResponseData ) - payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest') + payloadDigest = responseRecord.headers['WARC-Payload-Digest'] assert payloadDigest is not None - if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings... + if self._dedupe and responseRecord.length > 1024: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings... if payloadDigest in self._dedupeMap: refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] - responseHttpHeaders = responseRecord.http_headers - responseRecord = self._warcWriter.create_revisit_record( - str(r.url), - digest = payloadDigest, - refers_to_uri = refersToUri, - refers_to_date = refersToDate, - http_headers = responseHttpHeaders, - warc_headers_dict = { - 'WARC-Date': requestDate, - 'WARC-IP-Address': r.remoteAddress[0], - 'WARC-Concurrent-To': requestRecordID, - 'WARC-Refers-To': refersToRecordId, - 'WARC-Truncated': 'length', - 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, - } + httpHeaderData = io.BytesIO() + qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData) + httpHeaderData.seek(0) + responseRecord = self._create_warc_record( + 'revisit', + { + 'WARC-Date': requestDate, + 'WARC-Target-URI': str(r.url), + 'WARC-IP-Address': r.remoteAddress[0], + 'WARC-Concurrent-To': requestRecordID, + 'Content-Type': 'application/http; msgtype=response', + 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, + 'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest', + 'WARC-Refers-To-Target-URI': refersToUri, + 'WARC-Refers-To-Date': refersToDate, + 'WARC-Refers-To': refersToRecordId, + 'WARC-Payload-Digest': payloadDigest, + 'WARC-Truncated': 'length', + }, + httpHeaderData ) - # Workaround for https://github.com/webrecorder/warcio/issues/94 - responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest') else: - self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate) - self._warcWriter.write_record(requestRecord) - self._warcWriter.write_record(responseRecord) + self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate) + self._write_warc_record(requestRecord) + self._write_warc_record(responseRecord) if self._maxFileSize and self._file.tell() > self._maxFileSize: self._close_file() @@ -167,17 +220,17 @@ class WARC: for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)): with open(fn, 'rb') as f: - f.seek(0, io.SEEK_END) - length = f.tell() - f.seek(0) - record = self._warcWriter.create_warc_record( - f'file://{fn}', - 'resource', - payload = f, - length = length, - warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType}, + record = self._create_warc_record( + 'resource', + { + 'WARC-Target-URI': f'file://{fn}', + 'X-QWARC-Type': type_, + 'Content-Type': contentType, + 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, + }, + f ) - self._warcWriter.write_record(record) + self._write_warc_record(record) def _write_initial_meta_records(self): self._metaWarcinfoRecordID = self._write_warcinfo_record() @@ -190,24 +243,23 @@ class WARC: for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers handler.flush() with open(self._logFilename, 'rb') as fp: - fp.seek(0, io.SEEK_END) - length = fp.tell() - fp.seek(0) - record = self._warcWriter.create_warc_record( - f'file://{self._logFilename}', - 'resource', - payload = fp, - length = length, - warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID}, + record = self._create_warc_record( + 'resource', + { + 'WARC-Target-URI': f'file://{self._logFilename}', + 'X-QWARC-Type': 'log', + 'Content-Type': 'text/plain; charset=utf-8', + 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, + }, + fp ) - self._warcWriter.write_record(record) + self._write_warc_record(record) def _close_file(self): '''Close the currently opened WARC''' if not self._closed: self._file.close() - self._warcWriter = None self._file = None self._closed = True @@ -218,7 +270,6 @@ class WARC: try: fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) logging.info(f'Opened {filename}') - self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False callback()