|
- import collections
- import itertools
- import os
- import qwarc
- import qwarc.utils
-
-
- responseHandler = qwarc.utils.handle_response_limit_error_retries(5)
-
-
- class Comments(qwarc.Item):
- itemType = 'comments'
- # itemValue = '{videoId}'
-
- @classmethod
- def generate(cls):
- yield os.environ['YOUTUBE_VIDEOID']
-
- def get_json_obj_from_pos(self, content, startPos):
- # Given a startPos in content, extracts the content until braces or brackets are matching (requiring at least one set of parentheses)
- openParens = None
- for pos in itertools.count(start = startPos):
- char = content[pos:pos+1]
- if char in (b'{', b'['):
- if openParens is None: # First {[ in the string
- openParens = 0
- openParens += 1
- elif char in (b'}', b']'):
- openParens -= 1
- if openParens == 0:
- break
- return content[startPos:pos]
-
- def get_continuation_parameters(self, content):
- continuationToken = qwarc.utils.str_get_between(content, b'"continuation":"', b'"')
- if continuationToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'):
- self.logger.error(f'Unexpected continuation token value: {continuationToken!r}')
- return
- continuationToken = continuationToken.decode('ascii')
- itct = qwarc.utils.str_get_between(content, b'"clickTrackingParams":"', b'"')
- if itct.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'==', b'%3D', b'%3D%3D'):
- self.logger.error(f'Unexpected itct value: {itct!r}')
- return
- itct = itct.decode('ascii')
- return continuationToken, itct
-
- async def process(self):
- videoPageUrl = f'https://www.youtube.com/watch?v={self.itemValue}'
- response, _ = await self.fetch(videoPageUrl, responseHandler = responseHandler)
- if not response or response.status != 200:
- self.logger.error('Could not fetch video page')
- return
- content = await response.read()
-
- sessionToken = qwarc.utils.str_get_between(content, b'"XSRF_TOKEN":"', b'"')
- if not sessionToken:
- self.logger.error('Could not find session token')
- return
- if sessionToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'=='):
- self.logger.error(f'Unexpected session token value: {sessionToken!r}')
- return
- sessionToken = sessionToken.decode('ascii')
-
- sectionIdentifierPos = content.find(b'"comment-item-section"')
- if sectionIdentifierPos < 0:
- self.logger.error('Could not find comment section identifier')
- return
- continuationStartPos = content.rfind(b'"continuation":', 0, sectionIdentifierPos)
- if continuationStartPos < 0:
- self.logger.error('Could not find continuation start position')
- return
- section = content[continuationStartPos:sectionIdentifierPos]
- continuationToken = qwarc.utils.str_get_between(section, b'"continuation":"', b'"')
- if continuationToken.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'=='):
- self.logger.error(f'Unexpected continuation token value: {continuationToken!r}')
- return
- continuationToken = continuationToken.decode('ascii')
- itct = qwarc.utils.str_get_between(section, b'"clickTrackingParams":"', b'"')
- if itct.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'=', b'=='):
- self.logger.error(f'Unexpected itct value: {itct!r}')
- return
- itct = itct.decode('ascii')
-
- queue = collections.deque() # of (continuationToken, itct, nested, initial) where nested indicates that it's a comment's replies ("View N replies" or "Show more comments")
- queue.append((continuationToken, itct, False))
- first = True
- while queue:
- continuationToken, itct, nested = queue.popleft()
- response, _ = await self.fetch(
- f'https://www.youtube.com/comment_service_ajax?action_get_{"comments" if not nested else "comment_replies"}=1&pbj=1&ctoken={continuationToken}&continuation={continuationToken}&itct={itct}',
- method = 'POST',
- data = {'session_token': sessionToken},
- headers = [('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20191212.06.02'), ('X-SPF-Referer', videoPageUrl), ('X-SPF-Previous', videoPageUrl)],
- responseHandler = responseHandler,
- )
- if not response or response.status != 200 or (await response.read(16) == b'{"reload":"now"}'):
- self.logger.error('Error fetching comments, skipping')
- continue
- content = await response.read()
- # Yes, the response is JSON and I could parse that into an object, but where's the fun in that?
-
- for continuationsPos in qwarc.utils.find_all(content, b'"continuations":'):
- continuations = self.get_json_obj_from_pos(content, continuationsPos)
- subContinuationToken, subItct = self.get_continuation_parameters(continuations)
- queue.append((subContinuationToken, subItct, b'"label":' in continuations))
-
- if first:
- sortMenuPos = content.find(b'"sortMenu":')
- if sortMenuPos < 0:
- self.logger.error('Could not find sort menu')
- else:
- sortMenu = self.get_json_obj_from_pos(content, sortMenuPos)
- for continuationPos in qwarc.utils.find_all(sortMenu, b'"continuation":{'):
- continuation = self.get_json_obj_from_pos(sortMenu, continuationPos)
- subContinuationToken, subItct = self.get_continuation_parameters(continuation)
- queue.append((subContinuationToken, subItct, False))
-
- first = False
-
- specDependencies = qwarc.utils.SpecDependencies(extra = (('videoId', os.environ['YOUTUBE_VIDEOID']),))
|