import base64 import fcntl import hashlib import io import itertools import json import logging import qwarc.utils import time import uuid import warcio class _WARCRecord: def __init__(self, headers, body, length): self.headers = headers self.body = body self.length = length class _Digester: def __init__(self): self._digester = hashlib.sha1() def update(self, data): self._digester.update(data) def __str__(self): return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}' class WARC: def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename): ''' Initialise the WARC writer prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting. dedupe: bool, whether to enable record deduplication command: list, the command line call for qwarc specFile: str, path to the spec file specDependencies: qwarc.utils.SpecDependencies logFilename: str, name of the log file written by this process ''' self._prefix = prefix self._counter = 0 self._maxFileSize = maxFileSize self._closed = True self._file = None self._dedupe = dedupe self._dedupeMap = {} self._command = command self._specFile = specFile self._specDependencies = specDependencies self._logFilename = logFilename self._metaWarcinfoRecordID = None self._write_meta_warc(self._write_initial_meta_records) def _ensure_opened(self): '''Open the next file that doesn't exist yet if there is currently no file opened''' if not self._closed: return while True: filename = f'{self._prefix}-{self._counter:05d}.warc.gz' try: # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it self._file = open(filename, 'xb') fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except FileExistsError: logging.info(f'{filename} already exists, skipping') self._counter += 1 else: break logging.info(f'Opened {filename}') self._closed = False self._counter += 1 def _create_warc_record(self, recordType, headers, body, length = None): startPos = body.tell() if 'WARC-Record-ID' not in headers: headers['WARC-Record-ID'] = f'' headers['WARC-Type'] = recordType digester = _Digester() for buf in qwarc.utils.iter_file(body, length = length): digester.update(buf) body.seek(startPos) headers['WARC-Block-Digest'] = str(digester) if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'): digester = _Digester() httpHeaders = qwarc.utils.read_http_headers(body) for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders): digester.update(buf) body.seek(startPos) headers['WARC-Payload-Digest'] = str(digester) if not length: body.seek(0, io.SEEK_END) length = body.tell() - startPos body.seek(startPos) headers['Content-Length'] = str(length) return _WARCRecord(headers, body, length) def _write_warc_record(self, record): with qwarc.utils.GzipWrapper(self._file) as fp: fp.write(b'WARC/1.1\r\n') fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items())) fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers for buf in qwarc.utils.iter_file(record.body, length = record.length): fp.write(buf) fp.write(b'\r\n\r\n') # Record separator def _write_warcinfo_record(self): data = { 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), 'command': self._command, 'files': { 'spec': self._specFile, 'spec-dependencies': self._specDependencies.files }, 'extra': self._specDependencies.extra, } payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')) record = self._create_warc_record( 'warcinfo', { 'Content-Type': 'application/json; charset=utf-8', }, payload ) self._write_warc_record(record) return record.headers['WARC-Record-ID'] def write_client_response(self, response): ''' Write the requests and responses stored in a ClientResponse instance to the currently opened WARC. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC. ''' self._ensure_opened() for r in response.iter_all(): usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:] requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp)) r.rawRequestData.seek(0) requestRecord = self._create_warc_record( 'request', { 'WARC-Date': requestDate, 'WARC-Target-URI': str(r.url), 'WARC-IP-Address': r.remoteAddress[0], 'Content-Type': 'application/http; msgtype=request', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, }, r.rawRequestData ) requestRecordID = requestRecord.headers['WARC-Record-ID'] r.rawResponseData.seek(0) responseRecord = self._create_warc_record( 'response', { 'WARC-Date': requestDate, 'WARC-Target-URI': str(r.url), 'WARC-IP-Address': r.remoteAddress[0], 'Content-Type': 'application/http; msgtype=response', 'WARC-Concurrent-To': requestRecordID, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, }, r.rawResponseData ) payloadDigest = responseRecord.headers['WARC-Payload-Digest'] assert payloadDigest is not None if self._dedupe and responseRecord.length > 1024: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings... if payloadDigest in self._dedupeMap: refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] httpHeaderData = io.BytesIO() qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData) httpHeaderData.seek(0) responseRecord = self._create_warc_record( 'revisit', { 'WARC-Date': requestDate, 'WARC-Target-URI': str(r.url), 'WARC-IP-Address': r.remoteAddress[0], 'WARC-Concurrent-To': requestRecordID, 'Content-Type': 'application/http; msgtype=response', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest', 'WARC-Refers-To-Target-URI': refersToUri, 'WARC-Refers-To-Date': refersToDate, 'WARC-Refers-To': refersToRecordId, 'WARC-Payload-Digest': payloadDigest, 'WARC-Truncated': 'length', }, httpHeaderData ) else: self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate) self._write_warc_record(requestRecord) self._write_warc_record(responseRecord) if self._maxFileSize and self._file.tell() > self._maxFileSize: self._close_file() def _write_resource_records(self): '''Write spec file and dependencies''' assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first' for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)): with open(fn, 'rb') as f: record = self._create_warc_record( 'resource', { 'WARC-Target-URI': f'file://{fn}', 'X-QWARC-Type': type_, 'Content-Type': contentType, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, }, f ) self._write_warc_record(record) def _write_initial_meta_records(self): self._metaWarcinfoRecordID = self._write_warcinfo_record() self._write_resource_records() def _write_log_record(self): assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first' rootLogger = logging.getLogger() for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers handler.flush() with open(self._logFilename, 'rb') as fp: record = self._create_warc_record( 'resource', { 'WARC-Target-URI': f'file://{self._logFilename}', 'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, }, fp ) self._write_warc_record(record) def _close_file(self): '''Close the currently opened WARC''' if not self._closed: self._file.close() self._file = None self._closed = True def _write_meta_warc(self, callback): filename = f'{self._prefix}-meta.warc.gz' #TODO: Handle OSError on fcntl.flock and retry self._file = open(filename, 'ab') try: fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) logging.info(f'Opened {filename}') self._closed = False callback() finally: self._close_file() def close(self): '''Clean up everything.''' self._close_file() self._write_meta_warc(self._write_log_record)