Compare commits

...

4 Commits
master ... 0.2

Author SHA1 Message Date
  JustAnotherArchivist 5579129b11 Support overriding the total fetch timeout 2 years ago
  JustAnotherArchivist 215ac03221 Support HEAD requests 2 years ago
  JustAnotherArchivist 8f46225477 Replace warcio with own WARC writing implementation 2 years ago
  JustAnotherArchivist a7d7852c6d Fix ISO-8859-1-encoded Location header handling 2 years ago
4 changed files with 268 additions and 93 deletions
Unified View
  1. +8
    -3
      qwarc/__init__.py
  2. +105
    -0
      qwarc/utils.py
  3. +154
    -89
      qwarc/warc.py
  4. +1
    -1
      setup.py

+ 8
- 3
qwarc/__init__.py View File

@@ -16,6 +16,7 @@ import logging
import os import os
import random import random
import sqlite3 import sqlite3
import urllib.parse
import yarl import yarl




@@ -33,7 +34,7 @@ class Item:


self.childItems = [] self.childItems = []


async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True):
async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60):
''' '''
HTTP GET or POST a URL HTTP GET or POST a URL


@@ -50,7 +51,7 @@ class Item:
#TODO: Rewrite using 'async with self.session.get' #TODO: Rewrite using 'async with self.session.get'


url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
assert method in ('GET', 'POST'), 'method must be GET or POST'
assert method in ('GET', 'POST', 'HEAD'), 'method must be GET, POST, or HEAD'
headers = self.headers + headers headers = self.headers + headers
#TODO Deduplicate headers with later values overriding earlier ones #TODO Deduplicate headers with later values overriding earlier ones
history = [] history = []
@@ -64,7 +65,7 @@ class Item:
writeToWarc = True writeToWarc = True
try: try:
try: try:
with _aiohttp.Timeout(60):
with _aiohttp.Timeout(timeout):
self.logger.info(f'Fetching {url}') self.logger.info(f'Fetching {url}')
response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl) response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
try: try:
@@ -101,6 +102,10 @@ class Item:
redirectUrl = response.headers.get('Location') or response.headers.get('URI') redirectUrl = response.headers.get('Location') or response.headers.get('URI')
if not redirectUrl: if not redirectUrl:
return retResponse, tuple(history) return retResponse, tuple(history)
if any(56448 <= ord(c) <= 56575 for c in redirectUrl):
# Surrogate escape characters in the redirect URL, which usually means that the server sent non-ASCII data (e.g. ISO-8859-1).
# Revert the encoding, then percent-encode the non-ASCII bytes.
redirectUrl = urllib.parse.quote_from_bytes(redirectUrl.encode('utf8', 'surrogateescape'), safe = ''.join(chr(i) for i in range(128)))
url = url.join(yarl.URL(redirectUrl)) url = url.join(yarl.URL(redirectUrl))
if response.status in (301, 302, 303) and method == 'POST': if response.status in (301, 302, 303) and method == 'POST':
method = 'GET' method = 'GET'


+ 105
- 0
qwarc/utils.py View File

@@ -2,12 +2,14 @@ from qwarc.const import *
import aiohttp import aiohttp
import asyncio import asyncio
import functools import functools
import io
import logging import logging
import os import os
import pkg_resources import pkg_resources
import platform import platform
import time import time
import typing import typing
import zlib




PAGESIZE = os.sysconf('SC_PAGE_SIZE') PAGESIZE = os.sysconf('SC_PAGE_SIZE')
@@ -260,3 +262,106 @@ class ReadonlyFileView:
if key == 'writable': if key == 'writable':
return False return False
return getattr(self._fp, key) return getattr(self._fp, key)


def iter_file(f, length = None, blockSize = 1048576):
'''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.'''
read = 0
while True:
buf = f.read(blockSize)
if not buf: # EOF
if length and read < length:
raise RuntimeError('Reached EOF before reading enough data')
break
if length and read + len(buf) > length:
initialBufLen = len(buf)
buf = buf[0 : length - read]
f.seek(len(buf) - initialBufLen, io.SEEK_CUR)
read += len(buf)
yield buf
if length and read >= length:
if read > length: # This should never happen due to the truncation above.
raise RuntimeError('Overread')
break


def read_http_headers(f):
headers = {}

# Status line or request line
line = f.readline()

line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
# Split into header name and value
name, value = line.split(b':', 1)
name = name.strip(b' \t')

# Read next line
line = f.readline()

# Handle continuation lines
continuation = line[0:1] in (b' ', b'\t')
if continuation:
value = []
while continuation:
value.append(line)
line = f.readline()
continuation = line[0:1] in (b' ', b'\t')
value = b''.join(value)

# Decode and store
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
name = name.decode('iso-8859-1')
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('iso-8859-1')
headers[name.lower()] = value

# `line` is already the next line, if any
return headers


def read_http_body(f, length, headers):
if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')):
while True:
chunkLine = f.readline()
if b';' in chunkLine:
chunkLength = chunkLine.split(b';', 1)[0].strip()
else:
chunkLength = chunkLine.strip()
chunkLength = int(chunkLength, base = 16)
if chunkLength == 0:
break
yield from iter_file(f, length = chunkLength)
assert f.read(2) == b'\r\n' # Chunk terminator

