A framework for quick web archiving
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

229 рядки
8.0 KiB

  1. import fcntl
  2. import gzip
  3. import io
  4. import itertools
  5. import json
  6. import logging
  7. import os
  8. import qwarc.utils
  9. import tempfile
  10. import time
  11. import warcio
  12. class WARC:
  13. def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
  14. '''
  15. Initialise the WARC writer
  16. prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
  17. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
  18. dedupe: bool, whether to enable record deduplication
  19. command: list, the command line call for qwarc
  20. specFile: str, path to the spec file
  21. specDependencies: qwarc.utils.SpecDependencies
  22. logFilename: str, name of the log file written by this process
  23. '''
  24. self._prefix = prefix
  25. self._counter = 0
  26. self._maxFileSize = maxFileSize
  27. self._closed = True
  28. self._file = None
  29. self._warcWriter = None
  30. self._dedupe = dedupe
  31. self._dedupeMap = {}
  32. self._command = command
  33. self._specFile = specFile
  34. self._specDependencies = specDependencies
  35. self._logFile = None
  36. self._logHandler = None
  37. self._setup_logger()
  38. self._logFilename = logFilename
  39. self._dataWarcinfoRecordID = None
  40. self._metaWarcinfoRecordID = None
  41. self._write_meta_warc(self._write_initial_meta_records)
  42. def _setup_logger(self):
  43. rootLogger = logging.getLogger()
  44. formatter = qwarc.utils.LogFormatter()
  45. self._logFile = tempfile.NamedTemporaryFile(prefix = 'qwarc-warc-', suffix = '.log.gz', delete = False)
  46. self._logHandler = logging.StreamHandler(io.TextIOWrapper(gzip.GzipFile(filename = self._logFile.name, mode = 'wb'), encoding = 'utf-8'))
  47. self._logHandler.setFormatter(formatter)
  48. rootLogger.addHandler(self._logHandler)
  49. self._logHandler.setLevel(logging.INFO)
  50. def _ensure_opened(self):
  51. '''Open the next file that doesn't exist yet if there is currently no file opened'''
  52. if not self._closed:
  53. return
  54. while True:
  55. filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
  56. try:
  57. # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
  58. self._file = open(filename, 'xb')
  59. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  60. except FileExistsError:
  61. logging.info(f'{filename} already exists, skipping')
  62. self._counter += 1
  63. else:
  64. break
  65. logging.info(f'Opened {filename}')
  66. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  67. self._closed = False
  68. self._counter += 1
  69. self._dataWarcinfoRecordID = self._write_warcinfo_record()
  70. def _write_warcinfo_record(self):
  71. data = {
  72. 'software': qwarc.utils.get_software_info(self._specFile, self._specDependencies),
  73. 'command': self._command,
  74. 'files': {
  75. 'spec': self._specFile,
  76. 'spec-dependencies': self._specDependencies.files
  77. },
  78. 'extra': self._specDependencies.extra,
  79. }
  80. record = self._warcWriter.create_warc_record(
  81. 'urn:qwarc:warcinfo',
  82. 'warcinfo',
  83. payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8')),
  84. warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8'},
  85. )
  86. self._warcWriter.write_record(record)
  87. return record.rec_headers.get_header('WARC-Record-ID')
  88. def write_client_response(self, response):
  89. '''
  90. Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
  91. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
  92. '''
  93. self._ensure_opened()
  94. for r in response.iter_all():
  95. usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
  96. requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
  97. requestRecord = self._warcWriter.create_warc_record(
  98. str(r.url),
  99. 'request',
  100. payload = io.BytesIO(r.rawRequestData),
  101. warc_headers_dict = {
  102. 'WARC-Date': requestDate,
  103. 'WARC-IP-Address': r.remoteAddress[0],
  104. 'WARC-Warcinfo-ID': self._dataWarcinfoRecordID,
  105. }
  106. )
  107. requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
  108. responseRecord = self._warcWriter.create_warc_record(
  109. str(r.url),
  110. 'response',
  111. payload = io.BytesIO(r.rawResponseData),
  112. warc_headers_dict = {
  113. 'WARC-Date': requestDate,
  114. 'WARC-IP-Address': r.remoteAddress[0],
  115. 'WARC-Concurrent-To': requestRecordID,
  116. 'WARC-Warcinfo-ID': self._dataWarcinfoRecordID,
  117. }
  118. )
  119. payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
  120. assert payloadDigest is not None
  121. if self._dedupe and responseRecord.payload_length > 0: # Don't "deduplicate" empty responses
  122. if payloadDigest in self._dedupeMap:
  123. refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
  124. responseHttpHeaders = responseRecord.http_headers
  125. responseRecord = self._warcWriter.create_revisit_record(
  126. str(r.url),
  127. digest = payloadDigest,
  128. refers_to_uri = refersToUri,
  129. refers_to_date = refersToDate,
  130. http_headers = responseHttpHeaders,
  131. warc_headers_dict = {
  132. 'WARC-Date': requestDate,
  133. 'WARC-IP-Address': r.remoteAddress[0],
  134. 'WARC-Concurrent-To': requestRecordID,
  135. 'WARC-Refers-To': refersToRecordId,
  136. 'WARC-Truncated': 'length',
  137. 'WARC-Warcinfo-ID': self._dataWarcinfoRecordID,
  138. }
  139. )
  140. else:
  141. self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
  142. self._warcWriter.write_record(requestRecord)
  143. self._warcWriter.write_record(responseRecord)
  144. if self._maxFileSize and self._file.tell() > self._maxFileSize:
  145. self.close()
  146. def _write_resource_records(self):
  147. '''Write spec file and dependencies'''
  148. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  149. for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
  150. with open(fn, 'rb') as f:
  151. record = self._warcWriter.create_warc_record(
  152. f'file://{fn}',
  153. 'resource',
  154. payload = f,
  155. warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
  156. )
  157. self._warcWriter.write_record(record)
  158. def _write_initial_meta_records(self):
  159. self._metaWarcinfoRecordID = self._write_warcinfo_record()
  160. self._write_resource_records()
  161. def _write_log_record(self):
  162. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  163. self._logHandler.flush()
  164. self._logHandler.stream.close()
  165. record = self._warcWriter.create_warc_record(
  166. f'file://{self._logFilename}',
  167. 'resource',
  168. payload = gzip.GzipFile(self._logFile.name),
  169. warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
  170. )
  171. self._warcWriter.write_record(record)
  172. def _close_file(self):
  173. '''Close the currently opened WARC'''
  174. if not self._closed:
  175. self._file.close()
  176. self._warcWriter = None
  177. self._file = None
  178. self._closed = True
  179. def _write_meta_warc(self, callback):
  180. filename = f'{self._prefix}-meta.warc.gz'
  181. #TODO: Handle OSError on fcntl.flock and retry
  182. self._file = open(filename, 'ab')
  183. try:
  184. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
  185. logging.info(f'Opened {filename}')
  186. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  187. self._closed = False
  188. callback()
  189. finally:
  190. self._close_file()
  191. def close(self):
  192. '''Clean up everything.'''
  193. self._close_file()
  194. logging.getLogger().removeHandler(self._logHandler)
  195. self._write_meta_warc(self._write_log_record)
  196. try:
  197. os.remove(self._logFile.name)
  198. except OSError:
  199. logging.error('Could not remove temporary log file')
  200. self._logFile = None
  201. self._logHandler.close()
  202. self._logHandler = None