Browse Source

Add support for listening to channel activity via GET

master
JustAnotherArchivist 2 years ago
parent
commit
f6bb9374a7
1 changed files with 168 additions and 6 deletions
  1. +168
    -6
      http2irc.py

+ 168
- 6
http2irc.py View File

@@ -329,8 +329,9 @@ class MessageQueue:
class IRCClientProtocol(asyncio.Protocol):
logger = logging.getLogger('http2irc.IRCClientProtocol')

def __init__(self, http2ircMessageQueue, connectionClosedEvent, loop, config, channels):
def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, connectionClosedEvent, loop, config, channels):
self.http2ircMessageQueue = http2ircMessageQueue
self.irc2httpBroadcaster = irc2httpBroadcaster
self.connectionClosedEvent = connectionClosedEvent
self.loop = loop
self.config = config
@@ -429,6 +430,18 @@ class IRCClientProtocol(asyncio.Protocol):
self.logger.debug(f'Send: {data!r}')
time_ = time.time()
self.transport.write(data + b'\r\n')
if data.startswith(b'PRIVMSG '):
# Send own messages to broadcaster as well
command, channels, message = data.decode('utf-8').split(' ', 2)
for channel in channels.split(','):
assert channel.startswith('#') or channel.startswith('&'), f'invalid channel: {channel!r}'
user = {
'nick': self.server.nickname,
'hostmask': f'{self.server.nickname}!{self.server.username}@{self.server.hostname}',
'account': self.server.account,
'modes': self.get_mode_chars(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(self.server.nickname))),
}
self.irc2httpBroadcaster.send(channel, {'time': time_, 'command': command, 'channel': channel, 'user': user, 'message': message})
return time_

async def send_queue(self):
@@ -574,6 +587,11 @@ class IRCClientProtocol(asyncio.Protocol):
def message_received(self, time_, message, line):
self.logger.debug(f'Message received at {time_}: {message!r}')

# Send to HTTP broadcaster
# Note: WHOX is handled further down
for d in self.line_to_dicts(time_, line):
self.irc2httpBroadcaster.send(d['channel'], d)

maybeTriggerWhox = False

# PING/PONG
@@ -678,6 +696,7 @@ class IRCClientProtocol(asyncio.Protocol):
for entry in self.whoxReply:
if entry['account']:
self.server.users[self.server.casefold(entry['nick'])].account = entry['account']
self.irc2httpBroadcaster.send(self.whoxChannel, {'time': time_, 'command': 'RPL_ENDOFWHO', 'channel': self.whoxChannel, 'users': self.whoxReply, 'whoxstarttime': self.whoxStartTime})
self.whoxChannel = None
self.whoxReply = []
self.whoxStartTime = None
@@ -695,6 +714,87 @@ class IRCClientProtocol(asyncio.Protocol):
self.whoxStartTime = time.time() # Note, may not be the actual start time due to rate limiting
self.send(b'WHO ' + self.whoxChannel.encode('utf-8') + b' c%tuhna,042')

def get_mode_chars(self, channelUser):
if channelUser is None:
return ''
prefix = self.server.isupport.prefix
return ''.join(prefix.prefixes[i] for i in sorted((prefix.modes.index(c) for c in channelUser.modes if c in prefix.modes)))

