From f025c4e9f36a661c44773fc60bba32f8ec1300e2 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 16 Jul 2020 03:36:28 +0000 Subject: [PATCH] Add extensive debug logging --- qwarc/__init__.py | 79 +++++++++++++++++++++++++++++++++++++++++------ qwarc/cli.py | 5 ++- qwarc/utils.py | 2 +- qwarc/warc.py | 43 +++++++++++++++++++++----- 4 files changed, 111 insertions(+), 18 deletions(-) diff --git a/qwarc/__init__.py b/qwarc/__init__.py index 7fdb92c..d3c178c 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -20,6 +20,9 @@ import sqlite3 import yarl +logger = logging.getLogger(__name__) + + class Item: itemType = None defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default) @@ -33,7 +36,7 @@ class Item: if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__ self._baseUrl = None self.stats = {'tx': 0, 'rx': 0, 'requests': 0} - self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue}) + self.logger = logging.LoggerAdapter(logging.getLogger(f'{type(self).__module__}.{type(self).__name__}'), {'itemType': self.itemType, 'itemValue': self.itemValue}) self.childItems = [] @@ -43,6 +46,7 @@ class Item: @baseUrl.setter def baseUrl(self, baseUrl): + self.logger.debug(f'Setting baseUrl to {baseUrl!r}') if baseUrl is None: self._baseUrl = None elif isinstance(baseUrl, yarl.URL): @@ -93,13 +97,27 @@ class Item: #TODO: Rewrite using 'async with self.session.get' + if self.logger.isEnabledFor(logging.DEBUG): + l = locals() + modified = [] + for kwarg, kwdef in self.fetch.__kwdefaults__.items(): + if l[kwarg] != kwdef: + modified.append(kwarg) + if modified: + self.logger.debug(f'fetch({url!r}) with ...') + for kwarg in modified: + self.logger.debug(f'... {kwarg} = {repr(l[kwarg]).replace(chr(10), " ")}') # aiohttp.ClientResponse's repr can contain newlines + else: + self.logger.debug(f'fetch({url!r})') url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc. if not url.scheme or not url.host: if fromResponse is not None: + self.logger.debug(f'Joining incomplete URL {url} with fromResponse URL {fromResponse.url}') url = fromResponse.url.join(url) elif not self.baseUrl: raise ValueError('Incomplete URL and no baseUrl to join it with') else: + self.logger.debug(f'Joining incomplete URL {url} with base URL {baseUrl}') url = self.baseUrl.join(url) originalUrl = url if responseHandler is None: @@ -115,6 +133,7 @@ class Item: exc = None action = ACTION_RETRY writeToWarc = True + self.logger.debug(f'Trying to fetch {url}, attempt {attempt}, redirect level {redirectLevel}') try: try: with _aiohttp.Timeout(timeout): @@ -144,20 +163,25 @@ class Item: exc = e # Pass the exception outward for the history else: action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self) + self.logger.debug(f'Response handler result: {action}, {writeToWarc}') if response and exc is None and writeToWarc: self.warc.write_client_response(response) history.append((response, exc)) retResponse = response if exc is None else qwarc.utils.DummyClientResponse() if action in (ACTION_SUCCESS, ACTION_IGNORE): + self.logger.error(f'Fetch loop done') retResponse.qhistory = tuple(history) return retResponse elif action == ACTION_FOLLOW_OR_SUCCESS: redirectUrl = response.headers.get('Location') or response.headers.get('URI') if not redirectUrl: + self.logger.debug('Got redirect but no redirect target, fetch loop done') retResponse.qhistory = tuple(history) return retResponse url = url.join(yarl.URL(redirectUrl)) + self.logger.debug(f'Next URL: {url}') if response.status in (301, 302, 303) and method == 'POST': + self.logger.debug(f'{response.status} on POST, dropping to GET and clearing data') method = 'GET' data = None attempt = 0 @@ -175,6 +199,7 @@ class Item: pass finally: if response: + self.logger.debug('Releasing response') await response.release() async def process(self): @@ -189,13 +214,16 @@ class Item: item = (itemClassOrType.itemType, itemValue) else: item = (itemClassOrType, itemValue) + self.logger.debug(f'Adding subitem {itemClassOrType!r} {itemValue!r}') if item not in self.childItems: self.childItems.append(item) async def flush_subitems(self): + self.logger.debug('Flushing subitems') await self.qwarcObj.flush_subitems(self) def clear_subitems(self): + self.logger.debug('Clearing subitems') self.childItems = [] @classmethod @@ -257,6 +285,7 @@ class QWARC: @contextlib.asynccontextmanager async def exclusive_db_lock(self): + logger.debug('Attempting to obtain DB lock') c = self._db.cursor() while True: try: @@ -266,14 +295,18 @@ class QWARC: if str(e) != 'database is locked': raise await asyncio.sleep(1) + logger.debug('Obtained DB lock') try: yield c + logger.debug('Committing DB transaction') c.execute('COMMIT') except: + logger.debug('Rolling back DB transaction') c.execute('ROLLBACK') raise def _make_item(self, itemType, itemValue, session, headers): + logger.debug(f'Making item {itemType!r} {itemValue!r}') try: itemClass = self._itemTypeMap[itemType] except KeyError: @@ -281,29 +314,34 @@ class QWARC: return itemClass(self, itemValue, session, headers, self._warc) async def _wait_for_free_task(self): + logger.debug('Waiting for task(s) to finish') if not self._tasks: + logger.debug('No tasks to wait for') return done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED) + logger.debug(f'{len(done)} task(s) finished, {len(pending)} remaining') for future in done: + logger.debug(f'Finished {future.taskType} task: {future!r}') newStatus = STATUS_DONE if future.taskType == 'sleep': self._sleepTasks.remove(future) elif future.taskType == 'process': item = future.item + logger.debug(f'Finished item: {item!r}') try: future.result() except asyncio.CancelledError as e: # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task if future.taskType == 'process': - logging.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') + logger.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') newStatus = STATUS_ERROR except Exception as e: if future.taskType == 'process': - logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e) + logger.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e) newStatus = STATUS_ERROR else: if future.taskType == 'process': - logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') + logger.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') if future.taskType != 'process': continue async with self.exclusive_db_lock() as cursor: @@ -313,6 +351,7 @@ class QWARC: self._tasks = pending async def _insert_subitems(self, item): + logger.debug(f'Inserting {len(item.childItems)} subitem(s) from {item!r}') if item.childItems: async with self.exclusive_db_lock() as cursor: it = iter(item.childItems) @@ -325,6 +364,7 @@ class QWARC: async def run(self, loop): self._closed = _STATE_RUNNING + logger.debug(f'Creating {self._concurrency} sessions') for i in range(self._concurrency): session = _aiohttp.ClientSession( connector = qwarc.aiohttp.TCPConnector(loop = loop), @@ -335,12 +375,16 @@ class QWARC: self._sessions.append(session) self._freeSessions.append(session) + logger.debug('Constructing WARC object') self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename) + logger.debug('Connecting to the database') self._db = sqlite3.connect(self._dbPath, timeout = 1) + self._db.set_trace_callback(logging.getLogger(f'{__name__}[sqlite3]').debug) self._db.isolation_level = None # Transactions are handled manually below. # Setting the synchronous PRAGMA is not possible if the DB is currently locked by another process but also can't be done inside a transaction... So just retry until it succeeds. + logger.debug('Setting synchronous PRAGMA') while True: try: self._db.execute('PRAGMA synchronous = OFF') @@ -350,6 +394,7 @@ class QWARC: await asyncio.sleep(1) else: break + logger.debug('Set synchronous PRAGMA') async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"') @@ -364,31 +409,35 @@ class QWARC: await self._wait_for_free_task() if self._closed == _STATE_CLOSING: + logger.debug('Currently closing') break if os.path.exists('STOP'): - logging.info('Gracefully shutting down due to STOP file') + logger.info('Gracefully shutting down due to STOP file') break if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit): - logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})') + logger.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})') break if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk): - logging.info('Disk space is low, sleeping') + logger.info('Disk space is low, sleeping') sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' self._tasks.add(sleepTask) self._sleepTasks.add(sleepTask) continue + logger.debug('Trying to get an item from the DB') async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) result = cursor.fetchone() if not result: + logger.debug('Found no TODO item') if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone(): # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second. + logger.debug('There are incomplete items that may spawn further TODOs, starting sleep task') #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency? sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' @@ -397,8 +446,10 @@ class QWARC: continue else: # Really nothing to do anymore + logger.debug('All items are done') break id_, itemType, itemValue, status = result + logger.debug(f'Got item {itemType}:{itemValue} ({id_})') cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id_)) session = self._freeSessions.popleft() @@ -412,17 +463,20 @@ class QWARC: task.item = item self._tasks.add(task) + logger.debug(f'Cancelling {len(self._sleepTasks)} sleep task(s)') for sleepTask in self._sleepTasks: sleepTask.cancel() + logger.debug('Waiting for tasks to finish') while len(self._tasks): await self._wait_for_free_task() - logging.info('Done') + logger.info('Done') except (Exception, KeyboardInterrupt) as e: - # Kill all tasks + logger.debug(f'Got an exception, cancelling {len(self._tasks)} task(s)') for task in self._tasks: task.cancel() + logger.debug('Waiting for tasks to get cancelled') await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) self._tasks.clear() @@ -435,11 +489,13 @@ class QWARC: item.clear_subitems() def _create_db(self, cursor): + logger.debug('Creating database') cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)') cursor.execute('CREATE INDEX items_status_idx ON items (status)') cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)') def _insert_generated_items(self, cursor): + logger.debug('Inserting generated items') it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate()) while True: values = tuple(itertools.islice(it, 100000)) @@ -453,15 +509,20 @@ class QWARC: self._closed = _STATE_CLOSING + logger.debug('Closing') if self._tasks: logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup') for task in self._tasks: task.cancel() + logger.debug('Waiting for tasks to get cancelled') await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) self._tasks.clear() + logger.debug('Closing DB') self._db.close() + logger.debug('Closing sessions') for session in self._sessions: session.close() + logger.debug('Closing WARC') self._warc.close() self._reset_working_vars() # Also resets self._closed diff --git a/qwarc/cli.py b/qwarc/cli.py index 25e1c6e..809c259 100644 --- a/qwarc/cli.py +++ b/qwarc/cli.py @@ -9,6 +9,9 @@ import qwarc.version import sys +logger = logging.getLogger(__name__) + + def setup_logging(logFilename, logLevel, logLevelStderr): if logLevelStderr is None: logLevelStderr = logLevel @@ -91,7 +94,7 @@ def main(): try: loop.run_until_complete(a.run(loop)) except (Exception, KeyboardInterrupt) as e: - logging.exception('Unhandled error') + logger.exception('Unhandled error') finally: loop.run_until_complete(a.close()) loop.close() diff --git a/qwarc/utils.py b/qwarc/utils.py index 9781b27..6008a76 100644 --- a/qwarc/utils.py +++ b/qwarc/utils.py @@ -245,7 +245,7 @@ def get_software_info(specDependencyPackages): class LogFormatter(logging.Formatter): def __init__(self): - super().__init__('%(asctime)s.%(msecs)03dZ %(levelname)s %(itemString)s %(message)s', datefmt = '%Y-%m-%d %H:%M:%S') + super().__init__('%(asctime)s.%(msecs)03dZ %(name)s %(levelname)s %(itemString)s %(message)s', datefmt = '%Y-%m-%d %H:%M:%S') self.converter = time.gmtime def format(self, record): diff --git a/qwarc/warc.py b/qwarc/warc.py index 1b1df87..afb416c 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -11,6 +11,9 @@ import time import warcio +logger = logging.getLogger(__name__) + + class WARC: def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename): ''' @@ -54,37 +57,42 @@ class WARC: return while True: filename = f'{self._prefix}-{self._counter:05d}.warc.gz' + logger.debug(f'Trying to open {filename}') try: # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it self._file = open(filename, 'xb') fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except FileExistsError: - logging.info(f'{filename} already exists, skipping') + logger.debug(f'{filename} already exists, skipping') self._counter += 1 else: break - logging.info(f'Opened {filename}') + logger.info(f'Opened {filename}') self._open_journal(filename) self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False self._counter += 1 def _open_journal(self, filename): + logger.debug(f'Trying to open {filename}.qwarcjournal') try: self._journalFile = open(f'{filename}.qwarcjournal', 'xb') fcntl.flock(self._journalFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except FileExistsError: - logging.error(f'{filename}.qwarcjournal already exists!') + logger.error(f'{filename}.qwarcjournal already exists!') raise RuntimeError(f'Unable to create journal file for {filename}: {filename}.qwarcjournal already exists') except OSError as e: if e.errno == errno.EWOULDBLOCK: - logging.error(f'{filename}.qwarcjournal is already locked!') + logger.error(f'{filename}.qwarcjournal is already locked!') raise RuntimeError(f'Unable to lock journal file {filename}.qwarcjournal') from e raise + logger.debug(f'Opened {filename}.qwarcjournal') self._journalClean = True def _write_record(self, record): - # Write the current offset to the journal file + logger.debug(f'Preparing to write record {record.rec_headers.get_header("WARC-Record-ID")}') + + logger.debug('Writing journal entry') # Since the size can only grow, it is not necessary to explicitly delete the previous contents. self._journalFile.seek(0) previousSize = self._file.tell() @@ -92,19 +100,22 @@ class WARC: self._journalFile.flush() self._journalClean = False + logger.debug('Writing record') try: self._warcWriter.write_record(record) except (OSError, IOError): + logger.debug(f'Error occurred, truncating to previous size {previousSize}') self._file.truncate(previousSize) raise else: - # Mark the write as ok + logger.debug('Write ok, cleaning journal') self._journalFile.seek(-4, os.SEEK_END) # len(b'no \n') self._journalFile.write(b'yes\n') self._journalFile.flush() self._journalClean = True def _write_warcinfo_record(self): + logger.debug('Writing warcinfo record') data = { 'software': qwarc.utils.get_software_info(self._specDependencies.packages), 'command': self._command, @@ -116,8 +127,10 @@ class WARC: } payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')) # Workaround for https://github.com/webrecorder/warcio/issues/87 + logger.debug('Calculating digest') digester = warcio.utils.Digester('sha1') digester.update(payload.getvalue()) + logger.debug('Creating record object') record = self._warcWriter.create_warc_record( None, 'warcinfo', @@ -134,10 +147,13 @@ class WARC: A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC. ''' + logger.debug(f'Writing response(s) for {response.url}') self._ensure_opened() for r in response.iter_all(): usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:] requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp)) + logger.debug(f'Writing request/response for {requestDate} {r.url}') + logger.debug('Constructing request record') r.rawRequestData.seek(0, io.SEEK_END) length = r.rawRequestData.tell() r.rawRequestData.seek(0) @@ -153,6 +169,7 @@ class WARC: } ) requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID') + logger.debug('Constructing response record') r.rawResponseData.seek(0, io.SEEK_END) length = r.rawResponseData.tell() r.rawResponseData.seek(0) @@ -171,8 +188,10 @@ class WARC: payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest') assert payloadDigest is not None if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings... + logger.debug(f'Checking for duplicates with {payloadDigest}') if payloadDigest in self._dedupeMap: refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest] + logger.debug('Record is a duplicate, creating revisit record to {refersToRecordId} ({refersToDate} {refersToUri})') responseHttpHeaders = responseRecord.http_headers responseRecord = self._warcWriter.create_revisit_record( str(r.url), @@ -192,18 +211,24 @@ class WARC: # Workaround for https://github.com/webrecorder/warcio/issues/94 responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest') else: + logger.debug('New record, creating dedupe map entry') self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate) + logger.debug('Writing request and response records') self._write_record(requestRecord) self._write_record(responseRecord) + logger.debug(f'Done writing responses for {response.url}') if self._maxFileSize and self._file.tell() > self._maxFileSize: + logger.debug('File exceeds max size, closing') self._close_file() def _write_resource_records(self): '''Write spec file and dependencies''' assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first' + logger.debug('Writing resource records') for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)): + logger.debug(f'Writing resource record for {fn}') with open(fn, 'rb') as f: f.seek(0, io.SEEK_END) length = f.tell() @@ -224,6 +249,7 @@ class WARC: def _write_log_record(self): assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first' + logger.debug(f'Writing log record') rootLogger = logging.getLogger() for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers handler.flush() @@ -246,10 +272,12 @@ class WARC: '''Close the currently opened WARC''' if not self._closed: + logger.debug('Closing WARC file') self._file.close() journalFilename = self._journalFile.name self._journalFile.close() if self._journalClean: + logger.debug(f'Deleting journal {journalFilename}') os.remove(journalFilename) self._warcWriter = None self._file = None @@ -259,10 +287,11 @@ class WARC: def _write_meta_warc(self, callback): filename = f'{self._prefix}-meta.warc.gz' + logger.debug(f'Opening {filename}') self._file = open(filename, 'ab') try: fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) - logging.info(f'Opened {filename}') + logger.info(f'Opened {filename}') self._open_journal(filename) self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False