import asyncio import itertools import os import qwarc import qwarc.utils responseHandler = qwarc.utils.handle_response_limit_error_retries(5) class LiveChat(qwarc.Item): itemType = 'chat' # itemValue = '{videoId}' @classmethod def generate(cls): yield os.environ['YOUTUBE_VIDEOID'] async def recurse(self, continuation): cont = continuation extra = '' while True: page, _ = await self.fetch(f'https://www.youtube.com/live_chat?continuation={cont.decode("ascii")}{extra}', responseHandler = responseHandler) if not page or page.status != 200: self.logger.error(f'Could not fetch continuation {cont!r}') break pageContents = await page.read() invalidationContDataPos = pageContents.find(b'"invalidationContinuationData":') if invalidationContDataPos < 0: self.logger.error(f'Could not find continuation data on continuation {cont!r}') break openParens = 0 for pos in itertools.count(start = invalidationContDataPos + 31): char = pageContents[pos:pos+1] if char in (b'{', b'['): openParens += 1 elif char in (b'}', b']'): openParens -= 1 if openParens == 0: break contBlock = pageContents[invalidationContDataPos + 31 : pos] if not contBlock: break timeout = qwarc.utils.str_get_between(contBlock, b'"timeoutMs":', b',') if not timeout: self.logger.warning(f'Could not find timeout in {contBlock!r}') break if timeout.lstrip(b'0123456789') != b'': self.logger.warning(f'Invalid timeout value: {timeout!r}') break cont = qwarc.utils.str_get_between(contBlock, b'"continuation":"', b'"') if not cont: break if cont.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'%3D', b'%3D%3D'): self.logger.warning(f'Skipping unexpected cont value: {cont!r}') break extra = '&isInvalidationTimeoutRequest=true&hidden=false&pbj=1' await asyncio.sleep(int(timeout) / 1000) async def process(self): response, _ = await self.fetch(f'https://www.youtube.com/watch?v={self.itemValue}&disable_polymer=1', responseHandler = responseHandler) if not response or response.status != 200: self.logger.error('Could not fetch video page') return contents = await response.read() conversationBarPos = contents.find(b'\\"conversationBar\\":{') if conversationBarPos < 0: self.logger.error('Could not find conversation bar') return # No regerts openParens = 0 for pos in itertools.count(start = conversationBarPos + 20): char = contents[pos:pos+1] if char in (b'{', b'['): openParens += 1 elif char in (b'}', b']'): openParens -= 1 if openParens == 0: break conversationBar = contents[conversationBarPos + 20 : pos] tasks = [] for continuation in qwarc.utils.str_get_all_between(conversationBar, b'\\"continuation\\":\\"', b'\\"'): if not continuation or continuation.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'%3D', b'%3D%3D'): self.logger.warning('Skipping unexpected continuation value: {continuation!r}') continue tasks.append(asyncio.ensure_future(self.recurse(continuation))) await asyncio.gather(*tasks) specDependencies = qwarc.utils.SpecDependencies(extra = (('videoId', os.environ['YOUTUBE_VIDEOID']),))