A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
14 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import io
  12. import itertools
  13. import logging
  14. import os
  15. import random
  16. import sqlite3
  17. import urllib.parse
  18. import yarl
  19. class Item:
  20. itemType = None
  21. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  22. self.qwarcObj = qwarcObj
  23. self.itemValue = itemValue
  24. self.session = session
  25. self.headers = headers
  26. self.warc = warc
  27. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  28. self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
  29. self.childItems = []
  30. async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60):
  31. '''
  32. HTTP GET or POST a URL
  33. url: str or yarl.URL
  34. responseHandler: a callable that determines how the response is handled. See qwarc.utils.handle_response_default for details.
  35. method: str, must be 'GET' or 'POST'
  36. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  37. headers: list of 2-tuples, additional headers for this request only
  38. Returns response (a ClientResponse object or None) and history (a tuple of (response, exception) tuples).
  39. response can be None and history can be an empty tuple, depending on the circumstances (e.g. timeouts).
  40. '''
  41. #TODO: Rewrite using 'async with self.session.get'
  42. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  43. assert method in ('GET', 'POST', 'HEAD'), 'method must be GET, POST, or HEAD'
  44. headers = self.headers + headers
  45. #TODO Deduplicate headers with later values overriding earlier ones
  46. history = []
  47. attempt = 0
  48. #TODO redirectLevel
  49. while True:
  50. attempt += 1
  51. response = None
  52. exc = None
  53. action = ACTION_RETRY
  54. writeToWarc = True
  55. try:
  56. try:
  57. with _aiohttp.Timeout(timeout):
  58. self.logger.info(f'Fetching {url}')
  59. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  60. try:
  61. while True:
  62. ret = await response.content.read(1048576)
  63. if not ret:
  64. break
  65. except:
  66. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  67. response.close()
  68. raise
  69. else:
  70. response.rawRequestData.seek(0, io.SEEK_END)
  71. tx = response.rawRequestData.tell()
  72. response.rawResponseData.seek(0, io.SEEK_END)
  73. rx = response.rawResponseData.tell()
  74. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  75. self.stats['tx'] += tx
  76. self.stats['rx'] += rx
  77. self.stats['requests'] += 1
  78. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  79. self.logger.warning(f'Request for {url} failed: {e!r}')
  80. action, writeToWarc = await responseHandler(url, attempt, response, e)
  81. exc = e # Pass the exception outward for the history
  82. else:
  83. action, writeToWarc = await responseHandler(url, attempt, response, None)
  84. if response and exc is None and writeToWarc:
  85. self.warc.write_client_response(response)
  86. history.append((response, exc))
  87. retResponse = response if exc is None else None
  88. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  89. return retResponse, tuple(history)
  90. elif action == ACTION_FOLLOW_OR_SUCCESS:
  91. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  92. if not redirectUrl:
  93. return retResponse, tuple(history)
  94. if any(56448 <= ord(c) <= 56575 for c in redirectUrl):
  95. # Surrogate escape characters in the redirect URL, which usually means that the server sent non-ASCII data (e.g. ISO-8859-1).
  96. # Revert the encoding, then percent-encode the non-ASCII bytes.
  97. redirectUrl = urllib.parse.quote_from_bytes(redirectUrl.encode('utf8', 'surrogateescape'), safe = ''.join(chr(i) for i in range(128)))
  98. url = url.join(yarl.URL(redirectUrl))
  99. if response.status in (301, 302, 303) and method == 'POST':
  100. method = 'GET'
  101. data = None
  102. attempt = 0
  103. elif action == ACTION_RETRIES_EXCEEDED:
  104. self.logger.error(f'Request for {url} failed {attempt} times')
  105. return retResponse, tuple(history)
  106. elif action == ACTION_RETRY:
  107. # Nothing to do, just go to the next cycle
  108. pass
  109. finally:
  110. if response:
  111. await response.release()
  112. async def process(self):
  113. raise NotImplementedError
  114. @classmethod
  115. def generate(cls):
  116. yield from () # Generate no items by default
  117. @classmethod
  118. def _gen(cls):
  119. for x in cls.generate():
  120. yield (cls.itemType, x, STATUS_TODO)
  121. def add_subitem(self, itemClassOrType, itemValue):
  122. if issubclass(itemClassOrType, Item):
  123. item = (itemClassOrType.itemType, itemValue)
  124. else:
  125. item = (itemClassOrType, itemValue)
  126. if item not in self.childItems:
  127. self.childItems.append(item)
  128. async def flush_subitems(self):
  129. await self.qwarcObj.flush_subitems(self)
  130. def clear_subitems(self):
  131. self.childItems = []
  132. @classmethod
  133. def get_subclasses(cls):
  134. for subclass in cls.__subclasses__():
  135. yield subclass
  136. yield from subclass.get_subclasses()
  137. class QWARC:
  138. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  139. '''
  140. itemClasses: iterable of Item
  141. warcBasePath: str, base name of the WARC files
  142. dbPath: str, path to the sqlite3 database file
  143. command: list, the command line used to invoke qwarc
  144. specFile: str, path to the spec file
  145. specDependencies: qwarc.utils.SpecDependencies
  146. logFilename: str, name of the log file written by this process
  147. concurrency: int, number of concurrently processed items
  148. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  149. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  150. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  151. '''
  152. self._itemClasses = itemClasses
  153. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  154. self._warcBasePath = warcBasePath
  155. self._dbPath = dbPath
  156. self._command = command
  157. self._specFile = specFile
  158. self._specDependencies = specDependencies
  159. self._logFilename = logFilename
  160. self._concurrency = concurrency
  161. self._memoryLimit = memoryLimit
  162. self._minFreeDisk = minFreeDisk
  163. self._warcSizeLimit = warcSizeLimit
  164. self._warcDedupe = warcDedupe
  165. self._reset_working_vars()
  166. def _reset_working_vars(self):
  167. # Working variables
  168. self._db = None
  169. self._tasks = set()
  170. self._sleepTasks = set()
  171. self._sessions = [] # aiohttp.ClientSession instances
  172. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  173. self._warc = None
  174. async def obtain_exclusive_db_lock(self):
  175. c = self._db.cursor()
  176. while True:
  177. try:
  178. c.execute('BEGIN EXCLUSIVE')
  179. break
  180. except sqlite3.OperationalError as e:
  181. if str(e) != 'database is locked':
  182. raise
  183. await asyncio.sleep(1)
  184. return c
  185. def _make_item(self, itemType, itemValue, session, headers):
  186. try:
  187. itemClass = self._itemTypeMap[itemType]
  188. except KeyError:
  189. raise RuntimeError(f'No such item type: {itemType!r}')
  190. return itemClass(self, itemValue, session, headers, self._warc)
  191. async def _wait_for_free_task(self):
  192. if not self._tasks:
  193. return
  194. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  195. for future in done:
  196. newStatus = STATUS_DONE
  197. if future.taskType == 'sleep':
  198. self._sleepTasks.remove(future)
  199. elif future.taskType == 'process':
  200. item = future.item
  201. # TODO Replace all of this with `if future.cancelled():`
  202. try:
  203. await future #TODO: Is this actually necessary? asyncio.wait only returns 'done' futures...
  204. except concurrent.futures.CancelledError as e:
  205. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  206. if future.taskType == 'process':
  207. logging.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  208. newStatus = STATUS_ERROR
  209. except Exception as e:
  210. if future.taskType == 'process':
  211. logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
  212. newStatus = STATUS_ERROR
  213. else:
  214. if future.taskType == 'process':
  215. logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  216. if future.taskType != 'process':
  217. continue
  218. cursor = await self.obtain_exclusive_db_lock()
  219. try:
  220. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
  221. cursor.execute('COMMIT')
  222. except:
  223. cursor.execute('ROLLBACK')
  224. raise
  225. await self._insert_subitems(item)
  226. self._freeSessions.append(item.session)
  227. self._tasks = pending
  228. async def _insert_subitems(self, item):
  229. cursor = await self.obtain_exclusive_db_lock()
  230. try:
  231. if item.childItems:
  232. it = iter(item.childItems)
  233. while True:
  234. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  235. if not values:
  236. break
  237. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  238. cursor.execute('COMMIT')
  239. except:
  240. cursor.execute('ROLLBACK')
  241. raise
  242. async def run(self, loop):
  243. headers = [('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0')] #TODO: Move elsewhere
  244. for i in range(self._concurrency):
  245. session = _aiohttp.ClientSession(
  246. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  247. request_class = qwarc.aiohttp.ClientRequest,
  248. response_class = qwarc.aiohttp.ClientResponse,
  249. skip_auto_headers = ['Accept-Encoding'],
  250. loop = loop
  251. )
  252. self._sessions.append(session)
  253. self._freeSessions.append(session)
  254. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  255. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  256. self._db.isolation_level = None # Transactions are handled manually below.
  257. self._db.execute('PRAGMA synchronous = OFF')
  258. try:
  259. while True:
  260. while len(self._tasks) >= self._concurrency:
  261. await self._wait_for_free_task()
  262. if os.path.exists('STOP'):
  263. logging.info('Gracefully shutting down due to STOP file')
  264. break
  265. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  266. logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  267. break
  268. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  269. logging.info('Disk space is low, sleeping')
  270. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  271. sleepTask.taskType = 'sleep'
  272. self._tasks.add(sleepTask)
  273. self._sleepTasks.add(sleepTask)
  274. continue
  275. cursor = await self.obtain_exclusive_db_lock()
  276. try:
  277. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  278. result = cursor.fetchone()
  279. if not result:
  280. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  281. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  282. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  283. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  284. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  285. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  286. sleepTask.taskType = 'sleep'
  287. self._tasks.add(sleepTask)
  288. self._sleepTasks.add(sleepTask)
  289. cursor.execute('COMMIT')
  290. continue
  291. else:
  292. # Really nothing to do anymore
  293. #TODO: Another process may be running create_db, in which case we'd still want to wait...
  294. # create_db could insert a dummy item which is marked as done when the DB is ready
  295. cursor.execute('COMMIT')
  296. break
  297. id, itemType, itemValue, status = result
  298. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
  299. cursor.execute('COMMIT')
  300. except:
  301. cursor.execute('ROLLBACK')
  302. raise
  303. session = self._freeSessions.popleft()
  304. item = self._make_item(itemType, itemValue, session, headers)
  305. task = asyncio.ensure_future(item.process())
  306. #TODO: Is there a better way to add custom information to a task/coroutine object?
  307. task.taskType = 'process'
  308. task.id = id
  309. task.itemType = itemType
  310. task.itemValue = itemValue
  311. task.item = item
  312. self._tasks.add(task)
  313. for sleepTask in self._sleepTasks:
  314. sleepTask.cancel()
  315. while len(self._tasks):
  316. await self._wait_for_free_task()
  317. logging.info('Done')
  318. except (Exception, KeyboardInterrupt) as e:
  319. # Kill all tasks
  320. for task in self._tasks:
  321. task.cancel()
  322. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  323. raise
  324. finally:
  325. for session in self._sessions:
  326. session.close()
  327. self._warc.close()
  328. self._db.close()
  329. self._reset_working_vars()
  330. async def flush_subitems(self, item):
  331. await self._insert_subitems(item)
  332. item.clear_subitems()
  333. def create_db(self):
  334. db = sqlite3.connect(self._dbPath, timeout = 1)
  335. db.execute('PRAGMA synchronous = OFF')
  336. with db:
  337. db.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  338. db.execute('CREATE INDEX items_status_idx ON items (status)')
  339. db.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  340. it = itertools.chain(*(i._gen() for i in self._itemClasses))
  341. while True:
  342. values = tuple(itertools.islice(it, 100000))
  343. if not values:
  344. break
  345. with db:
  346. db.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values)