A framework for quick web archiving
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
9.8 KiB

  1. import fcntl
  2. import gzip
  3. import io
  4. import itertools
  5. import json
  6. import logging
  7. import os
  8. import qwarc.utils
  9. import tempfile
  10. import time
  11. import warcio
  12. class WARC:
  13. def __init__(self, prefix, maxFileSize, dedupe, command, specFile, specDependencies, logFilename):
  14. '''
  15. Initialise the WARC writer
  16. prefix: str, path prefix for WARCs; a dash, a five-digit number, and ".warc.gz" will be appended.
  17. maxFileSize: int, maximum size of an individual WARC. Use 0 to disable splitting.
  18. dedupe: bool, whether to enable record deduplication
  19. command: list, the command line call for qwarc
  20. specFile: str, path to the spec file
  21. specDependencies: qwarc.utils.SpecDependencies
  22. logFilename: str, name of the log file written by this process
  23. '''
  24. self._prefix = prefix
  25. self._counter = 0
  26. self._maxFileSize = maxFileSize
  27. self._closed = True
  28. self._file = None
  29. self._journalFile = None
  30. self._journalClean = None
  31. self._warcWriter = None
  32. self._dedupe = dedupe
  33. self._dedupeMap = {}
  34. self._command = command
  35. self._specFile = specFile
  36. self._specDependencies = specDependencies
  37. self._logFilename = logFilename
  38. self._metaWarcinfoRecordID = None
  39. self._write_meta_warc(self._write_initial_meta_records)
  40. def _ensure_opened(self):
  41. '''Open the next file that doesn't exist yet if there is currently no file opened'''
  42. if not self._closed:
  43. return
  44. while True:
  45. filename = f'{self._prefix}-{self._counter:05d}.warc.gz'
  46. try:
  47. # Try to open the file for writing, requiring that it does not exist yet, and attempt to get an exclusive, non-blocking lock on it
  48. self._file = open(filename, 'xb')
  49. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  50. except FileExistsError:
  51. logging.info(f'{filename} already exists, skipping')
  52. self._counter += 1
  53. else:
  54. break
  55. logging.info(f'Opened {filename}')
  56. self._open_journal(filename)
  57. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  58. self._closed = False
  59. self._counter += 1
  60. def _open_journal(self, filename):
  61. try:
  62. self._journalFile = open(f'{filename}.qwarcjournal', 'xb')
  63. fcntl.flock(self._journalFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  64. except FileExistsError:
  65. logging.error(f'{filename}.qwarcjournal already exists!')
  66. raise RuntimeError(f'Unable to create journal file for {filename}: {filename}.qwarcjournal already exists')
  67. except OSError as e:
  68. if e.errno == errno.EWOULDBLOCK:
  69. logging.error(f'{filename}.qwarcjournal is already locked!')
  70. raise RuntimeError(f'Unable to lock journal file {filename}.qwarcjournal') from e
  71. raise
  72. self._journalClean = True
  73. def _write_record(self, record):
  74. # Write the current offset to the journal file
  75. # Since the size can only grow, it is not necessary to explicitly delete the previous contents.
  76. self._journalFile.seek(0)
  77. previousSize = self._file.tell()
  78. self._journalFile.write(f'qwarc journal version: 1\noffset: {previousSize}\nwrite ok: no \n'.encode('ascii'))
  79. self._journalFile.flush()
  80. self._journalClean = False
  81. try:
  82. self._warcWriter.write_record(record)
  83. except (OSError, IOError):
  84. self._file.truncate(previousSize)
  85. raise
  86. else:
  87. # Mark the write as ok
  88. self._journalFile.seek(-4, os.SEEK_END) # len(b'no \n')
  89. self._journalFile.write(b'yes\n')
  90. self._journalFile.flush()
  91. self._journalClean = True
  92. def _write_warcinfo_record(self):
  93. data = {
  94. 'software': qwarc.utils.get_software_info(self._specDependencies.packages),
  95. 'command': self._command,
  96. 'files': {
  97. 'spec': self._specFile,
  98. 'spec-dependencies': self._specDependencies.files
  99. },
  100. 'extra': self._specDependencies.extra,
  101. }
  102. payload = io.BytesIO(json.dumps(data, indent = 2).encode('utf-8'))
  103. # Workaround for https://github.com/webrecorder/warcio/issues/87
  104. digester = warcio.utils.Digester('sha1')
  105. digester.update(payload.getvalue())
  106. record = self._warcWriter.create_warc_record(
  107. None,
  108. 'warcinfo',
  109. payload = payload,
  110. warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)},
  111. length = len(payload.getvalue()),
  112. )
  113. self._write_record(record)
  114. return record.rec_headers.get_header('WARC-Record-ID')
  115. def write_client_response(self, response):
  116. '''
  117. Write the requests and responses stored in a ClientResponse instance to the currently opened WARC.
  118. A new WARC will be started automatically if the size of the current file exceeds the limit after writing all requests and responses from this `response` to the current WARC.
  119. '''
  120. self._ensure_opened()
  121. for r in response.iter_all():
  122. usec = f'{(r.rawRequestTimestamp - int(r.rawRequestTimestamp)):.6f}'[2:]
  123. requestDate = time.strftime(f'%Y-%m-%dT%H:%M:%S.{usec}Z', time.gmtime(r.rawRequestTimestamp))
  124. r.rawRequestData.seek(0, io.SEEK_END)
  125. length = r.rawRequestData.tell()
  126. r.rawRequestData.seek(0)
  127. requestRecord = self._warcWriter.create_warc_record(
  128. str(r.url),
  129. 'request',
  130. payload = r.rawRequestData,
  131. length = length,
  132. warc_headers_dict = {
  133. 'WARC-Date': requestDate,
  134. 'WARC-IP-Address': r.remoteAddress[0],
  135. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  136. }
  137. )
  138. requestRecordID = requestRecord.rec_headers.get_header('WARC-Record-ID')
  139. r.rawResponseData.seek(0, io.SEEK_END)
  140. length = r.rawResponseData.tell()
  141. r.rawResponseData.seek(0)
  142. responseRecord = self._warcWriter.create_warc_record(
  143. str(r.url),
  144. 'response',
  145. payload = r.rawResponseData,
  146. length = length,
  147. warc_headers_dict = {
  148. 'WARC-Date': requestDate,
  149. 'WARC-IP-Address': r.remoteAddress[0],
  150. 'WARC-Concurrent-To': requestRecordID,
  151. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  152. }
  153. )
  154. payloadDigest = responseRecord.rec_headers.get_header('WARC-Payload-Digest')
  155. assert payloadDigest is not None
  156. if self._dedupe and responseRecord.payload_length > 100: # Don't deduplicate small responses; the additional headers are typically larger than the payload dedupe savings...
  157. if payloadDigest in self._dedupeMap:
  158. refersToRecordId, refersToUri, refersToDate = self._dedupeMap[payloadDigest]
  159. responseHttpHeaders = responseRecord.http_headers
  160. responseRecord = self._warcWriter.create_revisit_record(
  161. str(r.url),
  162. digest = payloadDigest,
  163. refers_to_uri = refersToUri,
  164. refers_to_date = refersToDate,
  165. http_headers = responseHttpHeaders,
  166. warc_headers_dict = {
  167. 'WARC-Date': requestDate,
  168. 'WARC-IP-Address': r.remoteAddress[0],
  169. 'WARC-Concurrent-To': requestRecordID,
  170. 'WARC-Refers-To': refersToRecordId,
  171. 'WARC-Truncated': 'length',
  172. 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID,
  173. }
  174. )
  175. # Workaround for https://github.com/webrecorder/warcio/issues/94
  176. responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest')
  177. else:
  178. self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate)
  179. self._write_record(requestRecord)
  180. self._write_record(responseRecord)
  181. if self._maxFileSize and self._file.tell() > self._maxFileSize:
  182. self._close_file()
  183. def _write_resource_records(self):
  184. '''Write spec file and dependencies'''
  185. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  186. for type_, contentType, fn in itertools.chain((('specfile', 'application/x-python', self._specFile),), map(lambda x: ('spec-dependency-file', 'application/octet-stream', x), self._specDependencies.files)):
  187. with open(fn, 'rb') as f:
  188. f.seek(0, io.SEEK_END)
  189. length = f.tell()
  190. f.seek(0)
  191. record = self._warcWriter.create_warc_record(
  192. f'file://{fn}',
  193. 'resource',
  194. payload = f,
  195. length = length,
  196. warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType},
  197. )
  198. self._write_record(record)
  199. def _write_initial_meta_records(self):
  200. self._metaWarcinfoRecordID = self._write_warcinfo_record()
  201. self._write_resource_records()
  202. def _write_log_record(self):
  203. assert self._metaWarcinfoRecordID is not None, 'write_warcinfo_record must be called first'
  204. rootLogger = logging.getLogger()
  205. for handler in rootLogger.handlers: #FIXME: Uses undocumented attribute handlers
  206. handler.flush()
  207. with open(self._logFilename, 'rb') as fp:
  208. fp.seek(0, io.SEEK_END)
  209. length = fp.tell()
  210. fp.seek(0)
  211. # Work around https://github.com/webrecorder/warcio/issues/90
  212. payload = qwarc.utils.FrozenFileView(fp, 0, length)
  213. record = self._warcWriter.create_warc_record(
  214. f'file://{self._logFilename}',
  215. 'resource',
  216. payload = payload,
  217. length = length,
  218. warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID},
  219. )
  220. self._write_record(record)
  221. def _close_file(self):
  222. '''Close the currently opened WARC'''
  223. if not self._closed:
  224. self._file.close()
  225. journalFilename = self._journalFile.name
  226. self._journalFile.close()
  227. if self._journalClean:
  228. os.remove(journalFilename)
  229. self._warcWriter = None
  230. self._file = None
  231. self._journalFile = None
  232. self._journalClean = None
  233. self._closed = True
  234. def _write_meta_warc(self, callback):
  235. filename = f'{self._prefix}-meta.warc.gz'
  236. self._file = open(filename, 'ab')
  237. try:
  238. fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
  239. logging.info(f'Opened {filename}')
  240. self._open_journal(filename)
  241. self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1')
  242. self._closed = False
  243. callback()
  244. finally:
  245. self._close_file()
  246. def close(self):
  247. '''Clean up everything.'''
  248. self._close_file()
  249. self._write_meta_warc(self._write_log_record)