#!/usr/bin/env python3 import argparse import datetime import itertools import json import math import os import re import sys import time import urllib.request # Column definitions columns = { 'jobid': (lambda job, pipelines: job["job_data"]["ident"], ()), 'url': (lambda job, pipelines: job["job_data"]["url"], ('truncatable',)), 'user': (lambda job, pipelines: job["job_data"]["started_by"], ()), 'pipenick': (lambda job, pipelines: pipelines[job["job_data"]["pipeline_id"]] if job["job_data"]["pipeline_id"] in pipelines else "unknown", ()), 'queued': (lambda job, pipelines: job["job_data"]["queued_at"], ('date', 'numeric')), 'started': (lambda job, pipelines: job["job_data"]["started_at"], ('date', 'numeric')), 'last active': (lambda job, pipelines: int(job["ts"]), ('date', 'coloured', 'numeric')), 'dl urls': (lambda job, pipelines: job["job_data"]["items_downloaded"], ('numeric',)), 'dl size': (lambda job, pipelines: job["job_data"]["bytes_downloaded"], ('size', 'numeric')), 'queue': (lambda job, pipelines: job["job_data"]["items_queued"] - job["job_data"]["items_downloaded"], ('numeric',)), 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job["job_data"]["items_queued"] - job["job_data"]["items_downloaded"]) / (job["job_data"]["items_downloaded"] / (curTime - job["job_data"]["started_at"]))) if job["job_data"]["items_downloaded"] > 0 else 0, ('date', 'numeric')), 'con': (lambda job, pipelines: job["job_data"]["concurrency"], ('numeric',)), 'delay min': (lambda job, pipelines: int(job["job_data"]["delay_min"]), ('hidden', 'numeric')), 'delay max': (lambda job, pipelines: int(job["job_data"]["delay_max"]), ('hidden', 'numeric')), 'delay': (lambda job, pipelines: str(int(job["job_data"]["delay_min"])) + '-' + str(int(job["job_data"]["delay_max"])) if job["job_data"]["delay_min"] != job["job_data"]["delay_max"] else str(int(job["job_data"]["delay_min"])), ()), } defaultSort = 'jobid' # Validate if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()): # Truncation code can't handle renderers raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable') # Filter function def make_field_filter(column, op, value, caseSensitive = True): compFunc = { "=": lambda a, b: a == b, "<": lambda a, b: a < b, ">": lambda a, b: a > b, "^": lambda a, b: a.startswith(b), "*": lambda a, b: b in a, "$": lambda a, b: a.endswith(b), "~": lambda a, b: re.search(b, a) is not None, }[op] transform = { True: (lambda x: x), False: (lambda x: x.lower() if isinstance(x, str) else x) }[caseSensitive] return (lambda job: compFunc(transform(job[column]), transform(value))) # Parse arguments class FilterAction(argparse.Action): def __call__(self, parser, namespace, values, optionString = None): if optionString == '--pyfilter': try: func = compile(values[0], '', 'eval') except Exception as e: parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}') setattr(namespace, self.dest, lambda job: eval(func, {}, {'job': job})) return global columns match = re.match(r"^(?P[A-Za-z ]+)(?P[=<>^*$~])(?P.*)$", values[0]) if not match: parser.error('Invalid filter') filterDict = match.groupdict() filterDict["column"] = filterDict["column"].lower() assert filterDict["column"] in columns if 'numeric' in columns[filterDict['column']][1]: filterDict['value'] = float(filterDict['value']) if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0: filterDict['value'] = time.time() + filterDict['value'] setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f')))) def parse_sort(value): global columns sortDesc = value.startswith('-') if sortDesc: value = value[1:] value = value.lower() if value not in columns: parser.error('Invalid column name') return (value, sortDesc) class SortAction(argparse.Action): def __call__(self, parser, namespace, values, optionString = None): result = parse_sort(values[0]) if getattr(namespace, self.dest, None) is None: setattr(namespace, self.dest, []) getattr(namespace, self.dest).append(result) parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter) parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([ 'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.', 'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE', ' = means the value must be exactly as specified.', ' < and > mean it must be less/greater than the specified.', ' ^ and $ mean it must start/end with the specified.', ' * means it must contain the specified.', ' ~ means it must match the specified regex.', ])) parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive') parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`') parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.") parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format', 'atdash'), default = 'table', help = '\n'.join([ 'Output modes:', ' table: print a table of the matched jobs', ' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter', ' con-d-commands: print !con and !d commands for the current settings', ' format: print some output for each job, separated by newlines; this requires the --format option', ' atdash: print the URL for displaying the matched jobs on atdash', ])) parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)") parser.add_argument('--no-table', action = 'store_true', help = 'Raw output without feeding through column(1); columns are separated by tabs. (Table mode only)') parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)') parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)') parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)') args = parser.parse_args() if args.mode == 'format' and not args.format: print('Error: when using format mode, --format is required.', file = sys.stderr) sys.exit(1) if not args.sort: args.sort = [parse_sort(defaultSort)] if args.mode == 'con-d-commands': args.mode = 'format' args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}' else: args.replace_concurrency = None args.replace_delay = None # Retrieve def fetch(url): req = urllib.request.Request(url) req.add_header('Accept', 'application/json') with urllib.request.urlopen(req) as f: if f.getcode() != 200: raise RuntimeError('Could not fetch job data') return json.load(f) jobdata = fetch('http://dashboard.at.ninjawedding.org/logs/recent?count=1') pipelinedata = fetch('http://dashboard.at.ninjawedding.org/pipelines') currentTime = time.time() # Process pipelines = {p["id"]: p["nickname"] for p in pipelinedata["pipelines"]} jobs = [] for job in jobdata: jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()}) if not jobs: # Nothing to do sys.exit(0) # Filter if args.filter: jobs = [job for job in jobs if args.filter(job)] if not jobs: sys.exit(0) # Sort class reversor: # https://stackoverflow.com/a/56842689 def __init__(self, obj): self.obj = obj def __eq__(self, other): return other.obj == self.obj def __lt__(self, other): return other.obj < self.obj sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort) if not args.dates: # Reverse sorting order for columns which have a date attribute since the column will have elapsed time sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns) jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns)) # Concurrency and delay overrides if specified and relevant if args.replace_concurrency is not None or args.replace_delay is not None: for job in jobs: if args.replace_concurrency is not None: job['con'] = args.replace_concurrency[0] if args.replace_delay is not None: job['delay min'] = args.replace_delay[0] job['delay max'] = args.replace_delay[1] # Non-table output modes if args.mode == 'dashboard-regex': print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$') sys.exit(0) elif args.mode == 'format': for job in jobs: print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()})) sys.exit(0) elif args.mode == 'atdash': print('https://atdash.meo.ws/d/nipgvEwmk/archivebot?orgId=1&' + '&'.join(f'var-ident={job["jobid"]}' for job in jobs)) sys.exit(0) # Renderers def render_date(ts, coloured = False): global args, currentTime diff = currentTime - ts colourStr = f"\x1b[{0 if diff < 6 * 3600 else 7};31m" if coloured and diff >= 300 else "" colourEndStr = "\x1b[0m" if colourStr else "" if args.dates: return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = " "), colourEndStr) if diff < -86400: return (colourStr, f"in {-diff // 86400:.0f}d {(-diff % 86400) // 3600:.0f}h", colourEndStr) elif diff < -60: return (colourStr, "in " + (f"{-diff // 3600:.0f}h " if diff <= -3600 else "") + f"{(-diff % 3600) // 60:.0f}mn", colourEndStr) elif diff < 0: return "in <1 min" elif diff == 0: return "now" elif diff < 60: return "<1 min ago" elif diff < 86400: return (colourStr, (f"{diff // 3600:.0f}h " if diff >= 3600 else "") + f"{(diff % 3600) // 60:.0f}mn ago", colourEndStr) else: return (colourStr, f"{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago", colourEndStr) def render_size(size): units = ('B', 'KiB', 'MiB', 'GiB', 'TiB') unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0 if unitIdx == 0: return f'{size} B' # No decimal places return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}' renderers = {} for column, (_, columnAttr) in columns.items(): if "date" in columnAttr: if "coloured" in columnAttr: renderers[column] = lambda x: render_date(x, coloured = not args.no_colours) else: renderers[column] = render_date elif "size" in columnAttr: renderers[column] = render_size elif isinstance(jobs[0][column], (int, float)): renderers[column] = str for job in jobs: for column in renderers: job[column] = renderers[column](job[column]) # Truncate if applicable printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]} if not args.no_table and not args.no_truncate: widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns} minWidthsD = {column: len(column) for column in printableColumns} try: termWidth = os.get_terminal_size().columns except OSError as e: if e.errno == 25: # Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping) # Silently ignore this and don't truncate termWidth = float('Inf') else: raise overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth if overage > 0: if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth: # Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally print('Sorry, cannot truncate columns to terminal width', file = sys.stderr) else: # Distribute overage to truncatable columns proportionally to each column's length over the minimum truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]} totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns) trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns} if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1: # Truncated one more character than necessary due to the flooring; add it again to the shortest column trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1 for job in jobs: for column in truncatableColumns: if len(job[column]) > trWidthsD[column]: job[column] = job[column][:trWidthsD[column] - 1] + '…' # Print output = [] output.append(tuple(column.upper() for column in columns if "hidden" not in columns[column][1])) for job in jobs: output.append(tuple(job[column] for column in columns if "hidden" not in columns[column][1])) if not args.no_table: widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output)) for row in output: print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths))) else: for row in output: print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))