diff --git a/irclog.py b/irclog.py index 8470860..96b1403 100644 --- a/irclog.py +++ b/irclog.py @@ -139,12 +139,24 @@ class Config(dict): if not is_valid_pem(obj['irc']['certkeyfile'], False): raise InvalidConfig('Invalid certificate key file: not a valid PEM key') if 'web' in obj: - if any(x not in ('host', 'port') for x in obj['web']): + if any(x not in ('host', 'port', 'search') for x in obj['web']): raise InvalidConfig('Unknown key found in web section') if 'host' in obj['web'] and not isinstance(obj['web']['host'], str): #TODO: Check whether it's a valid hostname (must resolve I guess?) raise InvalidConfig('Invalid web hostname') if 'port' in obj['web'] and (not isinstance(obj['web']['port'], int) or not 1 <= obj['web']['port'] <= 65535): raise InvalidConfig('Invalid web port') + if 'search' in obj['web']: + if not isinstance(obj['web']['search'], collections.abc.Mapping): + raise InvalidConfig('Invalid web search: must be a mapping') + if any(x not in ('maxTime', 'maxSize', 'nice', 'maxMemory') for x in obj['web']['search']): + raise InvalidConfig('Unknown key found in web search section') + for key in ('maxTime', 'maxSize', 'nice', 'maxMemory'): + if key not in obj['web']['search']: + continue + if not isinstance(obj['web']['search'][key], int): + raise InvalidConfig('Invalid web search {key}: not an integer') + if key != 'nice' and obj['web']['search'][key] < 0: + raise InvalidConfig('Invalid web search {key}: cannot be negative') if 'channels' in obj: seenChannels = {} seenPaths = {} @@ -219,21 +231,31 @@ class Config(dict): raise InvalidConfig(f'Invalid channel {key!r} extrasearchchannels: refers to auth-required channel whose auth differs from this channel\'s') # Default values - finalObj = { + defaults = { 'logging': {'level': 'INFO', 'format': '{asctime} {levelname} {name} {message}'}, 'storage': {'path': os.path.abspath(os.path.dirname(self._filename)), 'flushTime': 60}, 'irc': {'host': 'irc.hackint.org', 'port': 6697, 'ssl': 'yes', 'nick': 'irclogbot', 'real': 'I am an irclog bot.', 'certfile': None, 'certkeyfile': None}, - 'web': {'host': '127.0.0.1', 'port': 8080}, - 'channels': {} + 'web': {'host': '127.0.0.1', 'port': 8080, 'search': {'maxTime': 10, 'maxSize': 1048576, 'nice': 10, 'maxMemory': 52428800}}, + 'channels': obj['channels'], # _merge_dicts expects the same structure, and this is the easiest way to achieve that } # Default values for channels are already set above. # Merge in what was read from the config file and set keys on self - for key in ('logging', 'storage', 'irc', 'web', 'channels'): - if key in obj: - finalObj[key].update(obj[key]) + finalObj = self._merge_dicts(defaults, obj) + for key in defaults.keys(): self[key] = finalObj[key] + def _merge_dicts(self, defaults, overrides): + # Takes two dicts; the keys in overrides must be a subset of the ones in defaults. Returns a merged dict with values from overrides replacing those in defaults, recursively. + assert set(overrides.keys()).issubset(defaults.keys()), f'{overrides!r} is not a subset of {defaults!r}' + out = {} + for key in defaults.keys(): + if isinstance(defaults[key], dict): + out[key] = self._merge_dicts(defaults[key], overrides[key] if key in overrides else {}) + else: + out[key] = overrides[key] if key in overrides else defaults[key] + return out + def __repr__(self): return f'' @@ -936,18 +958,95 @@ class WebServer: content_type = 'text/html' ) - cmd = ['grep', '--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number', request.query['q']] + # Run the search with grep, limiting memory use, output size, and runtime and setting the niceness. + # While Python's subprocess.Process has preexec_fn, this isn't safe in conjunction with threads, and asyncio uses threads under the hood. + # So instead, use a wrapper script in Bash which sets the niceness and memory limit. + cmd = [ + os.path.join('.', os.path.dirname(__file__), 'nicegrep'), str(self.config['web']['search']['nice']), str(self.config['web']['search']['maxMemory']), + '--fixed-strings', '--recursive', '--with-filename', '--null', '--line-number', request.query['q'] + ] for path in itertools.chain((request.match_info['path'],), self._paths[request.match_info['path']][3]): cmd.append(os.path.join(self.config['storage']['path'], path, '')) - proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE) - #TODO Limit size and runtime - stdout, _ = await proc.communicate() + + proc = await asyncio.create_subprocess_exec(*cmd, stdout = asyncio.subprocess.PIPE, stderr = asyncio.subprocess.PIPE) + + async def process_stdout(): + out = [] + size = 0 + incomplete = False + while True: + buf = await proc.stdout.read(1024) + if not buf: + break + self.logger.debug(f'Request {id(request)} grep stdout: {buf!r}') + if size + len(buf) > self.config['web']['search']['maxSize']: + self.logger.warning(f'Request {id(request)} grep output exceeds max size') + bufLFPos = buf.rfind(b'\n', 0, self.config['web']['search']['maxSize'] - size) + if bufLFPos > -1: + # There's a LF in this buffer at the right position, keep everything up to it such that the total size is <= maxSize. + out.append(buf[:bufLFPos + 1]) + else: + # Try to find the last LF in the previous buffers + for i, prevBuf in enumerate(reversed(out)): + lfPos = prevBuf.rfind(b'\n') + if lfPos > -1: + out[i] = out[i][:lfPos + 1] + out = out[:i + 1] + break + else: + # No newline to be found anywhere at all; no output. + out = [] + incomplete = True + proc.kill() + break + out.append(buf) + size += len(buf) + return (b''.join(out), incomplete) + + async def process_stderr(): + buf = b'' + while True: + buf = buf + (await proc.stderr.read(64)) + if not buf: + break + lines = buf.split(b'\n') + buf = lines[-1] + for line in lines[:-1]: + try: + line = line.decode('utf-8') + except UnicodeDecodeError: + pass + self.logger.warning(f'Request {id(request)} grep stderr output: {line!r}') + + stdoutTask = asyncio.create_task(process_stdout()) + stderrTask = asyncio.create_task(process_stderr()) + await asyncio.wait({stdoutTask, stderrTask}, timeout = self.config['web']['search']['maxTime']) + if proc.returncode is None: + # Process hasn't finished yet after maxTime. Murder it and wait for it to die. + self.logger.warning(f'Request {id(request)} grep took more than the time limit') + proc.kill() + await asyncio.wait({stdoutTask, stderrTask, asyncio.create_task(proc.wait())}, timeout = 1) # This really shouldn't take longer. + if proc.returncode is None: + # Still not done?! Cancel tasks and bail. + self.logger.error(f'Request {id(request)} grep did not exit after getting killed!') + stdoutTask.cancel() + stderrTask.cancel() + return aiohttp.web.HTTPInternalServerError() + stdout, incomplete = stdoutTask.result() + self.logger.info(f'Request {id(request)} grep exited with {proc.returncode} and produced {len(stdout)} bytes (incomplete: {incomplete})') + if proc.returncode != 0: + incomplete = True lines = self._raw_to_lines(self._stdout_with_path(stdout)) return aiohttp.web.Response( text = ''.join([ '', - f'{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"{self.logStyleTag}', + '', + f'{html.escape(self._paths[request.match_info["path"]][0])} search results for "{html.escape(request.query["q"])}"', + self.logStyleTag, + '', + '', '', + '

Warning: output incomplete due to exceeding time or size limits

' if incomplete else '', self._render_log(lines, withDate = True), '', '' diff --git a/nicegrep b/nicegrep new file mode 100755 index 0000000..8b9ed18 --- /dev/null +++ b/nicegrep @@ -0,0 +1,7 @@ +#!/bin/bash +# Usage: nicegrep NICE MAXMEMORY ARG... +# Executes grep with the provided arguments using `nice -n NICE` and `ulimit -v MAXMEMORY` +nice="$1"; shift +maxMemory="$1"; shift +ulimit -v "${maxMemory}" +exec nice -n "${nice}" grep "$@"