Compare commits

...

4 Commits
master ... 0.2

Author SHA1 Message Date
  JustAnotherArchivist 5579129b11 Support overriding the total fetch timeout 2 years ago
  JustAnotherArchivist 215ac03221 Support HEAD requests 2 years ago
  JustAnotherArchivist 8f46225477 Replace warcio with own WARC writing implementation 2 years ago
  JustAnotherArchivist a7d7852c6d Fix ISO-8859-1-encoded Location header handling 2 years ago
4 changed files with 268 additions and 93 deletions
Split View
  1. +8
    -3
      qwarc/__init__.py
  2. +105
    -0
      qwarc/utils.py
  3. +154
    -89
      qwarc/warc.py
  4. +1
    -1
      setup.py

+ 8
- 3
qwarc/__init__.py View File

@@ -16,6 +16,7 @@ import logging
import os
import random
import sqlite3
import urllib.parse
import yarl


@@ -33,7 +34,7 @@ class Item:

self.childItems = []

async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True):
async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60):
'''
HTTP GET or POST a URL

@@ -50,7 +51,7 @@ class Item:
#TODO: Rewrite using 'async with self.session.get'

url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
assert method in ('GET', 'POST'), 'method must be GET or POST'
assert method in ('GET', 'POST', 'HEAD'), 'method must be GET, POST, or HEAD'
headers = self.headers + headers
#TODO Deduplicate headers with later values overriding earlier ones
history = []
@@ -64,7 +65,7 @@ class Item:
writeToWarc = True
try:
try:
with _aiohttp.Timeout(60):
with _aiohttp.Timeout(timeout):
self.logger.info(f'Fetching {url}')
response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
try:
@@ -101,6 +102,10 @@ class Item:
redirectUrl = response.headers.get('Location') or response.headers.get('URI')
if not redirectUrl:
return retResponse, tuple(history)
if any(56448 <= ord(c) <= 56575 for c in redirectUrl):
# Surrogate escape characters in the redirect URL, which usually means that the server sent non-ASCII data (e.g. ISO-8859-1).
# Revert the encoding, then percent-encode the non-ASCII bytes.
redirectUrl = urllib.parse.quote_from_bytes(redirectUrl.encode('utf8', 'surrogateescape'), safe = ''.join(chr(i) for i in range(128)))
url = url.join(yarl.URL(redirectUrl))
if response.status in (301, 302, 303) and method == 'POST':
method = 'GET'


+ 105
- 0
qwarc/utils.py View File

@@ -2,12 +2,14 @@ from qwarc.const import *
import aiohttp
import asyncio
import functools
import io
import logging
import os
import pkg_resources
import platform
import time
import typing
import zlib


PAGESIZE = os.sysconf('SC_PAGE_SIZE')
@@ -260,3 +262,106 @@ class ReadonlyFileView:
if key == 'writable':
return False
return getattr(self._fp, key)


def iter_file(f, length = None, blockSize = 1048576):
'''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.'''
read = 0
while True:
buf = f.read(blockSize)
if not buf: # EOF
if length and read < length:
raise RuntimeError('Reached EOF before reading enough data')
break
if length and read + len(buf) > length:
initialBufLen = len(buf)
buf = buf[0 : length - read]
f.seek(len(buf) - initialBufLen, io.SEEK_CUR)
read += len(buf)
yield buf
if length and read >= length:
if read > length: # This should never happen due to the truncation above.
raise RuntimeError('Overread')
break


def read_http_headers(f):
headers = {}

# Status line or request line
line = f.readline()

line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
# Split into header name and value
name, value = line.split(b':', 1)
name = name.strip(b' \t')

# Read next line
line = f.readline()

# Handle continuation lines
continuation = line[0:1] in (b' ', b'\t')
if continuation:
value = []
while continuation:
value.append(line)
line = f.readline()
continuation = line[0:1] in (b' ', b'\t')
value = b''.join(value)

# Decode and store
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
name = name.decode('iso-8859-1')
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('iso-8859-1')
headers[name.lower()] = value

# `line` is already the next line, if any
return headers


def read_http_body(f, length, headers):
if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')):
while True:
chunkLine = f.readline()
if b';' in chunkLine:
chunkLength = chunkLine.split(b';', 1)[0].strip()
else:
chunkLength = chunkLine.strip()
chunkLength = int(chunkLength, base = 16)
if chunkLength == 0:
break
yield from iter_file(f, length = chunkLength)
assert f.read(2) == b'\r\n' # Chunk terminator

# Consume trailer
line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
line = f.readline()
else:
yield from iter_file(f, length = length)


class GzipWrapper:
def __init__(self, f):
self._file = f
self._compressor = None

def __enter__(self):
self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
return self

def write(self, data):
buf = self._compressor.compress(data)
self._file.write(buf)

def __exit__(self, excType, excVal, excTb):
buf = self._compressor.flush()
self._file.write(buf)
self._file.flush()
self._compressor = None

+ 154
- 89
qwarc/warc.py View File

@@ -1,14 +1,32 @@
import base64
import fcntl
import gzip
import hashlib
import io
import itertools
import json
import logging
import os
import qwarc.utils
import tempfile
import time
import warcio
import uuid


class _WARCRecord:
def __init__(self, headers, body, length, httpHeaderLength):
self.headers = headers
self.body = body
self.length = length
self.httpHeaderLength = httpHeaderLength


class _Digester:
def __init__(self):
self._digester = hashlib.sha1()

def update(self, data):
self._digester.update(data)

def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'


class WARC:
@@ -31,7 +49,6 @@ class WARC:

self._closed = True
self._file = None
self._warcWriter = None

self._dedupe = dedupe
self._dedupeMap = {}
@@ -62,11 +79,62 @@ class WARC:
else:
break
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._counter += 1

def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()

if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'

headers['WARC-Type'] = recordType

digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)

if headers['Content-Type'].startswith('application/http;'):
httpHeaders = qwarc.utils.read_http_headers(body)
httpHeaderLength = body.tell() - startPos
if 'WARC-Payload-Digest' not in headers:
digester = _Digester()
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
headers['WARC-Payload-Digest'] = str(digester)
body.seek(startPos)
else:
httpHeaderLength = None

if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)

return _WARCRecord(headers, body, length, httpHeaderLength)

def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers

for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)

fp.write(b'\r\n\r\n') # Record separator

def _get_date_string(self, t = None):
if t is None:
t = time.time()
usec = f'{(t - int(t)):.6f}'[2:]
return time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(t))

def _write_warcinfo_record(self):
date = self._get_date_string()
data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'command': self._command,
@@ -77,18 +145,16 @@ class WARC:
'extra': self._specDependencies.extra,
}
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'WARC-Date': date,
'Content-Type': 'application/json; charset=utf-8',
},
payload
)
self._warcWriter.write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']

def write_client_response(self, response):
'''
@@ -98,65 +164,62 @@ class WARC:

