A framework for quick web archiving
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

374 řádky
14 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import itertools
  12. import logging
  13. import os
  14. import random
  15. import sqlite3
  16. import yarl
  17. class Item:
  18. itemType = None
  19. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  20. self.qwarcObj = qwarcObj
  21. self.itemValue = itemValue
  22. self.session = session
  23. self.headers = headers
  24. self.warc = warc
  25. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  26. self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
  27. self.childItems = []
  28. async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True):
  29. '''
  30. HTTP GET or POST a URL
  31. url: str or yarl.URL
  32. responseHandler: a callable that determines how the response is handled. See qwarc.utils.handle_response_default for details.
  33. method: str, must be 'GET' or 'POST'
  34. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  35. headers: list of 2-tuples, additional headers for this request only
  36. Returns response (a ClientResponse object or None) and history (a tuple of (response, exception) tuples).
  37. response can be None and history can be an empty tuple, depending on the circumstances (e.g. timeouts).
  38. '''
  39. #TODO: Rewrite using 'async with self.session.get'
  40. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  41. assert method in ('GET', 'POST'), 'method must be GET or POST'
  42. headers = self.headers + headers
  43. #TODO Deduplicate headers with later values overriding earlier ones
  44. history = []
  45. attempt = 0
  46. #TODO redirectLevel
  47. while True:
  48. attempt += 1
  49. response = None
  50. exc = None
  51. action = ACTION_RETRY
  52. writeToWarc = True
  53. try:
  54. try:
  55. with _aiohttp.Timeout(60):
  56. self.logger.info(f'Fetching {url}')
  57. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  58. try:
  59. ret = await response.read()
  60. except:
  61. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  62. response.close()
  63. raise
  64. else:
  65. tx = len(response.rawRequestData)
  66. rx = len(response.rawResponseData)
  67. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  68. self.stats['tx'] += tx
  69. self.stats['rx'] += rx
  70. self.stats['requests'] += 1
  71. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  72. self.logger.error(f'Request for {url} failed: {e!r}')
  73. action, writeToWarc = await responseHandler(url, attempt, response, e)
  74. exc = e # Pass the exception outward for the history
  75. else:
  76. action, writeToWarc = await responseHandler(url, attempt, response, None)
  77. history.append((response, exc))
  78. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  79. return response, tuple(history)
  80. elif action == ACTION_FOLLOW_OR_SUCCESS:
  81. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  82. if not redirectUrl:
  83. return response, tuple(history)
  84. url = url.join(yarl.URL(redirectUrl))
  85. if response.status in (301, 302, 303) and method == 'POST':
  86. method = 'GET'
  87. data = None
  88. attempt = 0
  89. elif action == ACTION_RETRIES_EXCEEDED:
  90. self.logger.error(f'Request for {url} failed {attempt} times')
  91. return response, tuple(history)
  92. elif action == ACTION_RETRY:
  93. # Nothing to do, just go to the next cycle
  94. pass
  95. finally:
  96. if response:
  97. if writeToWarc:
  98. self.warc.write_client_response(response)
  99. await response.release()
  100. async def process(self):
  101. raise NotImplementedError
  102. @classmethod
  103. def generate(cls):
  104. yield from () # Generate no items by default
  105. @classmethod
  106. def _gen(cls):
  107. for x in cls.generate():
  108. yield (cls.itemType, x, STATUS_TODO)
  109. def add_subitem(self, itemClassOrType, itemValue):
  110. if issubclass(itemClassOrType, Item):
  111. item = (itemClassOrType.itemType, itemValue)
  112. else:
  113. item = (itemClassOrType, itemValue)
  114. if item not in self.childItems:
  115. self.childItems.append(item)
  116. async def flush_subitems(self):
  117. await self.qwarcObj.flush_subitems(self)
  118. def clear_subitems(self):
  119. self.childItems = []
  120. @classmethod
  121. def get_subclasses(cls):
  122. for subclass in cls.__subclasses__():
  123. yield subclass
  124. yield from subclass.get_subclasses()
  125. class QWARC:
  126. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  127. '''
  128. itemClasses: iterable of Item
  129. warcBasePath: str, base name of the WARC files
  130. dbPath: str, path to the sqlite3 database file
  131. command: list, the command line used to invoke qwarc
  132. specFile: str, path to the spec file
  133. specDependencies: qwarc.utils.SpecDependencies
  134. logFilename: str, name of the log file written by this process
  135. concurrency: int, number of concurrently processed items
  136. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  137. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  138. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  139. '''
  140. self._itemClasses = itemClasses
  141. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  142. self._warcBasePath = warcBasePath
  143. self._dbPath = dbPath
  144. self._command = command
  145. self._specFile = specFile
  146. self._specDependencies = specDependencies
  147. self._logFilename = logFilename
  148. self._concurrency = concurrency
  149. self._memoryLimit = memoryLimit
  150. self._minFreeDisk = minFreeDisk
  151. self._warcSizeLimit = warcSizeLimit
  152. self._warcDedupe = warcDedupe
  153. self._reset_working_vars()
  154. def _reset_working_vars(self):
  155. # Working variables
  156. self._db = None
  157. self._tasks = set()
  158. self._sleepTasks = set()
  159. self._sessions = [] # aiohttp.ClientSession instances
  160. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  161. self._warc = None
  162. async def obtain_exclusive_db_lock(self):
  163. c = self._db.cursor()
  164. while True:
  165. try:
  166. c.execute('BEGIN EXCLUSIVE')
  167. break
  168. except sqlite3.OperationalError as e:
  169. if str(e) != 'database is locked':
  170. raise
  171. await asyncio.sleep(1)
  172. return c
  173. def _make_item(self, itemType, itemValue, session, headers):
  174. try:
  175. itemClass = self._itemTypeMap[itemType]
  176. except KeyError:
  177. raise RuntimeError(f'No such item type: {itemType!r}')
  178. return itemClass(self, itemValue, session, headers, self._warc)
  179. async def _wait_for_free_task(self):
  180. if not self._tasks:
  181. return
  182. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  183. for future in done:
  184. # TODO Replace all of this with `if future.cancelled():`
  185. try:
  186. await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures...
  187. except concurrent.futures.CancelledError as e:
  188. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  189. if isinstance(future, asyncio.Task):
  190. if future.taskType == 'process_item':
  191. logging.warning(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  192. elif future.taskType == 'sleep':
  193. self._sleepTasks.remove(future)
  194. continue
  195. if future.taskType == 'sleep':
  196. # Dummy task for empty todo list, see below.
  197. self._sleepTasks.remove(future)
  198. continue
  199. item = future.item
  200. logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  201. cursor = await self.obtain_exclusive_db_lock()
  202. try:
  203. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_DONE, future.id))
  204. cursor.execute('COMMIT')
  205. except:
  206. cursor.execute('ROLLBACK')
  207. raise
  208. await self._insert_subitems(item)
  209. self._freeSessions.append(item.session)
  210. self._tasks = pending
  211. async def _insert_subitems(self, item):
  212. cursor = await self.obtain_exclusive_db_lock()
  213. try:
  214. if item.childItems:
  215. it = iter(item.childItems)
  216. while True:
  217. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  218. if not values:
  219. break
  220. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  221. cursor.execute('COMMIT')
  222. except:
  223. cursor.execute('ROLLBACK')
  224. raise
  225. async def run(self, loop):
  226. headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere
  227. for i in range(self._concurrency):
  228. session = _aiohttp.ClientSession(
  229. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  230. request_class = qwarc.aiohttp.ClientRequest,
  231. response_class = qwarc.aiohttp.ClientResponse,
  232. skip_auto_headers = ['Accept-Encoding'],
  233. loop = loop
  234. )
  235. self._sessions.append(session)
  236. self._freeSessions.append(session)
  237. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  238. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  239. self._db.isolation_level = None # Transactions are handled manually below.
  240. self._db.execute('PRAGMA synchronous = OFF')
  241. try:
  242. while True:
  243. while len(self._tasks) >= self._concurrency:
  244. await self._wait_for_free_task()
  245. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  246. logging.info('Disk space is low, sleeping')
  247. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  248. sleepTask.taskType = 'sleep'
  249. self._tasks.add(sleepTask)
  250. self._sleepTasks.add(sleepTask)
  251. continue
  252. if os.path.exists('STOP'):
  253. logging.info('Gracefully shutting down due to STOP file')
  254. break
  255. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  256. logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  257. break
  258. cursor = await self.obtain_exclusive_db_lock()
  259. try:
  260. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  261. result = cursor.fetchone()
  262. if not result:
  263. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  264. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  265. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  266. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  267. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  268. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  269. sleepTask.taskType = 'sleep'
  270. self._tasks.add(sleepTask)
  271. self._sleepTasks.add(sleepTask)
  272. cursor.execute('COMMIT')
  273. continue
  274. else:
  275. # Really nothing to do anymore
  276. #TODO: Another process may be running create_db, in which case we'd still want to wait...
  277. # create_db could insert a dummy item which is marked as done when the DB is ready
  278. cursor.execute('COMMIT')
  279. break
  280. id, itemType, itemValue, status = result
  281. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
  282. cursor.execute('COMMIT')
  283. except:
  284. cursor.execute('ROLLBACK')
  285. raise
  286. session = self._freeSessions.popleft()
  287. item = self._make_item(itemType, itemValue, session, headers)
  288. task = asyncio.ensure_future(item.process())
  289. #TODO: Is there a better way to add custom information to a task/coroutine object?
  290. task.taskType = 'process'
  291. task.id = id
  292. task.itemType = itemType
  293. task.itemValue = itemValue
  294. task.item = item
  295. self._tasks.add(task)
  296. for sleepTask in self._sleepTasks:
  297. sleepTask.cancel()
  298. while len(self._tasks):
  299. await self._wait_for_free_task()
  300. logging.info('Done')
  301. except (Exception, KeyboardInterrupt) as e:
  302. # Kill all tasks
  303. for task in self._tasks:
  304. task.cancel()
  305. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  306. raise
  307. finally:
  308. for session in self._sessions:
  309. session.close()
  310. self._warc.close()
  311. self._db.close()
  312. self._reset_working_vars()
  313. async def flush_subitems(self, item):
  314. await self._insert_subitems(item)
  315. item.clear_subitems()
  316. def create_db(self):
  317. db = sqlite3.connect(self._dbPath, timeout = 1)
  318. db.execute('PRAGMA synchronous = OFF')
  319. with db:
  320. db.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  321. db.execute('CREATE INDEX items_status_idx ON items (status)')
  322. db.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  323. it = itertools.chain(*(i._gen() for i in self._itemClasses))
  324. while True:
  325. values = tuple(itertools.islice(it, 100000))
  326. if not values:
  327. break
  328. with db:
  329. db.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values)