A framework for quick web archiving
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

180 rader
5.9 KiB

  1. import aiohttp
  2. import aiohttp.client_proto
  3. import aiohttp.connector
  4. import functools
  5. import io
  6. import itertools
  7. import qwarc.utils
  8. import time
  9. import tempfile
  10. # aiohttp does not expose the raw data sent over the wire, so we need to get a bit creative...
  11. # The ResponseHandler handles received data; the writes are done directly on the underlying transport.
  12. # So ResponseHandler is replaced with a class which keeps all received data in a list, and the transport's write method is replaced with one which sends back all written data to the ResponseHandler.
  13. # Because the ResponseHandler instance disappears when the connection is closed (ClientResponse.{_response_eof,close,release}), ClientResponse copies the references to the data objects in the RequestHandler.
  14. # aiohttp also does connection pooling/reuse, so ClientRequest resets the raw data when the request is sent. (This would not work with pipelining, but aiohttp does not support pipelining: https://github.com/aio-libs/aiohttp/issues/1740 )
  15. # This code has been developed for aiohttp version 2.3.10.
  16. class RawData:
  17. def __init__(self):
  18. self.requestTimestamp = None
  19. self.requestData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  20. self.responseTimestamp = None
  21. self.responseData = tempfile.SpooledTemporaryFile(max_size = 1048576, dir = './')
  22. def close(self):
  23. self.requestData.close()
  24. self.responseData.close()
  25. class ResponseHandler(aiohttp.client_proto.ResponseHandler):
  26. def __init__(self, *args, **kwargs):
  27. super().__init__(*args, **kwargs)
  28. self.rawData = None
  29. self.remoteAddress = None
  30. def data_received(self, data):
  31. super().data_received(data)
  32. if not data:
  33. return
  34. if self.rawData.responseTimestamp is None:
  35. self.rawData.responseTimestamp = time.time()
  36. self.rawData.responseData.seek(0, io.SEEK_END)
  37. self.rawData.responseData.write(data)
  38. def reset_raw_data(self):
  39. if self.rawData:
  40. self.rawData.close()
  41. self.rawData = RawData()
  42. def make_transport_write(transport, protocol):
  43. transport._real_write = transport.write
  44. def write(self, data):
  45. if protocol.rawData.requestTimestamp is None:
  46. protocol.rawData.requestTimestamp = time.time()
  47. protocol.rawData.requestData.seek(0, io.SEEK_END)
  48. protocol.rawData.requestData.write(data)
  49. self._real_write(data)
  50. return write
  51. class TCPConnector(aiohttp.connector.TCPConnector):
  52. def __init__(self, *args, loop = None, **kwargs):
  53. super().__init__(*args, loop = loop, **kwargs)
  54. self._factory = functools.partial(ResponseHandler, loop = loop)
  55. async def _wrap_create_connection(self, protocolFactory, host, port, *args, **kwargs): #FIXME: Uses internal API
  56. transport, protocol = await super()._wrap_create_connection(protocolFactory, host, port, *args, **kwargs)
  57. transport.write = make_transport_write(transport, protocol).__get__(transport, type(transport)) # https://stackoverflow.com/a/28127947
  58. protocol.remoteAddress = (host, port)
  59. return (transport, protocol)
  60. class ClientRequest(aiohttp.client_reqrep.ClientRequest):
  61. def send(self, connection):
  62. connection.protocol.reset_raw_data()
  63. return super().send(connection)
  64. class ClientResponse(aiohttp.client_reqrep.ClientResponse):
  65. def __init__(self, *args, **kwargs):
  66. super().__init__(*args, **kwargs)
  67. self._rawData = None
  68. self._remoteAddress = None
  69. async def start(self, connection, readUntilEof):
  70. self._rawData = connection.protocol.rawData
  71. self._remoteAddress = connection.protocol.remoteAddress
  72. return (await super().start(connection, readUntilEof))
  73. @property
  74. def rawRequestTimestamp(self):
  75. return self._rawData.requestTimestamp
  76. @property
  77. def rawRequestData(self):
  78. return qwarc.utils.ReadonlyFileView(self._rawData.requestData)
  79. @property
  80. def rawResponseTimestamp(self):
  81. return self._rawData.responseTimestamp
  82. @property
  83. def rawResponseData(self):
  84. return qwarc.utils.ReadonlyFileView(self._rawData.responseData)
  85. @property
  86. def remoteAddress(self):
  87. return self._remoteAddress
  88. def set_history(self, history):
  89. self._history = history #FIXME: Uses private attribute of aiohttp.client_reqrep.ClientResponse
  90. def iter_all(self):
  91. return itertools.chain(self.history, (self,))
  92. async def _read(self):
  93. #FIXME: This uses internal undocumented APIs of aiohttp
  94. payload = Payload()
  95. self._rawData.responseData.seek(0)
  96. beginning = self._rawData.responseData.read(32768) # Headers must fit into 32 KiB. That's more than most clients out there, but aiohttp does *not* have this restriction!
  97. pos = beginning.find(b'\r\n\r\n')
  98. assert pos > -1, 'Could not find end of headers'
  99. respMsg = aiohttp.http_parser.HttpResponseParserPy().parse_message(beginning[:pos + 2].split(b'\r\n'))
  100. try:
  101. length = int(self.headers.get('Content-Length'))
  102. except (KeyError, ValueError):
  103. length = None
  104. parser = aiohttp.http_parser.HttpPayloadParser(payload, length = length, chunked = respMsg.chunked, compression = respMsg.compression, code = respMsg.code, method = self.method)
  105. eof, data = parser.feed_data(beginning[pos + 4:])
  106. while True:
  107. chunk = self._rawData.responseData.read(1048576)
  108. if not chunk:
  109. break
  110. eof, data = parser.feed_data(chunk)
  111. # data can only not be None if eof is True, so there is no need to actually do anything about it
  112. if eof:
  113. break
  114. if not eof:
  115. parser.feed_eof()
  116. if payload.exc:
  117. raise Exception from payload.exc
  118. return payload.data.getvalue()
  119. async def read(self):
  120. #FIXME: Uses internal aiohttp attribute _content
  121. if self._content is None:
  122. self._content = await self._read()
  123. return self._content
  124. async def release(self):
  125. if not self.closed:
  126. self.connection.reset_raw_data()
  127. await super().release()
  128. class Payload:
  129. # A class implementing the minimal subset used by the HttpPayloadParser to retrieve the data
  130. def __init__(self):
  131. self.data = io.BytesIO()
  132. self.exc = None
  133. def feed_data(self, data, size):
  134. self.data.write(data)
  135. def feed_eof(self):
  136. pass
  137. def set_exception(self, exc):
  138. self.exc = exc
  139. def begin_http_chunk_receiving(self):
  140. pass
  141. def end_http_chunk_receiving(self):
  142. pass