def line_to_dicts(self, time_, line):
if line.source:
sourceUser = self.server.users.get(self.server.casefold(line.hostmask.nickname)) if line.source else None
get_modes = lambda channel, nick = line.hostmask.nickname: self.get_mode_chars(self.server.channels[self.server.casefold(channel)].users.get(self.server.casefold(nick)))
get_user = lambda channel, withModes = True: {
'nick': line.hostmask.nickname,
'hostmask': str(line.hostmask),
'account': getattr(self.server.users.get(self.server.casefold(line.hostmask.nickname)), 'account', None),
**({'modes': get_modes(channel)} if withModes else {}),
}
if line.command == 'JOIN':
# Although servers SHOULD NOT send multiple channels in one message per the modern IRC docs <https://modern.ircdocs.horse/#join-message>, let's do the safe thing...
account = {'account': line.params[-2] if line.params[-2] != '*' else None} if 'extended-join' in self.caps else {}
for channel in line.params[0].split(','):
# There can't be a mode set yet on the JOIN, so no need to use get_modes (which would complicate the self-join).
yield {'time': time_, 'command': 'JOIN', 'channel': channel, 'user': {**get_user(channel, False), **account}}
elif line.command in ('PRIVMSG', 'NOTICE'):
channel = line.params[0]
if channel not in self.server.channels:
return
if line.command == 'PRIVMSG' and line.params[1].startswith('\x01ACTION ') and line.params[1].endswith('\x01'):
# CTCP ACTION (aka /me)
yield {'time': time_, 'command': 'ACTION', 'channel': channel, 'user': get_user(channel), 'message': line.params[1][8:-1]}
return
yield {'time': time_, 'command': line.command, 'channel': channel, 'user': get_user(channel), 'message': line.params[1]}
elif line.command == 'PART':
for channel in line.params[0].split(','):
yield {'time': time_, 'command': 'PART', 'channel': channel, 'user': get_user(channel), 'reason': line.params[1] if len(line.params) == 2 else None}
elif line.command in ('QUIT', 'NICK', 'ACCOUNT'):
if line.hostmask.nickname == self.server.nickname:
channels = self.channels
elif sourceUser is not None:
channels = sourceUser.channels
else:
return
for channel in channels:
if line.command == 'QUIT':
extra = {'reason': line.params[0] if len(line.params) == 1 else None}
elif line.command == 'NICK':
extra = {'newnick': line.params[0]}
elif line.command == 'ACCOUNT':
extra = {'account': line.params[0]}
yield {'time': time_, 'command': line.command, 'channel': channel, 'user': get_user(channel), **extra}
elif line.command == 'MODE' and line.params[0][0] in ('#', '&'):
channel = line.params[0]
yield {'time': time_, 'command': 'MODE', 'channel': channel, 'user': get_user(channel), 'args': line.params[1:]}
elif line.command == 'KICK':
channel = line.params[0]
targetUser = self.server.users[self.server.casefold(line.params[1])]
yield {
'time': time_,
'command': 'KICK',
'channel': channel,
'user': get_user(channel),
'targetuser': {'nick': targetUser.nickname, 'hostmask': targetUser.hostmask(), 'modes': get_modes(channel, targetUser.nickname), 'account': targetUser.account},
'reason': line.params[2] if len(line.params) == 3 else None
}
elif line.command == 'TOPIC':
channel = line.params[0]
channelObj = self.server.channels[self.server.casefold(channel)]
oldTopic = {'topic': channelObj.topic, 'setter': channelObj.topic_setter, 'time': channelObj.topic_time.timestamp() if channelObj.topic_time else None} if channelObj.topic else None
if line.params[1] == '':
yield {'time': time_, 'command': 'TOPIC', 'channel': channel, 'user': get_user(channel), 'oldtopic': oldTopic, 'newtopic': None}
else:
yield {'time': time_, 'command': 'TOPIC', 'channel': channel, 'user': get_user(channel), 'oldtopic': oldTopic, 'newtopic': line.params[1]}
elif line.command == ircstates.numerics.RPL_TOPIC:
channel = line.params[1]
yield {'time': time_, 'command': 'RPL_TOPIC', 'channel': channel, 'topic': line.params[2]}
elif line.command == ircstates.numerics.RPL_TOPICWHOTIME:
yield {'time': time_, 'command': 'RPL_TOPICWHOTIME', 'channel': line.params[1], 'setter': {'nick': irctokens.hostmask(line.params[2]).nickname, 'hostmask': line.params[2]}, 'topictime': int(line.params[3])}
elif line.command == ircstates.numerics.RPL_ENDOFNAMES:
channel = line.params[1]
users = self.server.channels[self.server.casefold(channel)].users
yield {'time': time_, 'command': 'NAMES', 'channel': channel, 'users': [{'nick': u.nickname, 'modes': self.get_mode_chars(u)} for u in users.values()]}

async def quit(self):
# The server acknowledges a QUIT by sending an ERROR and closing the connection. The latter triggers connection_lost, so just wait for the closure event.
self.logger.info('Quitting')
@@ -707,16 +807,19 @@ class IRCClientProtocol(asyncio.Protocol):
self.transport.close()

def connection_lost(self, exc):
time_ = time.time()
self.logger.info('IRC connection lost')
self.connected = False
self.connectionClosedEvent.set()
self.irc2httpBroadcaster.send(Broadcaster.ALL_CHANNELS, {'time': time_, 'command': 'CONNLOST'})


class IRCClient:
logger = logging.getLogger('http2irc.IRCClient')

def __init__(self, http2ircMessageQueue, config):
def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, config):
self.http2ircMessageQueue = http2ircMessageQueue
self.irc2httpBroadcaster = irc2httpBroadcaster
self.config = config
self.channels = {map_['ircchannel'] for map_ in config['maps'].values()}

