@@ -1,14 +1,32 @@
import base64
import fcntl
import fcntl
import gzip
import hashlib
import io
import io
import itertools
import itertools
import json
import json
import logging
import logging
import os
import qwarc.utils
import qwarc.utils
import tempfile
import time
import time
import warcio
import uuid
class _WARCRecord:
def __init__(self, headers, body, length, httpHeaderLength):
self.headers = headers
self.body = body
self.length = length
self.httpHeaderLength = httpHeaderLength
class _Digester:
def __init__(self):
self._digester = hashlib.sha1()
def update(self, data):
self._digester.update(data)
def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'
class WARC:
class WARC:
@@ -31,7 +49,6 @@ class WARC:
self._closed = True
self._closed = True
self._file = None
self._file = None
self._warcWriter = None
self._dedupe = dedupe
self._dedupe = dedupe
self._dedupeMap = {}
self._dedupeMap = {}
@@ -62,11 +79,62 @@ class WARC:
else:
else:
break
break
logging.info(f'Opened {filename}')
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._closed = False
self._counter += 1
self._counter += 1
def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()
if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'
headers['WARC-Type'] = recordType
digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)
if headers['Content-Type'].startswith('application/http;'):
httpHeaders = qwarc.utils.read_http_headers(body)
httpHeaderLength = body.tell() - startPos
if 'WARC-Payload-Digest' not in headers:
digester = _Digester()
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
headers['WARC-Payload-Digest'] = str(digester)
body.seek(startPos)
else:
httpHeaderLength = None
if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)
return _WARCRecord(headers, body, length, httpHeaderLength)
def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers
for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)
fp.write(b'\r\n\r\n') # Record separator
def _get_date_string(self, t = None):
if t is None:
t = time.time()
usec = f'{(t - int(t)):.6f}'[2:]
return time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(t))
def _write_warcinfo_record(self):
def _write_warcinfo_record(self):
date = self._get_date_string()
data = {
data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'command': self._command,
'command': self._command,
@@ -77,18 +145,16 @@ class WARC:
'extra': self._specDependencies.extra,
'extra': self._specDependencies.extra,
}
}
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'WARC-Date': date,
'Content-Type': 'application/json; charset=utf-8',
},
payload
)
)
self._warcWriter.write _record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']
def write_client_response(self, response):
def write_client_response(self, response):
'''
'''
@@ -98,65 +164,62 @@ class WARC:
self._ensure_opened()
self._ensure_opened()
for r in response.iter_all():
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
requestDate = self._get_date_string(r.rawRequestTimestamp)
r.rawRequestData.seek(0)
r.rawRequestData.seek(0)
requestRecord = self._warcWriter. create_warc_record(
str(r.url) ,
'request',
payload = r.rawRequestData ,
length = length ,
warc_headers_dict = {
'WARC-Date': requestDate ,
'WARC-IP-Address': r.remoteAddress[0] ,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate ,
'WARC-Target-URI': str(r.url) ,
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request' ,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
} ,
r.rawRequestData
)
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0)
r.rawResponseData.seek(0)
responseRecord = self._warcWriter. create_warc_record(
str(r.url) ,
'response',
payload = r.rawResponseData ,
length = length ,
warc_headers_dict = {
'WARC-Date': requestDate ,
'WARC-IP-Address': r.remoteAddress[0] ,
'WARC-Concurrent-To': request RecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
}
responseRecord = self._create_warc_record(
'response' ,
{
'WARC-Date': requestDate ,
'WARC-Target-URI': str(r.url) ,
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response' ,
'WARC-Concurrent-To': requestRecordID ,
'WARC-Warcinfo-ID': self._metaWarcinfo RecordID,
} ,
r.rawResponseData
)
)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
# Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.httpHeaderLength and (responseRecord.length - responseRecord.httpHeaderLength) > 100:
if payloadDigest in self._dedupeMap:
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO(r.rawResponseData.read(responseRecord.httpHeaderLength))
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
)
)
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID') , str(r.url), requestDate)
self._warcWriter.write _record(requestRecord)
self._warcWriter.write _record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc _record(requestRecord)
self._write_warc _record(responseRecord)
if self._maxFileSize and self._file.tell() > self._maxFileSize:
if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
self._close_file()
@@ -165,19 +228,21 @@ class WARC:
'''Write spec file and dependencies'''
'''Write spec file and dependencies'''
assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
date = self._get_date_string()
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f:
with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
)
)
self._warcWriter.write _record(record)
self._write_warc _record(record)
def _write_initial_meta_records(self):
def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -189,25 +254,26 @@ class WARC:
rootLogger = logging.getLogger()
rootLogger = logging.getLogger()
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush()
handler.flush()
date = self._get_date_string()
with open(self._logFilename, 'rb') as fp:
with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
)
)
self._warcWriter.write _record(record)
self._write_warc _record(record)
def _close_file(self):
def _close_file(self):
'''Close the currently opened WARC'''
'''Close the currently opened WARC'''
if not self._closed:
if not self._closed:
self._file.close()
self._file.close()
self._warcWriter = None
self._file = None
self._file = None
self._closed = True
self._closed = True
@@ -218,7 +284,6 @@ class WARC:
try:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._closed = False
callback()
callback()