import collections import itertools import os import qwarc import qwarc.utils responseHandler = qwarc.utils.handle_response_limit_error_retries(5) class Comments(qwarc.Item): itemType = 'comments' # itemValue = '{videoId}' @classmethod def generate(cls): yield os.environ['YOUTUBE_VIDEOID'] async def process(self): videoPageUrl = f'https://www.youtube.com/watch?v={self.itemValue}' response, _ = await self.fetch(videoPageUrl, responseHandler = responseHandler) if not response or response.status != 200: self.logger.error('Could not fetch video page') return content = await response.read() sessionToken = qwarc.utils.str_get_between(content, b'"XSRF_TOKEN":"', b'"') if not sessionToken: self.logger.error('Could not find session token') return if sessionToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected session token value: {sessionToken!r}') return sessionToken = sessionToken.decode('ascii') sectionIdentifierPos = content.find(b'"comment-item-section"') if sectionIdentifierPos < 0: self.logger.error('Could not find comment section identifier') return continuationStartPos = content.rfind(b'"continuation":', 0, sectionIdentifierPos) if continuationStartPos < 0: self.logger.error('Could not find continuation start position') return section = content[continuationStartPos:sectionIdentifierPos] continuationToken = qwarc.utils.str_get_between(section, b'"continuation":"', b'"') if continuationToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected continuation token value: {continuationToken!r}') return continuationToken = continuationToken.decode('ascii') itct = qwarc.utils.str_get_between(section, b'"clickTrackingParams":"', b'"') if itct.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'): self.logger.error(f'Unexpected itct value: {itct!r}') return itct = itct.decode('ascii') queue = collections.deque() # of (continuationToken, itct, nested) where nested indicates that it's a comment's replies ("View N replies" or "Show more comments") queue.append((continuationToken, itct, False)) first = True while queue: continuationToken, itct, nested = queue.popleft() response, _ = await self.fetch( f'https://www.youtube.com/comment_service_ajax?action_get_{"comments" if not nested else "comment_replies"}=1&pbj=1&ctoken={continuationToken}&continuation={continuationToken}&itct={itct}', method = 'POST', data = {'session_token': sessionToken}, headers = [('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20191212.06.02'), ('X-SPF-Referer', videoPageUrl), ('X-SPF-Previous', videoPageUrl)], responseHandler = responseHandler, ) if not response or response.status != 200 or (await response.read(16) == b'{"reload":"now"}'): self.logger.error('Error fetching comments, skipping') continue obj = await response.json() if first: sortMenu = obj['response']['continuationContents']['itemSectionContinuation']['header']['commentsHeaderRenderer']['sortMenu'] for subMenuItem in sortMenu['sortFilterSubMenuRenderer']['subMenuItems']: if subMenuItem['title'] != 'Newest first': continue subContinuation = subMenuItem['continuation']['reloadContinuationData'] queue.append((subContinuation['continuation'], subContinuation['clickTrackingParams'], False)) break else: self.logger.error('Could not find newest first sort continuation') first = False else: if not nested: o = obj continuationKey = 'itemSectionContinuation' else: # Of course the data format is different here... for o in obj: if 'response' in o: break continuationKey = 'commentRepliesContinuation' if 'continuationContents' not in o['response']: # Empty response continue for reply in o['response']['continuationContents'][continuationKey]['contents']: if 'commentThreadRenderer' in reply and 'replies' in reply['commentThreadRenderer']: # Nested continuations continuations = reply['commentThreadRenderer']['replies']['commentRepliesRenderer']['continuations'] assert len(continuations) == 1 queue.append((continuations[0]['nextContinuationData']['continuation'], continuations[0]['nextContinuationData']['clickTrackingParams'], True)) if 'continuations' in o['response']['continuationContents'][continuationKey]: assert len(o['response']['continuationContents'][continuationKey]['continuations']) == 1 continuation = o['response']['continuationContents'][continuationKey]['continuations'][0]['nextContinuationData'] queue.append((continuation['continuation'], continuation['clickTrackingParams'], nested)) specDependencies = qwarc.utils.SpecDependencies(extra = (('videoId', os.environ['YOUTUBE_VIDEOID']),))