A method to grab the live chat from YouTube
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
3.2KB

  1. import asyncio
  2. import itertools
  3. import os
  4. import qwarc
  5. import qwarc.utils
  6. responseHandler = qwarc.utils.handle_response_limit_error_retries(5)
  7. class LiveChat(qwarc.Item):
  8. itemType = 'chat'
  9. # itemValue = '{videoId}'
  10. @classmethod
  11. def generate(cls):
  12. yield os.environ['YOUTUBE_VIDEOID']
  13. async def recurse(self, continuation):
  14. cont = continuation
  15. extra = ''
  16. while True:
  17. page, _ = await self.fetch(f'https://www.youtube.com/live_chat?continuation={cont.decode("ascii")}{extra}', responseHandler = responseHandler)
  18. if not page or page.status != 200:
  19. self.logger.error(f'Could not fetch continuation {cont!r}')
  20. break
  21. pageContents = await page.read()
  22. invalidationContDataPos = pageContents.find(b'"invalidationContinuationData":')
  23. if invalidationContDataPos < 0:
  24. self.logger.error(f'Could not find continuation data on continuation {cont!r}')
  25. break
  26. openParens = 0
  27. for pos in itertools.count(start = invalidationContDataPos + 31):
  28. char = pageContents[pos:pos+1]
  29. if char in (b'{', b'['):
  30. openParens += 1
  31. elif char in (b'}', b']'):
  32. openParens -= 1
  33. if openParens == 0:
  34. break
  35. contBlock = pageContents[invalidationContDataPos + 31 : pos]
  36. if not contBlock:
  37. break
  38. timeout = qwarc.utils.str_get_between(contBlock, b'"timeoutMs":', b',')
  39. if not timeout:
  40. self.logger.warning(f'Could not find timeout in {contBlock!r}')
  41. break
  42. if timeout.lstrip(b'0123456789') != b'':
  43. self.logger.warning(f'Invalid timeout value: {timeout!r}')
  44. break
  45. cont = qwarc.utils.str_get_between(contBlock, b'"continuation":"', b'"')
  46. if not cont:
  47. break
  48. if cont.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'%3D', b'%3D%3D'):
  49. self.logger.warning(f'Skipping unexpected cont value: {cont!r}')
  50. break
  51. extra = '&isInvalidationTimeoutRequest=true&hidden=false&pbj=1'
  52. await asyncio.sleep(int(timeout) / 1000)
  53. async def process(self):
  54. response, _ = await self.fetch(f'https://www.youtube.com/watch?v={self.itemValue}&disable_polymer=1', responseHandler = responseHandler)
  55. if not response or response.status != 200:
  56. self.logger.error('Could not fetch video page')
  57. return
  58. contents = await response.read()
  59. conversationBarPos = contents.find(b'\\"conversationBar\\":{')
  60. if conversationBarPos < 0:
  61. self.logger.error('Could not find conversation bar')
  62. return
  63. # No regerts
  64. openParens = 0
  65. for pos in itertools.count(start = conversationBarPos + 20):
  66. char = contents[pos:pos+1]
  67. if char in (b'{', b'['):
  68. openParens += 1
  69. elif char in (b'}', b']'):
  70. openParens -= 1
  71. if openParens == 0:
  72. break
  73. conversationBar = contents[conversationBarPos + 20 : pos]
  74. tasks = []
  75. for continuation in qwarc.utils.str_get_all_between(conversationBar, b'\\"continuation\\":\\"', b'\\"'):
  76. if not continuation or continuation.lstrip(b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-') not in (b'', b'%3D', b'%3D%3D'):
  77. self.logger.warning('Skipping unexpected continuation value: {continuation!r}')
  78. continue
  79. tasks.append(asyncio.ensure_future(self.recurse(continuation)))
  80. await asyncio.gather(*tasks)
  81. specDependencies = qwarc.utils.SpecDependencies(extra = (('videoId', os.environ['YOUTUBE_VIDEOID']),))