A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
7.2 KiB

  1. import aiohttp
  2. import aiohttp.client_proto
  3. import aiohttp.connector
  4. import functools
  5. import io
  6. import itertools
  7. import qwarc.utils
  8. import time
  9. import tempfile
  10. # aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative...
  11. # The ResponseHandler handles received data; the writes are done directly on the underlying transport.
  12. # So ResponseHandler is replaced with a class which keeps all received data in a list, and the transport's write method is replaced with one which sends back all written data to the ResponseHandler.
  13. # Because the ResponseHandler instance disappears when the connection is closed (ClientResponse.{_response_eof,close,release}), ClientResponse copies the references to the data objects in the RequestHandler.
  14. # aiohttp also does connection pooling/reuse, so ClientRequest resets the raw data when the request is sent. (This would not work with pipelining, but aiohttp does not support pipelining: https://github.com/aio-libs/aiohttp/issues/1740 )
  15. # This code has been developed for aiohttp version 2.3.10.
  16. class RawData:
  17. def __init__(self):
  18. self.requestTimestamp = None
  19. self.requestData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  20. self.responseTimestamp = None
  21. self.responseData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  22. def close(self):
  23. self.requestData.close()
  24. self.responseData.close()
  25. class ResponseHandler(aiohttp.client_proto.ResponseHandler):
  26. def __init__(self, *args, **kwargs):
  27. super().__init__(*args, **kwargs)
  28. self.rawData = None
  29. self.remoteAddress = None
  30. def data_received(self, data):
  31. super().data_received(data)
  32. if not data:
  33. return
  34. if self.rawData.responseTimestamp is None:
  35. self.rawData.responseTimestamp = time.time()
  36. self.rawData.responseData.seek(0, io.SEEK_END)
  37. self.rawData.responseData.write(data)
  38. def reset_raw_data(self):
  39. self.rawData = RawData()
  40. def make_transport_write(transport, protocol):
  41. transport._real_write = transport.write
  42. def write(self, data):
  43. if protocol.rawData.requestTimestamp is None:
  44. protocol.rawData.requestTimestamp = time.time()
  45. protocol.rawData.requestData.seek(0, io.SEEK_END)
  46. protocol.rawData.requestData.write(data)
  47. self._real_write(data)
  48. return write
  49. class TCPConnector(aiohttp.connector.TCPConnector):
  50. def __init__(self, *args, loop = None, **kwargs):
  51. super().__init__(*args, loop = loop, **kwargs)
  52. self._factory = functools.partial(ResponseHandler, loop = loop)
  53. async def _wrap_create_connection(self, protocolFactory, host, port, *args, **kwargs): #FIXME: Uses internal API
  54. transport, protocol = await super()._wrap_create_connection(protocolFactory, host, port, *args, **kwargs)
  55. transport.write = make_transport_write(transport, protocol).__get__(transport, type(transport)) # https://stackoverflow.com/a/28127947
  56. protocol.remoteAddress = (host, port)
  57. return (transport, protocol)
  58. class ClientRequest(aiohttp.client_reqrep.ClientRequest):
  59. DEFAULT_HEADERS = {}
  60. def send(self, connection):
  61. connection.protocol.reset_raw_data()
  62. return super().send(connection)
  63. class ClientResponse(aiohttp.client_reqrep.ClientResponse):
  64. def __init__(self, *args, **kwargs):
  65. super().__init__(*args, **kwargs)
  66. self._rawData = None
  67. self._remoteAddress = None
  68. self._qhistory = None # _history is used by aiohttp internally
  69. async def start(self, connection, readUntilEof):
  70. self._rawData = connection.protocol.rawData
  71. self._remoteAddress = connection.protocol.remoteAddress
  72. return (await super().start(connection, readUntilEof))
  73. @property
  74. def rawRequestTimestamp(self):
  75. return self._rawData.requestTimestamp
  76. @property
  77. def rawRequestData(self):
  78. return qwarc.utils.ReadonlyFileView(self._rawData.requestData)
  79. @property
  80. def rawResponseTimestamp(self):
  81. return self._rawData.responseTimestamp
  82. @property
  83. def rawResponseData(self):
  84. return qwarc.utils.ReadonlyFileView(self._rawData.responseData)
  85. @property
  86. def remoteAddress(self):
  87. return self._remoteAddress
  88. @property
  89. def qhistory(self):
  90. return self._qhistory
  91. @qhistory.setter
  92. def qhistory(self, history):
  93. self._qhistory = history
  94. def iter_all(self):
  95. return itertools.chain(self.history, (self,))
  96. async def _read(self, nbytes = None):
  97. #FIXME: This uses internal undocumented APIs of aiohttp
  98. payload = Payload()
  99. self._rawData.responseData.seek(0)
  100. beginning = self._rawData.responseData.read(32768) # Headers must fit into 32 KiB. That's more than most clients out there, but aiohttp does *not* have this restriction!
  101. pos = beginning.find(b'\r\n\r\n')
  102. assert pos > -1, 'Could not find end of headers'
  103. respMsg = aiohttp.http_parser.HttpResponseParserPy().parse_message(beginning[:pos + 2].split(b'\r\n'))
  104. try:
  105. length = int(self.headers.get('Content-Length'))
  106. except (KeyError, ValueError, TypeError):
  107. length = None
  108. parser = aiohttp.http_parser.HttpPayloadParser(payload, length = length, chunked = respMsg.chunked, compression = respMsg.compression, code = respMsg.code, method = self.method)
  109. while beginning.endswith(b'0\r\n') or beginning.endswith(b'0\r\n\r'): # https://github.com/aio-libs/aiohttp/issues/4630
  110. chunk4630 = self._rawData.responseData.read(1024)
  111. if not chunk4630:
  112. break
  113. beginning = beginning + chunk4630
  114. eof, data = parser.feed_data(beginning[pos + 4:])
  115. while True:
  116. chunk = self._rawData.responseData.read(1048576)
  117. if not chunk:
  118. break
  119. while chunk.endswith(b'0\r\n') or chunk.endswith(b'0\r\n\r'): # https://github.com/aio-libs/aiohttp/issues/4630
  120. chunk4630 = self._rawData.responseData.read(1024)
  121. if not chunk4630:
  122. break
  123. chunk = chunk + chunk4630
  124. eof, data = parser.feed_data(chunk)
  125. if nbytes is not None and payload.data.tell() >= nbytes:
  126. if payload.exc:
  127. raise Exception from payload.exc
  128. return payload.data.getvalue()[:nbytes]
  129. # data can only not be None if eof is True, so there is no need to actually do anything about it
  130. if eof:
  131. break
  132. if not eof:
  133. parser.feed_eof()
  134. if payload.exc:
  135. raise Exception from payload.exc
  136. if nbytes is not None:
  137. return payload.data.getvalue()[:nbytes]
  138. return payload.data.getvalue()
  139. async def read(self, nbytes = None):
  140. '''
  141. Read up to nbytes from the response payload, or the entire response if nbytes is None.
  142. Note that this method always starts from the beginning of the response even if called repeatedly.
  143. '''
  144. #FIXME: Uses internal aiohttp attribute _content
  145. if nbytes is not None:
  146. if self._content is not None:
  147. return self._content[:nbytes]
  148. return (await self._read(nbytes))
  149. if self._content is None:
  150. self._content = await self._read()
  151. return self._content
  152. async def json(self, **kwargs):
  153. if 'content_type' not in kwargs:
  154. kwargs['content_type'] = None
  155. return (await super().json(**kwargs))
  156. async def release(self):
  157. if not self.closed:
  158. self.connection.reset_raw_data()
  159. await super().release()
  160. def __del__(self):
  161. if self._rawData:
  162. self._rawData.close()
  163. super().__del__()
  164. class Payload:
  165. # A class implementing the minimal subset used by the HttpPayloadParser to retrieve the data
  166. def __init__(self):
  167. self.data = io.BytesIO()
  168. self.exc = None
  169. def feed_data(self, data, size):
  170. self.data.write(data)
  171. def feed_eof(self):
  172. pass
  173. def set_exception(self, exc):
  174. self.exc = exc
  175. def begin_http_chunk_receiving(self):
  176. pass
  177. def end_http_chunk_receiving(self):
  178. pass