From c7fac0ec3f3313b3bf170e4ba04f0d8dcd6261d7 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Mon, 13 Jul 2020 05:22:29 +0000 Subject: [PATCH] Add WARC journalling with rollback on errors Inspired by the implementation in wpull but structured differently to avoid reopening the journal file constantly. --- qwarc/warc.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/qwarc/warc.py b/qwarc/warc.py index 8e292c2..1aa1ec3 100644 --- a/qwarc/warc.py +++ b/qwarc/warc.py @@ -31,6 +31,8 @@ class WARC: self._closed = True self._file = None + self._journalFile = None + self._journalClean = None self._warcWriter = None self._dedupe = dedupe @@ -62,10 +64,45 @@ class WARC: else: break logging.info(f'Opened {filename}') + self._open_journal(filename) self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False self._counter += 1 + def _open_journal(self, filename): + try: + self._journalFile = open(f'{filename}.qwarcjournal', 'xb') + fcntl.flock(self._journalFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except FileExistsError: + logging.error(f'{filename}.qwarcjournal already exists!') + raise RuntimeError(f'Unable to create journal file for {filename}: {filename}.qwarcjournal already exists') + except OSError as e: + if e.errno == errno.EWOULDBLOCK: + logging.error(f'{filename}.qwarcjournal is already locked!') + raise RuntimeError(f'Unable to lock journal file {filename}.qwarcjournal') + self._journalClean = True + + def _write_record(self, record): + # Write the current offset to the journal file + # Since the size can only grow, it is not necessary to explicitly delete the previous contents. + self._journalFile.seek(0) + previousSize = self._file.tell() + self._journalFile.write(f'qwarc journal version: 1\noffset: {previousSize}\nwrite ok: no \n'.encode('ascii')) + self._journalFile.flush() + self._journalClean = False + + try: + self._warcWriter.write_record(record) + except (OSError, IOError): + self._file.truncate(previousSize) + raise + else: + # Mark the write as ok + self._journalFile.seek(-4, os.SEEK_END) # len(b'no \n') + self._journalFile.write(b'yes\n') + self._journalFile.flush() + self._journalClean = True + def _write_warcinfo_record(self): data = { 'software': qwarc.utils.get_software_info(self._specDependencies.packages), @@ -87,7 +124,7 @@ class WARC: warc_headers_dict = {'Content-Type': 'application/json; charset=utf-8', 'WARC-Block-Digest': str(digester)}, length = len(payload.getvalue()), ) - self._warcWriter.write_record(record) + self._write_record(record) return record.rec_headers.get_header('WARC-Record-ID') def write_client_response(self, response): @@ -155,8 +192,8 @@ class WARC: responseRecord.rec_headers.replace_header('WARC-Profile', 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest') else: self._dedupeMap[payloadDigest] = (responseRecord.rec_headers.get_header('WARC-Record-ID'), str(r.url), requestDate) - self._warcWriter.write_record(requestRecord) - self._warcWriter.write_record(responseRecord) + self._write_record(requestRecord) + self._write_record(responseRecord) if self._maxFileSize and self._file.tell() > self._maxFileSize: self._close_file() @@ -177,7 +214,7 @@ class WARC: length = length, warc_headers_dict = {'X-QWARC-Type': type_, 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID, 'Content-Type': contentType}, ) - self._warcWriter.write_record(record) + self._write_record(record) def _write_initial_meta_records(self): self._metaWarcinfoRecordID = self._write_warcinfo_record() @@ -200,15 +237,21 @@ class WARC: length = length, warc_headers_dict = {'X-QWARC-Type': 'log', 'Content-Type': 'text/plain; charset=utf-8', 'WARC-Warcinfo-ID': self._metaWarcinfoRecordID}, ) - self._warcWriter.write_record(record) + self._write_record(record) def _close_file(self): '''Close the currently opened WARC''' if not self._closed: self._file.close() + journalFilename = self._journalFile.name + self._journalFile.close() + if self._journalClean: + os.remove(journalFilename) self._warcWriter = None self._file = None + self._journalFile = None + self._journalClean = None self._closed = True def _write_meta_warc(self, callback): @@ -218,6 +261,7 @@ class WARC: try: fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) logging.info(f'Opened {filename}') + self._open_journal(filename) self._warcWriter = warcio.warcwriter.WARCWriter(self._file, gzip = True, warc_version = '1.1') self._closed = False