# Consume trailer
line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
line = f.readline()
else:
yield from iter_file(f, length = length)


class GzipWrapper:
def __init__(self, f):
self._file = f
self._compressor = None

def __enter__(self):
self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
return self

def write(self, data):
buf = self._compressor.compress(data)
self._file.write(buf)

def __exit__(self, excType, excVal, excTb):
buf = self._compressor.flush()
self._file.write(buf)
self._file.flush()
self._compressor = None

+ 154
- 89
qwarc/warc.py View File

@@ -1,14 +1,32 @@
import base64
import fcntl import fcntl
import gzip
import hashlib
import io import io
import itertools import itertools
import json import json
import logging import logging
import os
import qwarc.utils import qwarc.utils
import tempfile
import time import time
import warcio
import uuid


class _WARCRecord:
def __init__(self, headers, body, length, httpHeaderLength):
self.headers = headers
self.body = body
self.length = length
self.httpHeaderLength = httpHeaderLength


class _Digester:
def __init__(self):
self._digester = hashlib.sha1()

def update(self, data):
self._digester.update(data)

def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'




class WARC: class WARC:
@@ -31,7 +49,6 @@ class WARC:


self._closed = True self._closed = True
self._file = None self._file = None
self._warcWriter = None


self._dedupe = dedupe self._dedupe = dedupe
self._dedupeMap = {} self._dedupeMap = {}
@@ -62,11 +79,62 @@ class WARC:
else: else:
break break
logging.info(f'Opened {filename}') logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False self._closed = False
self._counter += 1 self._counter += 1


def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()

if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'

headers['WARC-Type'] = recordType

digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)

if headers['Content-Type'].startswith('application/http;'):
httpHeaders = qwarc.utils.read_http_headers(body)
httpHeaderLength = body.tell() - startPos
if 'WARC-Payload-Digest' not in headers:
digester = _Digester()
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
headers['WARC-Payload-Digest'] = str(digester)
body.seek(startPos)
else:
httpHeaderLength = None

if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)

return _WARCRecord(headers, body, length, httpHeaderLength)

def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers

for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)

fp.write(b'\r\n\r\n') # Record separator

def _get_date_string(self, t = None):
if t is None:
t = time.time()
usec = f'{(t - int(t)):.6f}'[2:]
return time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(t))

def _write_warcinfo_record(self): def _write_warcinfo_record(self):
date = self._get_date_string()
data = { data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'command': self._command, 'command': self._command,
@@ -77,18 +145,16 @@ class WARC:
'extra': self._specDependencies.extra, 'extra': self._specDependencies.extra,
} }
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')) payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'WARC-Date': date,
'Content-Type': 'application/json; charset=utf-8',
},
payload
) )
self._warcWriter.write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']


def write_client_response(self, response): def write_client_response(self, response):
''' '''
@@ -98,65 +164,62 @@ class WARC:


self._ensure_opened() self._ensure_opened()
for r in response.iter_all(): for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
requestDate = self._get_date_string(r.rawRequestTimestamp)
r.rawRequestData.seek(0) r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = r.rawRequestData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawRequestData
) )
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0) r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = r.rawResponseData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
responseRecord = self._create_warc_record(
'response',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response',
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawResponseData
) )
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
# Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.httpHeaderLength and (responseRecord.length - responseRecord.httpHeaderLength) > 100:
if payloadDigest in self._dedupeMap: if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO(r.rawResponseData.read(responseRecord.httpHeaderLength))
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
) )
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else: else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc_record(requestRecord)
self._write_warc_record(responseRecord)


if self._maxFileSize and self._file.tell() > self._maxFileSize: if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file() self._close_file()
@@ -165,19 +228,21 @@ class WARC:
'''Write spec file and dependencies''' '''Write spec file and dependencies'''
assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first' assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'


date = self._get_date_string()
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)): for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f: with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
) )
self._warcWriter.write_record(record)
self._write_warc_record(record)


def _write_initial_meta_records(self): def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record() self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -189,25 +254,26 @@ class WARC:
rootLogger = logging.getLogger() rootLogger = logging.getLogger()
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush() handler.flush()
date = self._get_date_string()
with open(self._logFilename, 'rb') as fp: with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
) )
self._warcWriter.write_record(record)
self._write_warc_record(record)


def _close_file(self): def _close_file(self):
'''Close the currently opened WARC''' '''Close the currently opened WARC'''


if not self._closed: if not self._closed:
self._file.close() self._file.close()
self._warcWriter = None
self._file = None self._file = None
self._closed = True self._closed = True


@@ -218,7 +284,6 @@ class WARC:
try: try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}') logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False self._closed = False


callback() callback()


+ 1
- 1
setup.py View File

@@ -15,7 +15,7 @@ setuptools.setup(
packages = ['qwarc'], packages = ['qwarc'],
setup_requires = ['setuptools_scm'], setup_requires = ['setuptools_scm'],
use_scm_version = True, use_scm_version = True,
install_requires = ['aiohttp==2.3.10', 'warcio', 'yarl'],
install_requires = ['aiohttp==2.3.10', 'yarl'],
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
'qwarc = qwarc.cli:main', 'qwarc = qwarc.cli:main',


Loading…
Cancel
Save