#!/usr/bin/env python3 import argparse import datetime import itertools import json import math import os import re import sys import time import urllib.request # Column definitions columns = { 'jobid': (lambda job, pipelines: job['job_data']['ident'], ()), 'url': (lambda job, pipelines: job['job_data']['url'], ('truncatable',)), 'user': (lambda job, pipelines: job['job_data']['started_by'], ()), 'pipenick': (lambda job, pipelines: pipelines[job['job_data']['pipeline_id']] if job['job_data']['pipeline_id'] in pipelines else 'unknown', ()), 'queued': (lambda job, pipelines: job['job_data']['queued_at'], ('date', 'numeric')), 'started': (lambda job, pipelines: job['job_data']['started_at'], ('date', 'numeric')), 'last active': (lambda job, pipelines: int(job['ts']), ('date', 'coloured', 'numeric')), 'dl urls': (lambda job, pipelines: job['job_data']['items_downloaded'], ('numeric',)), 'dl size': (lambda job, pipelines: job['job_data']['bytes_downloaded'], ('size', 'numeric')), 'queue': (lambda job, pipelines: job['job_data']['items_queued'] - job['job_data']['items_downloaded'], ('numeric',)), 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job['job_data']['items_queued'] - job['job_data']['items_downloaded']) / (job['job_data']['items_downloaded'] / (curTime - job['job_data']['started_at']))) if job['job_data']['items_downloaded'] > 0 else 0, ('date', 'numeric')), # Overwritten below if --grafana-eta is given 'con': (lambda job, pipelines: job['job_data']['concurrency'], ('numeric',)), 'delay min': (lambda job, pipelines: int(job['job_data']['delay_min']), ('hidden', 'numeric')), 'delay max': (lambda job, pipelines: int(job['job_data']['delay_max']), ('hidden', 'numeric')), 'delay': (lambda job, pipelines: str(int(job['job_data']['delay_min'])) + '-' + str(int(job['job_data']['delay_max'])) if job['job_data']['delay_min'] != job['job_data']['delay_max'] else str(int(job['job_data']['delay_min'])), ()), } defaultSort = 'jobid' # Validate if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()): # Truncation code can't handle renderers raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable') # Filter function def make_field_filter(column, op, value, caseSensitive = True): compFunc = { '=': lambda a, b: a == b, '<': lambda a, b: a < b, '>': lambda a, b: a > b, '^': lambda a, b: a.startswith(b), '*': lambda a, b: b in a, '$': lambda a, b: a.endswith(b), '~': lambda a, b: re.search(b, a) is not None, }[op] transform = { True: (lambda x: x), False: (lambda x: x.lower() if isinstance(x, str) else x) }[caseSensitive] return (lambda job: compFunc(transform(job[column]), transform(value))) # Parse arguments class FilterAction(argparse.Action): def __call__(self, parser, namespace, values, optionString = None): if optionString == '--pyfilter': try: func = compile(values[0], '', 'eval') except Exception as e: parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}') setattr(namespace, self.dest, lambda job: eval(func, {}, {'job': job})) return global columns match = re.match(r'^(?P[A-Za-z ]+)(?P[=<>^*$~])(?P.*)$', values[0]) if not match: parser.error('Invalid filter') filterDict = match.groupdict() filterDict['column'] = filterDict['column'].lower() assert filterDict['column'] in columns if 'numeric' in columns[filterDict['column']][1]: filterDict['value'] = float(filterDict['value']) if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0: filterDict['value'] = time.time() + filterDict['value'] setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f')))) def parse_sort(value): global columns sortDesc = value.startswith('-') if sortDesc: value = value[1:] value = value.lower() if value not in columns: parser.error('Invalid column name') return (value, sortDesc) class SortAction(argparse.Action): def __call__(self, parser, namespace, values, optionString = None): result = parse_sort(values[0]) if getattr(namespace, self.dest, None) is None: setattr(namespace, self.dest, []) getattr(namespace, self.dest).append(result) parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter) parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([ 'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.', 'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE', ' = means the value must be exactly as specified.', ' < and > mean it must be less/greater than the specified.', ' ^ and $ mean it must start/end with the specified.', ' * means it must contain the specified.', ' ~ means it must match the specified regex.', ])) parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive') parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`') parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.") parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format', 'atdash'), default = 'table', help = '\n'.join([ 'Output modes:', ' table: print a table of the matched jobs', ' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter', ' con-d-commands: print !con and !d commands for the current settings', ' format: print some output for each job, separated by newlines; this requires the --format option', ' atdash: print the URL for displaying the matched jobs on atdash', ])) parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)") parser.add_argument('--no-table', action = 'store_true', help = 'Raw non-columnised output; columns are separated by tabs. (Table mode only)') parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)') parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)') parser.add_argument('--grafana-eta', action = 'store_true', help = 'Enable fetching data from Grafana for a better ETA on long-running jobs. (Table mode only)') parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)') parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)') args = parser.parse_args() if args.mode == 'format' and not args.format: print('Error: when using format mode, --format is required.', file = sys.stderr) sys.exit(1) if not args.sort: args.sort = [parse_sort(defaultSort)] if args.mode == 'con-d-commands': args.mode = 'format' args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}' else: args.replace_concurrency = None args.replace_delay = None # Retrieve def fetch(url): req = urllib.request.Request(url) req.add_header('Accept', 'application/json') with urllib.request.urlopen(req) as f: if f.getcode() != 200: raise RuntimeError('Could not fetch job data') return json.load(f) jobdata = fetch('http://dashboard.at.ninjawedding.org/logs/recent?count=1') pipelinedata = fetch('http://dashboard.at.ninjawedding.org/pipelines') currentTime = time.time() # Process pipelines = {p['id']: p['nickname'] for p in pipelinedata['pipelines']} jobs = [] for job in jobdata: jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()}) if not jobs: # Nothing to do sys.exit(0) # Filter if args.filter: jobs = [job for job in jobs if args.filter(job)] if not jobs: sys.exit(0) # Retrieve Grafana ETA if appropriate if args.grafana_eta and args.mode == 'table': def populate_grafana_eta(jobs): idents = {job['jobid'] for job in jobs} if not idents: return # Request for i, timeFilter in enumerate(('time>=now()-10m', 'time>=now()-24h-10m AND time<=now()-24h+10m')): req = urllib.request.Request('https://atdash.meo.ws/api/datasources/proxy/1/query?db=ateam&epoch=s') req.add_header('Content-Type', 'application/x-www-form-urlencoded') query = 'SELECT mean("items_queued")-mean("items_downloaded") FROM "grabsite" WHERE (' query += ' OR '.join(f""""ident"='{job["jobid"]}'""" for job in jobs) query += ')' query += f' AND {timeFilter}' query += ' GROUP BY time(1m), * fill(none)' query = f'q={urllib.parse.quote(query)}' req.data = query.encode('utf-8') with urllib.request.urlopen(req) as f: if f.getcode() != 200: raise RuntimeError('Could not fetch Grafana data') if i == 0: dataNow = json.load(f) else: data24hAgo = json.load(f) # Restructure data dataNow = {x['tags']['ident']: x['values'][-1] for x in dataNow['results'][0]['series']} data24hAgo = {x['tags']['ident']: x['values'][len(x['values']) // 2] for x in data24hAgo['results'][0]['series']} # Calculate ETA for job in jobs: if job['jobid'] not in dataNow or job['jobid'] not in data24hAgo: # Job not started yet 24 hours ago or no datapoint for whatever reason job['eta'] = 0 continue nowTs, nowTodo = dataNow[job['jobid']] prevTs, prevTodo = data24hAgo[job['jobid']] if nowTodo < 0 or prevTodo < 0: # Negative queue size due to AB's buggy queue counting job['eta'] = 0 continue if nowTodo >= prevTodo: # Queue hasn't shrunk job['eta'] = 0 continue job['eta'] = nowTs + nowTodo / ((prevTodo - nowTodo) / (nowTs - prevTs)) populate_grafana_eta(jobs) # Sort class reversor: # https://stackoverflow.com/a/56842689 def __init__(self, obj): self.obj = obj def __eq__(self, other): return other.obj == self.obj def __lt__(self, other): return other.obj < self.obj sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort) if not args.dates: # Reverse sorting order for columns which have a date attribute since the column will have elapsed time sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns) jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns)) # Concurrency and delay overrides if specified and relevant if args.replace_concurrency is not None or args.replace_delay is not None: for job in jobs: if args.replace_concurrency is not None: job['con'] = args.replace_concurrency[0] if args.replace_delay is not None: job['delay min'] = args.replace_delay[0] job['delay max'] = args.replace_delay[1] # Non-table output modes if args.mode == 'dashboard-regex': print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$') sys.exit(0) elif args.mode == 'format': for job in jobs: print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()})) sys.exit(0) elif args.mode == 'atdash': print('https://atdash.meo.ws/d/nipgvEwmk/archivebot?orgId=1&' + '&'.join(f'var-ident={job["jobid"]}' for job in jobs)) sys.exit(0) # Renderers def render_date(ts, coloured = False): global args, currentTime diff = currentTime - ts colourStr = f'\x1b[{0 if diff < 6 * 3600 else 7};31m' if coloured and diff >= 300 else '' colourEndStr = '\x1b[0m' if colourStr else '' if ts == 0: return 'N/A' if args.dates: return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = ' '), colourEndStr) if diff < -86400: return (colourStr, f'in {-diff // 86400:.0f}d {(-diff % 86400) // 3600:.0f}h', colourEndStr) elif diff < -60: return (colourStr, 'in ' + (f'{-diff // 3600:.0f}h ' if diff <= -3600 else '') + f'{(-diff % 3600) // 60:.0f}mn', colourEndStr) elif diff < 0: return 'in <1 min' elif diff == 0: return 'now' elif diff < 60: return '<1 min ago' elif diff < 86400: return (colourStr, (f'{diff // 3600:.0f}h ' if diff >= 3600 else '') + f'{(diff % 3600) // 60:.0f}mn ago', colourEndStr) else: return (colourStr, f'{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago', colourEndStr) def render_size(size): units = ('B', 'KiB', 'MiB', 'GiB', 'TiB') unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0 if unitIdx == 0: return f'{size} B' # No decimal places return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}' renderers = {} for column, (_, columnAttr) in columns.items(): if 'date' in columnAttr: if 'coloured' in columnAttr: renderers[column] = lambda x: render_date(x, coloured = not args.no_colours) else: renderers[column] = render_date elif 'size' in columnAttr: renderers[column] = render_size elif isinstance(jobs[0][column], (int, float)): renderers[column] = str for job in jobs: for column in renderers: job[column] = renderers[column](job[column]) # Truncate if applicable printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]} if not args.no_table and not args.no_truncate: widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns} minWidthsD = {column: len(column) for column in printableColumns} try: termWidth = os.get_terminal_size().columns except OSError as e: if e.errno == 25: # Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping) # Silently ignore this and don't truncate termWidth = float('Inf') else: raise overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth if overage > 0: if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth: # Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally print('Sorry, cannot truncate columns to terminal width', file = sys.stderr) else: # Distribute overage to truncatable columns proportionally to each column's length over the minimum truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]} totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns) trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns} if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1: # Truncated one more character than necessary due to the flooring; add it again to the shortest column trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1 for job in jobs: for column in truncatableColumns: if len(job[column]) > trWidthsD[column]: job[column] = job[column][:trWidthsD[column] - 1] + '…' # Print output = [] output.append(tuple(column.upper() for column in columns if 'hidden' not in columns[column][1])) for job in jobs: output.append(tuple(job[column] for column in columns if 'hidden' not in columns[column][1])) if not args.no_table: widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output)) for row in output: print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths))) else: for row in output: print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))