A framework for quick web archiving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
11 KiB

  1. import fcntl
  2. import gzip
  3. import io
  4. import itertools
  5. import json
  6. import logging
  7. import os
  8. import qwarc.utils
  9. import tempfile
  10. import time
  11. import warcio
  12. logger = logging.getLogger(__name__)
  13. class WARC:
  14. def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
  15. '''
  16. Initialise the WARC writer
  17. prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
  18. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
  19. dedupe: bool, whether to enable record deduplication
  20. command: list, the command line call for qwarc
  21. specFile: str, path to the spec file
  22. specDependencies: qwarc.utils.SpecDependencies
  23. logFilename: str, name of the log file written by this process
  24. '''
  25. self._prefix = prefix
  26. self._counter = 0
  27. self._maxFileSize = maxFileSize
  28. self._closed = True
  29. self._file = None
  30. self._journalFile = None
  31. self._journalClean = None
  32. self._warcWriter = None
  33. self._dedupe = dedupe
  34. self._dedupeMap = {}
  35. self._command = command
  36. self._specFile = specFile
  37. self._specDependencies = specDependencies
  38. self._logFilename = logFilename
  39. self._metaWarcinfoRecordID = None
  40. self._write_meta_warc(self._write_initial_meta_records)
  41. def _ensure_opened(self):
  42. '''Open the next file that doesn't exist yet if there is currently no file opened'''
  43. if not self._closed:
  44. return
  45. while True:
  46. filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
  47. logger.debug(f'Trying to open {filename}')
  48. try:
  49. # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
  50. self._file = open(filename, 'xb')
  51. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  52. except FileExistsError:
  53. logger.debug(f'{filename} already exists, skipping')
  54. self._counter += 1
  55. else:
  56. break
  57. logger.info(f'Opened {filename}')
  58. self._open_journal(filename)
  59. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  60. self._closed = False
  61. self._counter += 1
  62. def _open_journal(self, filename):
  63. logger.debug(f'Trying to open {filename}.qwarcjournal')
  64. try:
  65. self._journalFile = open(f'{filename}.qwarcjournal', 'xb')
  66. fcntl.flock(self._journalFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  67. except FileExistsError:
  68. logger.error(f'{filename}.qwarcjournal already exists!')
  69. raise RuntimeError(f'Unable to create journal file for {filename}: {filename}.qwarcjournal already exists')
  70. except OSError as e:
  71. if e.errno == errno.EWOULDBLOCK:
  72. logger.error(f'{filename}.qwarcjournal is already locked!')
  73. raise RuntimeError(f'Unable to lock journal file {filename}.qwarcjournal') from e
  74. raise
  75. logger.debug(f'Opened {filename}.qwarcjournal')
  76. self._journalClean = True
  77. def _write_record(self, record):
  78. logger.debug(f'Preparing to write record {record.rec_headers.get_header("WARC-Record-ID")}')
  79. logger.debug('Writing journal entry')
  80. # Since the size can only grow, it is not necessary to explicitly delete the previous contents.
  81. self._journalFile.seek(0)
  82. previousSize = self._file.tell()
  83. self._journalFile.write(f'qwarc journal version: 1\noffset: {previousSize}\nwrite ok: no \n'.encode('ascii'))
  84. self._journalFile.flush()
  85. self._journalClean = False
  86. logger.debug('Writing record')
  87. try:
  88. self._warcWriter.write_record(record)
  89. except (OSError, IOError):
  90. logger.debug(f'Error occurred, truncating to previous size {previousSize}')
  91. self._file.truncate(previousSize)
  92. raise
  93. else:
  94. logger.debug('Write ok, cleaning journal')
  95. self._journalFile.seek(-4, os.SEEK_END) # len(b'no \n')
  96. self._journalFile.write(b'yes\n')
  97. self._journalFile.flush()
  98. self._journalClean = True
  99. def _write_warcinfo_record(self):
  100. logger.debug('Writing warcinfo record')
  101. data = {
  102. 'software': qwarc.utils.get_software_info(self._specDependencies.packages),
  103. 'command': self._command,
  104. 'files': {
  105. 'spec': self._specFile,
  106. 'spec-dependencies': self._specDependencies.files
  107. },
  108. 'extra': self._specDependencies.extra,
  109. }
  110. payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
  111. # Workaround for https://github.com/webrecorder/warcio/issues/87
  112. logger.debug('Calculating digest')
  113. digester = warcio.utils.Digester('sha1')
  114. digester.update(payload.getvalue())
  115. logger.debug('Creating record object')
  116. record = self._warcWriter.create_warc_record(
  117. None,
  118. 'warcinfo',
  119. payload = payload,
  120. warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
  121. length = len(payload.getvalue()),
  122. )
  123. self._write_record(record)
  124. return record.rec_headers.get_header('WARC-Record-ID')
  125. def write_client_response(self, response):
  126. '''
  127. Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
  128. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
  129. '''
  130. logger.debug(f'Writing response(s) for {response.url}')
  131. self._ensure_opened()
  132. for r in response.iter_all():
  133. usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
  134. requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
  135. logger.debug(f'Writing request/response for {requestDate} {r.url}')
  136. logger.debug('Constructing request record')
  137. r.rawRequestData.seek(0, io.SEEK_END)
  138. length = r.rawRequestData.tell()
  139. r.rawRequestData.seek(0)
  140. requestRecord = self._warcWriter.create_warc_record(
  141. str(r.url),
  142. 'request',
  143. payload = r.rawRequestData,
  144. length = length,
  145. warc_headers_dict = {
  146. 'WARC-Date': requestDate,
  147. 'WARC-IP-Address': r.remoteAddress[0],
  148. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  149. }
  150. )
  151. requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
  152. logger.debug('Constructing response record')
  153. r.rawResponseData.seek(0, io.SEEK_END)
  154. length = r.rawResponseData.tell()
  155. r.rawResponseData.seek(0)
  156. responseRecord = self._warcWriter.create_warc_record(
  157. str(r.url),
  158. 'response',
  159. payload = r.rawResponseData,
  160. length = length,
  161. warc_headers_dict = {
  162. 'WARC-Date': requestDate,
  163. 'WARC-IP-Address': r.remoteAddress[0],
  164. 'WARC-Concurrent-To': requestRecordID,
  165. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  166. }
  167. )
  168. payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
  169. assert payloadDigest is not None
  170. if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
  171. logger.debug(f'Checking for duplicates with {payloadDigest}')
  172. if payloadDigest in self._dedupeMap:
  173. refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
  174. logger.debug('Record is a duplicate, creating revisit record to {refersToRecordId} ({refersToDate} {refersToUri})')
  175. responseHttpHeaders = responseRecord.http_headers
  176. responseRecord = self._warcWriter.create_revisit_record(
  177. str(r.url),
  178. digest = payloadDigest,
  179. refers_to_uri = refersToUri,
  180. refers_to_date = refersToDate,
  181. http_headers = responseHttpHeaders,
  182. warc_headers_dict = {
  183. 'WARC-Date': requestDate,
  184. 'WARC-IP-Address': r.remoteAddress[0],
  185. 'WARC-Concurrent-To': requestRecordID,
  186. 'WARC-Refers-To': refersToRecordId,
  187. 'WARC-Truncated': 'length',
  188. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  189. }
  190. )
  191. # Workaround for https://github.com/webrecorder/warcio/issues/94
  192. responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
  193. else:
  194. logger.debug('New record, creating dedupe map entry')
  195. self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
  196. logger.debug('Writing request and response records')
  197. self._write_record(requestRecord)
  198. self._write_record(responseRecord)
  199. logger.debug(f'Done writing responses for {response.url}')
  200. if self._maxFileSize and self._file.tell() > self._maxFileSize:
  201. logger.debug('File exceeds max size, closing')
  202. self._close_file()
  203. def _write_resource_records(self):
  204. '''Write spec file and dependencies'''
  205. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  206. logger.debug('Writing resource records')
  207. for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
  208. logger.debug(f'Writing resource record for {fn}')
  209. with open(fn, 'rb') as f:
  210. f.seek(0, io.SEEK_END)
  211. length = f.tell()
  212. f.seek(0)
  213. record = self._warcWriter.create_warc_record(
  214. f'file://{fn}',
  215. 'resource',
  216. payload = f,
  217. length = length,
  218. warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
  219. )
  220. self._write_record(record)
  221. def _write_initial_meta_records(self):
  222. self._metaWarcinfoRecordID = self._write_warcinfo_record()
  223. self._write_resource_records()
  224. def _write_log_record(self):
  225. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  226. logger.debug(f'Writing log record')
  227. rootLogger = logging.getLogger()
  228. for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
  229. handler.flush()
  230. with open(self._logFilename, 'rb') as fp:
  231. fp.seek(0, io.SEEK_END)
  232. length = fp.tell()
  233. fp.seek(0)
  234. # Work around https://github.com/webrecorder/warcio/issues/90
  235. payload = qwarc.utils.FrozenFileView(fp, 0, length)
  236. record = self._warcWriter.create_warc_record(
  237. f'file://{self._logFilename}',
  238. 'resource',
  239. payload = payload,
  240. length = length,
  241. warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
  242. )
  243. self._write_record(record)
  244. def _close_file(self):
  245. '''Close the currently opened WARC'''
  246. if not self._closed:
  247. logger.debug('Closing WARC file')
  248. self._file.close()
  249. journalFilename = self._journalFile.name
  250. self._journalFile.close()
  251. if self._journalClean:
  252. logger.debug(f'Deleting journal {journalFilename}')
  253. os.remove(journalFilename)
  254. self._warcWriter = None
  255. self._file = None
  256. self._journalFile = None
  257. self._journalClean = None
  258. self._closed = True
  259. def _write_meta_warc(self, callback):
  260. filename = f'{self._prefix}-meta.warc.gz'
  261. logger.debug(f'Opening {filename}')
  262. self._file = open(filename, 'ab')
  263. try:
  264. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
  265. logger.info(f'Opened {filename}')
  266. self._open_journal(filename)
  267. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  268. self._closed = False
  269. callback()
  270. finally:
  271. self._close_file()
  272. def close(self):
  273. '''Clean up everything.'''
  274. self._close_file()
  275. self._write_meta_warc(self._write_log_record)