A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
20 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import contextlib
  12. import io
  13. import itertools
  14. import logging
  15. import os
  16. import random
  17. import sqlite3
  18. import yarl
  19. logger = logging.getLogger(__name__)
  20. class Item:
  21. itemType = None
  22. defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default)
  23. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  24. self.qwarcObj = qwarcObj
  25. self.itemValue = itemValue
  26. self.session = session
  27. self.headers = headers
  28. self.warc = warc
  29. if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__
  30. self._baseUrl = None
  31. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  32. self.logger = logging.LoggerAdapter(logging.getLogger(f'{type(self).__module__}.{type(self).__name__}'), {'itemType': self.itemType, 'itemValue': self.itemValue})
  33. self.childItems = []
  34. @property
  35. def baseUrl(self):
  36. return self._baseUrl
  37. @baseUrl.setter
  38. def baseUrl(self, baseUrl):
  39. self.logger.debug(f'Setting baseUrl to {baseUrl!r}')
  40. if baseUrl is None:
  41. self._baseUrl = None
  42. elif isinstance(baseUrl, yarl.URL):
  43. self._baseUrl = baseUrl
  44. else:
  45. self._baseUrl = yarl.URL(baseUrl)
  46. def _merge_headers(self, headers, extraHeaders = []):
  47. d = {} # Preserves order from Python 3.7 (guaranteed) or CPython 3.6 (implementation detail)
  48. keys = {} # casefolded key -> d key
  49. for key, value in itertools.chain(self.headers, extraHeaders, headers):
  50. keyc = key.casefold()
  51. if value is None:
  52. if keyc in keys:
  53. del d[keys[keyc]]
  54. del keys[keyc]
  55. else:
  56. if keyc in keys and key != keys[keyc]:
  57. del d[keys[keyc]]
  58. d[key] = value
  59. keys[keyc] = key
  60. out = []
  61. for key, value in d.items():
  62. if isinstance(value, tuple):
  63. for value_ in value:
  64. out.append((key, value_))
  65. else:
  66. out.append((key, value))
  67. return out
  68. async def fetch(self, url, *, responseHandler = None, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60, fromResponse = None):
  69. '''
  70. HTTP GET or POST a URL
  71. url: str or yarl.URL; if this is not a complete URL, it is evaluated relative to self.baseUrl
  72. responseHandler: None or a callable that determines how the response is handled; if None, self.defaultResponseHandler is used. See qwarc.utils.handle_response_default for details.
  73. method: str, must be 'GET' or 'POST'
  74. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  75. headers: list of 2-tuples, additional or overriding headers for this request only
  76. To remove one of the default headers, pass a value of None.
  77. If a header appears multiple times, only the last one is used. To send a header multiple times, pass a tuple of values.
  78. verify_ssl: bool, whether the SSL/TLS certificate should be validated
  79. timeout: int or float, how long the fetch may take at most in total (sending request until finishing reading the response)
  80. fromResponse: ClientResponse or None; if provided, use fromResponse.url for the url completion (instead of self.baseUrl) and add it as a Referer header
  81. Returns response (a ClientResponse object or a qwarc.utils.DummyClientResponse object)
  82. '''
  83. #TODO: Rewrite using 'async with self.session.get'
  84. if self.logger.isEnabledFor(logging.DEBUG):
  85. l = locals()
  86. modified = []
  87. for kwarg, kwdef in self.fetch.__kwdefaults__.items():
  88. if l[kwarg] != kwdef:
  89. modified.append(kwarg)
  90. if modified:
  91. self.logger.debug(f'fetch({url!r}) with ...')
  92. for kwarg in modified:
  93. self.logger.debug(f'... {kwarg} = {repr(l[kwarg]).replace(chr(10), " ")}') # aiohttp.ClientResponse's repr can contain newlines
  94. else:
  95. self.logger.debug(f'fetch({url!r})')
  96. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  97. if not url.scheme or not url.host:
  98. if fromResponse is not None:
  99. self.logger.debug(f'Joining incomplete URL {url} with fromResponse URL {fromResponse.url}')
  100. url = fromResponse.url.join(url)
  101. elif not self.baseUrl:
  102. raise ValueError('Incomplete URL and no baseUrl to join it with')
  103. else:
  104. self.logger.debug(f'Joining incomplete URL {url} with base URL {baseUrl}')
  105. url = self.baseUrl.join(url)
  106. originalUrl = url
  107. if responseHandler is None:
  108. responseHandler = self.defaultResponseHandler
  109. assert method in ('GET', 'POST'), 'method must be GET or POST'
  110. headers = self._merge_headers(headers, extraHeaders = [('Referer', str(fromResponse.url))] if fromResponse is not None else [])
  111. history = []
  112. attempt = 0
  113. redirectLevel = 0
  114. while True:
  115. attempt += 1
  116. response = None
  117. exc = None
  118. action = ACTION_RETRY
  119. writeToWarc = True
  120. self.logger.debug(f'Trying to fetch {url}, attempt {attempt}, redirect level {redirectLevel}')
  121. try:
  122. try:
  123. with _aiohttp.Timeout(timeout):
  124. self.logger.info(f'Fetching {url}')
  125. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  126. try:
  127. while True:
  128. ret = await response.content.read(1048576)
  129. if not ret:
  130. break
  131. except:
  132. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  133. response.close()
  134. raise
  135. else:
  136. response.rawRequestData.seek(0, io.SEEK_END)
  137. tx = response.rawRequestData.tell()
  138. response.rawResponseData.seek(0, io.SEEK_END)
  139. rx = response.rawResponseData.tell()
  140. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  141. self.stats['tx'] += tx
  142. self.stats['rx'] += rx
  143. self.stats['requests'] += 1
  144. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  145. self.logger.warning(f'Request for {url} failed: {e!r}')
  146. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = e, redirectLevel = redirectLevel, item = self)
  147. exc = e # Pass the exception outward for the history
  148. else:
  149. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self)
  150. self.logger.debug(f'Response handler result: {action}, {writeToWarc}')
  151. if response and exc is None and writeToWarc:
  152. self.warc.write_client_response(response)
  153. history.append((response, exc))
  154. retResponse = response if exc is None else qwarc.utils.DummyClientResponse()
  155. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  156. self.logger.debug(f'Fetch loop done')
  157. retResponse.qhistory = tuple(history)
  158. return retResponse
  159. elif action == ACTION_FOLLOW_OR_SUCCESS:
  160. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  161. if not redirectUrl:
  162. self.logger.debug('Got redirect but no redirect target, fetch loop done')
  163. retResponse.qhistory = tuple(history)
  164. return retResponse
  165. url = url.join(yarl.URL(redirectUrl))
  166. self.logger.debug(f'Next URL: {url}')
  167. if response.status in (301, 302, 303) and method == 'POST':
  168. self.logger.debug(f'{response.status} on POST, dropping to GET and clearing data')
  169. method = 'GET'
  170. data = None
  171. attempt = 0
  172. redirectLevel += 1
  173. elif action == ACTION_RETRIES_EXCEEDED:
  174. self.logger.error(f'Request for {url} failed {attempt} times')
  175. retResponse.qhistory = tuple(history)
  176. return retResponse
  177. elif action == ACTION_TOO_MANY_REDIRECTS:
  178. self.logger.error(f'Request for {url} (from {originalUrl}) exceeded redirect limit')
  179. retResponse.qhistory = tuple(history)
  180. return retResponse
  181. elif action == ACTION_RETRY:
  182. # Nothing to do, just go to the next cycle
  183. pass
  184. finally:
  185. if response:
  186. self.logger.debug('Releasing response')
  187. await response.release()
  188. async def process(self):
  189. raise NotImplementedError
  190. @classmethod
  191. def generate(cls):
  192. yield from () # Generate no items by default
  193. def add_subitem(self, itemClassOrType, itemValue):
  194. if issubclass(itemClassOrType, Item):
  195. item = (itemClassOrType.itemType, itemValue)
  196. else:
  197. item = (itemClassOrType, itemValue)
  198. self.logger.debug(f'Adding subitem {itemClassOrType!r} {itemValue!r}')
  199. if item not in self.childItems:
  200. self.childItems.append(item)
  201. async def flush_subitems(self):
  202. self.logger.debug('Flushing subitems')
  203. await self.qwarcObj.flush_subitems(self)
  204. def clear_subitems(self):
  205. self.logger.debug('Clearing subitems')
  206. self.childItems = []
  207. @classmethod
  208. def get_subclasses(cls):
  209. for subclass in cls.__subclasses__():
  210. yield subclass
  211. yield from subclass.get_subclasses()
  212. def __repr__(self):
  213. return f'<{type(self).__module__}.{type(self).__name__} object {id(self)}, itemType = {self.itemType!r}, itemValue = {self.itemValue!r}>'
  214. # QWARC._closed values
  215. _STATE_RUNNING = 1
  216. _STATE_CLOSING = 2
  217. _STATE_CLOSED = 3
  218. class QWARC:
  219. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  220. '''
  221. itemClasses: iterable of Item
  222. warcBasePath: str, base name of the WARC files
  223. dbPath: str, path to the sqlite3 database file
  224. command: list, the command line used to invoke qwarc
  225. specFile: str, path to the spec file
  226. specDependencies: qwarc.utils.SpecDependencies
  227. logFilename: str, name of the log file written by this process
  228. concurrency: int, number of concurrently processed items
  229. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  230. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  231. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  232. '''
  233. self._itemClasses = itemClasses
  234. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  235. self._warcBasePath = warcBasePath
  236. self._dbPath = dbPath
  237. self._command = command
  238. self._specFile = specFile
  239. self._specDependencies = specDependencies
  240. self._logFilename = logFilename
  241. self._concurrency = concurrency
  242. self._memoryLimit = memoryLimit
  243. self._minFreeDisk = minFreeDisk
  244. self._warcSizeLimit = warcSizeLimit
  245. self._warcDedupe = warcDedupe
  246. self._reset_working_vars()
  247. def _reset_working_vars(self):
  248. # Working variables
  249. self._db = None
  250. self._tasks = set()
  251. self._sleepTasks = set()
  252. self._sessions = [] # aiohttp.ClientSession instances
  253. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  254. self._warc = None
  255. self._closed = _STATE_CLOSED
  256. @contextlib.asynccontextmanager
  257. async def exclusive_db_lock(self):
  258. logger.debug('Attempting to obtain DB lock')
  259. c = self._db.cursor()
  260. while True:
  261. try:
  262. c.execute('BEGIN EXCLUSIVE')
  263. break
  264. except sqlite3.OperationalError as e:
  265. if str(e) != 'database is locked':
  266. raise
  267. await asyncio.sleep(1)
  268. logger.debug('Obtained DB lock')
  269. try:
  270. yield c
  271. logger.debug('Committing DB transaction')
  272. c.execute('COMMIT')
  273. except:
  274. logger.debug('Rolling back DB transaction')
  275. c.execute('ROLLBACK')
  276. raise
  277. def _make_item(self, itemType, itemValue, session, headers):
  278. logger.debug(f'Making item {itemType!r} {itemValue!r}')
  279. try:
  280. itemClass = self._itemTypeMap[itemType]
  281. except KeyError:
  282. raise RuntimeError(f'No such item type: {itemType!r}')
  283. return itemClass(self, itemValue, session, headers, self._warc)
  284. async def _wait_for_free_task(self):
  285. logger.debug('Waiting for task(s) to finish')
  286. if not self._tasks:
  287. logger.debug('No tasks to wait for')
  288. return
  289. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  290. logger.debug(f'{len(done)} task(s) finished, {len(pending)} remaining')
  291. for future in done:
  292. logger.debug(f'Finished {future.taskType} task: {future!r}')
  293. newStatus = STATUS_DONE
  294. if future.taskType == 'sleep':
  295. self._sleepTasks.remove(future)
  296. elif future.taskType == 'process':
  297. item = future.item
  298. logger.debug(f'Finished item: {item!r}')
  299. try:
  300. future.result()
  301. except asyncio.CancelledError as e:
  302. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  303. if future.taskType == 'process':
  304. logger.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  305. newStatus = STATUS_ERROR
  306. except Exception as e:
  307. if future.taskType == 'process':
  308. logger.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
  309. newStatus = STATUS_ERROR
  310. else:
  311. if future.taskType == 'process':
  312. logger.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  313. if future.taskType != 'process':
  314. continue
  315. async with self.exclusive_db_lock() as cursor:
  316. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
  317. await self._insert_subitems(item)
  318. self._freeSessions.append(item.session)
  319. self._tasks = pending
  320. async def _insert_subitems(self, item):
  321. logger.debug(f'Inserting {len(item.childItems)} subitem(s) from {item!r}')
  322. if item.childItems:
  323. async with self.exclusive_db_lock() as cursor:
  324. it = iter(item.childItems)
  325. while True:
  326. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  327. if not values:
  328. break
  329. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  330. async def run(self, loop):
  331. self._closed = _STATE_RUNNING
  332. logger.debug(f'Creating {self._concurrency} sessions')
  333. for i in range(self._concurrency):
  334. session = _aiohttp.ClientSession(
  335. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  336. request_class = qwarc.aiohttp.ClientRequest,
  337. response_class = qwarc.aiohttp.ClientResponse,
  338. loop = loop
  339. )
  340. self._sessions.append(session)
  341. self._freeSessions.append(session)
  342. logger.debug('Constructing WARC object')
  343. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  344. logger.debug('Connecting to the database')
  345. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  346. self._db.set_trace_callback(logging.getLogger(f'{__name__}[sqlite3]').debug)
  347. self._db.isolation_level = None # Transactions are handled manually below.
  348. # Setting the synchronous PRAGMA is not possible if the DB is currently locked by another process but also can't be done inside a transaction... So just retry until it succeeds.
  349. logger.debug('Setting synchronous PRAGMA')
  350. while True:
  351. try:
  352. self._db.execute('PRAGMA synchronous = OFF')
  353. except sqlite3.OperationalError as e:
  354. if str(e) != 'database is locked':
  355. raise
  356. await asyncio.sleep(1)
  357. else:
  358. break
  359. logger.debug('Set synchronous PRAGMA')
  360. async with self.exclusive_db_lock() as cursor:
  361. cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"')
  362. result = cursor.fetchone()
  363. if not result:
  364. self._create_db(cursor)
  365. self._insert_generated_items(cursor)
  366. try:
  367. while True:
  368. while len(self._tasks) >= self._concurrency:
  369. await self._wait_for_free_task()
  370. if self._closed == _STATE_CLOSING:
  371. logger.debug('Currently closing')
  372. break
  373. if os.path.exists('STOP'):
  374. logger.info('Gracefully shutting down due to STOP file')
  375. break
  376. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  377. logger.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  378. break
  379. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  380. logger.info('Disk space is low, sleeping')
  381. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  382. sleepTask.taskType = 'sleep'
  383. self._tasks.add(sleepTask)
  384. self._sleepTasks.add(sleepTask)
  385. continue
  386. logger.debug('Trying to get an item from the DB')
  387. async with self.exclusive_db_lock() as cursor:
  388. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  389. result = cursor.fetchone()
  390. if not result:
  391. logger.debug('Found no TODO item')
  392. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  393. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  394. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  395. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  396. logger.debug('There are incomplete items that may spawn further TODOs, starting sleep task')
  397. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  398. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  399. sleepTask.taskType = 'sleep'
  400. self._tasks.add(sleepTask)
  401. self._sleepTasks.add(sleepTask)
  402. continue
  403. else:
  404. # Really nothing to do anymore
  405. logger.debug('All items are done')
  406. break
  407. id_, itemType, itemValue, status = result
  408. logger.debug(f'Got item {itemType}:{itemValue} ({id_})')
  409. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id_))
  410. session = self._freeSessions.popleft()
  411. item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)
  412. task = asyncio.ensure_future(item.process())
  413. #TODO: Is there a better way to add custom information to a task/coroutine object?
  414. task.taskType = 'process'
  415. task.id = id_
  416. task.itemType = itemType
  417. task.itemValue = itemValue
  418. task.item = item
  419. self._tasks.add(task)
  420. logger.debug(f'Cancelling {len(self._sleepTasks)} sleep task(s)')
  421. for sleepTask in self._sleepTasks:
  422. sleepTask.cancel()
  423. logger.debug('Waiting for tasks to finish')
  424. while len(self._tasks):
  425. await self._wait_for_free_task()
  426. logger.info('Done')
  427. except (Exception, KeyboardInterrupt) as e:
  428. logger.debug(f'Got an exception, cancelling {len(self._tasks)} task(s)')
  429. for task in self._tasks:
  430. task.cancel()
  431. logger.debug('Waiting for tasks to get cancelled')
  432. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  433. self._tasks.clear()
  434. raise
  435. finally:
  436. await self.close()
  437. async def flush_subitems(self, item):
  438. await self._insert_subitems(item)
  439. item.clear_subitems()
  440. def _create_db(self, cursor):
  441. logger.debug('Creating database')
  442. cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  443. cursor.execute('CREATE INDEX items_status_idx ON items (status)')
  444. cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  445. def _insert_generated_items(self, cursor):
  446. logger.debug('Inserting generated items')
  447. it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate())
  448. while True:
  449. values = tuple(itertools.islice(it, 100000))
  450. if not values:
  451. break
  452. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  453. async def close(self):
  454. if self._closed in (_STATE_CLOSING, _STATE_CLOSED):
  455. return
  456. self._closed = _STATE_CLOSING
  457. logger.debug('Closing')
  458. if self._tasks:
  459. logger.warning(f'Cancelling {len(self._tasks)} task(s) remaining on cleanup')
  460. for task in self._tasks:
  461. task.cancel()
  462. logger.debug('Waiting for tasks to get cancelled')
  463. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  464. self._tasks.clear()
  465. logger.debug('Closing DB')
  466. self._db.close()
  467. logger.debug('Closing sessions')
  468. for session in self._sessions:
  469. session.close()
  470. logger.debug('Closing WARC')
  471. self._warc.close()
  472. self._reset_working_vars() # Also resets self._closed