from qwarc.const import * import aiohttp import asyncio import functools import logging import os import pkg_resources import platform import time import typing PAGESIZE = os.sysconf('SC_PAGE_SIZE') def get_rss(): '''Get the current RSS of this process in bytes''' with open('/proc/self/statm', 'r') as fp: return int(fp.readline().split()[1]) * PAGESIZE def get_disk_free(): '''Get the current free disk space on the relevant partition in bytes''' st = os.statvfs('.') return st.f_bavail * st.f_frsize def uses_too_much_memory(limit): ''' Check whether the process is using too much memory For performance reasons, this actually only checks the memory usage on every 100th call. ''' uses_too_much_memory.callCounter += 1 # Only check every hundredth call if uses_too_much_memory.callCounter % 100 == 0 and get_rss() > limit: return True return False uses_too_much_memory.callCounter = 0 def too_little_disk_space(limit): ''' Check whether the disk space is too small For performance reasons, this actually only checks the free disk space on every 100th call. ''' too_little_disk_space.callCounter += 1 if too_little_disk_space.callCounter % 100 == 0: too_little_disk_space.currentResult = (get_disk_free() < limit) return too_little_disk_space.currentResult too_little_disk_space.callCounter = 0 too_little_disk_space.currentResult = False # https://stackoverflow.com/a/4665027 def find_all(aStr, sub): '''Generator yielding the start positions of every non-overlapping occurrence of sub in aStr.''' start = 0 while True: start = aStr.find(sub, start) if start == -1: return yield start start += len(sub) def str_get_between(aStr, a, b): '''Get the string after the first occurrence of a in aStr and the first occurrence of b after that of a, or None if there is no such string.''' aPos = aStr.find(a) if aPos == -1: return None offset = aPos + len(a) bPos = aStr.find(b, offset) if bPos == -1: return None return aStr[offset:bPos] def maybe_str_get_between(x, a, b): '''Like str_get_between, but returns None if x evaluates to False and converts it to a str before matching.''' if x: return str_get_between(str(x), a, b) def str_get_all_between(aStr, a, b): '''Generator yielding every string between an occurrence of a in aStr and the following occurrence of b.''' prevEnd = -1 for aOffset in find_all(aStr, a): if aOffset < prevEnd: continue offset = aOffset + len(a) bPos = aStr.find(b, offset) if bPos != -1: yield aStr[offset:bPos] prevEnd = bPos + len(b) def maybe_str_get_all_between(x, a, b): '''Like str_get_all_between, but yields no elements if x evaluates to False and converts x to a str before matching.''' if x: yield from str_get_all_between(str(x), a, b) def generate_range_items(start, stop, step): ''' Generator for items of `step` size between `start` and `stop` (inclusive) Yields strings of the form `'a-b'` where `a` and `b` are integers such that `b - a + 1 == step`, `min(a) == start`, and `max(b) == stop`. `b - a + 1` may be unequal to `step` on the last item if `(stop - start + 1) % step != 0` (see examples below). Note that `a` and `b` can be equal on the last item if `(stop - start) % step == 0` (see examples below). Examples: - generate_range_items(0, 99, 10) yields '0-9', '10-19', '20-29', ..., '90-99' - generate_range_items(0, 42, 10): '0-9', '10-19', '20-29', '30-39', '40-42' - generate_range_items(0, 20, 10): '0-9', '10-19', '20-20' ''' for i in range(start, stop + 1, step): yield f'{i}-{min(i + step - 1, stop)}' async def handle_response_default(*, url, attempt, response, exc, redirectLevel, item): ''' The default response handler, which behaves as follows: - If there is no response (e.g. timeout error), retry the retrieval after a delay of 5 seconds. - If the response has any of the status codes 401, 403, 404, 405, or 410, treat it as a permanent error and return. - If there was any exception and it is a asyncio.TimeoutError or a aiohttp.ClientError, treat as a potentially temporary error and retry the retrieval after a delay of 5 seconds. - If the response has any of the status codes 200, 204, 206, or 304, treat it as a success and return. - If the response has any of the status codes 301, 302, 303, 307, or 308, follow the redirect target if specified or return otherwise. - Otherwise, treat as a potentially temporary error and retry the retrieval after a delay of 5 seconds. - All responses are written to WARC by default. Note that this handler does not limit the number of retries on errors. Parameters: url (yarl.URL instance), attempt (int), response (aiohttp.ClientResponse or None), exc (Exception or None), redirectLevel (int), item (qwarc.Item instance) At least one of response and exc is not None. The redirectLevel indicates how many redirects were followed to get to this url, i.e. it starts out as zero and increases by one for every redirect. The attempt starts from 1 for every url, i.e. it is reset on redirects. The handler is invoked at most once for each attempt. Returns: (one of the qwarc.RESPONSE_* constants, bool signifying whether to write to WARC or not) The latter is ignored when exc is not None; responses that triggered an exception are never written to WARC. ''' if response is None: await asyncio.sleep(5) return ACTION_RETRY, True if response.status in (401, 403, 404, 405, 410): return ACTION_IGNORE, True if exc is not None: if isinstance(exc, (asyncio.TimeoutError, aiohttp.ClientError)): await asyncio.sleep(5) return ACTION_RETRY, False # Don't write to WARC since there might be an incomplete response if response.status in (200, 204, 206, 304): return ACTION_SUCCESS, True if response.status in (301, 302, 303, 307, 308): return ACTION_FOLLOW_OR_SUCCESS, True await asyncio.sleep(5) return ACTION_RETRY, True async def handle_response_ignore_redirects(**kwargs): '''A response handler that does not follow redirects, i.e. treats them as a success instead. It behaves as handle_response_default otherwise.''' action, writeToWarc = await handle_response_default(**kwargs) if action == ACTION_FOLLOW_OR_SUCCESS: action = ACTION_SUCCESS return action, writeToWarc def handle_response_limit_error_retries(maxRetries, handler = handle_response_default): '''A response handler that limits the number of retries on errors. It behaves as handler otherwise, which defaults to handle_response_default. Technically, this is actually a response handler factory. This is so that the intuitive use works: fetch(..., responseHandler = handle_response_limit_error_retries(5)) If you use the same limit many times, you should keep the return value (the response handler) of this method and reuse it to avoid creating a new function every time. ''' async def _handler(**kwargs): action, writeToWarc = await handler(**kwargs) if action == ACTION_RETRY and kwargs['attempt'] > maxRetries: action = ACTION_RETRIES_EXCEEDED return action, writeToWarc return _handler def handle_response_limit_redirect_depth(maxRedirects, handler = handle_response_default): ''' A response handler that limits how many redirects are followed. It behaves as handler otherwise, which defaults to handle_response_default. The same details as for handle_response_limit_error_retries apply. ''' async def _handler(**kwargs): action, writeToWarc = await handler(**kwargs) # redirectLevel starts off at 0 so if it is equal to maxRedirects - 1, there were exactly maxRedirects redirects if action == ACTION_FOLLOW_OR_SUCCESS and kwargs['redirectLevel'] >= maxRedirects - 1: action = ACTION_TOO_MANY_REDIRECTS return action, writeToWarc return _handler def _get_dependency_versions(*pkgs): pending = set(pkgs) have = set(pkgs) while pending: key = pending.pop() try: dist = pkg_resources.get_distribution(key) except pkg_resources.DistributionNotFound: logging.error(f'Unable to get distribution {key}') continue yield dist.key, dist.version for requirement in dist.requires(): if requirement.key not in have: pending.add(requirement.key) have.add(requirement.key) @functools.lru_cache(maxsize = 1) def get_software_info(specDependencyPackages): # Based on crocoite.utils, authored by PromyLOPh in commit 6ccd72ab on 2018-12-08 under MIT licence baseDependencyPackageVersions = list(_get_dependency_versions(__package__)) baseDependencyPackages = set(x[0] for x in baseDependencyPackageVersions) specDependencyPackageVersions = list(_get_dependency_versions(*specDependencyPackages)) return { 'platform': platform.platform(), 'python': { 'implementation': platform.python_implementation(), 'version': platform.python_version(), 'build': platform.python_build(), }, 'self': [{"package": package, "version": version} for package, version in baseDependencyPackageVersions], 'spec': [{"package": package, "version": version} for package, version in specDependencyPackageVersions if package not in baseDependencyPackages], } class LogFormatter(logging.Formatter): def __init__(self): super().__init__('%(asctime)s.%(msecs)03dZ %(name)s %(levelname)s %(itemString)s %(message)s', datefmt = '%Y-%m-%d %H:%M:%S') self.converter = time.gmtime def format(self, record): if not hasattr(record, 'itemString'): if hasattr(record, 'itemType') and hasattr(record, 'itemValue'): record.itemString = f'{record.itemType}:{record.itemValue}' else: record.itemString = 'None' return super().format(record) class SpecDependencies(typing.NamedTuple): packages: tuple = () files: tuple = () extra: typing.Any = None class ReadonlyFileView: ''' A poor read-only view for a file object. It hides the writing methods and passes everything else through to the underlying file object. Note that this does *not* actually prevent modification at all. ''' def __init__(self, fp): self._fp = fp def __getattr__(self, key): if key in ('write', 'writelines', 'truncate'): raise AttributeError if key == 'writable': return False return getattr(self._fp, key) class FrozenFileView: ''' A poor minimal frozen view for a file object. It fixes the bounds of the file, i.e. if something is appended to the underlying file object, it does not become visible in the frozen view. Only seek, tell, and read are implemented. Note that seeks and reads will affect the underlying file object. The actual data is not really frozen either, and any changes on the underlying file object will affect the frozen view as well. ''' def __init__(self, fp, begin, end): ''' fp: file-like object begin: int, offset from beginning of the file end: int, offset from beginning of the file ''' self._fp = fp self._begin = begin self._end = end def seek(self, offset, whence = os.SEEK_SET): if whence == os.SEEK_SET: return self._fp.seek(self._begin + offset, whence) elif whence == os.SEEK_CUR: return self._fp.seek(offset, whence) elif whence == os.SEEK_END: return self._fp.seek(self._end + offset, whence) raise NotImplementedError def tell(self): return self._fp.tell() - self._begin def read(self, size = -1): curPos = self._fp.tell() if curPos < self._begin: self._fp.seek(self._begin) elif curPos > self._end: return self._fp.read(0) if size == -1: return self._fp.read(self._end - self._fp.tell()) return self._fp.read(min(size, self._end - self._fp.tell())) class DummyClientResponse: '''A ClientResponse-like object for when no actual ClientResponse is available. Always evaluates to False when cast to a bool.''' def __init__(self): self._qhistory = None @property def qhistory(self): return self._qhistory @qhistory.setter def qhistory(self, history): self._qhistory = history def __bool__(self): return False