diff --git a/qwarc/__init__.py b/qwarc/__init__.py index de51581..fa8eb19 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -208,6 +208,11 @@ class Item: return f'<{type(self).__module__}.{type(self).__name__} object {id(self)}, itemType = {self.itemType!r}, itemValue = {self.itemValue!r}>' +# QWARC._closed values +_STATE_RUNNING = 1 +_STATE_CLOSING = 2 +_STATE_CLOSED = 3 + class QWARC: def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): ''' @@ -248,6 +253,7 @@ class QWARC: self._sessions = [] # aiohttp.ClientSession instances self._freeSessions = collections.deque() # ClientSession instances that are currently free self._warc = None + self._closed = _STATE_CLOSED @contextlib.asynccontextmanager async def exclusive_db_lock(self): @@ -317,6 +323,8 @@ class QWARC: cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) async def run(self, loop): + self._closed = _STATE_RUNNING + for i in range(self._concurrency): session = _aiohttp.ClientSession( connector = qwarc.aiohttp.TCPConnector(loop = loop), @@ -355,6 +363,9 @@ class QWARC: while len(self._tasks) >= self._concurrency: await self._wait_for_free_task() + if self._closed == _STATE_CLOSING: + break + if os.path.exists('STOP'): logging.info('Gracefully shutting down due to STOP file') break @@ -413,15 +424,11 @@ class QWARC: for task in self._tasks: task.cancel() await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) + self._tasks.clear() raise finally: - for session in self._sessions: - session.close() - self._warc.close() - self._db.close() - - self._reset_working_vars() + await self.close() async def flush_subitems(self, item): await self._insert_subitems(item) @@ -439,3 +446,22 @@ class QWARC: if not values: break cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) + + async def close(self): + if self._closed in (_STATE_CLOSING, _STATE_CLOSED): + return + + self._closed = _STATE_CLOSING + + if self._tasks: + logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup') + for task in self._tasks: + task.cancel() + await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) + self._tasks.clear() + self._db.close() + for session in self._sessions: + session.close() + self._warc.close() + + self._reset_working_vars() # Also resets self._closed diff --git a/qwarc/cli.py b/qwarc/cli.py index 18bf790..a69b1d9 100644 --- a/qwarc/cli.py +++ b/qwarc/cli.py @@ -83,4 +83,6 @@ def main(): loop.run_until_complete(a.run(loop)) except (Exception, KeyboardInterrupt) as e: logging.exception('Unhandled error') + finally: + loop.run_until_complete(a.close()) loop.close()