@@ -20,6 +20,9 @@ import sqlite3
import yarl
logger = logging.getLogger(__name__)
class Item:
itemType = None
defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default)
@@ -33,7 +36,7 @@ class Item:
if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__
self._baseUrl = None
self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
self.logger = logging.LoggerAdapter(logging.getLogger(f'{type(self).__module__}.{type(self).__name__}' ), {'itemType': self.itemType, 'itemValue': self.itemValue})
self.childItems = []
@@ -43,6 +46,7 @@ class Item:
@baseUrl.setter
def baseUrl(self, baseUrl):
self.logger.debug(f'Setting baseUrl to {baseUrl!r}')
if baseUrl is None:
self._baseUrl = None
elif isinstance(baseUrl, yarl.URL):
@@ -93,13 +97,27 @@ class Item:
#TODO: Rewrite using 'async with self.session.get'
if self.logger.isEnabledFor(logging.DEBUG):
l = locals()
modified = []
for kwarg, kwdef in self.fetch.__kwdefaults__.items():
if l[kwarg] != kwdef:
modified.append(kwarg)
if modified:
self.logger.debug(f'fetch({url!r}) with ...')
for kwarg in modified:
self.logger.debug(f'... {kwarg} = {repr(l[kwarg]).replace(chr(10), " ")}') # aiohttp.ClientResponse's repr can contain newlines
else:
self.logger.debug(f'fetch({url!r})')
url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
if not url.scheme or not url.host:
if fromResponse is not None:
self.logger.debug(f'Joining incomplete URL {url} with fromResponse URL {fromResponse.url}')
url = fromResponse.url.join(url)
elif not self.baseUrl:
raise ValueError('Incomplete URL and no baseUrl to join it with')
else:
self.logger.debug(f'Joining incomplete URL {url} with base URL {baseUrl}')
url = self.baseUrl.join(url)
originalUrl = url
if responseHandler is None:
@@ -115,6 +133,7 @@ class Item:
exc = None
action = ACTION_RETRY
writeToWarc = True
self.logger.debug(f'Trying to fetch {url}, attempt {attempt}, redirect level {redirectLevel}')
try:
try:
with _aiohttp.Timeout(timeout):
@@ -144,20 +163,25 @@ class Item:
exc = e # Pass the exception outward for the history
else:
action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self)
self.logger.debug(f'Response handler result: {action}, {writeToWarc}')
if response and exc is None and writeToWarc:
self.warc.write_client_response(response)
history.append((response, exc))
retResponse = response if exc is None else qwarc.utils.DummyClientResponse()
if action in (ACTION_SUCCESS, ACTION_IGNORE):
self.logger.error(f'Fetch loop done')
retResponse.qhistory = tuple(history)
return retResponse
elif action == ACTION_FOLLOW_OR_SUCCESS:
redirectUrl = response.headers.get('Location') or response.headers.get('URI')
if not redirectUrl:
self.logger.debug('Got redirect but no redirect target, fetch loop done')
retResponse.qhistory = tuple(history)
return retResponse
url = url.join(yarl.URL(redirectUrl))
self.logger.debug(f'Next URL: {url}')
if response.status in (301, 302, 303) and method == 'POST':
self.logger.debug(f'{response.status} on POST, dropping to GET and clearing data')
method = 'GET'
data = None
attempt = 0
@@ -175,6 +199,7 @@ class Item:
pass
finally:
if response:
self.logger.debug('Releasing response')
await response.release()
async def process(self):
@@ -189,13 +214,16 @@ class Item:
item = (itemClassOrType.itemType, itemValue)
else:
item = (itemClassOrType, itemValue)
self.logger.debug(f'Adding subitem {itemClassOrType!r} {itemValue!r}')
if item not in self.childItems:
self.childItems.append(item)
async def flush_subitems(self):
self.logger.debug('Flushing subitems')
await self.qwarcObj.flush_subitems(self)
def clear_subitems(self):
self.logger.debug('Clearing subitems')
self.childItems = []
@classmethod
@@ -257,6 +285,7 @@ class QWARC:
@contextlib.asynccontextmanager
async def exclusive_db_lock(self):
logger.debug('Attempting to obtain DB lock')
c = self._db.cursor()
while True:
try:
@@ -266,14 +295,18 @@ class QWARC:
if str(e) != 'database is locked':
raise
await asyncio.sleep(1)
logger.debug('Obtained DB lock')
try:
yield c
logger.debug('Committing DB transaction')
c.execute('COMMIT')
except:
logger.debug('Rolling back DB transaction')
c.execute('ROLLBACK')
raise
def _make_item(self, itemType, itemValue, session, headers):
logger.debug(f'Making item {itemType!r} {itemValue!r}')
try:
itemClass = self._itemTypeMap[itemType]
except KeyError:
@@ -281,29 +314,34 @@ class QWARC:
return itemClass(self, itemValue, session, headers, self._warc)
async def _wait_for_free_task(self):
logger.debug('Waiting for task(s) to finish')
if not self._tasks:
logger.debug('No tasks to wait for')
return
done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
logger.debug(f'{len(done)} task(s) finished, {len(pending)} remaining')
for future in done:
logger.debug(f'Finished {future.taskType} task: {future!r}')
newStatus = STATUS_DONE
if future.taskType == 'sleep':
self._sleepTasks.remove(future)
elif future.taskType == 'process':
item = future.item
logger.debug(f'Finished item: {item!r}')
try:
future.result()
except asyncio.CancelledError as e:
# Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
if future.taskType == 'process':
logging .error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
logger .error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
newStatus = STATUS_ERROR
except Exception as e:
if future.taskType == 'process':
logging .error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
logger .error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
newStatus = STATUS_ERROR
else:
if future.taskType == 'process':
logging .info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
logger .info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
if future.taskType != 'process':
continue
async with self.exclusive_db_lock() as cursor:
@@ -313,6 +351,7 @@ class QWARC:
self._tasks = pending
async def _insert_subitems(self, item):
logger.debug(f'Inserting {len(item.childItems)} subitem(s) from {item!r}')
if item.childItems:
async with self.exclusive_db_lock() as cursor:
it = iter(item.childItems)
@@ -325,6 +364,7 @@ class QWARC:
async def run(self, loop):
self._closed = _STATE_RUNNING
logger.debug(f'Creating {self._concurrency} sessions')
for i in range(self._concurrency):
session = _aiohttp.ClientSession(
connector = qwarc.aiohttp.TCPConnector(loop = loop),
@@ -335,12 +375,16 @@ class QWARC:
self._sessions.append(session)
self._freeSessions.append(session)
logger.debug('Constructing WARC object')
self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
logger.debug('Connecting to the database')
self._db = sqlite3.connect(self._dbPath, timeout = 1)
self._db.set_trace_callback(logging.getLogger(f'{__name__}[sqlite3]').debug)
self._db.isolation_level = None # Transactions are handled manually below.
# Setting the synchronous PRAGMA is not possible if the DB is currently locked by another process but also can't be done inside a transaction... So just retry until it succeeds.
logger.debug('Setting synchronous PRAGMA')
while True:
try:
self._db.execute('PRAGMA synchronous = OFF')
@@ -350,6 +394,7 @@ class QWARC:
await asyncio.sleep(1)
else:
break
logger.debug('Set synchronous PRAGMA')
async with self.exclusive_db_lock() as cursor:
cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"')
@@ -364,31 +409,35 @@ class QWARC:
await self._wait_for_free_task()
if self._closed == _STATE_CLOSING:
logger.debug('Currently closing')
break
if os.path.exists('STOP'):
logging .info('Gracefully shutting down due to STOP file')
logger .info('Gracefully shutting down due to STOP file')
break
if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
logging .info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
logger .info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
break
if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
logging .info('Disk space is low, sleeping')
logger .info('Disk space is low, sleeping')
sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
sleepTask.taskType = 'sleep'
self._tasks.add(sleepTask)
self._sleepTasks.add(sleepTask)
continue
logger.debug('Trying to get an item from the DB')
async with self.exclusive_db_lock() as cursor:
cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
result = cursor.fetchone()
if not result:
logger.debug('Found no TODO item')
if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
# There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
# It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
# So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
logger.debug('There are incomplete items that may spawn further TODOs, starting sleep task')
#TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
sleepTask.taskType = 'sleep'
@@ -397,8 +446,10 @@ class QWARC:
continue
else:
# Really nothing to do anymore
logger.debug('All items are done')
break
id_, itemType, itemValue, status = result
logger.debug(f'Got item {itemType}:{itemValue} ({id_})')
cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id_))
session = self._freeSessions.popleft()
@@ -412,17 +463,20 @@ class QWARC:
task.item = item
self._tasks.add(task)
logger.debug(f'Cancelling {len(self._sleepTasks)} sleep task(s)')
for sleepTask in self._sleepTasks:
sleepTask.cancel()
logger.debug('Waiting for tasks to finish')
while len(self._tasks):
await self._wait_for_free_task()
logging .info('Done')
logger .info('Done')
except (Exception, KeyboardInterrupt) as e:
# Kill all tasks
logger.debug(f'Got an exception, cancelling {len(self._tasks)} task(s)')
for task in self._tasks:
task.cancel()
logger.debug('Waiting for tasks to get cancelled')
await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
self._tasks.clear()
@@ -435,11 +489,13 @@ class QWARC:
item.clear_subitems()
def _create_db(self, cursor):
logger.debug('Creating database')
cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
cursor.execute('CREATE INDEX items_status_idx ON items (status)')
cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
def _insert_generated_items(self, cursor):
logger.debug('Inserting generated items')
it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate())
while True:
values = tuple(itertools.islice(it, 100000))
@@ -453,15 +509,20 @@ class QWARC:
self._closed = _STATE_CLOSING
logger.debug('Closing')
if self._tasks:
logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup')
for task in self._tasks:
task.cancel()
logger.debug('Waiting for tasks to get cancelled')
await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
self._tasks.clear()
logger.debug('Closing DB')
self._db.close()
logger.debug('Closing sessions')
for session in self._sessions:
session.close()
logger.debug('Closing WARC')
self._warc.close()
self._reset_working_vars() # Also resets self._closed