Browse Source

Clean up logging

master
JustAnotherArchivist 4 years ago
parent
commit
ffee32980d
1 changed files with 48 additions and 36 deletions
  1. +48
    -36
      http2irc.py

+ 48
- 36
http2irc.py View File

@@ -15,6 +15,7 @@ import sys
import toml


logger = logging.getLogger('http2irc')
SSL_CONTEXTS = {'yes': True, 'no': False, 'insecure': ssl.SSLContext()}


@@ -53,8 +54,6 @@ class Config(dict):
with open(self._filename, 'r') as fp:
obj = toml.load(fp)

logging.info(repr(obj))

# Sanity checks
if any(x not in ('logging', 'irc', 'web', 'maps') for x in obj.keys()):
raise InvalidConfig('Unknown sections found in base object')
@@ -128,7 +127,7 @@ class Config(dict):
raise InvalidConfig(f'Module args cannot be specified without a module for {key!r}')

# Default values
finalObj = {'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {message}'}, 'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'h2ibot', 'real': 'I am an http2irc bot.', 'certfile': None, 'certkeyfile': None}, 'web': {'host': '127.0.0.1', 'port': 8080}, 'maps': {}}
finalObj = {'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {name} {message}'}, 'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'h2ibot', 'real': 'I am an http2irc bot.', 'certfile': None, 'certkeyfile': None}, 'web': {'host': '127.0.0.1', 'port': 8080}, 'maps': {}}

# Fill in default values for the maps
for key, map_ in obj['maps'].items():
@@ -198,6 +197,8 @@ class MessageQueue:
# - Only one concurrent getter
# - putleft_nowait to put to the front of the queue (so that the IRC client can put a message back when delivery fails)

logger = logging.getLogger('http2irc.MessageQueue')

def __init__(self):
self._getter = None # None | asyncio.Future
self._queue = collections.deque()
@@ -207,19 +208,19 @@ class MessageQueue:
raise RuntimeError('Cannot get concurrently')
if len(self._queue) == 0:
self._getter = asyncio.get_running_loop().create_future()
logging.debug('Awaiting getter')
self.logger.debug('Awaiting getter')
try:
await self._getter
except asyncio.CancelledError:
logging.debug('Cancelled getter')
self.logger.debug('Cancelled getter')
self._getter = None
raise
logging.debug('Awaited getter')
self.logger.debug('Awaited getter')
self._getter = None
# For testing the cancellation/putting back onto the queue
#logging.debug('Delaying message queue get')
#self.logger.debug('Delaying message queue get')
#await asyncio.sleep(3)
#logging.debug('Done delaying')
#self.logger.debug('Done delaying')
return self.get_nowait()

def get_nowait(self):
@@ -242,8 +243,9 @@ class MessageQueue:


class IRCClientProtocol(asyncio.Protocol):
logger = logging.getLogger('http2irc.IRCClientProtocol')

def __init__(self, messageQueue, connectionClosedEvent, loop, config, channels):
logging.debug(f'Protocol init {id(self)}: {messageQueue} {id(messageQueue)}, {connectionClosedEvent}, {loop}')
self.messageQueue = messageQueue
self.connectionClosedEvent = connectionClosedEvent
self.loop = loop
@@ -255,7 +257,7 @@ class IRCClientProtocol(asyncio.Protocol):
self.pongReceivedEvent = asyncio.Event()

def connection_made(self, transport):
logging.info('Connected')
self.logger.info('IRC connected')
self.transport = transport
self.connected = True
nickb = self.config['irc']['nick'].encode('utf-8')
@@ -275,21 +277,21 @@ class IRCClientProtocol(asyncio.Protocol):
self.send(b'JOIN ' + ','.join(channelsToJoin).encode('utf-8'))

def send(self, data):
logging.info(f'Send: {data!r}')
self.logger.debug(f'Send: {data!r}')
self.transport.write(data + b'\r\n')

