diff --git a/qwarc/__init__.py b/qwarc/__init__.py index c017c98..b568d59 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -159,8 +159,19 @@ class QWARC: self._warcSizeLimit = warcSizeLimit self._warcDedupe = warcDedupe - async def obtain_exclusive_db_lock(self, db): - c = db.cursor() + self._reset_working_vars() + + def _reset_working_vars(self): + # Working variables + self._db = None + self._tasks = set() + self._sleepTasks = set() + self._sessions = [] # aiohttp.ClientSession instances + self._freeSessions = collections.deque() # ClientSession instances that are currently free + self._warc = None + + async def obtain_exclusive_db_lock(self): + c = self._db.cursor() while True: try: c.execute('BEGIN EXCLUSIVE') @@ -171,21 +182,64 @@ class QWARC: await asyncio.sleep(1) return c - def _make_item(self, itemType, itemValue, session, headers, warc): + def _make_item(self, itemType, itemValue, session, headers): try: itemClass = self._itemTypeMap[itemType] except KeyError: raise RuntimeError(f'No such item type: {itemType!r}') - return itemClass(itemValue, session, headers, warc) + return itemClass(itemValue, session, headers, self._warc) + + async def _wait_for_free_task(self): + if not self._tasks: + return + done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED) + for future in done: + # TODO Replace all of this with `if future.cancelled():` + try: + await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures... + except concurrent.futures.CancelledError as e: + # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task + if isinstance(future, asyncio.Task): + if future.taskType == 'process_item': + logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') + elif future.taskType == 'sleep': + self._sleepTasks.remove(future) + continue + if future.taskType == 'sleep': + # Dummy task for empty todo list, see below. + self._sleepTasks.remove(future) + continue + item = future.item + logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') + cursor = await self.obtain_exclusive_db_lock() + try: + cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id)) + cursor.execute('COMMIT') + except: + cursor.execute('ROLLBACK') + raise + await self._insert_subitems(item) + self._freeSessions.append(item.session) + self._tasks = pending + + async def _insert_subitems(self, item): + cursor = await self.obtain_exclusive_db_lock() + try: + if item.childItems: + it = iter(item.childItems) + while True: + values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)] + if not values: + break + cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) + cursor.execute('COMMIT') + except: + cursor.execute('ROLLBACK') + raise async def run(self, loop): headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere - tasks = set() - sleepTasks = set() - sessions = [] # aiohttp.ClientSession instances - freeSessions = collections.deque() # ClientSession instances that are currently free - for i in range(self._concurrency): session = _aiohttp.ClientSession( connector = qwarc.aiohttp.TCPConnector(loop = loop), @@ -194,64 +248,26 @@ class QWARC: skip_auto_headers = ['Accept-Encoding'], loop = loop ) - sessions.append(session) - freeSessions.append(session) + self._sessions.append(session) + self._freeSessions.append(session) - warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename) + self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename) - db = sqlite3.connect(self._dbPath, timeout = 1) - db.isolation_level = None # Transactions are handled manually below. - db.execute('PRAGMA synchronous = OFF') + self._db = sqlite3.connect(self._dbPath, timeout = 1) + self._db.isolation_level = None # Transactions are handled manually below. + self._db.execute('PRAGMA synchronous = OFF') try: - async def wait_for_free_task(): - nonlocal tasks, freeSessions, db - done, pending = await asyncio.wait(tasks, return_when = concurrent.futures.FIRST_COMPLETED) - for future in done: - # TODO Replace all of this with `if future.cancelled():` - try: - await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures... - except concurrent.futures.CancelledError as e: - # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task - if isinstance(future, asyncio.Task): - if future.taskType == 'process_item': - logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}') - elif future.taskType == 'sleep': - sleepTasks.remove(future) - continue - if future.taskType == 'sleep': - # Dummy task for empty todo list, see below. - sleepTasks.remove(future) - continue - item = future.item - logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') - cursor = await self.obtain_exclusive_db_lock(db) - try: - cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id)) - if item.childItems: - it = iter(item.childItems) - while True: - values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)] - if not values: - break - cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) - cursor.execute('COMMIT') - except: - cursor.execute('ROLLBACK') - raise - freeSessions.append(item.session) - tasks = pending - while True: - while len(tasks) >= self._concurrency: - await wait_for_free_task() + while len(self._tasks) >= self._concurrency: + await self._wait_for_free_task() if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk): logging.info('Disk space is low, sleeping') sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' - tasks.add(sleepTask) - sleepTasks.add(sleepTask) + self._tasks.add(sleepTask) + self._sleepTasks.add(sleepTask) continue if os.path.exists('STOP'): @@ -261,7 +277,7 @@ class QWARC: logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})') break - cursor = await self.obtain_exclusive_db_lock(db) + cursor = await self.obtain_exclusive_db_lock() try: cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) result = cursor.fetchone() @@ -273,8 +289,8 @@ class QWARC: #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency? sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5))) sleepTask.taskType = 'sleep' - tasks.add(sleepTask) - sleepTasks.add(sleepTask) + self._tasks.add(sleepTask) + self._sleepTasks.add(sleepTask) cursor.execute('COMMIT') continue else: @@ -290,8 +306,8 @@ class QWARC: cursor.execute('ROLLBACK') raise - session = freeSessions.popleft() - item = self._make_item(itemType, itemValue, session, headers, warc) + session = self._freeSessions.popleft() + item = self._make_item(itemType, itemValue, session, headers) task = asyncio.ensure_future(item.process()) #TODO: Is there a better way to add custom information to a task/coroutine object? task.taskType = 'process' @@ -299,27 +315,29 @@ class QWARC: task.itemType = itemType task.itemValue = itemValue task.item = item - tasks.add(task) + self._tasks.add(task) - for sleepTask in sleepTasks: + for sleepTask in self._sleepTasks: sleepTask.cancel() - while len(tasks): - await wait_for_free_task() + while len(self._tasks): + await self._wait_for_free_task() logging.info('Done') except (Exception, KeyboardInterrupt) as e: # Kill all tasks - for task in tasks: + for task in self._tasks: task.cancel() - await asyncio.wait(tasks, return_when = concurrent.futures.ALL_COMPLETED) + await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED) raise finally: - for session in sessions: + for session in self._sessions: session.close() - warc.close() - db.close() + self._warc.close() + self._db.close() + + self._reset_working_vars() def create_db(self): db = sqlite3.connect(self._dbPath, timeout = 1)