From 3c8b45b3a61625bfd00e8655440e5961ca832f03 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Tue, 14 Jul 2020 05:53:35 +0000 Subject: [PATCH] Refactor cleanup code - Run the cleanup code on exceptions (e.g. ^C). There were several effects of that not happening previously; most notably, the log file was not written to the meta WARC. - Cancel remaining tasks, which avoids a pile of asyncio warnings and errors on crashes. - Close the DB before the WARC, or rather, close the WARC last. This is mostly a semantic change to further ensure that the log written to the meta WARC is as complete as possible. --- qwarc/__init__.py | 38 ++++++++++++++++++++++++++++++++------ qwarc/cli.py | 2 ++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/qwarc/__init__.py b/qwarc/__init__.py index de51581..fa8eb19 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -208,6 +208,11 @@ class Item: return f'<{type(self).__module__}.{type(self).__name__} object {id(self)}, itemType = {self.itemType!r}, itemValue = {self.itemValue!r}>' +# QWARC._closed values +_STATE_RUNNING = 1 +_STATE_CLOSING = 2 +_STATE_CLOSED = 3 + class QWARC: def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): ''' @@ -248,6 +253,7 @@ class QWARC: self._sessions = [] # aiohttp.ClientSession instances self._freeSessions = collections.deque() # ClientSession instances that are currently free self._warc = None + self._closed = _STATE_CLOSED @contextlib.asynccontextmanager async def exclusive_db_lock(self): @@ -317,6 +323,8 @@ class QWARC: cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) async def run(self, loop): + self._closed = _STATE_RUNNING + for i in range(self._concurrency): session = _aiohttp.ClientSession( connector = qwarc.aiohttp.TCPConnector(loop = loop), @@ -355,6 +363,9 @@ class QWARC: while len(self._tasks) >= self._concurrency: await self._wait_for_free_task() + if self._closed == _STATE_CLOSING: + break + if os.path.exists('STOP'): logging.info('Gracefully shutting down due to STOP file') break @@ -413,15 +424,11 @@ class QWARC: for task in self._tasks: task.cancel() await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) + self._tasks.clear() raise finally: - for session in self._sessions: - session.close() - self._warc.close() - self._db.close() - - self._reset_working_vars() + await self.close() async def flush_subitems(self, item): await self._insert_subitems(item) @@ -439,3 +446,22 @@ class QWARC: if not values: break cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) + + async def close(self): + if self._closed in (_STATE_CLOSING, _STATE_CLOSED): + return + + self._closed = _STATE_CLOSING + + if self._tasks: + logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup') + for task in self._tasks: + task.cancel() + await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) + self._tasks.clear() + self._db.close() + for session in self._sessions: + session.close() + self._warc.close() + + self._reset_working_vars() # Also resets self._closed diff --git a/qwarc/cli.py b/qwarc/cli.py index 18bf790..a69b1d9 100644 --- a/qwarc/cli.py +++ b/qwarc/cli.py @@ -83,4 +83,6 @@ def main(): loop.run_until_complete(a.run(loop)) except (Exception, KeyboardInterrupt) as e: logging.exception('Unhandled error') + finally: + loop.run_until_complete(a.close()) loop.close()