Compare commits

...

1 Commits

Author SHA1 Message Date
  JustAnotherArchivist f5c3eb42b3 WIP attempt to remove warcio 2 years ago
2 changed files with 252 additions and 86 deletions
Unified View
  1. +115
    -0
      qwarc/utils.py
  2. +137
    -86
      qwarc/warc.py

+ 115
- 0
qwarc/utils.py View File

@@ -2,12 +2,14 @@ from qwarc.const import *
import aiohttp import aiohttp
import asyncio import asyncio
import functools import functools
import io
import logging import logging
import os import os
import pkg_resources import pkg_resources
import platform import platform
import time import time
import typing import typing
import zlib




PAGESIZE = os.sysconf('SC_PAGE_SIZE') PAGESIZE = os.sysconf('SC_PAGE_SIZE')
@@ -260,3 +262,116 @@ class ReadonlyFileView:
if key == 'writable': if key == 'writable':
return False return False
return getattr(self._fp, key) return getattr(self._fp, key)


def iter_file(f, length = None, blockSize = 1048576):
'''Read `length` bytes from `f` in chunks of `blockSize` bytes. If `length` is `None`, read until EOF.'''
read = 0
while True:
buf = f.read(blockSize)
if not buf: # EOF
if length and read < length:
raise RuntimeError('Reached EOF before reading enough data')
break
if length and read + len(buf) > length:
initialBufLen = len(buf)
buf = buf[0 : length - read]
f.seek(len(buf) - initialBufLen, io.SEEK_CUR)
read += len(buf)
yield buf
if length and read >= length:
if read > length: # This should never happen due to the truncation above.
raise RuntimeError('Overread')
break


def read_http_headers(f, copy = None):
headers = {}

# Status line or request line
line = f.readline()
if copy:
copy.write(line)

line = f.readline()
if copy:
copy.write(line)
while line and line not in (b'\r\n', b'\r', b'\n'):
# Split into header name and value
name, value = line.split(b':', 1)
name = name.strip(b' \t')
#TODO name validation

# Read next line
line = f.readline()
if copy:
copy.write(line)

# Handle continuation lines
continuation = line[0:1] in (b' ', b'\t')
if continuation:
value = []
while continuation:
value.append(line)
line = f.readline()
if copy:
copy.write(line)
continuation = line[0:1] in (b' ', b'\t')
value = b''.join(value)

# Decode and store
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
name = name.decode('iso-8859-1')
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('iso-8859-1')
headers[name.lower()] = value

# `line` is already the next line, if any
return headers


def read_http_body(f, length, headers):
if 'chunked' in map(str.strip, headers.get('transfer-encoding', '').split(',')):
while True:
chunkLine = f.readline()
if b';' in chunkLine:
chunkLength = chunkLine.split(b';', 1)[0].strip()
else:
chunkLength = chunkLine.strip()
chunkLength = int(chunkLength, base = 16)
if chunkLength == 0:
break
yield from iter_file(f, length = chunkLength)
assert f.read(2) == b'\r\n' # Chunk terminator

# Consume trailer
line = f.readline()
while line and line not in (b'\r\n', b'\r', b'\n'):
line = f.readline()
else:
yield from iter_file(f, length = length)



class GzipWrapper:
def __init__(self, f):
self._file = f
self._compressor = None

def __enter__(self):
self._compressor = zlib.compressobj(9, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
return self

def write(self, data):
buf = self._compressor.compress(data)
self._file.write(buf)

def __exit__(self, excType, excVal, excTb):
buf = self._compressor.flush()
self._file.write(buf)
self._file.flush()
self._compressor = None

+ 137
- 86
qwarc/warc.py View File

@@ -1,16 +1,34 @@
import base64
import fcntl import fcntl
import gzip
import hashlib
import io import io
import itertools import itertools
import json import json
import logging import logging
import os
import qwarc.utils import qwarc.utils
import tempfile
import time import time
import uuid
import warcio import warcio




class _WARCRecord:
def __init__(self, headers, body, length):
self.headers = headers
self.body = body
self.length = length


class _Digester:
def __init__(self):
self._digester = hashlib.sha1()

def update(self, data):
self._digester.update(data)

def __str__(self):
return f'sha1:{base64.b32encode(self._digester.digest()).decode("ascii")}'


class WARC: class WARC:
def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename): def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
''' '''
@@ -31,7 +49,6 @@ class WARC:


