A framework for quick web archiving
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

214 рядки
7.2 KiB

  1. import aiohttp
  2. import aiohttp.client_proto
  3. import aiohttp.connector
  4. import functools
  5. import io
  6. import itertools
  7. import qwarc.utils
  8. import time
  9. import tempfile
  10. # aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative...
  11. # The ResponseHandler handles received data; the writes are done directly on the underlying transport.
  12. # So ResponseHandler is replaced with a class which keeps all received data in a list, and the transport's write method is replaced with one which sends back all written data to the ResponseHandler.
  13. # Because the ResponseHandler instance disappears when the connection is closed (ClientResponse.{_response_eof,close,release}), ClientResponse copies the references to the data objects in the RequestHandler.
  14. # aiohttp also does connection pooling/reuse, so ClientRequest resets the raw data when the request is sent. (This would not work with pipelining, but aiohttp does not support pipelining: https://github.com/aio-libs/aiohttp/issues/1740 )
  15. # This code has been developed for aiohttp version 2.3.10.
  16. class RawData:
  17. def __init__(self):
  18. self.requestTimestamp = None
  19. self.requestData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  20. self.responseTimestamp = None
  21. self.responseData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  22. def close(self):
  23. self.requestData.close()
  24. self.responseData.close()
  25. class ResponseHandler(aiohttp.client_proto.ResponseHandler):
  26. def __init__(self, *args, **kwargs):
  27. super().__init__(*args, **kwargs)
  28. self.rawData = None
  29. self.remoteAddress = None
  30. def data_received(self, data):
  31. super().data_received(data)
  32. if not data:
  33. return
  34. if self.rawData.responseTimestamp is None:
  35. self.rawData.responseTimestamp = time.time()
  36. self.rawData.responseData.seek(0, io.SEEK_END)
  37. self.rawData.responseData.write(data)
  38. def reset_raw_data(self):
  39. self.rawData = RawData()
  40. def make_transport_write(transport, protocol):
  41. transport._real_write = transport.write
  42. def write(self, data):
  43. if protocol.rawData.requestTimestamp is None:
  44. protocol.rawData.requestTimestamp = time.time()
  45. protocol.rawData.requestData.seek(0, io.SEEK_END)
  46. protocol.rawData.requestData.write(data)
  47. self._real_write(data)
  48. return write
  49. class TCPConnector(aiohttp.connector.TCPConnector):
  50. def __init__(self, *args, loop = None, **kwargs):
  51. super().__init__(*args, loop = loop, **kwargs)
  52. self._factory = functools.partial(ResponseHandler, loop = loop)
  53. async def _wrap_create_connection(self, protocolFactory, host, port, *args, **kwargs): #FIXME: Uses internal API
  54. transport, protocol = await super()._wrap_create_connection(protocolFactory, host, port, *args, **kwargs)
  55. transport.write = make_transport_write(transport, protocol).__get__(transport, type(transport)) # https://stackoverflow.com/a/28127947
  56. protocol.remoteAddress = (host, port)
  57. return (transport, protocol)
  58. class ClientRequest(aiohttp.client_reqrep.ClientRequest):
  59. DEFAULT_HEADERS = {}
  60. def send(self, connection):
  61. connection.protocol.reset_raw_data()
  62. return super().send(connection)
  63. class ClientResponse(aiohttp.client_reqrep.ClientResponse):
  64. def __init__(self, *args, **kwargs):
  65. super().__init__(*args, **kwargs)
  66. self._rawData = None
  67. self._remoteAddress = None
  68. async def start(self, connection, readUntilEof):
  69. self._rawData = connection.protocol.rawData
  70. self._remoteAddress = connection.protocol.remoteAddress
  71. return (await super().start(connection, readUntilEof))
  72. @property
  73. def rawRequestTimestamp(self):
  74. return self._rawData.requestTimestamp
  75. @property
  76. def rawRequestData(self):
  77. return qwarc.utils.ReadonlyFileView(self._rawData.requestData)
  78. @property
  79. def rawResponseTimestamp(self):
  80. return self._rawData.responseTimestamp
  81. @property
  82. def rawResponseData(self):
  83. return qwarc.utils.ReadonlyFileView(self._rawData.responseData)
  84. @property
  85. def remoteAddress(self):
  86. return self._remoteAddress
  87. def set_history(self, history):
  88. self._history = history #FIXME: Uses private attribute of aiohttp.client_reqrep.ClientResponse
  89. def iter_all(self):
  90. return itertools.chain(self.history, (self,))
  91. async def _read(self, nbytes = None):
  92. #FIXME: This uses internal undocumented APIs of aiohttp
  93. payload = Payload()
  94. self._rawData.responseData.seek(0)
  95. beginning = self._rawData.responseData.read(32768) # Headers must fit into 32 KiB. That's more than most clients out there, but aiohttp does *not* have this restriction!
  96. pos = beginning.find(b'\r\n\r\n')
  97. assert pos > -1, 'Could not find end of headers'
  98. respMsg = aiohttp.http_parser.HttpResponseParserPy().parse_message(beginning[:pos + 2].split(b'\r\n'))
  99. try:
  100. length = int(self.headers.get('Content-Length'))
  101. except (KeyError, ValueError, TypeError):
  102. length = None
  103. parser = aiohttp.http_parser.HttpPayloadParser(payload, length = length, chunked = respMsg.chunked, compression = respMsg.compression, code = respMsg.code, method = self.method)
  104. while beginning.endswith(b'0\r\n') or beginning.endswith(b'0\r\n\r'): # https://github.com/aio-libs/aiohttp/issues/4630
  105. chunk4630 = self._rawData.responseData.read(1024)
  106. if not chunk4630:
  107. break
  108. beginning = beginning + chunk4630
  109. eof, data = parser.feed_data(beginning[pos + 4:])
  110. while True:
  111. chunk = self._rawData.responseData.read(1048576)
  112. if not chunk:
  113. break
  114. while chunk.endswith(b'0\r\n') or chunk.endswith(b'0\r\n\r'): # https://github.com/aio-libs/aiohttp/issues/4630
  115. chunk4630 = self._rawData.responseData.read(1024)
  116. if not chunk4630:
  117. break
  118. chunk = chunk + chunk4630
  119. eof, data = parser.feed_data(chunk)
  120. if nbytes is not None and payload.data.tell() >= nbytes:
  121. if payload.exc:
  122. raise Exception from payload.exc
  123. return payload.data.getvalue()[:nbytes]
  124. # data can only not be None if eof is True, so there is no need to actually do anything about it
  125. if eof:
  126. break
  127. if not eof:
  128. parser.feed_eof()
  129. if payload.exc:
  130. raise Exception from payload.exc
  131. if nbytes is not None:
  132. return payload.data.getvalue()[:nbytes]
  133. return payload.data.getvalue()
  134. async def read(self, nbytes = None):
  135. '''
  136. Read up to nbytes from the response payload, or the entire response if nbytes is None.
  137. Note that this method always starts from the beginning of the response even if called repeatedly.
  138. '''
  139. #FIXME: Uses internal aiohttp attribute _content
  140. if nbytes is not None:
  141. if self._content is not None:
  142. return self._content[:nbytes]
  143. return (await self._read(nbytes))
  144. if self._content is None:
  145. self._content = await self._read()
  146. return self._content
  147. async def json(self, **kwargs):
  148. if 'content_type' not in kwargs:
  149. kwargs['content_type'] = None
  150. return (await super().json(**kwargs))
  151. async def release(self):
  152. if not self.closed:
  153. self.connection.reset_raw_data()
  154. await super().release()
  155. def __del__(self):
  156. if self._rawData:
  157. self._rawData.close()
  158. super().__del__()
  159. class Payload:
  160. # A class implementing the minimal subset used by the HttpPayloadParser to retrieve the data
  161. def __init__(self):
  162. self.data = io.BytesIO()
  163. self.exc = None
  164. def feed_data(self, data, size):
  165. self.data.write(data)
  166. def feed_eof(self):
  167. pass
  168. def set_exception(self, exc):
  169. self.exc = exc
  170. def begin_http_chunk_receiving(self):
  171. pass
  172. def end_http_chunk_receiving(self):
  173. pass