From 9d8de13775ac51a5a1dce15f5d0e1c9948734d95 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Fri, 6 Sep 2019 14:56:11 +0000 Subject: [PATCH] Add Item.flush_subitems to flush the new subitems to the database while the item is still being processed This also renames add_item to add_subitem for clarity. --- qwarc/__init__.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/qwarc/__init__.py b/qwarc/__init__.py index b568d59..50b42aa 100644 --- a/qwarc/__init__.py +++ b/qwarc/__init__.py @@ -21,7 +21,8 @@ import yarl class Item: itemType = None - def __init__(self, itemValue, session, headers, warc): + def __init__(self, qwarcObj, itemValue, session, headers, warc): + self.qwarcObj = qwarcObj self.itemValue = itemValue self.session = session self.headers = headers @@ -120,7 +121,7 @@ class Item: for x in cls.generate(): yield (cls.itemType, x, STATUS_TODO) - def add_item(self, itemClassOrType, itemValue): + def add_subitem(self, itemClassOrType, itemValue): if issubclass(itemClassOrType, Item): item = (itemClassOrType.itemType, itemValue) else: @@ -128,6 +129,12 @@ class Item: if item not in self.childItems: self.childItems.append(item) + async def flush_subitems(self): + await self.qwarcObj.flush_subitems(self) + + def clear_subitems(self): + self.childItems = [] + class QWARC: def __init__(self, itemClasses, warcBasePath, dbPath, command, specFile, specDependencies, logFilename, concurrency = 1, memoryLimit = 0, minFreeDisk = 0, warcSizeLimit = 0, warcDedupe = False): @@ -187,7 +194,7 @@ class QWARC: itemClass = self._itemTypeMap[itemType] except KeyError: raise RuntimeError(f'No such item type: {itemType!r}') - return itemClass(itemValue, session, headers, self._warc) + return itemClass(self, itemValue, session, headers, self._warc) async def _wait_for_free_task(self): if not self._tasks: @@ -339,6 +346,10 @@ class QWARC: self._reset_working_vars() + async def flush_subitems(self, item): + await self._insert_subitems(item) + item.clear_subitems() + def create_db(self): db = sqlite3.connect(self._dbPath, timeout = 1) db.execute('PRAGMA synchronous = OFF')