A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
11 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import itertools
  12. import logging
  13. import os
  14. import random
  15. import sqlite3
  16. import yarl
  17. class Item:
  18. itemType = None
  19. def __init__(self, itemValue, session, headers, warc):
  20. self.itemValue = itemValue
  21. self.session = session
  22. self.headers = headers
  23. self.warc = warc
  24. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  25. self.childItems = []
  26. async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default):
  27. '''
  28. HTTP GET a URL
  29. url: str or yarl.URL
  30. responseHandler: a callable that determines how the response is handled. See qwarc.utils.handle_response_default for details.
  31. Returns response (a ClientResponse object or None) and history (a tuple of (response, exception) tuples).
  32. response can be None and history can be an empty tuple, depending on the circumstances (e.g. timeouts).
  33. '''
  34. #TODO: Rewrite using 'async with self.session.get'
  35. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  36. history = []
  37. attempt = 0
  38. #TODO redirectLevel
  39. while True:
  40. attempt += 1
  41. response = None
  42. exc = None
  43. action = ACTION_RETRY
  44. writeToWarc = True
  45. try:
  46. try:
  47. with _aiohttp.Timeout(60):
  48. logging.info('Fetching {}'.format(url))
  49. response = await self.session.get(url, headers = self.headers, allow_redirects = False)
  50. try:
  51. ret = await response.text(errors = 'surrogateescape')
  52. except:
  53. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  54. response.close()
  55. raise
  56. else:
  57. tx = len(response.rawRequestData)
  58. rx = len(response.rawResponseData)
  59. logging.info('Fetched {}: {} (tx {}, rx {})'.format(url, response.status, tx, rx))
  60. self.stats['tx'] += tx
  61. self.stats['rx'] += rx
  62. self.stats['requests'] += 1
  63. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  64. logging.error('Request for {} failed: {!r}'.format(url, e))
  65. action, writeToWarc = await responseHandler(url, attempt, response, e)
  66. exc = e # Pass the exception outward for the history
  67. else:
  68. action, writeToWarc = await responseHandler(url, attempt, response, None)
  69. history.append((response, exc))
  70. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  71. return response, tuple(history)
  72. elif action == ACTION_FOLLOW_OR_SUCCESS:
  73. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  74. if not redirectUrl:
  75. return response, tuple(history)
  76. url = url.join(yarl.URL(redirectUrl))
  77. attempt = 0
  78. elif action == ACTION_RETRY:
  79. # Nothing to do, just go to the next cycle
  80. pass
  81. finally:
  82. if response:
  83. if writeToWarc:
  84. self.warc.write_client_response(response)
  85. await response.release()
  86. async def process(self):
  87. raise NotImplementedError
  88. @classmethod
  89. def generate(cls):
  90. yield from () # Generate no items by default
  91. @classmethod
  92. def _gen(cls):
  93. for x in cls.generate():
  94. yield (cls.itemType, x, STATUS_TODO)
  95. def add_item(self, itemClassOrType, itemValue):
  96. if issubclass(itemClassOrType, Item):
  97. item = (itemClassOrType.itemType, itemValue)
  98. else:
  99. item = (itemClassOrType, itemValue)
  100. if item not in self.childItems:
  101. self.childItems.append(item)
  102. class QWARC:
  103. def __init__(self, itemClasses, warcBasePath, dbPath, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  104. '''
  105. itemClasses: iterable of Item
  106. warcBasePath: str, base name of the WARC files
  107. dbPath: str, path to the sqlite3 database file
  108. concurrency: int, number of concurrently processed items
  109. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  110. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  111. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  112. '''
  113. self._itemClasses = itemClasses
  114. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  115. self._warcBasePath = warcBasePath
  116. self._dbPath = dbPath
  117. self._concurrency = concurrency
  118. self._memoryLimit = memoryLimit
  119. self._minFreeDisk = minFreeDisk
  120. self._warcSizeLimit = warcSizeLimit
  121. self._warcDedupe = warcDedupe
  122. async def obtain_exclusive_db_lock(self, db):
  123. c = db.cursor()
  124. while True:
  125. try:
  126. c.execute('BEGIN EXCLUSIVE')
  127. break
  128. except sqlite3.OperationalError as e:
  129. if str(e) != 'database is locked':
  130. raise
  131. await asyncio.sleep(1)
  132. return c
  133. def _make_item(self, itemType, itemValue, session, headers, warc):
  134. try:
  135. itemClass = self._itemTypeMap[itemType]
  136. except KeyError:
  137. raise RuntimeError('No such item type: {!r}'.format(itemType))
  138. return itemClass(itemValue, session, headers, warc)
  139. async def run(self, loop):
  140. headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere
  141. tasks = set()
  142. sleepTasks = set()
  143. sessions = [] # aiohttp.ClientSession instances
  144. freeSessions = collections.deque() # ClientSession instances that are currently free
  145. for i in range(self._concurrency):
  146. session = _aiohttp.ClientSession(
  147. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  148. request_class = qwarc.aiohttp.ClientRequest,
  149. response_class = qwarc.aiohttp.ClientResponse,
  150. skip_auto_headers = ['Accept-Encoding'],
  151. loop = loop
  152. )
  153. sessions.append(session)
  154. freeSessions.append(session)
  155. warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe)
  156. db = sqlite3.connect(self._dbPath, timeout = 1)
  157. db.isolation_level = None # Transactions are handled manually below.
  158. db.execute('PRAGMA synchronous = OFF')
  159. try:
  160. async def wait_for_free_task():
  161. nonlocal tasks, freeSessions, db, emptyTodoSleep
  162. done, pending = await asyncio.wait(tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  163. for future in done:
  164. # TODO Replace all of this with `if future.cancelled():`
  165. try:
  166. await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures...
  167. except concurrent.futures.CancelledError as e:
  168. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  169. if isinstance(future, asyncio.Task):
  170. if future.taskType == 'process_item':
  171. logging.warning('Task for {}:{} cancelled: {!r}'.format(future.itemType, future.itemValue, future))
  172. elif future.taskType == 'sleep':
  173. sleepTasks.remove(future)
  174. continue
  175. if future.taskType == 'sleep':
  176. # Dummy task for empty todo list, see below.
  177. sleepTasks.remove(future)
  178. continue
  179. item = future.item
  180. logging.info('{itemType}:{itemValue} done: {requests} requests, {tx} tx, {rx} rx'.format(itemType = future.itemType, itemValue = future.itemValue, **item.stats))
  181. cursor = await self.obtain_exclusive_db_lock(db)
  182. try:
  183. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id))
  184. if item.childItems:
  185. it = iter(item.childItems)
  186. while True:
  187. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  188. if not values:
  189. break
  190. cursor.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values)
  191. cursor.execute('COMMIT')
  192. except:
  193. cursor.execute('ROLLBACK')
  194. raise
  195. freeSessions.append(item.session)
  196. tasks = pending
  197. while True:
  198. while len(tasks) >= self._concurrency:
  199. emptyTodoFullReached = True
  200. await wait_for_free_task()
  201. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  202. logging.info('Disk space is low, sleeping')
  203. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  204. sleepTask.taskType = 'sleep'
  205. tasks.add(sleepTask)
  206. sleepTasks.add(sleepTask)
  207. continue
  208. cursor = await self.obtain_exclusive_db_lock(db)
  209. try:
  210. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  211. result = cursor.fetchone()
  212. if not result:
  213. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  214. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  215. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  216. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  217. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  218. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  219. sleepTask.taskType = 'sleep'
  220. tasks.add(sleepTask)
  221. sleepTasks.add(sleepTask)
  222. cursor.execute('COMMIT')
  223. continue
  224. else:
  225. # Really nothing to do anymore
  226. #TODO: Another process may be running create_db, in which case we'd still want to wait...
  227. # create_db could insert a dummy item which is marked as done when the DB is ready
  228. cursor.execute('COMMIT')
  229. break
  230. emptyTodoSleep = 0
  231. id, itemType, itemValue, status = result
  232. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
  233. cursor.execute('COMMIT')
  234. except:
  235. cursor.execute('ROLLBACK')
  236. raise
  237. session = freeSessions.popleft()
  238. item = self._make_item(itemType, itemValue, session, headers, warc)
  239. task = asyncio.ensure_future(item.process())
  240. #TODO: Is there a better way to add custom information to a task/coroutine object?
  241. task.taskType = 'process'
  242. task.id = id
  243. task.itemType = itemType
  244. task.itemValue = itemValue
  245. task.item = item
  246. tasks.add(task)
  247. if os.path.exists('STOP'):
  248. logging.info('Gracefully shutting down due to STOP file')
  249. break
  250. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  251. logging.info('Gracefully shutting down due to memory usage (current = {} > limit = {})'.format(qwarc.utils.get_rss(), self._memoryLimit))
  252. break
  253. for sleepTask in sleepTasks:
  254. sleepTask.cancel()
  255. while len(tasks):
  256. await wait_for_free_task()
  257. logging.info('Done')
  258. except (Exception, KeyboardInterrupt) as e:
  259. # Kill all tasks
  260. for task in tasks:
  261. task.cancel()
  262. await asyncio.wait(tasks, return_when = concurrent.futures.ALL_COMPLETED)
  263. raise
  264. finally:
  265. for session in sessions:
  266. session.close()
  267. warc.close()
  268. db.close()
  269. def create_db(self):
  270. db = sqlite3.connect(self._dbPath, timeout = 1)
  271. db.execute('PRAGMA synchronous = OFF')
  272. with db:
  273. db.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  274. db.execute('CREATE INDEX items_status_idx ON items (status)')
  275. it = itertools.chain(*(i._gen() for i in self._itemClasses))
  276. while True:
  277. values = tuple(itertools.islice(it, 100000))
  278. if not values:
  279. break
  280. with db:
  281. db.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values)