A framework for quick web archiving
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

411 linhas
15 KiB

  1. import qwarc.aiohttp
  2. from qwarc.const import *
  3. import qwarc.utils
  4. import qwarc.warc
  5. import aiohttp as _aiohttp
  6. if _aiohttp.__version__ != '2.3.10':
  7. raise ImportError('aiohttp must be version 2.3.10')
  8. import asyncio
  9. import collections
  10. import concurrent.futures
  11. import io
  12. import itertools
  13. import logging
  14. import os
  15. import random
  16. import sqlite3
  17. import yarl
  18. class Item:
  19. itemType = None
  20. def __init__(self, qwarcObj, itemValue, session, headers, warc):
  21. self.qwarcObj = qwarcObj
  22. self.itemValue = itemValue
  23. self.session = session
  24. self.headers = headers
  25. self.warc = warc
  26. self.stats = {'tx': 0, 'rx': 0, 'requests': 0}
  27. self.logger = logging.LoggerAdapter(logging.getLogger(), {'itemType': self.itemType, 'itemValue': self.itemValue})
  28. self.childItems = []
  29. def _merge_headers(self, headers):
  30. d = {} # Preserves order from Python 3.7 (guaranteed) or CPython 3.6 (implementation detail)
  31. keys = {} # casefolded key -> d key
  32. for key, value in self.headers:
  33. d[key] = value
  34. keys[key.casefold()] = key
  35. for key, value in headers:
  36. keyc = key.casefold()
  37. if value is None:
  38. if keyc in keys:
  39. del d[keys[keyc]]
  40. del keys[keyc]
  41. else:
  42. if keyc in keys and key != keys[keyc]:
  43. del d[keys[keyc]]
  44. d[key] = value
  45. keys[keyc] = key
  46. out = []
  47. for key, value in d.items():
  48. if isinstance(value, tuple):
  49. for value_ in value:
  50. out.append((key, value_))
  51. else:
  52. out.append((key, value))
  53. return out
  54. async def fetch(self, url, responseHandler = qwarc.utils.handle_response_default, method = 'GET', data = None, headers = [], verify_ssl = True, timeout = 60):
  55. '''
  56. HTTP GET or POST a URL
  57. url: str or yarl.URL
  58. responseHandler: a callable that determines how the response is handled. See qwarc.utils.handle_response_default for details.
  59. method: str, must be 'GET' or 'POST'
  60. data: dict or list/tuple of lists/tuples of length two or bytes or file-like or None, the data to be sent in the request body
  61. headers: list of 2-tuples, additional or overriding headers for this request only
  62. To remove one of the default headers, pass a value of None.
  63. If a header appears multiple times, only the last one is used. To send a header multiple times, pass a tuple of values.
  64. verify_ssl: bool, whether the SSL/TLS certificate should be validated
  65. timeout: int or float, how long the fetch may take at most in total (sending request until finishing reading the response)
  66. Returns response (a ClientResponse object or None) and history (a tuple of (response, exception) tuples).
  67. response can be None and history can be an empty tuple, depending on the circumstances (e.g. timeouts).
  68. '''
  69. #TODO: Rewrite using 'async with self.session.get'
  70. url = yarl.URL(url) # Explicitly convert for normalisation, percent-encoding, etc.
  71. assert method in ('GET', 'POST'), 'method must be GET or POST'
  72. headers = self._merge_headers(headers)
  73. history = []
  74. attempt = 0
  75. #TODO redirectLevel
  76. while True:
  77. attempt += 1
  78. response = None
  79. exc = None
  80. action = ACTION_RETRY
  81. writeToWarc = True
  82. try:
  83. try:
  84. with _aiohttp.Timeout(timeout):
  85. self.logger.info(f'Fetching {url}')
  86. response = await self.session.request(method, url, data = data, headers = headers, allow_redirects = False, verify_ssl = verify_ssl)
  87. try:
  88. while True:
  89. ret = await response.content.read(1048576)
  90. if not ret:
  91. break
  92. except:
  93. # No calling the handleResponse callback here because this is really bad. The not-so-bad exceptions (e.g. an error during reading the response) will be caught further down.
  94. response.close()
  95. raise
  96. else:
  97. response.rawRequestData.seek(0, io.SEEK_END)
  98. tx = response.rawRequestData.tell()
  99. response.rawResponseData.seek(0, io.SEEK_END)
  100. rx = response.rawResponseData.tell()
  101. self.logger.info(f'Fetched {url}: {response.status} (tx {tx}, rx {rx})')
  102. self.stats['tx'] += tx
  103. self.stats['rx'] += rx
  104. self.stats['requests'] += 1
  105. except (asyncio.TimeoutError, _aiohttp.ClientError) as e:
  106. self.logger.warning(f'Request for {url} failed: {e!r}')
  107. action, writeToWarc = await responseHandler(url, attempt, response, e)
  108. exc = e # Pass the exception outward for the history
  109. else:
  110. action, writeToWarc = await responseHandler(url, attempt, response, None)
  111. if response and exc is None and writeToWarc:
  112. self.warc.write_client_response(response)
  113. history.append((response, exc))
  114. retResponse = response if exc is None else None
  115. if action in (ACTION_SUCCESS, ACTION_IGNORE):
  116. return retResponse, tuple(history)
  117. elif action == ACTION_FOLLOW_OR_SUCCESS:
  118. redirectUrl = response.headers.get('Location') or response.headers.get('URI')
  119. if not redirectUrl:
  120. return retResponse, tuple(history)
  121. url = url.join(yarl.URL(redirectUrl))
  122. if response.status in (301, 302, 303) and method == 'POST':
  123. method = 'GET'
  124. data = None
  125. attempt = 0
  126. elif action == ACTION_RETRIES_EXCEEDED:
  127. self.logger.error(f'Request for {url} failed {attempt} times')
  128. return retResponse, tuple(history)
  129. elif action == ACTION_RETRY:
  130. # Nothing to do, just go to the next cycle
  131. pass
  132. finally:
  133. if response:
  134. await response.release()
  135. async def process(self):
  136. raise NotImplementedError
  137. @classmethod
  138. def generate(cls):
  139. yield from () # Generate no items by default
  140. @classmethod
  141. def _gen(cls):
  142. for x in cls.generate():
  143. yield (cls.itemType, x, STATUS_TODO)
  144. def add_subitem(self, itemClassOrType, itemValue):
  145. if issubclass(itemClassOrType, Item):
  146. item = (itemClassOrType.itemType, itemValue)
  147. else:
  148. item = (itemClassOrType, itemValue)
  149. if item not in self.childItems:
  150. self.childItems.append(item)
  151. async def flush_subitems(self):
  152. await self.qwarcObj.flush_subitems(self)
  153. def clear_subitems(self):
  154. self.childItems = []
  155. @classmethod
  156. def get_subclasses(cls):
  157. for subclass in cls.__subclasses__():
  158. yield subclass
  159. yield from subclass.get_subclasses()
  160. class QWARC:
  161. def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False):
  162. '''
  163. itemClasses: iterable of Item
  164. warcBasePath: str, base name of the WARC files
  165. dbPath: str, path to the sqlite3 database file
  166. command: list, the command line used to invoke qwarc
  167. specFile: str, path to the spec file
  168. specDependencies: qwarc.utils.SpecDependencies
  169. logFilename: str, name of the log file written by this process
  170. concurrency: int, number of concurrently processed items
  171. memoryLimit: int, gracefully stop when the process uses more than memoryLimit bytes of RSS; 0 disables the memory check
  172. minFreeDisk: int, pause when there's less than minFreeDisk space on the partition where WARCs are written; 0 disables the disk space check
  173. warcSizeLimit: int, size of each WARC file; 0 if the WARCs should not be split
  174. '''
  175. self._itemClasses = itemClasses
  176. self._itemTypeMap = {cls.itemType: cls for cls in itemClasses}
  177. self._warcBasePath = warcBasePath
  178. self._dbPath = dbPath
  179. self._command = command
  180. self._specFile = specFile
  181. self._specDependencies = specDependencies
  182. self._logFilename = logFilename
  183. self._concurrency = concurrency
  184. self._memoryLimit = memoryLimit
  185. self._minFreeDisk = minFreeDisk
  186. self._warcSizeLimit = warcSizeLimit
  187. self._warcDedupe = warcDedupe
  188. self._reset_working_vars()
  189. def _reset_working_vars(self):
  190. # Working variables
  191. self._db = None
  192. self._tasks = set()
  193. self._sleepTasks = set()
  194. self._sessions = [] # aiohttp.ClientSession instances
  195. self._freeSessions = collections.deque() # ClientSession instances that are currently free
  196. self._warc = None
  197. async def obtain_exclusive_db_lock(self):
  198. c = self._db.cursor()
  199. while True:
  200. try:
  201. c.execute('BEGIN EXCLUSIVE')
  202. break
  203. except sqlite3.OperationalError as e:
  204. if str(e) != 'database is locked':
  205. raise
  206. await asyncio.sleep(1)
  207. return c
  208. def _make_item(self, itemType, itemValue, session, headers):
  209. try:
  210. itemClass = self._itemTypeMap[itemType]
  211. except KeyError:
  212. raise RuntimeError(f'No such item type: {itemType!r}')
  213. return itemClass(self, itemValue, session, headers, self._warc)
  214. async def _wait_for_free_task(self):
  215. if not self._tasks:
  216. return
  217. done, pending = await asyncio.wait(self._tasks, return_when = concurrent.futures.FIRST_COMPLETED)
  218. for future in done:
  219. newStatus = STATUS_DONE
  220. if future.taskType == 'sleep':
  221. self._sleepTasks.remove(future)
  222. elif future.taskType == 'process':
  223. item = future.item
  224. try:
  225. future.result()
  226. except asyncio.CancelledError as e:
  227. # Got cancelled, nothing we can do about it, but let's log a warning if it's a process task
  228. if future.taskType == 'process':
  229. logging.error(f'Task for {future.itemType}:{future.itemValue} cancelled: {future!r}')
  230. newStatus = STATUS_ERROR
  231. except Exception as e:
  232. if future.taskType == 'process':
  233. logging.error(f'{future.itemType}:{future.itemValue} failed: {e!r} ({item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx)', exc_info = e)
  234. newStatus = STATUS_ERROR
  235. else:
  236. if future.taskType == 'process':
  237. logging.info(f'{future.itemType}:{future.itemValue} done: {item.stats["requests"]} requests, {item.stats["tx"]} tx, {item.stats["rx"]} rx')
  238. if future.taskType != 'process':
  239. continue
  240. cursor = await self.obtain_exclusive_db_lock()
  241. try:
  242. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (newStatus, future.id))
  243. cursor.execute('COMMIT')
  244. except:
  245. cursor.execute('ROLLBACK')
  246. raise
  247. await self._insert_subitems(item)
  248. self._freeSessions.append(item.session)
  249. self._tasks = pending
  250. async def _insert_subitems(self, item):
  251. cursor = await self.obtain_exclusive_db_lock()
  252. try:
  253. if item.childItems:
  254. it = iter(item.childItems)
  255. while True:
  256. values = [(t, v, STATUS_TODO) for t, v in itertools.islice(it, 100000)]
  257. if not values:
  258. break
  259. cursor.executemany('INSERT OR IGNORE INTO items (type, value, status) VALUES (?, ?, ?)', values)
  260. cursor.execute('COMMIT')
  261. except:
  262. cursor.execute('ROLLBACK')
  263. raise
  264. async def run(self, loop):
  265. for i in range(self._concurrency):
  266. session = _aiohttp.ClientSession(
  267. connector = qwarc.aiohttp.TCPConnector(loop = loop),
  268. request_class = qwarc.aiohttp.ClientRequest,
  269. response_class = qwarc.aiohttp.ClientResponse,
  270. loop = loop
  271. )
  272. self._sessions.append(session)
  273. self._freeSessions.append(session)
  274. self._warc = qwarc.warc.WARC(self._warcBasePath, self._warcSizeLimit, self._warcDedupe, self._command, self._specFile, self._specDependencies, self._logFilename)
  275. self._db = sqlite3.connect(self._dbPath, timeout = 1)
  276. self._db.isolation_level = None # Transactions are handled manually below.
  277. self._db.execute('PRAGMA synchronous = OFF')
  278. try:
  279. while True:
  280. while len(self._tasks) >= self._concurrency:
  281. await self._wait_for_free_task()
  282. if os.path.exists('STOP'):
  283. logging.info('Gracefully shutting down due to STOP file')
  284. break
  285. if self._memoryLimit and qwarc.utils.uses_too_much_memory(self._memoryLimit):
  286. logging.info(f'Gracefully shutting down due to memory usage (current = {qwarc.utils.get_rss()} > limit = {self._memoryLimit})')
  287. break
  288. if self._minFreeDisk and qwarc.utils.too_little_disk_space(self._minFreeDisk):
  289. logging.info('Disk space is low, sleeping')
  290. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  291. sleepTask.taskType = 'sleep'
  292. self._tasks.add(sleepTask)
  293. self._sleepTasks.add(sleepTask)
  294. continue
  295. cursor = await self.obtain_exclusive_db_lock()
  296. try:
  297. cursor.execute('SELECT id, type, value, status FROM items WHERE status = ? LIMIT 1', (STATUS_TODO,))
  298. result = cursor.fetchone()
  299. if not result:
  300. if cursor.execute('SELECT id, status FROM items WHERE status != ? LIMIT 1', (STATUS_DONE,)).fetchone():
  301. # There is currently no item to do, but there are still some in progress, so more TODOs may appear in the future.
  302. # It would be nice if we could just await wait_for_free_task() here, but that doesn't work because those TODOs might be in another process.
  303. # So instead, we insert a dummy task which just sleeps a bit. Average sleep time is equal to concurrency, i.e. one check per second.
  304. #TODO: The average sleep time is too large if there are only few sleep tasks; scale with len(sleepTasks)/self._concurrency?
  305. sleepTask = asyncio.ensure_future(asyncio.sleep(random.uniform(self._concurrency / 2, self._concurrency * 1.5)))
  306. sleepTask.taskType = 'sleep'
  307. self._tasks.add(sleepTask)
  308. self._sleepTasks.add(sleepTask)
  309. cursor.execute('COMMIT')
  310. continue
  311. else:
  312. # Really nothing to do anymore
  313. #TODO: Another process may be running create_db, in which case we'd still want to wait...
  314. # create_db could insert a dummy item which is marked as done when the DB is ready
  315. cursor.execute('COMMIT')
  316. break
  317. id, itemType, itemValue, status = result
  318. cursor.execute('UPDATE items SET status = ? WHERE id = ?', (STATUS_INPROGRESS, id))
  319. cursor.execute('COMMIT')
  320. except:
  321. cursor.execute('ROLLBACK')
  322. raise
  323. session = self._freeSessions.popleft()
  324. item = self._make_item(itemType, itemValue, session, DEFAULT_HEADERS)
  325. task = asyncio.ensure_future(item.process())
  326. #TODO: Is there a better way to add custom information to a task/coroutine object?
  327. task.taskType = 'process'
  328. task.id = id
  329. task.itemType = itemType
  330. task.itemValue = itemValue
  331. task.item = item
  332. self._tasks.add(task)
  333. for sleepTask in self._sleepTasks:
  334. sleepTask.cancel()
  335. while len(self._tasks):
  336. await self._wait_for_free_task()
  337. logging.info('Done')
  338. except (Exception, KeyboardInterrupt) as e:
  339. # Kill all tasks
  340. for task in self._tasks:
  341. task.cancel()
  342. await asyncio.wait(self._tasks, return_when = concurrent.futures.ALL_COMPLETED)
  343. raise
  344. finally:
  345. for session in self._sessions:
  346. session.close()
  347. self._warc.close()
  348. self._db.close()
  349. self._reset_working_vars()
  350. async def flush_subitems(self, item):
  351. await self._insert_subitems(item)
  352. item.clear_subitems()
  353. def create_db(self):
  354. db = sqlite3.connect(self._dbPath, timeout = 1)
  355. db.execute('PRAGMA synchronous = OFF')
  356. with db:
  357. db.execute('CREATE TABLE items (id INTEGER PRIMARY KEY, type TEXT, value TEXT, status INTEGER)')
  358. db.execute('CREATE INDEX items_status_idx ON items (status)')
  359. db.execute('CREATE UNIQUE INDEX items_type_value_idx ON items (type, value)')
  360. it = itertools.chain(*(i._gen() for i in self._itemClasses))
  361. while True:
  362. values = tuple(itertools.islice(it, 100000))
  363. if not values:
  364. break
  365. with db:
  366. db.executemany('INSERT INTO items (type, value, status) VALUES (?, ?, ?)', values)