A framework for quick web archiving
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

439 řádky
16 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import io
  12. import itertools
  13. import logging
  14. import os
  15. import random
  16. import sqlite3
  17. import yarl
  18. class Item:
  19. itemType = None
  20. defaultResponseHandler = staticmethod(qwarc.utils.handle_response_default)
  21. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  22. self.qwarcObj = qwarcObj
  23. self.itemValue = itemValue
  24. self.session = session
  25. self.headers = headers
  26. self.warc = warc
  27. if not hasattr(self, '_baseUrl'): # To allow subclasses to set the baseUrl before calling super().__init__
  28. self._baseUrl = None
  29. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  30. self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
  31. self.childItems = []
  32. @property
  33. def baseUrl(self):
  34. return self._baseUrl
  35. @baseUrl.setter
  36. def baseUrl(self, baseUrl):
  37. if baseUrl is None:
  38. self._baseUrl = None
  39. elif isinstance(baseUrl, yarl.URL):
  40. self._baseUrl = baseUrl
  41. else:
  42. self._baseUrl = yarl.URL(baseUrl)
  43. def _merge_headers(self, headers, extraHeaders = []):
  44. d = {} # Preserves order from Python 3.7 (guaranteed) or CPython 3.6 (implementation detail)
  45. keys = {} # casefolded key -> d key
  46. for key, value in itertools.chain(self.headers, extraHeaders, headers):
  47. keyc = key.casefold()
  48. if value is None:
  49. if keyc in keys:
  50. del d[keys[keyc]]
  51. del keys[keyc]
  52. else:
  53. if keyc in keys and key != keys[keyc]:
  54. del d[keys[keyc]]
  55. d[key] = value
  56. keys[keyc] = key
  57. out = []
  58. for key, value in d.items():
  59. if isinstance(value, tuple):
  60. for value_ in value:
  61. out.append((key, value_))
  62. else:
  63. out.append((key, value))
  64. return out
  65. async def fetch(self, url, responseHandler = None, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60, fromResponse = None):
  66. '''
  67. HTTP GET or POST a URL
  68. url: str or yarl.URL; if this is not a complete URL, it is evaluated relative to self.baseUrl
  69. responseHandler: None or a callable that determines how the response is handled; if None, self.defaultResponseHandler is used. See qwarc.utils.handle_response_default for details.
  70. method: str, must be 'GET' or 'POST'
  71. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  72. headers: list of 2-tuples, additional or overriding headers for this request only
  73. To remove one of the default headers, pass a value of None.
  74. If a header appears multiple times, only the last one is used. To send a header multiple times, pass a tuple of values.
  75. verify_ssl: bool, whether the SSL/TLS certificate should be validated
  76. timeout: int or float, how long the fetch may take at most in total (sending request until finishing reading the response)
  77. fromResponse: ClientResponse or None; if provided, use fromResponse.url for the url completion (instead of self.baseUrl) and add it as a Referer header
  78. Returns response (a ClientResponse object or a qwarc.utils.DummyClientResponse object)
  79. '''
  80. #TODO: Rewrite using 'async with self.session.get'
  81. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  82. if not url.scheme or not url.host:
  83. if fromResponse is not None:
  84. url = fromResponse.url.join(url)
  85. elif not self.baseUrl:
  86. raise ValueError('Incomplete URL and no baseUrl to join it with')
  87. else:
  88. url = self.baseUrl.join(url)
  89. if responseHandler is None:
  90. responseHandler = self.defaultResponseHandler
  91. assert method in ('GET', 'POST'), 'method must be GET or POST'
  92. headers = self._merge_headers(headers, extraHeaders = [('Referer', str(fromResponse.url))] if fromResponse is not None else [])
  93. history = []
  94. attempt = 0
  95. redirectLevel = 0
  96. while True:
  97. attempt += 1
  98. response = None
  99. exc = None
  100. action = ACTION_RETRY
  101. writeToWarc = True
  102. try:
  103. try:
  104. with _aiohttp.Timeout(timeout):
  105. self.logger.info(f'Fetching {url}')
  106. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  107. try:
  108. while True:
  109. ret = await response.content.read(1048576)
  110. if not ret:
  111. break
  112. except:
  113. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  114. response.close()
  115. raise
  116. else:
  117. response.rawRequestData.seek(0, io.SEEK_END)
  118. tx = response.rawRequestData.tell()
  119. response.rawResponseData.seek(0, io.SEEK_END)
  120. rx = response.rawResponseData.tell()
  121. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  122. self.stats['tx'] += tx
  123. self.stats['rx'] += rx
  124. self.stats['requests'] += 1
  125. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  126. self.logger.warning(f'Request for {url} failed: {e!r}')
  127. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = e, redirectLevel = redirectLevel, item = self)
  128. exc = e # Pass the exception outward for the history
  129. else:
  130. action, writeToWarc = await responseHandler(url = url, attempt = attempt, response = response, exc = None, redirectLevel = redirectLevel, item = self)
  131. if response and exc is None and writeToWarc:
  132. self.warc.write_client_response(response)
  133. history.append((response, exc))
  134. retResponse = response if exc is None else qwarc.utils.DummyClientResponse()
  135. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  136. retResponse.qhistory = tuple(history)
  137. return retResponse
  138. elif action == ACTION_FOLLOW_OR_SUCCESS:
  139. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  140. if not redirectUrl:
  141. retResponse.qhistory = tuple(history)
  142. return retResponse
  143. url = url.join(yarl.URL(redirectUrl))
  144. if response.status in (301, 302, 303) and method == 'POST':
  145. method = 'GET'
  146. data = None
  147. attempt = 0
  148. redirectLevel += 1
  149. elif action == ACTION_RETRIES_EXCEEDED:
  150. self.logger.error(f'Request for {url} failed {attempt} times')
  151. retResponse.qhistory = tuple(history)
  152. return retResponse
  153. elif action == ACTION_RETRY:
  154. # Nothing to do, just go to the next cycle
  155. pass
  156. finally:
  157. if response:
  158. await response.release()
  159. async def process(self):
  160. raise NotImplementedError
  161. @classmethod
  162. def generate(cls):
  163. yield from () # Generate no items by default
  164. def add_subitem(self, itemClassOrType, itemValue):
  165. if issubclass(itemClassOrType, Item):
  166. item = (itemClassOrType.itemType, itemValue)
  167. else:
  168. item = (itemClassOrType, itemValue)
  169. if item not in self.childItems:
  170. self.childItems.append(item)
  171. async def flush_subitems(self):
  172. await self.qwarcObj.flush_subitems(self)
  173. def clear_subitems(self):
  174. self.childItems = []
  175. @classmethod
  176. def get_subclasses(cls):
  177. for subclass in cls.__subclasses__():
  178. yield subclass
  179. yield from subclass.get_subclasses()
  180. class QWARC:
  181. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  182. '''
  183. itemClasses: iterable of Item
  184. warcBasePath: str, base name of the WARC files
  185. dbPath: str, path to the sqlite3 database file
  186. command: list, the command line used to invoke qwarc
  187. specFile: str, path to the spec file
  188. specDependencies: qwarc.utils.SpecDependencies
  189. logFilename: str, name of the log file written by this process
  190. concurrency: int, number of concurrently processed items
  191. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  192. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  193. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  194. '''
  195. self._itemClasses = itemClasses
  196. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  197. self._warcBasePath = warcBasePath
  198. self._dbPath = dbPath
  199. self._command = command
  200. self._specFile = specFile
  201. self._specDependencies = specDependencies
  202. self._logFilename = logFilename
  203. self._concurrency = concurrency
  204. self._memoryLimit = memoryLimit
  205. self._minFreeDisk = minFreeDisk
  206. self._warcSizeLimit = warcSizeLimit
  207. self._warcDedupe = warcDedupe
  208. self._reset_working_vars()
  209. def _reset_working_vars(self):
  210. # Working variables
  211. self._db = None
  212. self._tasks = set()
  213. self._sleepTasks = set()
  214. self._sessions = [] # aiohttp.ClientSession instances
  215. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  216. self._warc = None
  217. async def obtain_exclusive_db_lock(self):
  218. c = self._db.cursor()
  219. while True:
  220. try:
  221. c.execute('BEGIN EXCLUSIVE')
  222. break
  223. except sqlite3.OperationalError as e:
  224. if str(e) != 'database is locked':
  225. raise
  226. await asyncio.sleep(1)
  227. return c
  228. def _make_item(self, itemType, itemValue, session, headers):
  229. try:
  230. itemClass = self._itemTypeMap[itemType]
  231. except KeyError:
  232. raise RuntimeError(f'No such item type: {itemType!r}')
  233. return itemClass(self, itemValue, session, headers, self._warc)
  234. async def _wait_for_free_task(self):
  235. if not self._tasks:
  236. return
  237. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  238. for future in done:
  239. newStatus = STATUS_DONE
  240. if future.taskType == 'sleep':
  241. self._sleepTasks.remove(future)
  242. elif future.taskType == 'process':
  243. item = future.item
  244. try:
  245. future.result()
  246. except asyncio.CancelledError as e:
  247. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  248. if future.taskType == 'process':
  249. logging.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  250. newStatus = STATUS_ERROR
  251. except Exception as e:
  252. if future.taskType == 'process':
  253. logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
  254. newStatus = STATUS_ERROR
  255. else:
  256. if future.taskType == 'process':
  257. logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  258. if future.taskType != 'process':
  259. continue
  260. cursor = await self.obtain_exclusive_db_lock()
  261. try:
  262. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
  263. cursor.execute('COMMIT')
  264. except:
  265. cursor.execute('ROLLBACK')
  266. raise
  267. await self._insert_subitems(item)
  268. self._freeSessions.append(item.session)
  269. self._tasks = pending
  270. async def _insert_subitems(self, item):
  271. cursor = await self.obtain_exclusive_db_lock()
  272. try:
  273. if item.childItems:
  274. it = iter(item.childItems)
  275. while True:
  276. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  277. if not values:
  278. break
  279. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  280. cursor.execute('COMMIT')
  281. except:
  282. cursor.execute('ROLLBACK')
  283. raise
  284. async def run(self, loop):
  285. for i in range(self._concurrency):
  286. session = _aiohttp.ClientSession(
  287. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  288. request_class = qwarc.aiohttp.ClientRequest,
  289. response_class = qwarc.aiohttp.ClientResponse,
  290. loop = loop
  291. )
  292. self._sessions.append(session)
  293. self._freeSessions.append(session)
  294. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  295. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  296. self._db.isolation_level = None # Transactions are handled manually below.
  297. self._db.execute('PRAGMA synchronous = OFF')
  298. cursor = await self.obtain_exclusive_db_lock()
  299. try:
  300. cursor.execute('SELECT name FROM sqlite_master WHERE type = "table" AND name = "items"')
  301. result = cursor.fetchone()
  302. if not result:
  303. self._create_db(cursor)
  304. self._insert_generated_items(cursor)
  305. cursor.execute('COMMIT')
  306. except:
  307. cursor.execute('ROLLBACK')
  308. raise
  309. try:
  310. while True:
  311. while len(self._tasks) >= self._concurrency:
  312. await self._wait_for_free_task()
  313. if os.path.exists('STOP'):
  314. logging.info('Gracefully shutting down due to STOP file')
  315. break
  316. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  317. logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  318. break
  319. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  320. logging.info('Disk space is low, sleeping')
  321. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  322. sleepTask.taskType = 'sleep'
  323. self._tasks.add(sleepTask)
  324. self._sleepTasks.add(sleepTask)
  325. continue
  326. cursor = await self.obtain_exclusive_db_lock()
  327. try:
  328. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  329. result = cursor.fetchone()
  330. if not result:
  331. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  332. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  333. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  334. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  335. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  336. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  337. sleepTask.taskType = 'sleep'
  338. self._tasks.add(sleepTask)
  339. self._sleepTasks.add(sleepTask)
  340. cursor.execute('COMMIT')
  341. continue
  342. else:
  343. # Really nothing to do anymore
  344. cursor.execute('COMMIT')
  345. break
  346. id, itemType, itemValue, status = result
  347. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
  348. cursor.execute('COMMIT')
  349. except:
  350. cursor.execute('ROLLBACK')
  351. raise
  352. session = self._freeSessions.popleft()
  353. item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)
  354. task = asyncio.ensure_future(item.process())
  355. #TODO: Is there a better way to add custom information to a task/coroutine object?
  356. task.taskType = 'process'
  357. task.id = id
  358. task.itemType = itemType
  359. task.itemValue = itemValue
  360. task.item = item
  361. self._tasks.add(task)
  362. for sleepTask in self._sleepTasks:
  363. sleepTask.cancel()
  364. while len(self._tasks):
  365. await self._wait_for_free_task()
  366. logging.info('Done')
  367. except (Exception, KeyboardInterrupt) as e:
  368. # Kill all tasks
  369. for task in self._tasks:
  370. task.cancel()
  371. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  372. raise
  373. finally:
  374. for session in self._sessions:
  375. session.close()
  376. self._warc.close()
  377. self._db.close()
  378. self._reset_working_vars()
  379. async def flush_subitems(self, item):
  380. await self._insert_subitems(item)
  381. item.clear_subitems()
  382. def _create_db(self, cursor):
  383. cursor.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  384. cursor.execute('CREATE INDEX items_status_idx ON items (status)')
  385. cursor.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  386. def _insert_generated_items(self, cursor):
  387. it = itertools.chain((cls.itemType, value, STATUS_TODO) for cls in self._itemClasses for value in cls.generate())
  388. while True:
  389. values = tuple(itertools.islice(it, 100000))
  390. if not values:
  391. break
  392. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)