A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

468 lines
17 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import contextlib
  12. import io
  13. import itertools
  14. import logging
  15. import os
  16. import random
  17. import sqlite3
  18. import yarl
  19. class Item:
  20. itemType = None
  21. defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default)
  22. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  23. self.qwarcObj = qwarcObj
  24. self.itemValue = itemValue
  25. self.session = session
  26. self.headers = headers
  27. self.warc = warc
  28. if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__
  29. self._baseUrl = None
  30. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  31. self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
  32. self.childItems = []
  33. @property
  34. def baseUrl(self):
  35. return self._baseUrl
  36. @baseUrl.setter
  37. def baseUrl(self, baseUrl):
  38. if baseUrl is None:
  39. self._baseUrl = None
  40. elif isinstance(baseUrl, yarl.URL):
  41. self._baseUrl = baseUrl
  42. else:
  43. self._baseUrl = yarl.URL(baseUrl)
  44. def _merge_headers(self, headers, extraHeaders = []):
  45. d = {} # Preserves order from Python 3.7 (guaranteed) or CPython 3.6 (implementation detail)
  46. keys = {} # casefolded key -> d key
  47. for key, value in itertools.chain(self.headers, extraHeaders, headers):
  48. keyc = key.casefold()
  49. if value is None:
  50. if keyc in keys:
  51. del d[keys[keyc]]
  52. del keys[keyc]
  53. else:
  54. if keyc in keys and key != keys[keyc]:
  55. del d[keys[keyc]]
  56. d[key] = value
  57. keys[keyc] = key
  58. out = []
  59. for key, value in d.items():
  60. if isinstance(value, tuple):
  61. for value_ in value:
  62. out.append((key, value_))
  63. else:
  64. out.append((key, value))
  65. return out
  66. async def fetch(self, url, responseHandler = None, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60, fromResponse = None):
  67. '''
  68. HTTP GET or POST a URL
  69. url: str or yarl.URL; if this is not a complete URL, it is evaluated relative to self.baseUrl
  70. responseHandler: None or a callable that determines how the response is handled; if None, self.defaultResponseHandler is used. See qwarc.utils.handle_response_default for details.
  71. method: str, must be 'GET' or 'POST'
  72. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  73. headers: list of 2-tuples, additional or overriding headers for this request only
  74. To remove one of the default headers, pass a value of None.
  75. If a header appears multiple times, only the last one is used. To send a header multiple times, pass a tuple of values.
  76. verify_ssl: bool, whether the SSL/TLS certificate should be validated
  77. timeout: int or float, how long the fetch may take at most in total (sending request until finishing reading the response)
  78. fromResponse: ClientResponse or None; if provided, use fromResponse.url for the url completion (instead of self.baseUrl) and add it as a Referer header
  79. Returns response (a ClientResponse object or a qwarc.utils.DummyClientResponse object)
  80. '''
  81. #TODO: Rewrite using 'async with self.session.get'
  82. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  83. if not url.scheme or not url.host:
  84. if fromResponse is not None:
  85. url = fromResponse.url.join(url)
  86. elif not self.baseUrl:
  87. raise ValueError('Incomplete URL and no baseUrl to join it with')
  88. else:
  89. url = self.baseUrl.join(url)
  90. originalUrl = url
  91. if responseHandler is None:
  92. responseHandler = self.defaultResponseHandler
  93. assert method in ('GET', 'POST'), 'method must be GET or POST'
  94. headers = self._merge_headers(headers, extraHeaders = [('Referer', str(fromResponse.url))] if fromResponse is not None else [])
  95. history = []
  96. attempt = 0
  97. redirectLevel = 0
  98. while True:
  99. attempt += 1
  100. response = None
  101. exc = None
  102. action = ACTION_RETRY
  103. writeToWarc = True
  104. try:
  105. try:
  106. with _aiohttp.Timeout(timeout):
  107. self.logger.info(f'Fetching {url}')
  108. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  109. try:
  110. while True:
  111. ret = await response.content.read(1048576)
  112. if not ret:
  113. break
  114. except:
  115. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  116. response.close()
  117. raise
  118. else:
  119. response.rawRequestData.seek(0, io.SEEK_END)
  120. tx = response.rawRequestData.tell()
  121. response.rawResponseData.seek(0, io.SEEK_END)
  122. rx = response.rawResponseData.tell()
  123. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  124. self.stats['tx'] += tx
  125. self.stats['rx'] += rx
  126. self.stats['requests'] += 1
  127. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  128. self.logger.warning(f'Request for {url} failed: {e!r}')
  129. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = e, redirectLevel = redirectLevel, item = self)
  130. exc = e # Pass the exception outward for the history
  131. else:
  132. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self)
  133. if response and exc is None and writeToWarc:
  134. self.warc.write_client_response(response)
  135. history.append((response, exc))
  136. retResponse = response if exc is None else qwarc.utils.DummyClientResponse()
  137. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  138. retResponse.qhistory = tuple(history)
  139. return retResponse
  140. elif action == ACTION_FOLLOW_OR_SUCCESS:
  141. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  142. if not redirectUrl:
  143. retResponse.qhistory = tuple(history)
  144. return retResponse
  145. url = url.join(yarl.URL(redirectUrl))
  146. if response.status in (301, 302, 303) and method == 'POST':
  147. method = 'GET'
  148. data = None
  149. attempt = 0
  150. redirectLevel += 1
  151. elif action == ACTION_RETRIES_EXCEEDED:
  152. self.logger.error(f'Request for {url} failed {attempt} times')
  153. retResponse.qhistory = tuple(history)
  154. return retResponse
  155. elif action == ACTION_TOO_MANY_REDIRECTS:
  156. self.logger.error(f'Request for {url} (from {originalUrl}) exceeded redirect limit')
  157. retResponse.qhistory = tuple(history)
  158. return retResponse
  159. elif action == ACTION_RETRY:
  160. # Nothing to do, just go to the next cycle
  161. pass
  162. finally:
  163. if response:
  164. await response.release()
  165. async def process(self):
  166. raise NotImplementedError
  167. @classmethod
  168. def generate(cls):
  169. yield from () # Generate no items by default
  170. def add_subitem(self, itemClassOrType, itemValue):
  171. if issubclass(itemClassOrType, Item):
  172. item = (itemClassOrType.itemType, itemValue)
  173. else:
  174. item = (itemClassOrType, itemValue)
  175. if item not in self.childItems:
  176. self.childItems.append(item)
  177. async def flush_subitems(self):
  178. await self.qwarcObj.flush_subitems(self)
  179. def clear_subitems(self):
  180. self.childItems = []
  181. @classmethod
  182. def get_subclasses(cls):
  183. for subclass in cls.__subclasses__():
  184. yield subclass
  185. yield from subclass.get_subclasses()
  186. def __repr__(self):
  187. return f'<{type(self).__module__}.{type(self).__name__} object {id(self)}, itemType = {self.itemType!r}, itemValue = {self.itemValue!r}>'
  188. # QWARC._closed values
  189. _STATE_RUNNING = 1
  190. _STATE_CLOSING = 2
  191. _STATE_CLOSED = 3
  192. class QWARC:
  193. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  194. '''
  195. itemClasses: iterable of Item
  196. warcBasePath: str, base name of the WARC files
  197. dbPath: str, path to the sqlite3 database file
  198. command: list, the command line used to invoke qwarc
  199. specFile: str, path to the spec file
  200. specDependencies: qwarc.utils.SpecDependencies
  201. logFilename: str, name of the log file written by this process
  202. concurrency: int, number of concurrently processed items
  203. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  204. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  205. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  206. '''
  207. self._itemClasses = itemClasses
  208. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  209. self._warcBasePath = warcBasePath
  210. self._dbPath = dbPath
  211. self._command = command
  212. self._specFile = specFile
  213. self._specDependencies = specDependencies
  214. self._logFilename = logFilename
  215. self._concurrency = concurrency
  216. self._memoryLimit = memoryLimit
  217. self._minFreeDisk = minFreeDisk
  218. self._warcSizeLimit = warcSizeLimit
  219. self._warcDedupe = warcDedupe
  220. self._reset_working_vars()
  221. def _reset_working_vars(self):
  222. # Working variables
  223. self._db = None
  224. self._tasks = set()
  225. self._sleepTasks = set()
  226. self._sessions = [] # aiohttp.ClientSession instances
  227. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  228. self._warc = None
  229. self._closed = _STATE_CLOSED
  230. @contextlib.asynccontextmanager
  231. async def exclusive_db_lock(self):
  232. c = self._db.cursor()
  233. while True:
  234. try:
  235. c.execute('BEGIN EXCLUSIVE')
  236. break
  237. except sqlite3.OperationalError as e:
  238. if str(e) != 'database is locked':
  239. raise
  240. await asyncio.sleep(1)
  241. try:
  242. yield c
  243. c.execute('COMMIT')
  244. except:
  245. c.execute('ROLLBACK')
  246. raise
  247. def _make_item(self, itemType, itemValue, session, headers):
  248. try:
  249. itemClass = self._itemTypeMap[itemType]
  250. except KeyError:
  251. raise RuntimeError(f'No such item type: {itemType!r}')
  252. return itemClass(self, itemValue, session, headers, self._warc)
  253. async def _wait_for_free_task(self):
  254. if not self._tasks:
  255. return
  256. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  257. for future in done:
  258. newStatus = STATUS_DONE
  259. if future.taskType == 'sleep':
  260. self._sleepTasks.remove(future)
  261. elif future.taskType == 'process':
  262. item = future.item
  263. try:
  264. future.result()
  265. except asyncio.CancelledError as e:
  266. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  267. if future.taskType == 'process':
  268. logging.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  269. newStatus = STATUS_ERROR
  270. except Exception as e:
  271. if future.taskType == 'process':
  272. logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
  273. newStatus = STATUS_ERROR
  274. else:
  275. if future.taskType == 'process':
  276. logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  277. if future.taskType != 'process':
  278. continue
  279. async with self.exclusive_db_lock() as cursor:
  280. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
  281. await self._insert_subitems(item)
  282. self._freeSessions.append(item.session)
  283. self._tasks = pending
  284. async def _insert_subitems(self, item):
  285. if item.childItems:
  286. async with self.exclusive_db_lock() as cursor:
  287. it = iter(item.childItems)
  288. while True:
  289. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  290. if not values:
  291. break
  292. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  293. async def run(self, loop):
  294. self._closed = _STATE_RUNNING
  295. for i in range(self._concurrency):
  296. session = _aiohttp.ClientSession(
  297. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  298. request_class = qwarc.aiohttp.ClientRequest,
  299. response_class = qwarc.aiohttp.ClientResponse,
  300. loop = loop
  301. )
  302. self._sessions.append(session)
  303. self._freeSessions.append(session)
  304. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  305. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  306. self._db.isolation_level = None # Transactions are handled manually below.
  307. # Setting the synchronous PRAGMA is not possible if the DB is currently locked by another process but also can't be done inside a transaction... So just retry until it succeeds.
  308. while True:
  309. try:
  310. self._db.execute('PRAGMA synchronous = OFF')
  311. except sqlite3.OperationalError as e:
  312. if str(e) != 'database is locked':
  313. raise
  314. await asyncio.sleep(1)
  315. else:
  316. break
  317. async with self.exclusive_db_lock() as cursor:
  318. cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"')
  319. result = cursor.fetchone()
  320. if not result:
  321. self._create_db(cursor)
  322. self._insert_generated_items(cursor)
  323. try:
  324. while True:
  325. while len(self._tasks) >= self._concurrency:
  326. await self._wait_for_free_task()
  327. if self._closed == _STATE_CLOSING:
  328. break
  329. if os.path.exists('STOP'):
  330. logging.info('Gracefully shutting down due to STOP file')
  331. break
  332. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  333. logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  334. break
  335. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  336. logging.info('Disk space is low, sleeping')
  337. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  338. sleepTask.taskType = 'sleep'
  339. self._tasks.add(sleepTask)
  340. self._sleepTasks.add(sleepTask)
  341. continue
  342. async with self.exclusive_db_lock() as cursor:
  343. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  344. result = cursor.fetchone()
  345. if not result:
  346. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  347. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  348. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  349. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  350. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  351. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  352. sleepTask.taskType = 'sleep'
  353. self._tasks.add(sleepTask)
  354. self._sleepTasks.add(sleepTask)
  355. continue
  356. else:
  357. # Really nothing to do anymore
  358. break
  359. id_, itemType, itemValue, status = result
  360. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id_))
  361. session = self._freeSessions.popleft()
  362. item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)
  363. task = asyncio.ensure_future(item.process())
  364. #TODO: Is there a better way to add custom information to a task/coroutine object?
  365. task.taskType = 'process'
  366. task.id = id_
  367. task.itemType = itemType
  368. task.itemValue = itemValue
  369. task.item = item
  370. self._tasks.add(task)
  371. for sleepTask in self._sleepTasks:
  372. sleepTask.cancel()
  373. while len(self._tasks):
  374. await self._wait_for_free_task()
  375. logging.info('Done')
  376. except (Exception, KeyboardInterrupt) as e:
  377. # Kill all tasks
  378. for task in self._tasks:
  379. task.cancel()
  380. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  381. self._tasks.clear()
  382. raise
  383. finally:
  384. await self.close()
  385. async def flush_subitems(self, item):
  386. await self._insert_subitems(item)
  387. item.clear_subitems()
  388. def _create_db(self, cursor):
  389. cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  390. cursor.execute('CREATE INDEX items_status_idx ON items (status)')
  391. cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  392. def _insert_generated_items(self, cursor):
  393. it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate())
  394. while True:
  395. values = tuple(itertools.islice(it, 100000))
  396. if not values:
  397. break
  398. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  399. async def close(self):
  400. if self._closed in (_STATE_CLOSING, _STATE_CLOSED):
  401. return
  402. self._closed = _STATE_CLOSING
  403. if self._tasks:
  404. logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup')
  405. for task in self._tasks:
  406. task.cancel()
  407. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  408. self._tasks.clear()
  409. self._db.close()
  410. for session in self._sessions:
  411. session.close()
  412. self._warc.close()
  413. self._reset_working_vars() # Also resets self._closed