self._closed = True self._closed = True
self._file = None self._file = None
self._warcWriter = None


self._dedupe = dedupe self._dedupe = dedupe
self._dedupeMap = {} self._dedupeMap = {}
@@ -62,10 +79,50 @@ class WARC:
else: else:
break break
logging.info(f'Opened {filename}') logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False self._closed = False
self._counter += 1 self._counter += 1


def _create_warc_record(self, recordType, headers, body, length = None):
startPos = body.tell()

if 'WARC-Record-ID' not in headers:
headers['WARC-Record-ID'] = f'<urn:uuid:{uuid.uuid4()}>'

headers['WARC-Type'] = recordType

digester = _Digester()
for buf in qwarc.utils.iter_file(body, length = length):
digester.update(buf)
body.seek(startPos)
headers['WARC-Block-Digest'] = str(digester)

if 'WARC-Payload-Digest' not in headers and headers['Content-Type'].startswith('application/http;'):
digester = _Digester()
httpHeaders = qwarc.utils.read_http_headers(body)
for buf in qwarc.utils.read_http_body(body, length = (length - body.tell()) if length is not None else None, headers = httpHeaders):
digester.update(buf)
body.seek(startPos)
headers['WARC-Payload-Digest'] = str(digester)

if not length:
body.seek(0, io.SEEK_END)
length = body.tell() - startPos
body.seek(startPos)
headers['Content-Length'] = str(length)

return _WARCRecord(headers, body, length)

def _write_warc_record(self, record):
with qwarc.utils.GzipWrapper(self._file) as fp:
fp.write(b'WARC/1.1\r\n')
fp.write(b'\r\n'.join(k.encode('utf-8') + b': ' + v.encode('utf-8') for k, v in record.headers.items()))
fp.write(b'\r\n\r\n') # Trailing CRLF for last header line plus end of headers

for buf in qwarc.utils.iter_file(record.body, length = record.length):
fp.write(buf)

fp.write(b'\r\n\r\n') # Record separator

def _write_warcinfo_record(self): def _write_warcinfo_record(self):
data = { data = {
'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies), 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
@@ -77,18 +134,15 @@ class WARC:
'extra': self._specDependencies.extra, 'extra': self._specDependencies.extra,
} }
payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')) payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
# Workaround for https://github.com/webrecorder/warcio/issues/87
digester = warcio.utils.Digester('sha1')
digester.update(payload.getvalue())
record = self._warcWriter.create_warc_record(
None,
'warcinfo',
payload = payload,
warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
length = len(payload.getvalue()),
record = self._create_warc_record(
'warcinfo',
{
'Content-Type': 'application/json; charset=utf-8',
},
payload
) )
self._warcWriter.write_record(record)
return record.rec_headers.get_header('WARC-Record-ID')
self._write_warc_record(record)
return record.headers['WARC-Record-ID']


