瀏覽代碼

Replace DB locking with an async context manager

master
JustAnotherArchivist 3 年之前
父節點
當前提交
a1e693739e
共有 1 個文件被更改,包括 13 次插入28 次删除
  1. +13
    -28
      qwarc/__init__.py

+ 13
- 28
qwarc/__init__.py 查看文件

@@ -10,6 +10,7 @@ if _aiohttp.__version__ != '2.3.10':
import asyncio import asyncio
import collections import collections
import concurrent.futures import concurrent.futures
import contextlib
import io import io
import itertools import itertools
import logging import logging
@@ -245,7 +246,8 @@ class QWARC:
self._freeSessions = collections.deque() # ClientSession instances that are currently free self._freeSessions = collections.deque() # ClientSession instances that are currently free
self._warc = None self._warc = None


async def obtain_exclusive_db_lock(self):
@contextlib.asynccontextmanager
async def exclusive_db_lock(self):
c = self._db.cursor() c = self._db.cursor()
while True: while True:
try: try:
@@ -255,7 +257,12 @@ class QWARC:
if str(e) != 'database is locked': if str(e) != 'database is locked':
raise raise
await asyncio.sleep(1) await asyncio.sleep(1)
return c
try:
yield c
c.execute('COMMIT')
except:
c.execute('ROLLBACK')
raise


def _make_item(self, itemType, itemValue, session, headers): def _make_item(self, itemType, itemValue, session, headers):
try: try:
@@ -290,20 +297,14 @@ class QWARC:
logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx') logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
if future.taskType != 'process': if future.taskType != 'process':
continue continue
cursor = await self.obtain_exclusive_db_lock()
try:
async with self.exclusive_db_lock() as cursor:
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id)) cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise
await self._insert_subitems(item) await self._insert_subitems(item)
self._freeSessions.append(item.session) self._freeSessions.append(item.session)
self._tasks = pending self._tasks = pending


async def _insert_subitems(self, item): async def _insert_subitems(self, item):
cursor = await self.obtain_exclusive_db_lock()
try:
async with self.exclusive_db_lock() as cursor:
if item.childItems: if item.childItems:
it = iter(item.childItems) it = iter(item.childItems)
while True: while True:
@@ -311,10 +312,6 @@ class QWARC:
if not values: if not values:
break break
cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values) cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise


async def run(self, loop): async def run(self, loop):
for i in range(self._concurrency): for i in range(self._concurrency):
@@ -333,17 +330,12 @@ class QWARC:
self._db.isolation_level = None # Transactions are handled manually below. self._db.isolation_level = None # Transactions are handled manually below.
self._db.execute('PRAGMA synchronous = OFF') self._db.execute('PRAGMA synchronous = OFF')


cursor = await self.obtain_exclusive_db_lock()
try:
async with self.exclusive_db_lock() as cursor:
cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"') cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"')
result = cursor.fetchone() result = cursor.fetchone()
if not result: if not result:
self._create_db(cursor) self._create_db(cursor)
self._insert_generated_items(cursor) self._insert_generated_items(cursor)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise


try: try:
while True: while True:
@@ -365,8 +357,7 @@ class QWARC:
self._sleepTasks.add(sleepTask) self._sleepTasks.add(sleepTask)
continue continue


cursor = await self.obtain_exclusive_db_lock()
try:
async with self.exclusive_db_lock() as cursor:
cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,)) cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
result = cursor.fetchone() result = cursor.fetchone()
if not result: if not result:
@@ -379,18 +370,12 @@ class QWARC:
sleepTask.taskType = 'sleep' sleepTask.taskType = 'sleep'
self._tasks.add(sleepTask) self._tasks.add(sleepTask)
self._sleepTasks.add(sleepTask) self._sleepTasks.add(sleepTask)
cursor.execute('COMMIT')
continue continue
else: else:
# Really nothing to do anymore # Really nothing to do anymore
cursor.execute('COMMIT')
break break
id, itemType, itemValue, status = result id, itemType, itemValue, status = result
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id)) cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise


session = self._freeSessions.popleft() session = self._freeSessions.popleft()
item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS) item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)


Loading…
取消
儲存