From e892a6b6a7b05429d74607c64ad50c041ba1ba03 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Mon, 22 Apr 2019 23:25:41 +0000 Subject: [PATCH] Initial commit --- qwarc/__init__.py | 316 ++++++++++++++++++++++++++++++++++++++++++++++ qwarc/aiohttp.py | 113 +++++++++++++++++ qwarc/cli.py | 81 ++++++++++++ qwarc/const.py | 23 ++++ qwarc/utils.py | 182 ++++++++++++++++++++++++++ qwarc/warc.py | 93 ++++++++++++++ setup.py | 22 ++++ 7 files changed, 830 insertions(+) create mode 100644 qwarc/__init__.py create mode 100644 qwarc/aiohttp.py create mode 100644 qwarc/cli.py create mode 100644 qwarc/const.py create mode 100644 qwarc/utils.py create mode 100644 qwarc/warc.py create mode 100644 setup.py diff --git a/qwarc/__init__.py b/qwarc/__init__.py new file mode 100644 index 0000000..c709a33 --- /dev/null +++ b/qwarc/__init__.py @@ -0,0 +1,316 @@ +import qwarc.aiohttp +from qwarc.const import * +import qwarc.utils +import qwarc.warc + + +import aiohttp as _aiohttp +if _aiohttp.__version__ != '2.3.10': + raise ImportError('aiohttp must be version 2.3.10') +import asyncio +import collections +import concurrent.futures +import itertools +import logging +import os +import random +import sqlite3 +import yarl + + +class Item: + itemType = None + + def __init__(self, itemValue, session, headers, warc): + self.itemValue = itemValue + self.session = session + self.headers = headers + self.warc = warc + self.stats = {'tx': 0, 'rx': 0, 'requests': 0} + + self.childItems = [] + + async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default): + ''' + HTTP GET a URL + + url: str or yarl.URL + responseHandler: a callable that determines how the response is handled. See qwarc.utils.handle_response_default for details. + + Returns response (a ClientResponse object or None) and history (a tuple of (response, exception) tuples). + response can be None and history can be an empty tuple, depending on the circumstances (e.g. timeouts). + ''' + + #TODO: Rewrite using 'async with self.session.get' + + url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc. + history = [] + attempt = 0 + #TODO redirectLevel + while True: + attempt += 1 + response = None + exc = None + action = ACTION_RETRY + writeToWarc = True + try: + try: + with _aiohttp.Timeout(60): + logging.info('Fetching {}'.format(url)) + response = await self.session.get(url, headers = self.headers, allow_redirects = False) + try: + ret = await response.text(errors = 'surrogateescape') + except: + # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down. + response.close() + raise + else: + tx = len(response.rawRequestData) + rx = len(response.rawResponseData) + logging.info('Fetched {}: {} (tx {}, rx {})'.format(url, response.status, tx, rx)) + self.stats['tx'] += tx + self.stats['rx'] += rx + self.stats['requests'] += 1 + except (asyncio.TimeoutError, _aiohttp.ClientError) as e: + logging.error('Request for {} failed: {!r}'.format(url, e)) + action, writeToWarc = await responseHandler(url, attempt, response, e) + exc = e # Pass the exception outward for the history + else: + action, writeToWarc = await responseHandler(url, attempt, response, None) + history.append((response, exc)) + if action in (ACTION_SUCCESS, ACTION_IGNORE): + return response, tuple(history) + elif action == ACTION_FOLLOW_OR_SUCCESS: + redirectUrl = response.headers.get('Location') or response.headers.get('URI') + if not redirectUrl: + return response, tuple(history) + url = url.join(yarl.URL(redirectUrl)) + attempt = 0 + elif action == ACTION_RETRY: + # Nothing to do, just go to the next cycle + pass + finally: + if response: + if writeToWarc: + self.warc.write_client_response(response) + await response.release() + + async def process(self): + raise NotImplementedError + + @classmethod + def generate(cls): + yield from () # Generate no items by default + + @classmethod + def _gen(cls): + for x in cls.generate(): + yield (cls.itemType, x, STATUS_TODO) + + def add_item(self, itemClassOrType, itemValue): + if issubclass(itemClassOrType, Item): + item = (itemClassOrType.itemType, itemValue) + else: + item = (itemClassOrType, itemValue) + if item not in self.childItems: + self.childItems.append(item) + + +class QWARC: + def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0): + ''' + itemClasses: iterable of Item + warcBasePath: str, base name of the WARC files + dbPath: str, path to the sqlite3 database file + concurrency: int, number of concurrently processed items + memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check + minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check + warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split + ''' + + self._itemClasses = itemClasses + self._itemTypeMap = {cls.itemType: cls for cls in itemClasses} + self._warcBasePath = warcBasePath + self._dbPath = dbPath + self._concurrency = concurrency + self._memoryLimit = memoryLimit + self._minFreeDisk = minFreeDisk + self._warcSizeLimit = warcSizeLimit + + async def obtain_exclusive_db_lock(self, db): + c = db.cursor() + while True: + try: + c.execute('BEGIN EXCLUSIVE') + break + except sqlite3.OperationalError as e: + if str(e) != 'database is locked': + raise + await asyncio.sleep(1) + return c + + def _make_item(self, itemType, itemValue, session, headers, warc): + try: + itemClass = self._itemTypeMap[itemType] + except KeyError: + raise RuntimeError('No such item type: {!r}'.format(itemType)) + return itemClass(itemValue, session, headers, warc) + + async def run(self, loop): + headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere + + tasks = set() + sleepTasks = set() + sessions = [] # aiohttp.ClientSession instances + freeSessions = collections.deque() # ClientSession instances that are currently free + + for i in range(self._concurrency): + session = _aiohttp.ClientSession( + connector = qwarc.aiohttp.TCPConnector(loop = loop), + request_class = qwarc.aiohttp.ClientRequest, + response_class = qwarc.aiohttp.ClientResponse, + skip_auto_headers = ['Accept-Encoding'], + loop = loop + ) + sessions.append(session) + freeSessions.append(session) + + warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit) + + db = sqlite3.connect(self._dbPath, timeout = 1) + db.isolation_level = None # Transactions are handled manually below. + db.execute('PRAGMA synchronous = OFF') + + try: + async def wait_for_free_task(): + nonlocal tasks, freeSessions, db, emptyTodoSleep + done, pending = await asyncio.wait(tasks, return_when = concurrent.futures.FIRST_COMPLETED) + for future in done: + # TODO Replace all of this with `if future.cancelled():` + try: + await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures... + except concurrent.futures.CancelledError as e: + # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task + if isinstance(future, asyncio.Task): + if future.taskType == 'process_item': + logging.warning('Task for {}:{} cancelled: {!r}'.format(future.itemType, future.itemValue, future)) + elif future.taskType == 'sleep': + sleepTasks.remove(future) + continue + if future.taskType == 'sleep': + # Dummy task for empty todo list, see below. + sleepTasks.remove(future) + continue + item = future.item + logging.info('{itemType}:{itemValue} done: {requests} requests, {tx} tx, {rx} rx'.format(itemType = future.itemType, itemValue = future.itemValue, **item.stats)) + cursor = await self.obtain_exclusive_db_lock(db) + try: + cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id)) + if item.childItems: + it = iter(item.childItems) + while True: + values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)] + if not values: + break + cursor.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values) + cursor.execute('COMMIT') + except: + cursor.execute('ROLLBACK') + raise + freeSessions.append(item.session) + tasks = pending + + while True: + while len(tasks) >= self._concurrency: + emptyTodoFullReached = True + await wait_for_free_task() + + if self._minFreeDisk and too_little_disk_space(self._minFreeDisk): + logging.info('Disk space is low, sleeping') + sleepTask = asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)) + sleepTask.taskType = 'sleep' + tasks.add(sleepTask) + sleepTasks.add(sleepTask) + continue + + cursor = await self.obtain_exclusive_db_lock(db) + try: + cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) + result = cursor.fetchone() + if not result: + if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone(): + # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future. + # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process. + # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second. + #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency? + sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) + sleepTask.taskType = 'sleep' + tasks.add(sleepTask) + sleepTasks.add(sleepTask) + cursor.execute('COMMIT') + continue + else: + # Really nothing to do anymore + #TODO: Another process may be running create_db, in which case we'd still want to wait... + # create_db could insert a dummy item which is marked as done when the DB is ready + cursor.execute('COMMIT') + break + emptyTodoSleep = 0 + id, itemType, itemValue, status = result + cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id)) + cursor.execute('COMMIT') + except: + cursor.execute('ROLLBACK') + raise + + session = freeSessions.popleft() + item = self._make_item(itemType, itemValue, session, headers, warc) + task = asyncio.ensure_future(item.process()) + #TODO: Is there a better way to add custom information to a task/coroutine object? + task.taskType = 'process' + task.id = id + task.itemType = itemType + task.itemValue = itemValue + task.item = item + tasks.add(task) + if os.path.exists('STOP'): + logging.info('Gracefully shutting down due to STOP file') + break + if self._memoryLimit and uses_too_much_memory(self._memoryLimit): + logging.info('Gracefully shutting down due to memory usage (current = {} > limit = {})'.format(get_rss(), self._memoryLimit)) + break + + for sleepTask in sleepTasks: + sleepTask.cancel() + + while len(tasks): + await wait_for_free_task() + + logging.info('Done') + except (Exception, KeyboardInterrupt) as e: + # Kill all tasks + for task in tasks: + task.cancel() + await asyncio.wait(tasks, return_when = concurrent.futures.ALL_COMPLETED) + + raise + finally: + for session in sessions: + session.close() + warc.close() + db.close() + + def create_db(self): + db = sqlite3.connect(self._dbPath, timeout = 1) + db.execute('PRAGMA synchronous = OFF') + with db: + db.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)') + db.execute('CREATE INDEX items_status_idx ON items (status)') + + it = itertools.chain(*(i._gen() for i in self._itemClasses)) + while True: + values = tuple(itertools.islice(it, 100000)) + if not values: + break + with db: + db.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values) diff --git a/qwarc/aiohttp.py b/qwarc/aiohttp.py new file mode 100644 index 0000000..0531019 --- /dev/null +++ b/qwarc/aiohttp.py @@ -0,0 +1,113 @@ +import aiohttp +import aiohttp.client_proto +import aiohttp.connector +import functools +import itertools +import time + + +# aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative... +# The ResponseHandler handles received data; the writes are done directly on the underlying transport. +# So ResponseHandler is replaced with a class which keeps all received data in a list, and the transport's write method is replaced with one which sends back all written data to the ResponseHandler. +# Because the ResponseHandler instance disappears when the connection is closed (ClientResponse.{_response_eof,close,release}), ClientResponse copies the references to the data objects in the RequestHandler. +# aiohttp also does connection pooling/reuse, so ClientRequest resets the raw data when the request is sent. (This would not work with pipelining, but aiohttp does not support pipelining: https://github.com/aio-libs/aiohttp/issues/1740 ) +# This code has been developed for aiohttp version 2.3.10. + +#TODO: THERE IS A MEMORY LEAK HERE SOMEWHERE! I spent a whole day trying to find it without success. + + +class RawData: + def __init__(self): + self.requestTimestamp = None + self.requestData = [] + self.responseTimestamp = None + self.responseData = [] + + +class ResponseHandler(aiohttp.client_proto.ResponseHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.rawData = None + self.remoteAddress = None + + def data_received(self, data): + super().data_received(data) + if not data: + return + if self.rawData.responseTimestamp is None: + self.rawData.responseTimestamp = time.time() + self.rawData.responseData.append(data) + + def reset_raw_data(self): + self.rawData = RawData() + + +def make_transport_write(transport, protocol): + transport._real_write = transport.write + def write(self, data): + if protocol.rawData.requestTimestamp is None: + protocol.rawData.requestTimestamp = time.time() + protocol.rawData.requestData.append(data) + self._real_write(data) + return write + + +class TCPConnector(aiohttp.connector.TCPConnector): + def __init__(self, *args, loop = None, **kwargs): + super().__init__(*args, loop = loop, **kwargs) + self._factory = functools.partial(ResponseHandler, loop = loop) + + async def _wrap_create_connection(self, protocolFactory, host, port, *args, **kwargs): #FIXME: Uses internal API + transport, protocol = await super()._wrap_create_connection(protocolFactory, host, port, *args, **kwargs) + transport.write = make_transport_write(transport, protocol).__get__(transport, type(transport)) # https://stackoverflow.com/a/28127947 + protocol.remoteAddress = (host, port) + return (transport, protocol) + + +class ClientRequest(aiohttp.client_reqrep.ClientRequest): + def send(self, connection): + connection.protocol.reset_raw_data() + return super().send(connection) + + +class ClientResponse(aiohttp.client_reqrep.ClientResponse): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._rawData = None + self._remoteAddress = None + + async def start(self, connection, readUntilEof): + self._rawData = connection.protocol.rawData + self._remoteAddress = connection.protocol.remoteAddress + return (await super().start(connection, readUntilEof)) + + @property + def rawRequestTimestamp(self): + return self._rawData.requestTimestamp + + @property + def rawRequestData(self): + return b''.join(self._rawData.requestData) + + @property + def rawResponseTimestamp(self): + return self._rawData.responseTimestamp + + @property + def rawResponseData(self): + return b''.join(self._rawData.responseData) + + @property + def remoteAddress(self): + return self._remoteAddress + + def set_history(self, history): + self._history = history #FIXME: Uses private attribute of aiohttp.client_reqrep.ClientResponse + + def iter_all(self): + return itertools.chain(self.history, (self,)) + + async def release(self): + if not self.closed: + self.connection.reset_raw_data() + await super().release() diff --git a/qwarc/cli.py b/qwarc/cli.py new file mode 100644 index 0000000..0b03a14 --- /dev/null +++ b/qwarc/cli.py @@ -0,0 +1,81 @@ +import argparse +import asyncio +import importlib.util +import logging +import os.path +import qwarc +import sys +import time + + +def setup_logging(logFilename): + rootLogger = logging.getLogger() + rootLogger.handlers = [] + rootLogger.setLevel(logging.INFO) + + formatter = logging.Formatter('%(asctime)s.%(msecs)03dZ %(levelname)s %(message)s', datefmt = '%Y-%m-%d %H:%M:%S') + formatter.converter = time.gmtime + + fileHandler = logging.FileHandler(logFilename) + fileHandler.setFormatter(formatter) + rootLogger.addHandler(fileHandler) + + stderrHandler = logging.StreamHandler() + stderrHandler.setFormatter(formatter) + rootLogger.addHandler(stderrHandler) + + +def check_files(specFilename, logFilename): + success = True + if not os.path.isfile(specFilename): + print('Error: "{}" does not exist or is not a regular file', file = sys.stderr) + success = False + if os.path.exists(logFilename): + print('Error: "{}" already exists'.format(logFilename), file = sys.stderr) + success = False + if os.path.exists('STOP'): + print('Error: "STOP" exists', file = sys.stderr) + success = False + return success + + +def main(): + parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--log', metavar = 'LOGFILE', default = './qwarc.log') + parser.add_argument('--database', metavar = 'DBFILE', default = './qwarc.db') + parser.add_argument('--warc', metavar = 'PREFIX', help = 'prefix for the WARC filenames', default = './qwarc') + parser.add_argument('--concurrency', type = int, default = 1) + parser.add_argument('--memorylimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes memory is free; disable if 0', default = 0) + parser.add_argument('--disklimit', metavar = 'LIMIT', help = 'pause when less than LIMIT bytes disk space is free; disable if 0', default = 0) + parser.add_argument('--warcsplit', metavar = 'SIZE', help = 'split WARCs into files of SIZE bytes; disable if 0', default = 0) + parser.add_argument('specfile') + + args = parser.parse_args() + + if not check_files(args.specfile, args.log): + sys.exit(1) + + setup_logging(args.log) + + spec = importlib.util.spec_from_file_location('spec', args.specfile) + specMod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(specMod) + + a = qwarc.QWARC( + itemClasses = qwarc.Item.__subclasses__(), + warcBasePath = args.warc, + dbPath = args.database, + concurrency = args.concurrency, + memoryLimit = args.memorylimit, + minFreeDisk = args.disklimit, + warcSizeLimit = args.warcsplit, + ) + if not os.path.exists(args.database): + a.create_db() + + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(a.run(loop)) + except (Exception, KeyboardInterrupt) as e: + logging.exception('Unhandled error') + loop.close() diff --git a/qwarc/const.py b/qwarc/const.py new file mode 100644 index 0000000..0ac6cd9 --- /dev/null +++ b/qwarc/const.py @@ -0,0 +1,23 @@ +STATUS_TODO = 0 +'''Status of an item that has not been processed yet''' + +STATUS_INPROGRESS = 1 +'''Status of an item that is currently being processed''' + +STATUS_DONE = 2 +'''Status of an item that has been processed''' + +#TODO: Add a STATUS_ERROR? + +ACTION_SUCCESS = 0 +'''Treat this response as a success''' + +ACTION_IGNORE = 1 #TODO Replace with ACTION_SUCCESS since it's really the same thing. +'''Ignore this response''' + +ACTION_RETRY = 2 +'''Retry the same request''' + +ACTION_FOLLOW_OR_SUCCESS = 3 +'''If the response contains a Location or URI header, follow it. Otherwise, treat it as a success.''' +#TODO: Rename to ACTION_FOLLOW maybe? However, the current name makes it more clear what qwarc does when there's a redirect without a redirect target... diff --git a/qwarc/utils.py b/qwarc/utils.py new file mode 100644 index 0000000..fb062a7 --- /dev/null +++ b/qwarc/utils.py @@ -0,0 +1,182 @@ +from qwarc.const import * +import asyncio +import os + + +PAGESIZE = os.sysconf('SC_PAGE_SIZE') + + +def get_rss(): + '''Get the current RSS of this process in bytes''' + + with open('/proc/self/statm', 'r') as fp: + return int(fp.readline().split()[1]) * PAGESIZE + + +def get_disk_free(): + '''Get the current free disk space on the relevant partition in bytes''' + + st = os.statvfs('.') + return st.f_bavail * st.f_frsize + + +def uses_too_much_memory(limit): + ''' + Check whether the process is using too much memory + + For performance reasons, this actually only checks the memory usage on every 100th call. + ''' + + uses_too_much_memory.callCounter += 1 + # Only check every hundredth call + if uses_too_much_memory.callCounter % 100 == 0 and get_rss() > limit: + return True + return False +uses_too_much_memory.callCounter = 0 + + +def too_little_disk_space(limit): + ''' + Check whether the disk space is too small + + For performance reasons, this actually only checks the free disk space on every 100th call. + ''' + + too_little_disk_space.callCounter += 1 + if too_little_disk_space.callCounter % 100 == 0: + too_little_disk_space.currentResult = (get_disk_free() < limit) + return too_little_disk_space.currentResult +too_little_disk_space.callCounter = 0 +too_little_disk_space.currentResult = False + + +# https://stackoverflow.com/a/4665027 +def find_all(aStr, sub): + '''Generator yielding the start positions of every non-overlapping occurrence of sub in aStr.''' + + start = 0 + while True: + start = aStr.find(sub, start) + if start == -1: + return + yield start + start += len(sub) + + +def str_get_between(aStr, a, b): + '''Get the string after the first occurrence of a in aStr and the first occurrence of b after that of a, or None if there is no such string.''' + + aPos = aStr.find(a) + if aPos == -1: + return None + offset = aPos + len(a) + bPos = aStr.find(b, offset) + if bPos == -1: + return None + return aStr[offset:bPos] + + +def maybe_str_get_between(x, a, b): + '''Like str_get_between, but returns None if x evaluates to False and converts it to a str before matching.''' + + if x: + return str_get_between(str(x), a, b) + + +def str_get_all_between(aStr, a, b): + '''Generator yielding every string between occurrences of a in aStr and the following occurrence of b.''' + + #TODO: This produces half-overlapping matches: str_get_all_between('aabc', 'a', 'c') will yield 'ab' and 'b'. + # Might need to implement sending an offset to the find_all generator to work around this, or discard aOffset values which are smaller than the previous bPos+len(b). + + for aOffset in find_all(aStr, a): + offset = aOffset + len(a) + bPos = aStr.find(b, offset) + if bPos != -1: + yield aStr[offset:bPos] + + +def maybe_str_get_all_between(x, a, b): + '''Like str_get_all_between, but yields no elements if x evaluates to False and converts x to a str before matching.''' + + if x: + yield from str_get_all_between(str(x), a, b) + + +def generate_range_items(start, stop, step): + ''' + Generator for items of `step` size between `start` and `stop` (inclusive) + Yields strings of the form `'a-b'` where `a` and `b` are integers such that `b - a + 1 == step`, `min(a) == start`, and `max(b) == stop`. + `b - a + 1` may be unequal to `step` on the last item if `(stop - start + 1) % step != 0` (see examples below). + Note that `a` and `b` can be equal on the last item if `(stop - start) % step == 0` (see examples below). + + Examples: + - generate_range_items(0, 99, 10) yields '0-9', '10-19', '20-29', ..., '90-99' + - generate_range_items(0, 42, 10): '0-9', '10-19', '20-29', '30-39', '40-42' + - generate_range_items(0, 20, 10): '0-9', '10-19', '20-20' + ''' + + for i in range(start, stop + 1, step): + yield '{}-{}'.format(i, min(i + step - 1, stop)) + + +async def handle_response_default(url, attempt, response, exc): + ''' + The default response handler, which behaves as follows: + - If there is no response (e.g. timeout error), retry the retrieval after a delay of 5 seconds. + - If the response has any of the status codes 401, 403, 404, 405, or 410, treat it as a permanent error and return. + - If there was any exception and it is a asyncio.TimeoutError or a aiohttp.ClientError, treat as a potentially temporary error and retry the retrieval after a delay of 5 seconds. + - If the response has any of the status codes 200, 204, 206, or 304, treat it as a success and return. + - If the response has any of the status codes 301, 302, 303, 307, or 308, follow the redirect target if specified or return otherwise. + - Otherwise, treat as a potentially temporary error and retry the retrieval after a delay of 5 seconds. + + - All responses are written to WARC by default. + + Note that this handler does not limit the number of retries on errors. + + Parameters: url (yarl.URL instance), attempt (int), response (aiohttp.ClientResponse or None), exc (Exception or None) + At least one of response and exc is not None. + Returns: (one of the qwarc.RESPONSE_* constants, bool signifying whether to write to WARC or not) + ''' + + #TODO: Document that `attempt` is reset on redirects + + if response is None: + await asyncio.sleep(5) + return ACTION_RETRY, True + if response.status in (401, 403, 404, 405, 410): + return ACTION_IGNORE, True + if exc is not None and isinstance(exc, (asyncio.TimeoutError, _aiohttp.ClientError)): + await asyncio.sleep(5) + return ACTION_RETRY, True + if response.status in (200, 204, 206, 304): + return ACTION_SUCCESS, True + if response.status in (301, 302, 303, 307, 308): + return ACTION_FOLLOW_OR_SUCCESS, True + await asyncio.sleep(5) + return ACTION_RETRY, True + + +async def handle_response_ignore_redirects(url, attempt, response, exc): + '''A response handler that does not follow redirects, i.e. treats them as a success instead. It behaves as handle_response_default otherwise.''' + + action, writeToWarc = await handle_response_default(url, attempt, response, exc) + if action == ACTION_FOLLOW_OR_SUCCESS: + action = ACTION_SUCCESS + return action, writeToWarc + + +def handle_response_limit_error_retries(maxRetries): + '''A response handler that limits the number of retries on errors. It behaves as handle_response_default otherwise. + + Technically, this is actually a response handler factory. This is so that the intuitive use works: fetch(..., responseHandler = handle_response_limit_error_retries(5)) + + If you use the same limit many times, you should keep the return value (the response handler) of this method and reuse it to avoid creating a new function every time. + ''' + + async def handler(url, attempt, response, exc): + action, writeToWarc = await handle_response_default(url, attempt, response, exc) + if action == ACTION_RETRY and attempt > maxRetries: + action = ACTION_IGNORE + return action, writeToWarc + return handler diff --git a/qwarc/warc.py b/qwarc/warc.py new file mode 100644 index 0000000..1f9d85f --- /dev/null +++ b/qwarc/warc.py @@ -0,0 +1,93 @@ +import fcntl +import io +import logging +import time +import warcio + + +class WARCWriter(warcio.warcwriter.WARCWriter): + def _do_write_req_resp(self, req, resp, params): #FIXME: Internal API + # Write request before response, like wget and wpull; cf. https://github.com/webrecorder/warcio/issues/20 + self._write_warc_record(self.out, req) + self._write_warc_record(self.out, resp) + + +class WARC: + def __init__(self, prefix, maxFileSize): + ''' + Initialise the WARC writer + + prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended. + maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting. + ''' + + self._prefix = prefix + self._counter = 0 + self._maxFileSize = maxFileSize + + self._closed = True + self._file = None + self._warcWriter = None + + self._cycle() + + def _cycle(self): + '''Close the current file, open the next file that doesn't exist yet''' + + #TODO: This opens a new file also at the end, which can result in empty WARCs. Should try to reorder this to only open a WARC when writing a record, and to only close the current WARC if the size is exceeded after write_client_response. + self.close() + while True: + filename = '{}-{:05d}.warc.gz'.format(self._prefix, self._counter) + try: + # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it + self._file = open(filename, 'xb') + fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except FileExistsError: + logging.info('{} already exists, skipping'.format(filename)) + self._counter += 1 + else: + break + logging.info('Opened {}'.format(filename)) + self._warcWriter = WARCWriter(self._file, gzip = True) + self._closed = False + self._counter += 1 + + def write_client_response(self, response): + ''' + Write the requests and responses stored in a ClientResponse instance to the currently opened WARC. + A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC. + ''' + + for r in response.iter_all(): + requestDate = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(r.rawRequestTimestamp)) + requestRecord = self._warcWriter.create_warc_record( + str(r.url), + 'request', + payload = io.BytesIO(r.rawRequestData), + warc_headers_dict = { + 'WARC-Date': requestDate, + 'WARC-IP-Address': r.remoteAddress[0], + } + ) + responseRecord = self._warcWriter.create_warc_record( + str(r.url), + 'response', + payload = io.BytesIO(r.rawResponseData), + warc_headers_dict = { + 'WARC-Date': requestDate, + 'WARC-IP-Address': r.remoteAddress[0], + } + ) + self._warcWriter.write_request_response_pair(requestRecord, responseRecord) + + if self._maxFileSize and self._file.tell() > self._maxFileSize: + self._cycle() + + def close(self): + '''Close the currently opened WARC''' + + if not self._closed: + self._file.close() + self._warcWriter = None + self._file = None + self._closed = True diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..f6b0456 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +import setuptools + + +setuptools.setup( + name = 'qwarc', + version = '0.0-dev', + description = 'A framework for quick web archival', + author = 'JustAnotherArchivist', + url = 'https://github.com/JustAnotherArchivist/qwarc', + classifiers = [ + 'Development Status :: 3 - Alpha', + 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', + 'Programming Language :: Python :: 3.6', + ], + packages = ['qwarc'], + install_requires = ['aiohttp==2.3.10', 'warcio', 'yarl'], + entry_points = { + 'console_scripts': [ + 'qwarc = qwarc.cli:main', + ], + }, +)