def write_client_response(self, response): def write_client_response(self, response):
''' '''
@@ -100,63 +154,62 @@ class WARC:
for r in response.iter_all(): for r in response.iter_all():
usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:] usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp)) requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
r.rawRequestData.seek(0, io.SEEK_END)
length = r.rawRequestData.tell()
r.rawRequestData.seek(0) r.rawRequestData.seek(0)
requestRecord = self._warcWriter.create_warc_record(
str(r.url),
'request',
payload = r.rawRequestData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
requestRecord = self._create_warc_record(
'request',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=request',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawRequestData
) )
requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
r.rawResponseData.seek(0, io.SEEK_END)
length = r.rawResponseData.tell()
requestRecordID = requestRecord.headers['WARC-Record-ID']
r.rawResponseData.seek(0) r.rawResponseData.seek(0)
responseRecord = self._warcWriter.create_warc_record(
str(r.url),
'response',
payload = r.rawResponseData,
length = length,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
responseRecord = self._create_warc_record(
'response',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'Content-Type': 'application/http; msgtype=response',
'WARC-Concurrent-To': requestRecordID,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
r.rawResponseData
) )
payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
payloadDigest = responseRecord.headers['WARC-Payload-Digest']
assert payloadDigest is not None assert payloadDigest is not None
if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if self._dedupe and responseRecord.length > 1024: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
if payloadDigest in self._dedupeMap: if payloadDigest in self._dedupeMap:
refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
responseHttpHeaders = responseRecord.http_headers
responseRecord = self._warcWriter.create_revisit_record(
str(r.url),
digest = payloadDigest,
refers_to_uri = refersToUri,
refers_to_date = refersToDate,
http_headers = responseHttpHeaders,
warc_headers_dict = {
'WARC-Date': requestDate,
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'WARC-Refers-To': refersToRecordId,
'WARC-Truncated': 'length',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
}
httpHeaderData = io.BytesIO()
qwarc.utils.read_http_headers(r.rawResponseData, copy = httpHeaderData)
httpHeaderData.seek(0)
responseRecord = self._create_warc_record(
'revisit',
{
'WARC-Date': requestDate,
'WARC-Target-URI': str(r.url),
'WARC-IP-Address': r.remoteAddress[0],
'WARC-Concurrent-To': requestRecordID,
'Content-Type': 'application/http; msgtype=response',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
'WARC-Profile': 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest',
'WARC-Refers-To-Target-URI': refersToUri,
'WARC-Refers-To-Date': refersToDate,
'WARC-Refers-To': refersToRecordId,
'WARC-Payload-Digest': payloadDigest,
'WARC-Truncated': 'length',
},
httpHeaderData
) )
# Workaround for https://github.com/webrecorder/warcio/issues/94
responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
else: else:
self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
self._warcWriter.write_record(requestRecord)
self._warcWriter.write_record(responseRecord)
self._dedupeMap[payloadDigest] = (responseRecord.headers['WARC-Record-ID'], str(r.url), requestDate)
self._write_warc_record(requestRecord)
self._write_warc_record(responseRecord)


if self._maxFileSize and self._file.tell() > self._maxFileSize: if self._maxFileSize and self._file.tell() > self._maxFileSize:
self._close_file() self._close_file()
@@ -167,17 +220,17 @@ class WARC:


for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)): for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
with open(fn, 'rb') as f: with open(fn, 'rb') as f:
f.seek(0, io.SEEK_END)
length = f.tell()
f.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{fn}',
'resource',
payload = f,
length = length,
warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{fn}',
'X-QWARC-Type': type_,
'Content-Type': contentType,
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
f
) )
self._warcWriter.write_record(record)
self._write_warc_record(record)


def _write_initial_meta_records(self): def _write_initial_meta_records(self):
self._metaWarcinfoRecordID = self._write_warcinfo_record() self._metaWarcinfoRecordID = self._write_warcinfo_record()
@@ -190,24 +243,23 @@ class WARC:
for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
handler.flush() handler.flush()
with open(self._logFilename, 'rb') as fp: with open(self._logFilename, 'rb') as fp:
fp.seek(0, io.SEEK_END)
length = fp.tell()
fp.seek(0)
record = self._warcWriter.create_warc_record(
f'file://{self._logFilename}',
'resource',
payload = fp,
length = length,
warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
record = self._create_warc_record(
'resource',
{
'WARC-Target-URI': f'file://{self._logFilename}',
'X-QWARC-Type': 'log',
'Content-Type': 'text/plain; charset=utf-8',
'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
},
fp
) )
self._warcWriter.write_record(record)
self._write_warc_record(record)


def _close_file(self): def _close_file(self):
'''Close the currently opened WARC''' '''Close the currently opened WARC'''


if not self._closed: if not self._closed:
self._file.close() self._file.close()
self._warcWriter = None
self._file = None self._file = None
self._closed = True self._closed = True


@@ -218,7 +270,6 @@ class WARC:
try: try:
fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
logging.info(f'Opened {filename}') logging.info(f'Opened {filename}')
self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
self._closed = False self._closed = False


callback() callback()


Loading…
Cancel
Save