async def _get_message(self):
logging.debug(f'Message queue {id(self.messageQueue)} length: {self.messageQueue.qsize()}')
self.logger.debug(f'Message queue {id(self.messageQueue)} length: {self.messageQueue.qsize()}')
messageFuture = asyncio.create_task(self.messageQueue.get())
done, pending = await asyncio.wait((messageFuture, self.connectionClosedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
if self.connectionClosedEvent.is_set():
if messageFuture in pending:
logging.debug('Cancelling messageFuture')
self.logger.debug('Cancelling messageFuture')
messageFuture.cancel()
try:
await messageFuture
except asyncio.CancelledError:
logging.debug('Cancelled messageFuture')
self.logger.debug('Cancelled messageFuture')
pass
else:
# messageFuture is already done but we're stopping, so put the result back onto the queue
@@ -300,11 +302,12 @@ class IRCClientProtocol(asyncio.Protocol):

async def send_messages(self):
while self.connected:
logging.debug(f'{id(self)}: trying to get a message')
self.logger.debug(f'Trying to get a message')
channel, message = await self._get_message()
logging.debug(f'{id(self)}: got message: {message!r}')
self.logger.debug(f'Got message: {message!r}')
if message is None:
break
self.logger.info(f'Sending {message!r} to {channel!r}')
#TODO Split if the message is too long.
self.unconfirmedMessages.append((channel, message))
self.send(b'PRIVMSG ' + channel.encode('utf-8') + b' :' + message.encode('utf-8'))
@@ -318,21 +321,22 @@ class IRCClientProtocol(asyncio.Protocol):
self.unconfirmedMessages = []
break
if not self.unconfirmedMessages:
logging.debug(f'{id(self)}: no messages to confirm')
self.logger.debug('No messages to confirm')
continue
logging.debug(f'{id(self)}: trying to confirm message delivery')
self.logger.debug('Trying to confirm message delivery')
self.pongReceivedEvent.clear()
self.send(b'PING :42')
await asyncio.wait((asyncio.sleep(5), self.pongReceivedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
logging.debug(f'{id(self)}: message delivery success: {self.pongReceivedEvent.is_set()}')
self.logger.debug(f'Message delivery successful: {self.pongReceivedEvent.is_set()}')
if not self.pongReceivedEvent.is_set():
# No PONG received in five seconds, assume connection's dead
self.logger.warning(f'Message delivery confirmation failed, putting {len(self.unconfirmedMessages)} messages back into the queue')
self.messageQueue.putleft_nowait(*self.unconfirmedMessages)
self.transport.close()
self.unconfirmedMessages = []

def data_received(self, data):
logging.debug(f'Data received: {data!r}')
self.logger.debug(f'Data received: {data!r}')
# Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that.
# Then, process all messages except the last one (since data might not end on a CRLF) and keep the remainder in the buffer.
# If data does end with CRLF, all messages will have been processed and the buffer will be empty again.
@@ -345,7 +349,7 @@ class IRCClientProtocol(asyncio.Protocol):
self.buffer = messages[-1]

def message_received(self, message):
logging.info(f'Message received: {message!r}')
self.logger.debug(f'Message received: {message!r}')
if message.startswith(b':'):
# Prefixed message, extract command + parameters (the prefix cannot contain a space)
message = message.split(b' ', 1)[1]
@@ -354,18 +358,20 @@ class IRCClientProtocol(asyncio.Protocol):
elif message.startswith(b'PONG '):
self.pongReceivedEvent.set()
elif message.startswith(b'001 '):
# Connection registered
self.logger.info('IRC connection registered')
self.send(b'JOIN ' + ','.join(self.channels).encode('utf-8')) #TODO: Split if too long
asyncio.create_task(self.send_messages())
asyncio.create_task(self.confirm_messages())

def connection_lost(self, exc):
logging.info('The server closed the connection')
self.logger.info('IRC connection lost')
self.connected = False
self.connectionClosedEvent.set()


class IRCClient:
logger = logging.getLogger('http2irc.IRCClient')

def __init__(self, messageQueue, config):
self.messageQueue = messageQueue
self.config = config
@@ -404,13 +410,15 @@ class IRCClient:
finally:
self._transport.close() #TODO BaseTransport.close is asynchronous and then triggers the protocol's connection_lost callback; need to wait for connectionClosedEvent again perhaps to correctly handle ^C?
except (ConnectionRefusedError, asyncio.TimeoutError) as e:
logging.error(str(e))
self.logger.error(str(e))
await asyncio.wait((asyncio.sleep(5), sigintEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
if sigintEvent.is_set():
break


class WebServer:
logger = logging.getLogger('http2irc.WebServer')

def __init__(self, messageQueue, config):
self.messageQueue = messageQueue
self.config = config
@@ -443,31 +451,33 @@ class WebServer:
self._configChanged.clear()

async def post(self, request):
logging.info(f'Received request for {request.path!r}')
self.logger.info(f'Received request {id(request)} from {request.remote!r} for {request.path!r}')
try:
channel, auth, module, moduleargs = self._paths[request.path]
except KeyError:
logging.info(f'Bad request: no path {request.path!r}')
self.logger.info(f'Bad request {id(request)}: no path {request.path!r}')
raise aiohttp.web.HTTPNotFound()
if auth:
authHeader = request.headers.get('Authorization')
if not authHeader or authHeader != auth:
logging.info(f'Bad request: authentication failed: {authHeader!r} != {auth}')
self.logger.info(f'Bad request {id(request)}: authentication failed: {authHeader!r} != {auth}')
raise aiohttp.web.HTTPForbidden()
if module is not None:
self.logger.debug(f'Processing request {id(request)} using {module!r}')
try:
message = await module.process(request, *moduleargs)
except aiohttp.web.HTTPException as e:
raise e
except Exception as e:
logging.error(f'Bad request: exception in module process function: {e!s}')
self.logger.error(f'Bad request {id(request)}: exception in module process function: {e!s}')
raise aiohttp.web.HTTPBadRequest()
if '\r' in message or '\n' in message:
logging.error(f'Bad request: module process function returned message with linebreaks: {message!r}')
self.logger.error(f'Bad request {id(request)}: module process function returned message with linebreaks: {message!r}')
raise aiohttp.web.HTTPBadRequest()
else:
self.logger.debug(f'Processing request {id(request)} using default processor')
message = await self._default_process(request)
logging.debug(f'Putting message {message!r} for {channel} into message queue')
self.logger.info(f'Accepted request {id(request)}, putting message {message!r} for {channel} into message queue')
self.messageQueue.put_nowait((channel, message))
raise aiohttp.web.HTTPOk()

@@ -475,16 +485,16 @@ class WebServer:
try:
message = await request.text()
except Exception as e:
logging.info(f'Bad request: exception while reading request data: {e!s}')
self.logger.info(f'Bad request {id(request)}: exception while reading request data: {e!s}')
raise aiohttp.web.HTTPBadRequest() # Yes, it's always the client's fault. :-)
logging.debug(f'Request payload: {message!r}')
self.logger.debug(f'Request {id(request)} payload: {message!r}')
# Strip optional [CR] LF at the end of the payload
if message.endswith('\r\n'):
message = message[:-2]
elif message.endswith('\n'):
message = message[:-1]
if '\r' in message or '\n' in message:
logging.info('Bad request: linebreaks in message')
self.logger.info('Bad request {id(request)}: linebreaks in message')
raise aiohttp.web.HTTPBadRequest()
return message

@@ -517,18 +527,20 @@ async def main():

sigintEvent = asyncio.Event()
def sigint_callback():
logging.info('Got SIGINT')
global logger
nonlocal sigintEvent
logger.info('Got SIGINT, stopping')
sigintEvent.set()
loop.add_signal_handler(signal.SIGINT, sigint_callback)

def sigusr1_callback():
logging.info('Got SIGUSR1, reloading config')
global logger
nonlocal config, irc, webserver
logger.info('Got SIGUSR1, reloading config')
try:
newConfig = config.reread()
except InvalidConfig as e:
logging.error(f'Config reload failed: {e!s}')
logger.error(f'Config reload failed: {e!s} (old config remains active)')
return
config = newConfig
configure_logging(config)


Loading…
Cancel
Save