@@ -1,16 +1,34 @@
import base64
import fcntl
import fcntl
import gzip
import hashlib
import io
import io
import itertools
import itertools
import json
import json
import logging
import logging
import os
import qwarc.utils
import qwarc.utils
import tempfile
import time
import time
import uuid
import warcio
import warcio
class _WARCRecord:
def __init__(self, headers, body, length):
self.headers = headers
self.body = body
self.length = length
class _Digester:
def __init__(self):
self._digester = hashlib.sha1()
def update(self, data):
self._digester.update(data)
def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'
class WARC:
class WARC:
def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
'''
'''
@@ -31,7 +49,6 @@ class WARC:
self._closed = True
self._closed = True
self._file = None
self._file = None
self._warcWriter = None
self._dedupe = dedupe
self._dedupe = dedupe
self._dedupeMap = {}
self._dedupeMap = {}
@@ -62,10 +79,50 @@ class WARC:
else:
else:
break
break
logging.info(f'Opened {filename}')
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._closed = False
self._counter += 1
self._counter += 1
def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()
if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'
headers['WARC-Type'] = recordType
digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)
if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'):
digester = _Digester()
httpHeaders = qwarc.utils.read_http_headers(body)
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
body.seek(startPos)
headers['WARC-Payload-Digest'] = str(digester)
if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)
return _WARCRecord(headers, body, length)
def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers
for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)
fp.write(b'\r\n\r\n') # Record separator
def _write_warcinfo_record(self):
def _write_warcinfo_record(self):
data = {
data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
@@ -77,18 +134,15 @@ class WARC:
'extra': self._specDependencies.extra,
'extra': self._specDependencies.extra,
}
}
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'Content-Type': 'application/json; charset=utf-8',
},
payload
)
)
self._warcWriter.write _record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']
def write_client_response(self, response):
def write_client_response(self, response):
'''
'''
@@ -100,63 +154,62 @@ class WARC:
for r in response.iter_all():
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
r.rawRequestData.seek(0)
r.rawRequestData.seek(0)
requestRecord = self._warcWriter. create_warc_record(
str(r.url) ,
'request',
payload = r.rawRequestData ,
length = length ,
warc_headers_dict = {
'WARC-Date': requestDate ,
'WARC-IP-Address': r.remoteAddress[0] ,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
}
requestRecord = self._create_warc_record(
'request' ,
{
'WARC-Date': requestDate ,
'WARC-Target-URI': str(r.url) ,
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request' ,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
} ,
r.rawRequestData
)
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0)
r.rawResponseData.seek(0)
responseRecord = self._warcWriter. create_warc_record(
str(r.url) ,
'response',
payload = r.rawResponseData ,
length = length ,
warc_headers_dict = {
'WARC-Date': requestDate ,
'WARC-IP-Address': r.remoteAddress[0] ,
'WARC-Concurrent-To': request RecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
}
responseRecord = self._create_warc_record(
'response' ,
{
'WARC-Date': requestDate ,
'WARC-Target-URI': str(r.url) ,
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response' ,
'WARC-Concurrent-To': requestRecordID ,
'WARC-Warcinfo-ID': self._metaWarcinfo RecordID,
} ,
r.rawResponseData
)
)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100 : # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.length > 1024 : # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if payloadDigest in self._dedupeMap:
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO()
qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData)
httpHeaderData.seek(0)
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
)
)
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write _record(requestRecord)
self._warcWriter.write _record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc _record(requestRecord)
self._write_warc _record(responseRecord)
if self._maxFileSize and self._file.tell() > self._maxFileSize:
if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
self._close_file()
@@ -167,17 +220,17 @@ class WARC:
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f:
with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}' ,
'resource' ,
payload = f ,
length = length ,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_ ,
'Content-Type': contentType ,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
} ,
f
)
)
self._warcWriter.write _record(record)
self._write_warc _record(record)
def _write_initial_meta_records(self):
def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -190,24 +243,23 @@ class WARC:
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush()
handler.flush()
with open(self._logFilename, 'rb') as fp:
with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename} ',
'resource ',
payload = fp ,
length = length ,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log ',
'Content-Type': 'text/plain; charset=utf-8 ',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID ,
} ,
fp
)
)
self._warcWriter.write _record(record)
self._write_warc _record(record)
def _close_file(self):
def _close_file(self):
'''Close the currently opened WARC'''
'''Close the currently opened WARC'''
if not self._closed:
if not self._closed:
self._file.close()
self._file.close()
self._warcWriter = None
self._file = None
self._file = None
self._closed = True
self._closed = True
@@ -218,7 +270,6 @@ class WARC:
try:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._closed = False
callback()
callback()