Parcourir la source

Refactor QWARC class to keep relevant variables in instance attributes instead of local variables

tags/v0.2.1
JustAnotherArchivist il y a 4 ans
Parent
révision
50b936b18c
1 fichiers modifiés avec 89 ajouts et 71 suppressions
  1. +89
    -71
      qwarc/__init__.py

+ 89
- 71
qwarc/__init__.py Voir le fichier

@@ -159,8 +159,19 @@ class QWARC:
self._warcSizeLimit = warcSizeLimit
self._warcDedupe = warcDedupe

async def obtain_exclusive_db_lock(self, db):
c = db.cursor()
self._reset_working_vars()

def _reset_working_vars(self):
# Working variables
self._db = None
self._tasks = set()
self._sleepTasks = set()
self._sessions = [] # aiohttp.ClientSession instances
self._freeSessions = collections.deque() # ClientSession instances that are currently free
self._warc = None

async def obtain_exclusive_db_lock(self):
c = self._db.cursor()
while True:
try:
c.execute('BEGIN EXCLUSIVE')
@@ -171,21 +182,64 @@ class QWARC:
await asyncio.sleep(1)
return c

def _make_item(self, itemType, itemValue, session, headers, warc):
def _make_item(self, itemType, itemValue, session, headers):
try:
itemClass = self._itemTypeMap[itemType]
except KeyError:
raise RuntimeError(f'No such item type: {itemType!r}')
return itemClass(itemValue, session, headers, warc)
return itemClass(itemValue, session, headers, self._warc)

async def _wait_for_free_task(self):
if not self._tasks:
return
done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
for future in done:
# TODO Replace all of this with `if future.cancelled():`
try:
await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures...
except concurrent.futures.CancelledError as e:
# Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
if isinstance(future, asyncio.Task):
if future.taskType == 'process_item':
logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
elif future.taskType == 'sleep':
self._sleepTasks.remove(future)
continue
if future.taskType == 'sleep':
# Dummy task for empty todo list, see below.
self._sleepTasks.remove(future)
continue
item = future.item
logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
cursor = await self.obtain_exclusive_db_lock()
try:
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id))
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise
await self._insert_subitems(item)
self._freeSessions.append(item.session)
self._tasks = pending

async def _insert_subitems(self, item):
cursor = await self.obtain_exclusive_db_lock()
try:
if item.childItems:
it = iter(item.childItems)
while True:
values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
if not values:
break
cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise

async def run(self, loop):
headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere

tasks = set()
sleepTasks = set()
sessions = [] # aiohttp.ClientSession instances
freeSessions = collections.deque() # ClientSession instances that are currently free

for i in range(self._concurrency):
session = _aiohttp.ClientSession(
connector = qwarc.aiohttp.TCPConnector(loop = loop),
@@ -194,64 +248,26 @@ class QWARC:
skip_auto_headers = ['Accept-Encoding'],
loop = loop
)
sessions.append(session)
freeSessions.append(session)
self._sessions.append(session)
self._freeSessions.append(session)

warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)

db = sqlite3.connect(self._dbPath, timeout = 1)
db.isolation_level = None # Transactions are handled manually below.
db.execute('PRAGMA synchronous = OFF')
self._db = sqlite3.connect(self._dbPath, timeout = 1)
self._db.isolation_level = None # Transactions are handled manually below.
self._db.execute('PRAGMA synchronous = OFF')

try:
async def wait_for_free_task():
nonlocal tasks, freeSessions, db
done, pending = await asyncio.wait(tasks, return_when = concurrent.futures.FIRST_COMPLETED)
for future in done:
# TODO Replace all of this with `if future.cancelled():`
try:
await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures...
except concurrent.futures.CancelledError as e:
# Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
if isinstance(future, asyncio.Task):
if future.taskType == 'process_item':
logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
elif future.taskType == 'sleep':
sleepTasks.remove(future)
continue
if future.taskType == 'sleep':
# Dummy task for empty todo list, see below.
sleepTasks.remove(future)
continue
item = future.item
logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
cursor = await self.obtain_exclusive_db_lock(db)
try:
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id))
if item.childItems:
it = iter(item.childItems)
while True:
values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
if not values:
break
cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
cursor.execute('COMMIT')
except:
cursor.execute('ROLLBACK')
raise
freeSessions.append(item.session)
tasks = pending

while True:
while len(tasks) >= self._concurrency:
await wait_for_free_task()
while len(self._tasks) >= self._concurrency:
await self._wait_for_free_task()

if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
logging.info('Disk space is low, sleeping')
sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
sleepTask.taskType = 'sleep'
tasks.add(sleepTask)
sleepTasks.add(sleepTask)
self._tasks.add(sleepTask)
self._sleepTasks.add(sleepTask)
continue

if os.path.exists('STOP'):
@@ -261,7 +277,7 @@ class QWARC:
logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
break

cursor = await self.obtain_exclusive_db_lock(db)
cursor = await self.obtain_exclusive_db_lock()
try:
cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
result = cursor.fetchone()
@@ -273,8 +289,8 @@ class QWARC:
#TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
sleepTask.taskType = 'sleep'
tasks.add(sleepTask)
sleepTasks.add(sleepTask)
self._tasks.add(sleepTask)
self._sleepTasks.add(sleepTask)
cursor.execute('COMMIT')
continue
else:
@@ -290,8 +306,8 @@ class QWARC:
cursor.execute('ROLLBACK')
raise

session = freeSessions.popleft()
item = self._make_item(itemType, itemValue, session, headers, warc)
session = self._freeSessions.popleft()
item = self._make_item(itemType, itemValue, session, headers)
task = asyncio.ensure_future(item.process())
#TODO: Is there a better way to add custom information to a task/coroutine object?
task.taskType = 'process'
@@ -299,27 +315,29 @@ class QWARC:
task.itemType = itemType
task.itemValue = itemValue
task.item = item
tasks.add(task)
self._tasks.add(task)

for sleepTask in sleepTasks:
for sleepTask in self._sleepTasks:
sleepTask.cancel()

while len(tasks):
await wait_for_free_task()
while len(self._tasks):
await self._wait_for_free_task()

logging.info('Done')
except (Exception, KeyboardInterrupt) as e:
# Kill all tasks
for task in tasks:
for task in self._tasks:
task.cancel()
await asyncio.wait(tasks, return_when = concurrent.futures.ALL_COMPLETED)
await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)

raise
finally:
for session in sessions:
for session in self._sessions:
session.close()
warc.close()
db.close()
self._warc.close()
self._db.close()

self._reset_working_vars()

def create_db(self):
db = sqlite3.connect(self._dbPath, timeout = 1)


Chargement…
Annuler
Enregistrer