Compare commits

...

1 Commits

Author SHA1 Message Date
  JustAnotherArchivist f5c3eb42b3 WIP attempt to remove warcio 2 years ago
2 changed files with 252 additions and 86 deletions
Split View
  1. +115
    -0
      qwarc/utils.py
  2. +137
    -86
      qwarc/warc.py

+ 115
- 0
qwarc/utils.py View File

@@ -2,12 +2,14 @@ from qwarc.const import *
import aiohttp
import asyncio
import functools
import io
import logging
import os
import pkg_resources
import platform
import time
import typing
import zlib


PAGESIZE = os.sysconf('SC_PAGE_SIZE')
@@ -260,3 +262,116 @@ class ReadonlyFileView:
if key == 'writable':
return False
return getattr(self._fp, key)


def iter_file(f, length = None, blockSize = 1048576):
'''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.'''
read = 0
while True:
buf = f.read(blockSize)
if not buf: # EOF
if length and read < length:
raise RuntimeError('Reached EOF before reading enough data')
break
if length and read + len(buf) > length:
initialBufLen = len(buf)
buf = buf[0 : length - read]
f.seek(len(buf) - initialBufLen, io.SEEK_CUR)
read += len(buf)
yield buf
if length and read >= length:
if read > length: # This should never happen due to the truncation above.
raise RuntimeError('Overread')
break


def read_http_headers(f, copy = None):
headers = {}

# Status line or request line
line = f.readline()
if copy:
copy.write(line)

line = f.readline()
if copy:
copy.write(line)
while line and line not in (b'\r\n', b'\r', b'\n'):
# Split into header name and value
name, value = line.split(b':', 1)
name = name.strip(b' \t')
#TODO name validation

# Read next line
line = f.readline()
if copy:
copy.write(line)

# Handle continuation lines
continuation = line[0:1] in (b' ', b'\t')
if continuation:
value = []
while continuation:
value.append(line)
line = f.readline()
if copy:
copy.write(line)
continuation = line[0:1] in (b' ', b'\t')
value = b''.join(value)

# Decode and store
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
name = name.decode('iso-8859-1')
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('iso-8859-1')
headers[name.lower()] = value

# `line` is already the next line, if any
return headers


def read_http_body(f, length, headers):
if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')):
while True:
chunkLine = f.readline()
if b';' in chunkLine:
chunkLength = chunkLine.split(b';', 1)[0].strip()
else:
chunkLength = chunkLine.strip()
chunkLength = int(chunkLength, base = 16)
if chunkLength == 0:
break
yield from iter_file(f, length = chunkLength)
assert f.read(2) == b'\r\n' # Chunk terminator

# Consume trailer
line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
line = f.readline()
else:
yield from iter_file(f, length = length)



class GzipWrapper:
def __init__(self, f):
self._file = f
self._compressor = None

def __enter__(self):
self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
return self

def write(self, data):
buf = self._compressor.compress(data)
self._file.write(buf)

def __exit__(self, excType, excVal, excTb):
buf = self._compressor.flush()
self._file.write(buf)
self._file.flush()
self._compressor = None

+ 137
- 86
qwarc/warc.py View File

@@ -1,16 +1,34 @@
import base64
import fcntl
import gzip
import hashlib
import io
import itertools
import json
import logging
import os
import qwarc.utils
import tempfile
import time
import uuid
import warcio


class _WARCRecord:
def __init__(self, headers, body, length):
self.headers = headers
self.body = body
self.length = length


class _Digester:
def __init__(self):
self._digester = hashlib.sha1()

def update(self, data):
self._digester.update(data)

def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'


class WARC:
def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
'''
@@ -31,7 +49,6 @@ class WARC:

self._closed = True
self._file = None
self._warcWriter = None

self._dedupe = dedupe
self._dedupeMap = {}
@@ -62,10 +79,50 @@ class WARC:
else:
break
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._counter += 1

def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()

if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'

headers['WARC-Type'] = recordType

digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)

if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'):
digester = _Digester()
httpHeaders = qwarc.utils.read_http_headers(body)
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
body.seek(startPos)
headers['WARC-Payload-Digest'] = str(digester)

if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)

return _WARCRecord(headers, body, length)

def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers

for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)

fp.write(b'\r\n\r\n') # Record separator

def _write_warcinfo_record(self):
data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
@@ -77,18 +134,15 @@ class WARC:
'extra': self._specDependencies.extra,
}
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'Content-Type': 'application/json; charset=utf-8',
},
payload
)
self._warcWriter.write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']

def write_client_response(self, response):
'''
@@ -100,63 +154,62 @@ class WARC:
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = r.rawRequestData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawRequestData
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = r.rawResponseData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
responseRecord = self._create_warc_record(
'response',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response',
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawResponseData
)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.length > 1024: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO()
qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData)
httpHeaderData.seek(0)
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
)
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc_record(requestRecord)
self._write_warc_record(responseRecord)

if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
@@ -167,17 +220,17 @@ class WARC:

for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -190,24 +243,23 @@ class WARC:
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush()
with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _close_file(self):
'''Close the currently opened WARC'''

if not self._closed:
self._file.close()
self._warcWriter = None
self._file = None
self._closed = True

@@ -218,7 +270,6 @@ class WARC:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False

callback()


Loading…
Cancel
Save