@@ -749,7 +852,7 @@ class IRCClient:
try:
self.logger.debug('Creating IRC connection')
t = asyncio.create_task(loop.create_connection(
protocol_factory = lambda: IRCClientProtocol(self.http2ircMessageQueue, connectionClosedEvent, loop, self.config, self.channels),
protocol_factory = lambda: IRCClientProtocol(self.http2ircMessageQueue, self.irc2httpBroadcaster, connectionClosedEvent, loop, self.config, self.channels),
host = self.config['irc']['host'],
port = self.config['irc']['port'],
ssl = self._get_ssl_context(),
@@ -791,11 +894,46 @@ class IRCClient:
return self._protocol.lastRecvTime if self._protocol else None


class Broadcaster:
ALL_CHANNELS = object() # Singleton for send's channel argument, e.g. for connection loss

def __init__(self):
self._queues = {}

def subscribe(self, channel):
queue = asyncio.Queue()
if channel not in self._queues:
self._queues[channel] = set()
self._queues[channel].add(queue)
return queue

def _send(self, channel, j):
for queue in self._queues[channel]:
queue.put_nowait(j)

def send(self, channel, d):
if channel is self.ALL_CHANNELS and self._queues:
channels = self._queues
elif channel in self._queues:
channels = [channel]
else:
return
j = json.dumps(d, separators = (',', ':')).encode('utf-8')
for channel in channels:
self._send(channel, j)

def unsubscribe(self, channel, queue):
self._queues[channel].remove(queue)
if not self._queues[channel]:
del self._queues[channel]


class WebServer:
logger = logging.getLogger('http2irc.WebServer')

def __init__(self, http2ircMessageQueue, ircClient, config):
def __init__(self, http2ircMessageQueue, irc2httpBroadcaster, ircClient, config):
self.http2ircMessageQueue = http2ircMessageQueue
self.irc2httpBroadcaster = irc2httpBroadcaster
self.ircClient = ircClient
self.config = config

@@ -805,10 +943,12 @@ class WebServer:
self._app.add_routes([
aiohttp.web.get('/status', self.get_status),
aiohttp.web.post('/{path:.+}', functools.partial(self._path_request, func = self.post)),
aiohttp.web.get('/{path:.+}', functools.partial(self._path_request, func = self.get)),
])

self.update_config(config)
self._configChanged = asyncio.Event()
self.stopEvent = None

def update_config(self, config):
self._paths = {map_['webpath']: (map_['ircchannel'], f'Basic {base64.b64encode(map_["auth"].encode("utf-8")).decode("utf-8")}' if map_['auth'] else False, map_['module'], map_['moduleargs'], map_['overlongmode']) for map_ in config['maps'].values()}
@@ -818,6 +958,7 @@ class WebServer:
self._configChanged.set()

async def run(self, stopEvent):
self.stopEvent = stopEvent
while True:
runner = aiohttp.web.AppRunner(self._app)
await runner.setup()
@@ -885,6 +1026,26 @@ class WebServer:
raise aiohttp.web.HTTPBadRequest()
return message

async def get(self, request, channel, auth, module, moduleargs, overlongmode):
self.logger.info(f'Subscribing listener from request {id(request)} for {channel}')
queue = self.irc2httpBroadcaster.subscribe(channel)
response = aiohttp.web.StreamResponse()
response.enable_chunked_encoding()
await response.prepare(request)
try:
while True:
t = asyncio.create_task(queue.get())
done, pending = await wait_cancel_pending({t, asyncio.create_task(self.stopEvent.wait()), asyncio.create_task(self._configChanged.wait())}, return_when = asyncio.FIRST_COMPLETED)
if t not in done: # stopEvent or config change
#TODO Don't break if the config change doesn't affect this connection
break
j = t.result()
await response.write(j + b'\n')
finally:
self.irc2httpBroadcaster.unsubscribe(channel, queue)
self.logger.info(f'Unsubscribed listener from request {id(request)} for {channel}')
return response


def configure_logging(config):
#TODO: Replace with logging.basicConfig(..., force = True) (Py 3.8+)
@@ -908,9 +1069,10 @@ async def main():
loop = asyncio.get_running_loop()

http2ircMessageQueue = MessageQueue()
irc2httpBroadcaster = Broadcaster()

irc = IRCClient(http2ircMessageQueue, config)
webserver = WebServer(http2ircMessageQueue, irc, config)
irc = IRCClient(http2ircMessageQueue, irc2httpBroadcaster, config)
webserver = WebServer(http2ircMessageQueue, irc2httpBroadcaster, irc, config)

sigintEvent = asyncio.Event()
def sigint_callback():


Loading…
Cancel
Save