コミットを比較

...

4 コミット
master ... 0.2

作成者 SHA1 メッセージ 日付
  JustAnotherArchivist 5579129b11 Support overriding the total fetch timeout 2年前
  JustAnotherArchivist 215ac03221 Support HEAD requests 2年前
  JustAnotherArchivist 8f46225477 Replace warcio with own WARC writing implementation 2年前
  JustAnotherArchivist a7d7852c6d Fix ISO-8859-1-encoded Location header handling 2年前
4個のファイルの変更268行の追加93行の削除
分割表示
  1. +8
    -3
      qwarc/__init__.py
  2. +105
    -0
      qwarc/utils.py
  3. +154
    -89
      qwarc/warc.py
  4. +1
    -1
      setup.py

+ 8
- 3
qwarc/__init__.py ファイルの表示

@@ -16,6 +16,7 @@ import logging
import os
import random
import sqlite3
import urllib.parse
import yarl


@@ -33,7 +34,7 @@ class Item:

self.childItems = []

async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True):
async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60):
'''
HTTP GET or POST a URL

@@ -50,7 +51,7 @@ class Item:
#TODO: Rewrite using 'async with self.session.get'

url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
assert method in ('GET', 'POST'), 'method must be GET or POST'
assert method in ('GET', 'POST', 'HEAD'), 'method must be GET, POST, or HEAD'
headers = self.headers + headers
#TODO Deduplicate headers with later values overriding earlier ones
history = []
@@ -64,7 +65,7 @@ class Item:
writeToWarc = True
try:
try:
with _aiohttp.Timeout(60):
with _aiohttp.Timeout(timeout):
self.logger.info(f'Fetching {url}')
response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
try:
@@ -101,6 +102,10 @@ class Item:
redirectUrl = response.headers.get('Location') or response.headers.get('URI')
if not redirectUrl:
return retResponse, tuple(history)
if any(56448 <= ord(c) <= 56575 for c in redirectUrl):
# Surrogate escape characters in the redirect URL, which usually means that the server sent non-ASCII data (e.g. ISO-8859-1).
# Revert the encoding, then percent-encode the non-ASCII bytes.
redirectUrl = urllib.parse.quote_from_bytes(redirectUrl.encode('utf8', 'surrogateescape'), safe = ''.join(chr(i) for i in range(128)))
url = url.join(yarl.URL(redirectUrl))
if response.status in (301, 302, 303) and method == 'POST':
method = 'GET'


+ 105
- 0
qwarc/utils.py ファイルの表示

@@ -2,12 +2,14 @@ from qwarc.const import *
import aiohttp
import asyncio
import functools
import io
import logging
import os
import pkg_resources
import platform
import time
import typing
import zlib


PAGESIZE = os.sysconf('SC_PAGE_SIZE')
@@ -260,3 +262,106 @@ class ReadonlyFileView:
if key == 'writable':
return False
return getattr(self._fp, key)


def iter_file(f, length = None, blockSize = 1048576):
'''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.'''
read = 0
while True:
buf = f.read(blockSize)
if not buf: # EOF
if length and read < length:
raise RuntimeError('Reached EOF before reading enough data')
break
if length and read + len(buf) > length:
initialBufLen = len(buf)
buf = buf[0 : length - read]
f.seek(len(buf) - initialBufLen, io.SEEK_CUR)
read += len(buf)
yield buf
if length and read >= length:
if read > length: # This should never happen due to the truncation above.
raise RuntimeError('Overread')
break


def read_http_headers(f):
headers = {}

# Status line or request line
line = f.readline()

line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
# Split into header name and value
name, value = line.split(b':', 1)
name = name.strip(b' \t')

# Read next line
line = f.readline()

# Handle continuation lines
continuation = line[0:1] in (b' ', b'\t')
if continuation:
value = []
while continuation:
value.append(line)
line = f.readline()
continuation = line[0:1] in (b' ', b'\t')
value = b''.join(value)

# Decode and store
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
name = name.decode('iso-8859-1')
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('iso-8859-1')
headers[name.lower()] = value

# `line` is already the next line, if any
return headers


def read_http_body(f, length, headers):
if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')):
while True:
chunkLine = f.readline()
if b';' in chunkLine:
chunkLength = chunkLine.split(b';', 1)[0].strip()
else:
chunkLength = chunkLine.strip()
chunkLength = int(chunkLength, base = 16)
if chunkLength == 0:
break
yield from iter_file(f, length = chunkLength)
assert f.read(2) == b'\r\n' # Chunk terminator

# Consume trailer
line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
line = f.readline()
else:
yield from iter_file(f, length = length)


class GzipWrapper:
def __init__(self, f):
self._file = f
self._compressor = None

def __enter__(self):
self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
return self

def write(self, data):
buf = self._compressor.compress(data)
self._file.write(buf)

def __exit__(self, excType, excVal, excTb):
buf = self._compressor.flush()
self._file.write(buf)
self._file.flush()
self._compressor = None

+ 154
- 89
qwarc/warc.py ファイルの表示

@@ -1,14 +1,32 @@
import base64
import fcntl
import gzip
import hashlib
import io
import itertools
import json
import logging
import os
import qwarc.utils
import tempfile
import time
import warcio
import uuid


class _WARCRecord:
def __init__(self, headers, body, length, httpHeaderLength):
self.headers = headers
self.body = body
self.length = length
self.httpHeaderLength = httpHeaderLength


class _Digester:
def __init__(self):
self._digester = hashlib.sha1()

def update(self, data):
self._digester.update(data)

def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'


class WARC:
@@ -31,7 +49,6 @@ class WARC:

self._closed = True
self._file = None
self._warcWriter = None

self._dedupe = dedupe
self._dedupeMap = {}
@@ -62,11 +79,62 @@ class WARC:
else:
break
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False
self._counter += 1

def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()

if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'

headers['WARC-Type'] = recordType

digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)

if headers['Content-Type'].startswith('application/http;'):
httpHeaders = qwarc.utils.read_http_headers(body)
httpHeaderLength = body.tell() - startPos
if 'WARC-Payload-Digest' not in headers:
digester = _Digester()
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
headers['WARC-Payload-Digest'] = str(digester)
body.seek(startPos)
else:
httpHeaderLength = None

if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)

return _WARCRecord(headers, body, length, httpHeaderLength)

def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers

for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)

fp.write(b'\r\n\r\n') # Record separator

def _get_date_string(self, t = None):
if t is None:
t = time.time()
usec = f'{(t - int(t)):.6f}'[2:]
return time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(t))

def _write_warcinfo_record(self):
date = self._get_date_string()
data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
'command': self._command,
@@ -77,18 +145,16 @@ class WARC:
'extra': self._specDependencies.extra,
}
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'WARC-Date': date,
'Content-Type': 'application/json; charset=utf-8',
},
payload
)
self._warcWriter.write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']

def write_client_response(self, response):
'''
@@ -98,65 +164,62 @@ class WARC:

