import collections import itertools import os import qwarc import qwarc.utils responseHandler = qwarc.utils.handle_response_limit_error_retries(5) class Comments(qwarc.Item): itemType = 'comments' # itemValue = '{videoId}' @classmethod def generate(cls): yield os.environ['YOUTUBE_VIDEOID'] def get_json_obj_from_pos(self, content, startPos): # Given a startPos in content, extracts the content until braces or brackets are matching (requiring at least one set of parentheses) openParens = None for pos in itertools.count(start = startPos): char = content[pos:pos+1] if char in (b'{', b'['): if openParens is None: # First {[ in the string openParens = 0 openParens += 1 elif char in (b'}', b']'): openParens -= 1 if openParens == 0: break return content[startPos:pos] def get_continuation_parameters(self, content): continuationToken = qwarc.utils.str_get_between(content, b'"continuation":"', b'"') if continuationToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected continuation token value: {continuationToken!r}') return continuationToken = continuationToken.decode('ascii') itct = qwarc.utils.str_get_between(content, b'"clickTrackingParams":"', b'"') if itct.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected itct value: {itct!r}') return itct = itct.decode('ascii') return continuationToken, itct async def process(self): videoPageUrl = f'https://www.youtube.com/watch?v={self.itemValue}' response, _ = await self.fetch(videoPageUrl, responseHandler = responseHandler) if not response or response.status != 200: self.logger.error('Could not fetch video page') return content = await response.read() sessionToken = qwarc.utils.str_get_between(content, b'"XSRF_TOKEN":"', b'"') if not sessionToken: self.logger.error('Could not find session token') return if sessionToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected session token value: {sessionToken!r}') return sessionToken = sessionToken.decode('ascii') sectionIdentifierPos = content.find(b'"comment-item-section"') if sectionIdentifierPos < 0: self.logger.error('Could not find comment section identifier') return continuationStartPos = content.rfind(b'"continuation":', 0, sectionIdentifierPos) if continuationStartPos < 0: self.logger.error('Could not find continuation start position') return section = content[continuationStartPos:sectionIdentifierPos] continuationToken = qwarc.utils.str_get_between(section, b'"continuation":"', b'"') if continuationToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected continuation token value: {continuationToken!r}') return continuationToken = continuationToken.decode('ascii') itct = qwarc.utils.str_get_between(section, b'"clickTrackingParams":"', b'"') if itct.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected itct value: {itct!r}') return itct = itct.decode('ascii') queue = collections.deque() # of (continuationToken, itct, nested, initial) where nested indicates that it's a comment's replies ("View N replies" or "Show more comments") queue.append((continuationToken, itct, False)) first = True while queue: continuationToken, itct, nested = queue.popleft() response, _ = await self.fetch( f'https://www.youtube.com/comment_service_ajax?action_get_{"comments" if not nested else "comment_replies"}=1&pbj=1&ctoken={continuationToken}&continuation={continuationToken}&itct={itct}', method = 'POST', data = {'session_token': sessionToken}, headers = [('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20191212.06.02'), ('X-SPF-Referer', videoPageUrl), ('X-SPF-Previous', videoPageUrl)], responseHandler = responseHandler, ) if not response or response.status != 200 or (await response.read(16) == b'{"reload":"now"}'): self.logger.error('Error fetching comments, skipping') continue content = await response.read() # Yes, the response is JSON and I could parse that into an object, but where's the fun in that? for continuationsPos in qwarc.utils.find_all(content, b'"continuations":'): continuations = self.get_json_obj_from_pos(content, continuationsPos) subContinuationToken, subItct = self.get_continuation_parameters(continuations) queue.append((subContinuationToken, subItct, b'"label":' in continuations)) if first: sortMenuPos = content.find(b'"sortMenu":') if sortMenuPos < 0: self.logger.error('Could not find sort menu') else: sortMenu = self.get_json_obj_from_pos(content, sortMenuPos) for continuationPos in qwarc.utils.find_all(sortMenu, b'"continuation":{'): continuation = self.get_json_obj_from_pos(sortMenu, continuationPos) subContinuationToken, subItct = self.get_continuation_parameters(continuation) queue.append((subContinuationToken, subItct, False)) first = False specDependencies = qwarc.utils.SpecDependencies(extra = (('videoId', os.environ['YOUTUBE_VIDEOID']),))