import qwarc.aiohttp from qwarc.const import * import qwarc.utils import qwarc.warc import aiohttp as _aiohttp if _aiohttp.__version__ != '2.3.10': raise ImportError('aiohttp must be version 2.3.10') import asyncio import collections import concurrent.futures import contextlib import io import itertools import logging import os import random import sqlite3 import yarl logger = logging.getLogger(__name__) class Item: itemType = None defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default) def __init__(self, qwarcObj, itemValue, session, headers, warc): self.qwarcObj = qwarcObj self.itemValue = itemValue self.session = session self.headers = headers self.warc = warc if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__ self._baseUrl = None self.stats = {'tx': 0, 'rx': 0, 'requests': 0} self.logger = logging.LoggerAdapter(logging.getLogger(f'{type(self).__module__}.{type(self).__name__}'), {'itemType': self.itemType, 'itemValue': self.itemValue}) self.childItems = [] @property def baseUrl(self): return self._baseUrl @baseUrl.setter def baseUrl(self, baseUrl): self.logger.debug(f'Setting baseUrl to {baseUrl!r}') if baseUrl is None: self._baseUrl = None elif isinstance(baseUrl, yarl.URL): self._baseUrl = baseUrl else: self._baseUrl = yarl.URL(baseUrl) def _merge_headers(self, headers, extraHeaders = []): d = {} # Preserves order from Python 3.7 (guaranteed) or CPython 3.6 (implementation detail) keys = {} # casefolded key -> d key for key, value in itertools.chain(self.headers, extraHeaders, headers): keyc = key.casefold() if value is None: if keyc in keys: del d[keys[keyc]] del keys[keyc] else: if keyc in keys and key != keys[keyc]: del d[keys[keyc]] d[key] = value keys[keyc] = key out = [] for key, value in d.items(): if isinstance(value, tuple): for value_ in value: out.append((key, value_)) else: out.append((key, value)) return out async def fetch(self, url, *, responseHandler = None, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60, fromResponse = None): ''' HTTP GET or POST a URL url: str or yarl.URL; if this is not a complete URL, it is evaluated relative to self.baseUrl responseHandler: None or a callable that determines how the response is handled; if None, self.defaultResponseHandler is used. See qwarc.utils.handle_response_default for details. method: str, must be 'GET' or 'POST' data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body headers: list of 2-tuples, additional or overriding headers for this request only To remove one of the default headers, pass a value of None. If a header appears multiple times, only the last one is used. To send a header multiple times, pass a tuple of values. verify_ssl: bool, whether the SSL/TLS certificate should be validated timeout: int or float, how long the fetch may take at most in total (sending request until finishing reading the response) fromResponse: ClientResponse or None; if provided, use fromResponse.url for the url completion (instead of self.baseUrl) and add it as a Referer header Returns response (a ClientResponse object or a qwarc.utils.DummyClientResponse object) ''' #TODO: Rewrite using 'async with self.session.get' if self.logger.isEnabledFor(logging.DEBUG): l = locals() modified = [] for kwarg, kwdef in self.fetch.__kwdefaults__.items(): if l[kwarg] != kwdef: modified.append(kwarg) if modified: self.logger.debug(f'fetch({url!r}) with ...') for kwarg in modified: self.logger.debug(f'... {kwarg} = {repr(l[kwarg]).replace(chr(10), " ")}') # aiohttp.ClientResponse's repr can contain newlines else: self.logger.debug(f'fetch({url!r})') url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc. if not url.scheme or not url.host: if fromResponse is not None: self.logger.debug(f'Joining incomplete URL {url} with fromResponse URL {fromResponse.url}') url = fromResponse.url.join(url) elif not self.baseUrl: raise ValueError('Incomplete URL and no baseUrl to join it with') else: self.logger.debug(f'Joining incomplete URL {url} with base URL {baseUrl}') url = self.baseUrl.join(url) originalUrl = url if responseHandler is None: responseHandler = self.defaultResponseHandler assert method in ('GET', 'POST'), 'method must be GET or POST' headers = self._merge_headers(headers, extraHeaders = [('Referer', str(fromResponse.url))] if fromResponse is not None else []) history = [] attempt = 0 redirectLevel = 0 while True: attempt += 1 response = None exc = None action = ACTION_RETRY writeToWarc = True self.logger.debug(f'Trying to fetch {url}, attempt {attempt}, redirect level {redirectLevel}') try: try: with _aiohttp.Timeout(timeout): self.logger.info(f'Fetching {url}') response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl) try: while True: ret = await response.content.read(1048576) if not ret: break except: # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down. response.close() raise else: response.rawRequestData.seek(0, io.SEEK_END) tx = response.rawRequestData.tell() response.rawResponseData.seek(0, io.SEEK_END) rx = response.rawResponseData.tell() self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})') self.stats['tx'] += tx self.stats['rx'] += rx self.stats['requests'] += 1 except (asyncio.TimeoutError, _aiohttp.ClientError) as e: self.logger.warning(f'Request for {url} failed: {e!r}') action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = e, redirectLevel = redirectLevel, item = self) exc = e # Pass the exception outward for the history else: action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self) self.logger.debug(f'Response handler result: {action}, {writeToWarc}') if response and exc is None and writeToWarc: self.warc.write_client_response(response) history.append((response, exc)) retResponse = response if exc is None else qwarc.utils.DummyClientResponse() if action in (ACTION_SUCCESS, ACTION_IGNORE): self.logger.debug(f'Fetch loop done') retResponse.qhistory = tuple(history) return retResponse elif action == ACTION_FOLLOW_OR_SUCCESS: redirectUrl = response.headers.get('Location') or response.headers.get('URI') if not redirectUrl: self.logger.debug('Got redirect but no redirect target, fetch loop done') retResponse.qhistory = tuple(history) return retResponse url = url.join(yarl.URL(redirectUrl)) self.logger.debug(f'Next URL: {url}') if response.status in (301, 302, 303) and method == 'POST': self.logger.debug(f'{response.status} on POST, dropping to GET and clearing data') method = 'GET' data = None attempt = 0 redirectLevel += 1 elif action == ACTION_RETRIES_EXCEEDED: self.logger.error(f'Request for {url} failed {attempt} times') retResponse.qhistory = tuple(history) return retResponse elif action == ACTION_TOO_MANY_REDIRECTS: self.logger.error(f'Request for {url} (from {originalUrl}) exceeded redirect limit') retResponse.qhistory = tuple(history) return retResponse elif action == ACTION_RETRY: # Nothing to do, just go to the next cycle pass finally: if response: self.logger.debug('Releasing response') await response.release() async def process(self): raise NotImplementedError @classmethod def generate(cls): yield from () # Generate no items by default def add_subitem(self, itemClassOrType, itemValue): if issubclass(itemClassOrType, Item): item = (itemClassOrType.itemType, itemValue) else: item = (itemClassOrType, itemValue) self.logger.debug(f'Adding subitem {itemClassOrType!r} {itemValue!r}') if item not in self.childItems: self.childItems.append(item) async def flush_subitems(self): self.logger.debug('Flushing subitems') await self.qwarcObj.flush_subitems(self) def clear_subitems(self): self.logger.debug('Clearing subitems') self.childItems = [] @classmethod def get_subclasses(cls): for subclass in cls.__subclasses__(): yield subclass yield from subclass.get_subclasses() def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} object {id(self)}, itemType = {self.itemType!r}, itemValue = {self.itemValue!r}>' # QWARC._closed values _STATE_RUNNING = 1 _STATE_CLOSING = 2 _STATE_CLOSED = 3 class QWARC: def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): ''' itemClasses: iterable of Item warcBasePath: str, base name of the WARC files dbPath: str, path to the sqlite3 database file command: list, the command line used to invoke qwarc specFile: str, path to the spec file specDependencies: qwarc.utils.SpecDependencies logFilename: str, name of the log file written by this process concurrency: int, number of concurrently processed items memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split ''' self._itemClasses = itemClasses self._itemTypeMap = {cls.itemType: cls for cls in itemClasses} self._warcBasePath = warcBasePath self._dbPath = dbPath self._command = command self._specFile = specFile self._specDependencies = specDependencies self._logFilename = logFilename self._concurrency = concurrency self._memoryLimit = memoryLimit self._minFreeDisk = minFreeDisk self._warcSizeLimit = warcSizeLimit self._warcDedupe = warcDedupe self._reset_working_vars() def _reset_working_vars(self): # Working variables self._db = None self._tasks = set() self._sleepTasks = set() self._sessions = [] # aiohttp.ClientSession instances self._freeSessions = collections.deque() # ClientSession instances that are currently free self._warc = None self._closed = _STATE_CLOSED @contextlib.asynccontextmanager async def exclusive_db_lock(self): logger.debug('Attempting to obtain DB lock') c = self._db.cursor() while True: try: c.execute('BEGIN EXCLUSIVE') break except sqlite3.OperationalError as e: if str(e) != 'database is locked': raise await asyncio.sleep(1) logger.debug('Obtained DB lock') try: yield c logger.debug('Committing DB transaction') c.execute('COMMIT') except: logger.debug('Rolling back DB transaction') c.execute('ROLLBACK') raise def _make_item(self, itemType, itemValue, session, headers): logger.debug(f'Making item {itemType!r} {itemValue!r}') try: itemClass = self._itemTypeMap[itemType] except KeyError: raise RuntimeError(f'No such item type: {itemType!r}') return itemClass(self, itemValue, session, headers, self._warc) async def _wait_for_free_task(self): logger.debug('Waiting for task(s) to finish') if not self._tasks: logger.debug('No tasks to wait for') return done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED) logger.debug(f'{len(done)} task(s) finished, {len(pending)} remaining') for future in done: logger.debug(f'Finished {future.taskType} task: {future!r}') newStatus = STATUS_DONE if future.taskType == 'sleep': self._sleepTasks.remove(future) elif future.taskType == 'process': item = future.item logger.debug(f'Finished item: {item!r}') try: future.result() except asyncio.CancelledError as e: # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task if future.taskType == 'process': logger.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') newStatus = STATUS_ERROR except Exception as e: if future.taskType == 'process': logger.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e) newStatus = STATUS_ERROR else: if future.taskType == 'process': logger.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') if future.taskType != 'process': continue async with self.exclusive_db_lock() as cursor: cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id)) await self._insert_subitems(item) self._freeSessions.append(item.session) self._tasks = pending async def _insert_subitems(self, item): logger.debug(f'Inserting {len(item.childItems)} subitem(s) from {item!r}') if item.childItems: async with self.exclusive_db_lock() as cursor: it = iter(item.childItems) while True: values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)] if not values: break cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) async def run(self, loop): self._closed = _STATE_RUNNING logger.debug(f'Creating {self._concurrency} sessions') for i in range(self._concurrency): session = _aiohttp.ClientSession( connector = qwarc.aiohttp.TCPConnector(loop = loop), request_class = qwarc.aiohttp.ClientRequest, response_class = qwarc.aiohttp.ClientResponse, loop = loop ) self._sessions.append(session) self._freeSessions.append(session) logger.debug('Constructing WARC object') self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename) logger.debug('Connecting to the database') self._db = sqlite3.connect(self._dbPath, timeout = 1) self._db.set_trace_callback(logging.getLogger(f'{__name__}[sqlite3]').debug) self._db.isolation_level = None # Transactions are handled manually below. # Setting the synchronous PRAGMA is not possible if the DB is currently locked by another process but also can't be done inside a transaction... So just retry until it succeeds. logger.debug('Setting synchronous PRAGMA') while True: try: self._db.execute('PRAGMA synchronous = OFF') except sqlite3.OperationalError as e: if str(e) != 'database is locked': raise await asyncio.sleep(1) else: break logger.debug('Set synchronous PRAGMA') async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"') result = cursor.fetchone() if not result: self._create_db(cursor) self._insert_generated_items(cursor) try: while True: while len(self._tasks) >= self._concurrency: await self._wait_for_free_task() if self._closed == _STATE_CLOSING: logger.debug('Currently closing') break if os.path.exists('STOP'): logger.info('Gracefully shutting down due to STOP file') break if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit): logger.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})') break if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk): logger.info('Disk space is low, sleeping') sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' self._tasks.add(sleepTask) self._sleepTasks.add(sleepTask) continue logger.debug('Trying to get an item from the DB') async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) result = cursor.fetchone() if not result: logger.debug('Found no TODO item') if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone(): # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second. logger.debug('There are incomplete items that may spawn further TODOs, starting sleep task') #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency? sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' self._tasks.add(sleepTask) self._sleepTasks.add(sleepTask) continue else: # Really nothing to do anymore logger.debug('All items are done') break id_, itemType, itemValue, status = result logger.debug(f'Got item {itemType}:{itemValue} ({id_})') cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id_)) session = self._freeSessions.popleft() item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS) task = asyncio.ensure_future(item.process()) #TODO: Is there a better way to add custom information to a task/coroutine object? task.taskType = 'process' task.id = id_ task.itemType = itemType task.itemValue = itemValue task.item = item self._tasks.add(task) logger.debug(f'Cancelling {len(self._sleepTasks)} sleep task(s)') for sleepTask in self._sleepTasks: sleepTask.cancel() logger.debug('Waiting for tasks to finish') while len(self._tasks): await self._wait_for_free_task() logger.info('Done') except (Exception, KeyboardInterrupt) as e: logger.debug(f'Got an exception, cancelling {len(self._tasks)} task(s)') for task in self._tasks: task.cancel() logger.debug('Waiting for tasks to get cancelled') await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) self._tasks.clear() raise finally: await self.close() async def flush_subitems(self, item): await self._insert_subitems(item) item.clear_subitems() def _create_db(self, cursor): logger.debug('Creating database') cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)') cursor.execute('CREATE INDEX items_status_idx ON items (status)') cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)') def _insert_generated_items(self, cursor): logger.debug('Inserting generated items') it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate()) while True: values = tuple(itertools.islice(it, 100000)) if not values: break cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) async def close(self): if self._closed in (_STATE_CLOSING, _STATE_CLOSED): return self._closed = _STATE_CLOSING logger.debug('Closing') if self._tasks: logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup') for task in self._tasks: task.cancel() logger.debug('Waiting for tasks to get cancelled') await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) self._tasks.clear() logger.debug('Closing DB') self._db.close() logger.debug('Closing sessions') for session in self._sessions: session.close() logger.debug('Closing WARC') self._warc.close() self._reset_working_vars() # Also resets self._closed