self._ensure_opened()
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
requestDate = self._get_date_string(r.rawRequestTimestamp)
r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = r.rawRequestData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawRequestData
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = r.rawResponseData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
responseRecord = self._create_warc_record(
'response',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response',
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawResponseData
)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
# Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.httpHeaderLength and (responseRecord.length - responseRecord.httpHeaderLength) > 100:
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO(r.rawResponseData.read(responseRecord.httpHeaderLength))
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
)
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc_record(requestRecord)
self._write_warc_record(responseRecord)

if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
@@ -165,19 +228,21 @@ class WARC:
'''Write spec file and dependencies'''
assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'

date = self._get_date_string()
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -189,25 +254,26 @@ class WARC:
rootLogger = logging.getLogger()
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush()
date = self._get_date_string()
with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _close_file(self):
'''Close the currently opened WARC'''

if not self._closed:
self._file.close()
self._warcWriter = None
self._file = None
self._closed = True

@@ -218,7 +284,6 @@ class WARC:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False

callback()


+ 1
- 1
setup.py View File

@@ -15,7 +15,7 @@ setuptools.setup(
packages = ['qwarc'],
setup_requires = ['setuptools_scm'],
use_scm_version = True,
install_requires = ['aiohttp==2.3.10', 'warcio', 'yarl'],
install_requires = ['aiohttp==2.3.10', 'yarl'],
entry_points = {
'console_scripts': [
'qwarc = qwarc.cli:main',


Loading…
Cancel
Save