diff --git a/http2irc.py b/http2irc.py index 03f6ad7..730820c 100644 --- a/http2irc.py +++ b/http2irc.py @@ -329,8 +329,9 @@ class MessageQueue: class IRCClientProtocol(asyncio.Protocol): logger = logging.getLogger('http2irc.IRCClientProtocol') - def __init__(self, http2ircMessageQueue, connectionClosedEvent, loop, config, channels): + def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, connectionClosedEvent, loop, config, channels): self.http2ircMessageQueue = http2ircMessageQueue + self.irc2httpBroadcaster = irc2httpBroadcaster self.connectionClosedEvent = connectionClosedEvent self.loop = loop self.config = config @@ -429,6 +430,18 @@ class IRCClientProtocol(asyncio.Protocol): self.logger.debug(f'Send: {data!r}') time_ = time.time() self.transport.write(data + b'\r\n') + if data.startswith(b'PRIVMSG '): + # Send own messages to broadcaster as well + command, channels, message = data.decode('utf-8').split(' ', 2) + for channel in channels.split(','): + assert channel.startswith('#') or channel.startswith('&'), f'invalid channel: {channel!r}' + user = { + 'nick': self.server.nickname, + 'hostmask': f'{self.server.nickname}!{self.server.username}@{self.server.hostname}', + 'account': self.server.account, + 'modes': self.get_mode_chars(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(self.server.nickname))), + } + self.irc2httpBroadcaster.send(channel, {'time': time_, 'command': command, 'channel': channel, 'user': user, 'message': message}) return time_ async def send_queue(self): @@ -574,6 +587,11 @@ class IRCClientProtocol(asyncio.Protocol): def message_received(self, time_, message, line): self.logger.debug(f'Message received at {time_}: {message!r}') + # Send to HTTP broadcaster + # Note: WHOX is handled further down + for d in self.line_to_dicts(time_, line): + self.irc2httpBroadcaster.send(d['channel'], d) + maybeTriggerWhox = False # PING/PONG @@ -678,6 +696,7 @@ class IRCClientProtocol(asyncio.Protocol): for entry in self.whoxReply: if entry['account']: self.server.users[self.server.casefold(entry['nick'])].account = entry['account'] + self.irc2httpBroadcaster.send(self.whoxChannel, {'time': time_, 'command': 'RPL_ENDOFWHO', 'channel': self.whoxChannel, 'users': self.whoxReply, 'whoxstarttime': self.whoxStartTime}) self.whoxChannel = None self.whoxReply = [] self.whoxStartTime = None @@ -695,6 +714,87 @@ class IRCClientProtocol(asyncio.Protocol): self.whoxStartTime = time.time() # Note, may not be the actual start time due to rate limiting self.send(b'WHO ' + self.whoxChannel.encode('utf-8') + b' c%tuhna,042') + def get_mode_chars(self, channelUser): + if channelUser is None: + return '' + prefix = self.server.isupport.prefix + return ''.join(prefix.prefixes[i] for i in sorted((prefix.modes.index(c) for c in channelUser.modes if c in prefix.modes))) + + def line_to_dicts(self, time_, line): + if line.source: + sourceUser = self.server.users.get(self.server.casefold(line.hostmask.nickname)) if line.source else None + get_modes = lambda channel, nick = line.hostmask.nickname: self.get_mode_chars(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(nick))) + get_user = lambda channel, withModes = True: { + 'nick': line.hostmask.nickname, + 'hostmask': str(line.hostmask), + 'account': getattr(self.server.users.get(self.server.casefold(line.hostmask.nickname)), 'account', None), + **({'modes': get_modes(channel)} if withModes else {}), + } + if line.command == 'JOIN': + # Although servers SHOULD NOT send multiple channels in one message per the modern IRC docs , let's do the safe thing... + account = {'account': line.params[-2] if line.params[-2] != '*' else None} if 'extended-join' in self.caps else {} + for channel in line.params[0].split(','): + # There can't be a mode set yet on the JOIN, so no need to use get_modes (which would complicate the self-join). + yield {'time': time_, 'command': 'JOIN', 'channel': channel, 'user': {**get_user(channel, False), **account}} + elif line.command in ('PRIVMSG', 'NOTICE'): + channel = line.params[0] + if channel not in self.server.channels: + return + if line.command == 'PRIVMSG' and line.params[1].startswith('\x01ACTION ') and line.params[1].endswith('\x01'): + # CTCP ACTION (aka /me) + yield {'time': time_, 'command': 'ACTION', 'channel': channel, 'user': get_user(channel), 'message': line.params[1][8:-1]} + return + yield {'time': time_, 'command': line.command, 'channel': channel, 'user': get_user(channel), 'message': line.params[1]} + elif line.command == 'PART': + for channel in line.params[0].split(','): + yield {'time': time_, 'command': 'PART', 'channel': channel, 'user': get_user(channel), 'reason': line.params[1] if len(line.params) == 2 else None} + elif line.command in ('QUIT', 'NICK', 'ACCOUNT'): + if line.hostmask.nickname == self.server.nickname: + channels = self.channels + elif sourceUser is not None: + channels = sourceUser.channels + else: + return + for channel in channels: + if line.command == 'QUIT': + extra = {'reason': line.params[0] if len(line.params) == 1 else None} + elif line.command == 'NICK': + extra = {'newnick': line.params[0]} + elif line.command == 'ACCOUNT': + extra = {'account': line.params[0]} + yield {'time': time_, 'command': line.command, 'channel': channel, 'user': get_user(channel), **extra} + elif line.command == 'MODE' and line.params[0][0] in ('#', '&'): + channel = line.params[0] + yield {'time': time_, 'command': 'MODE', 'channel': channel, 'user': get_user(channel), 'args': line.params[1:]} + elif line.command == 'KICK': + channel = line.params[0] + targetUser = self.server.users[self.server.casefold(line.params[1])] + yield { + 'time': time_, + 'command': 'KICK', + 'channel': channel, + 'user': get_user(channel), + 'targetuser': {'nick': targetUser.nickname, 'hostmask': targetUser.hostmask(), 'modes': get_modes(channel, targetUser.nickname), 'account': targetUser.account}, + 'reason': line.params[2] if len(line.params) == 3 else None + } + elif line.command == 'TOPIC': + channel = line.params[0] + channelObj = self.server.channels[self.server.casefold(channel)] + oldTopic = {'topic': channelObj.topic, 'setter': channelObj.topic_setter, 'time': channelObj.topic_time.timestamp() if channelObj.topic_time else None} if channelObj.topic else None + if line.params[1] == '': + yield {'time': time_, 'command': 'TOPIC', 'channel': channel, 'user': get_user(channel), 'oldtopic': oldTopic, 'newtopic': None} + else: + yield {'time': time_, 'command': 'TOPIC', 'channel': channel, 'user': get_user(channel), 'oldtopic': oldTopic, 'newtopic': line.params[1]} + elif line.command == ircstates.numerics.RPL_TOPIC: + channel = line.params[1] + yield {'time': time_, 'command': 'RPL_TOPIC', 'channel': channel, 'topic': line.params[2]} + elif line.command == ircstates.numerics.RPL_TOPICWHOTIME: + yield {'time': time_, 'command': 'RPL_TOPICWHOTIME', 'channel': line.params[1], 'setter': {'nick': irctokens.hostmask(line.params[2]).nickname, 'hostmask': line.params[2]}, 'topictime': int(line.params[3])} + elif line.command == ircstates.numerics.RPL_ENDOFNAMES: + channel = line.params[1] + users = self.server.channels[self.server.casefold(channel)].users + yield {'time': time_, 'command': 'NAMES', 'channel': channel, 'users': [{'nick': u.nickname, 'modes': self.get_mode_chars(u)} for u in users.values()]} + async def quit(self): # The server acknowledges a QUIT by sending an ERROR and closing the connection. The latter triggers connection_lost, so just wait for the closure event. self.logger.info('Quitting') @@ -707,16 +807,19 @@ class IRCClientProtocol(asyncio.Protocol): self.transport.close() def connection_lost(self, exc): + time_ = time.time() self.logger.info('IRC connection lost') self.connected = False self.connectionClosedEvent.set() + self.irc2httpBroadcaster.send(Broadcaster.ALL_CHANNELS, {'time': time_, 'command': 'CONNLOST'}) class IRCClient: logger = logging.getLogger('http2irc.IRCClient') - def __init__(self, http2ircMessageQueue, config): + def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, config): self.http2ircMessageQueue = http2ircMessageQueue + self.irc2httpBroadcaster = irc2httpBroadcaster self.config = config self.channels = {map_['ircchannel'] for map_ in config['maps'].values()} @@ -749,7 +852,7 @@ class IRCClient: try: self.logger.debug('Creating IRC connection') t = asyncio.create_task(loop.create_connection( - protocol_factory = lambda: IRCClientProtocol(self.http2ircMessageQueue, connectionClosedEvent, loop, self.config, self.channels), + protocol_factory = lambda: IRCClientProtocol(self.http2ircMessageQueue, self.irc2httpBroadcaster, connectionClosedEvent, loop, self.config, self.channels), host = self.config['irc']['host'], port = self.config['irc']['port'], ssl = self._get_ssl_context(), @@ -791,11 +894,46 @@ class IRCClient: return self._protocol.lastRecvTime if self._protocol else None +class Broadcaster: + ALL_CHANNELS = object() # Singleton for send's channel argument, e.g. for connection loss + + def __init__(self): + self._queues = {} + + def subscribe(self, channel): + queue = asyncio.Queue() + if channel not in self._queues: + self._queues[channel] = set() + self._queues[channel].add(queue) + return queue + + def _send(self, channel, j): + for queue in self._queues[channel]: + queue.put_nowait(j) + + def send(self, channel, d): + if channel is self.ALL_CHANNELS and self._queues: + channels = self._queues + elif channel in self._queues: + channels = [channel] + else: + return + j = json.dumps(d, separators = (',', ':')).encode('utf-8') + for channel in channels: + self._send(channel, j) + + def unsubscribe(self, channel, queue): + self._queues[channel].remove(queue) + if not self._queues[channel]: + del self._queues[channel] + + class WebServer: logger = logging.getLogger('http2irc.WebServer') - def __init__(self, http2ircMessageQueue, ircClient, config): + def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, ircClient, config): self.http2ircMessageQueue = http2ircMessageQueue + self.irc2httpBroadcaster = irc2httpBroadcaster self.ircClient = ircClient self.config = config @@ -805,10 +943,12 @@ class WebServer: self._app.add_routes([ aiohttp.web.get('/status', self.get_status), aiohttp.web.post('/{path:.+}', functools.partial(self._path_request, func = self.post)), + aiohttp.web.get('/{path:.+}', functools.partial(self._path_request, func = self.get)), ]) self.update_config(config) self._configChanged = asyncio.Event() + self.stopEvent = None def update_config(self, config): self._paths = {map_['webpath']: (map_['ircchannel'], f'Basic {base64.b64encode(map_["auth"].encode("utf-8")).decode("utf-8")}' if map_['auth'] else False, map_['module'], map_['moduleargs'], map_['overlongmode']) for map_ in config['maps'].values()} @@ -818,6 +958,7 @@ class WebServer: self._configChanged.set() async def run(self, stopEvent): + self.stopEvent = stopEvent while True: runner = aiohttp.web.AppRunner(self._app) await runner.setup() @@ -885,6 +1026,26 @@ class WebServer: raise aiohttp.web.HTTPBadRequest() return message + async def get(self, request, channel, auth, module, moduleargs, overlongmode): + self.logger.info(f'Subscribing listener from request {id(request)} for {channel}') + queue = self.irc2httpBroadcaster.subscribe(channel) + response = aiohttp.web.StreamResponse() + response.enable_chunked_encoding() + await response.prepare(request) + try: + while True: + t = asyncio.create_task(queue.get()) + done, pending = await wait_cancel_pending({t, asyncio.create_task(self.stopEvent.wait()), asyncio.create_task(self._configChanged.wait())}, return_when = asyncio.FIRST_COMPLETED) + if t not in done: # stopEvent or config change + #TODO Don't break if the config change doesn't affect this connection + break + j = t.result() + await response.write(j + b'\n') + finally: + self.irc2httpBroadcaster.unsubscribe(channel, queue) + self.logger.info(f'Unsubscribed listener from request {id(request)} for {channel}') + return response + def configure_logging(config): #TODO: Replace with logging.basicConfig(..., force = True) (Py 3.8+) @@ -908,9 +1069,10 @@ async def main(): loop = asyncio.get_running_loop() http2ircMessageQueue = MessageQueue() + irc2httpBroadcaster = Broadcaster() - irc = IRCClient(http2ircMessageQueue, config) - webserver = WebServer(http2ircMessageQueue, irc, config) + irc = IRCClient(http2ircMessageQueue, irc2httpBroadcaster, config) + webserver = WebServer(http2ircMessageQueue, irc2httpBroadcaster, irc, config) sigintEvent = asyncio.Event() def sigint_callback():