diff --git a/qwarc/__init__.py b/qwarc/__init__.py index bd7c075..90e0c9d 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -10,6 +10,7 @@ if _aiohttp.__version__ != '2.3.10': import asyncio import collections import concurrent.futures +import contextlib import io import itertools import logging @@ -245,7 +246,8 @@ class QWARC: self._freeSessions = collections.deque() # ClientSession instances that are currently free self._warc = None - async def obtain_exclusive_db_lock(self): + @contextlib.asynccontextmanager + async def exclusive_db_lock(self): c = self._db.cursor() while True: try: @@ -255,7 +257,12 @@ class QWARC: if str(e) != 'database is locked': raise await asyncio.sleep(1) - return c + try: + yield c + c.execute('COMMIT') + except: + c.execute('ROLLBACK') + raise def _make_item(self, itemType, itemValue, session, headers): try: @@ -290,20 +297,14 @@ class QWARC: logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') if future.taskType != 'process': continue - cursor = await self.obtain_exclusive_db_lock() - try: + async with self.exclusive_db_lock() as cursor: cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id)) - cursor.execute('COMMIT') - except: - cursor.execute('ROLLBACK') - raise await self._insert_subitems(item) self._freeSessions.append(item.session) self._tasks = pending async def _insert_subitems(self, item): - cursor = await self.obtain_exclusive_db_lock() - try: + async with self.exclusive_db_lock() as cursor: if item.childItems: it = iter(item.childItems) while True: @@ -311,10 +312,6 @@ class QWARC: if not values: break cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) - cursor.execute('COMMIT') - except: - cursor.execute('ROLLBACK') - raise async def run(self, loop): for i in range(self._concurrency): @@ -333,17 +330,12 @@ class QWARC: self._db.isolation_level = None # Transactions are handled manually below. self._db.execute('PRAGMA synchronous = OFF') - cursor = await self.obtain_exclusive_db_lock() - try: + async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"') result = cursor.fetchone() if not result: self._create_db(cursor) self._insert_generated_items(cursor) - cursor.execute('COMMIT') - except: - cursor.execute('ROLLBACK') - raise try: while True: @@ -365,8 +357,7 @@ class QWARC: self._sleepTasks.add(sleepTask) continue - cursor = await self.obtain_exclusive_db_lock() - try: + async with self.exclusive_db_lock() as cursor: cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) result = cursor.fetchone() if not result: @@ -379,18 +370,12 @@ class QWARC: sleepTask.taskType = 'sleep' self._tasks.add(sleepTask) self._sleepTasks.add(sleepTask) - cursor.execute('COMMIT') continue else: # Really nothing to do anymore - cursor.execute('COMMIT') break id, itemType, itemValue, status = result cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id)) - cursor.execute('COMMIT') - except: - cursor.execute('ROLLBACK') - raise session = self._freeSessions.popleft() item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)