self._ensure_opened()
for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
requestDate = self._get_date_string(r.rawRequestTimestamp)
r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = r.rawRequestData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawRequestData
)
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = r.rawResponseData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
responseRecord = self._create_warc_record(
'response',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response',
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawResponseData
)
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
# Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.httpHeaderLength and (responseRecord.length - responseRecord.httpHeaderLength) > 100:
if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO(r.rawResponseData.read(responseRecord.httpHeaderLength))
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
)
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc_record(requestRecord)
self._write_warc_record(responseRecord)

if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file()
@@ -165,19 +228,21 @@ class WARC:
'''Write spec file and dependencies'''
assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'

date = self._get_date_string()
for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -189,25 +254,26 @@ class WARC:
rootLogger = logging.getLogger()
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush()
date = self._get_date_string()
with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Date': date,
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
)
self._warcWriter.write_record(record)
self._write_warc_record(record)

def _close_file(self):
'''Close the currently opened WARC'''

if not self._closed:
self._file.close()
self._warcWriter = None
self._file = None
self._closed = True

@@ -218,7 +284,6 @@ class WARC:
try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False

callback()


+ 1
- 1
setup.py ファイルの表示

@@ -15,7 +15,7 @@ setuptools.setup(
packages = ['qwarc'],
setup_requires = ['setuptools_scm'],
use_scm_version = True,
install_requires = ['aiohttp==2.3.10', 'warcio', 'yarl'],
install_requires = ['aiohttp==2.3.10', 'yarl'],
entry_points = {
'console_scripts': [
'qwarc = qwarc.cli:main',


読み込み中…
キャンセル
保存