The little things give you away... A collection of various small helper stuff
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 

351 řádky
16 KiB

  1. #!/usr/bin/env python3
  2. import argparse
  3. import datetime
  4. import itertools
  5. import json
  6. import math
  7. import os
  8. import re
  9. import sys
  10. import time
  11. import urllib.request
  12. # Column definitions
  13. columns = {
  14. 'jobid': (lambda job, pipelines: job['job_data']['ident'], ()),
  15. 'url': (lambda job, pipelines: job['job_data']['url'], ('truncatable',)),
  16. 'user': (lambda job, pipelines: job['job_data']['started_by'], ()),
  17. 'pipenick': (lambda job, pipelines: pipelines[job['job_data']['pipeline_id']] if job['job_data']['pipeline_id'] in pipelines else 'unknown', ()),
  18. 'queued': (lambda job, pipelines: job['job_data']['queued_at'], ('date', 'numeric')),
  19. 'started': (lambda job, pipelines: job['job_data']['started_at'], ('date', 'numeric')),
  20. 'last active': (lambda job, pipelines: int(job['ts']), ('date', 'coloured', 'numeric')),
  21. 'dl urls': (lambda job, pipelines: job['job_data']['items_downloaded'], ('numeric',)),
  22. 'dl size': (lambda job, pipelines: job['job_data']['bytes_downloaded'], ('size', 'numeric')),
  23. 'queue': (lambda job, pipelines: job['job_data']['items_queued'] - job['job_data']['items_downloaded'], ('numeric',)),
  24. 'eta': (lambda job, pipelines: int((curTime := time.time()) + (job['job_data']['items_queued'] - job['job_data']['items_downloaded']) / (job['job_data']['items_downloaded'] / (curTime - job['job_data']['started_at']))) if job['job_data']['items_downloaded'] > 0 else 0, ('date', 'numeric')),
  25. # Overwritten below if --grafana-eta is given
  26. 'con': (lambda job, pipelines: job['job_data']['concurrency'], ('numeric',)),
  27. 'delay min': (lambda job, pipelines: int(job['job_data']['delay_min']), ('hidden', 'numeric')),
  28. 'delay max': (lambda job, pipelines: int(job['job_data']['delay_max']), ('hidden', 'numeric')),
  29. 'delay': (lambda job, pipelines: str(int(job['job_data']['delay_min'])) + '-' + str(int(job['job_data']['delay_max'])) if job['job_data']['delay_min'] != job['job_data']['delay_max'] else str(int(job['job_data']['delay_min'])), ()),
  30. }
  31. defaultSort = 'jobid'
  32. # Validate
  33. if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()):
  34. # Truncation code can't handle renderers
  35. raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable')
  36. # Filter function
  37. def make_field_filter(column, op, value, caseSensitive = True):
  38. compFunc = {
  39. '=': lambda a, b: a == b,
  40. '<': lambda a, b: a < b,
  41. '>': lambda a, b: a > b,
  42. '^': lambda a, b: a.startswith(b),
  43. '*': lambda a, b: b in a,
  44. '$': lambda a, b: a.endswith(b),
  45. '~': lambda a, b: re.search(b, a) is not None,
  46. }[op]
  47. transform = {
  48. True: (lambda x: x),
  49. False: (lambda x: x.lower() if isinstance(x, str) else x)
  50. }[caseSensitive]
  51. return (lambda job: compFunc(transform(job[column]), transform(value)))
  52. # Parse arguments
  53. class FilterAction(argparse.Action):
  54. def __call__(self, parser, namespace, values, optionString = None):
  55. if optionString == '--pyfilter':
  56. try:
  57. func = compile(values[0], '<pyfilter>', 'eval')
  58. except Exception as e:
  59. parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}')
  60. setattr(namespace, self.dest, lambda job: eval(func, {**{k: v for k, v in globals().items() if k in ('datetime', 'math', 're', 'time')}, 'job': job}))
  61. return
  62. global columns
  63. match = re.match(r'^(?P<column>[A-Za-z ]+)(?P<op>[=<>^*$~])(?P<value>.*)$', values[0])
  64. if not match:
  65. parser.error('Invalid filter')
  66. filterDict = match.groupdict()
  67. filterDict['column'] = filterDict['column'].lower()
  68. assert filterDict['column'] in columns
  69. if 'numeric' in columns[filterDict['column']][1]:
  70. filterDict['value'] = float(filterDict['value'])
  71. if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0:
  72. filterDict['value'] = time.time() + filterDict['value']
  73. setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f'))))
  74. def parse_sort(value):
  75. global columns
  76. sortDesc = value.startswith('-')
  77. if sortDesc:
  78. value = value[1:]
  79. value = value.lower()
  80. if value not in columns:
  81. parser.error('Invalid column name')
  82. return (value, sortDesc)
  83. class SortAction(argparse.Action):
  84. def __call__(self, parser, namespace, values, optionString = None):
  85. result = parse_sort(values[0])
  86. if getattr(namespace, self.dest, None) is None:
  87. setattr(namespace, self.dest, [])
  88. getattr(namespace, self.dest).append(result)
  89. parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter)
  90. parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([
  91. 'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.',
  92. 'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE',
  93. ' = means the value must be exactly as specified.',
  94. ' < and > mean it must be less/greater than the specified.',
  95. ' ^ and $ mean it must start/end with the specified.',
  96. ' * means it must contain the specified.',
  97. ' ~ means it must match the specified regex.',
  98. ]))
  99. parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive')
  100. parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`')
  101. parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.")
  102. parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format', 'atdash'), default = 'table', help = '\n'.join([
  103. 'Output modes:',
  104. ' table: print a table of the matched jobs',
  105. ' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter',
  106. ' con-d-commands: print !con and !d commands for the current settings',
  107. ' format: print some output for each job, separated by newlines; this requires the --format option',
  108. ' atdash: print the URL for displaying the matched jobs on atdash',
  109. ]))
  110. parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)")
  111. parser.add_argument('--no-table', action = 'store_true', help = 'Raw non-columnised output; columns are separated by tabs. (Table mode only)')
  112. parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)')
  113. parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)')
  114. parser.add_argument('--grafana-eta', action = 'store_true', help = 'Enable fetching data from Grafana for a better ETA on long-running jobs. (Table mode only)')
  115. parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
  116. parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
  117. parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)')
  118. args = parser.parse_args()
  119. if args.mode == 'format' and not args.format:
  120. print('Error: when using format mode, --format is required.', file = sys.stderr)
  121. sys.exit(1)
  122. if not args.sort:
  123. args.sort = [parse_sort(defaultSort)]
  124. if args.mode == 'con-d-commands':
  125. args.mode = 'format'
  126. args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}'
  127. else:
  128. args.replace_concurrency = None
  129. args.replace_delay = None
  130. # Retrieve
  131. def fetch(url):
  132. req = urllib.request.Request(url)
  133. req.add_header('Accept', 'application/json')
  134. with urllib.request.urlopen(req) as f:
  135. if f.getcode() != 200:
  136. raise RuntimeError('Could not fetch job data')
  137. return json.load(f)
  138. jobdata = fetch('http://archivebot.com/logs/recent?count=1')
  139. pipelinedata = fetch('http://archivebot.com/pipelines')
  140. currentTime = time.time()
  141. # Process
  142. pipelines = {p['id']: p['nickname'] for p in pipelinedata['pipelines']}
  143. jobs = []
  144. for job in jobdata:
  145. jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()})
  146. if not jobs:
  147. # Nothing to do
  148. sys.exit(0)
  149. # Filter
  150. if args.filter:
  151. jobs = [job for job in jobs if args.filter(job)]
  152. if not jobs:
  153. sys.exit(0)
  154. # Retrieve Grafana ETA if appropriate
  155. if args.grafana_eta and args.mode == 'table':
  156. def populate_grafana_eta(jobs):
  157. idents = {job['jobid'] for job in jobs}
  158. if not idents:
  159. return
  160. # Request
  161. for i, timeFilter in enumerate(('time>=now()-10m', 'time>=now()-24h-10m AND time<=now()-24h+10m')):
  162. req = urllib.request.Request('https://atdash.meo.ws/api/datasources/proxy/1/query?db=ateam&epoch=s')
  163. req.add_header('Content-Type', 'application/x-www-form-urlencoded')
  164. query = 'SELECT mean("items_queued")-mean("items_downloaded") FROM "grabsite" WHERE ('
  165. query += ' OR '.join(f""""ident"='{job["jobid"]}'""" for job in jobs)
  166. query += ')'
  167. query += f' AND {timeFilter}'
  168. query += ' GROUP BY time(1m), * fill(none)'
  169. query = f'q={urllib.parse.quote(query)}'
  170. req.data = query.encode('utf-8')
  171. with urllib.request.urlopen(req) as f:
  172. if f.getcode() != 200:
  173. raise RuntimeError('Could not fetch Grafana data')
  174. if i == 0:
  175. dataNow = json.load(f)
  176. else:
  177. data24hAgo = json.load(f)
  178. # Restructure data
  179. dataNow = {x['tags']['ident']: x['values'][-1] for x in dataNow['results'][0]['series']}
  180. data24hAgo = {x['tags']['ident']: x['values'][len(x['values']) // 2] for x in data24hAgo['results'][0]['series']}
  181. # Calculate ETA
  182. for job in jobs:
  183. if job['jobid'] not in dataNow or job['jobid'] not in data24hAgo: # Job not started yet 24 hours ago or no datapoint for whatever reason
  184. job['eta'] = 0
  185. continue
  186. nowTs, nowTodo = dataNow[job['jobid']]
  187. prevTs, prevTodo = data24hAgo[job['jobid']]
  188. if nowTodo < 0 or prevTodo < 0: # Negative queue size due to AB's buggy queue counting
  189. job['eta'] = 0
  190. continue
  191. if nowTodo >= prevTodo: # Queue hasn't shrunk
  192. job['eta'] = 0
  193. continue
  194. job['eta'] = nowTs + nowTodo / ((prevTodo - nowTodo) / (nowTs - prevTs))
  195. populate_grafana_eta(jobs)
  196. # Sort
  197. class reversor: # https://stackoverflow.com/a/56842689
  198. def __init__(self, obj):
  199. self.obj = obj
  200. def __eq__(self, other):
  201. return other.obj == self.obj
  202. def __lt__(self, other):
  203. return other.obj < self.obj
  204. sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort)
  205. if not args.dates:
  206. # Reverse sorting order for columns which have a date attribute since the column will have elapsed time
  207. sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns)
  208. jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns))
  209. # Concurrency and delay overrides if specified and relevant
  210. if args.replace_concurrency is not None or args.replace_delay is not None:
  211. for job in jobs:
  212. if args.replace_concurrency is not None:
  213. job['con'] = args.replace_concurrency[0]
  214. if args.replace_delay is not None:
  215. job['delay min'] = args.replace_delay[0]
  216. job['delay max'] = args.replace_delay[1]
  217. # Non-table output modes
  218. if args.mode == 'dashboard-regex':
  219. print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$')
  220. sys.exit(0)
  221. elif args.mode == 'format':
  222. for job in jobs:
  223. print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()}))
  224. sys.exit(0)
  225. elif args.mode == 'atdash':
  226. print('https://atdash.meo.ws/d/nipgvEwmk/archivebot?orgId=1&' + '&'.join(f'var-ident={job["jobid"]}' for job in jobs))
  227. sys.exit(0)
  228. # Renderers
  229. def render_date(ts, coloured = False):
  230. global args, currentTime
  231. diff = currentTime - ts
  232. colourStr = f'\x1b[{0 if diff < 6 * 3600 else 7};31m' if coloured and diff >= 300 else ''
  233. colourEndStr = '\x1b[0m' if colourStr else ''
  234. if ts == 0:
  235. return 'N/A'
  236. if args.dates:
  237. return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = ' '), colourEndStr)
  238. if diff < -86400:
  239. return (colourStr, f'in {-diff // 86400:.0f}d {(-diff % 86400) // 3600:.0f}h', colourEndStr)
  240. elif diff < -60:
  241. return (colourStr, 'in ' + (f'{-diff // 3600:.0f}h ' if diff <= -3600 else '') + f'{(-diff % 3600) // 60:.0f}mn', colourEndStr)
  242. elif diff < 0:
  243. return 'in <1 min'
  244. elif diff == 0:
  245. return 'now'
  246. elif diff < 60:
  247. return '<1 min ago'
  248. elif diff < 86400:
  249. return (colourStr, (f'{diff // 3600:.0f}h ' if diff >= 3600 else '') + f'{(diff % 3600) // 60:.0f}mn ago', colourEndStr)
  250. else:
  251. return (colourStr, f'{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago', colourEndStr)
  252. def render_size(size):
  253. units = ('B', 'KiB', 'MiB', 'GiB', 'TiB')
  254. unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0
  255. if unitIdx == 0:
  256. return f'{size} B' # No decimal places
  257. return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}'
  258. renderers = {}
  259. for column, (_, columnAttr) in columns.items():
  260. if 'date' in columnAttr:
  261. if 'coloured' in columnAttr:
  262. renderers[column] = lambda x: render_date(x, coloured = not args.no_colours)
  263. else:
  264. renderers[column] = render_date
  265. elif 'size' in columnAttr:
  266. renderers[column] = render_size
  267. elif isinstance(jobs[0][column], (int, float)):
  268. renderers[column] = str
  269. for job in jobs:
  270. for column in renderers:
  271. job[column] = renderers[column](job[column])
  272. # Truncate if applicable
  273. printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]}
  274. if not args.no_table and not args.no_truncate:
  275. widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns}
  276. minWidthsD = {column: len(column) for column in printableColumns}
  277. try:
  278. termWidth = os.get_terminal_size().columns
  279. except OSError as e:
  280. if e.errno == 25:
  281. # Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping)
  282. # Silently ignore this and don't truncate
  283. termWidth = float('Inf')
  284. else:
  285. raise
  286. overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth
  287. if overage > 0:
  288. if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth:
  289. # Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally
  290. print('Sorry, cannot truncate columns to terminal width', file = sys.stderr)
  291. else:
  292. # Distribute overage to truncatable columns proportionally to each column's length over the minimum
  293. truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]}
  294. totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns)
  295. trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns}
  296. if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1:
  297. # Truncated one more character than necessary due to the flooring; add it again to the shortest column
  298. trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1
  299. for job in jobs:
  300. for column in truncatableColumns:
  301. if len(job[column]) > trWidthsD[column]:
  302. job[column] = job[column][:trWidthsD[column] - 1] + '…'
  303. # Print
  304. output = []
  305. output.append(tuple(column.upper() for column in columns if 'hidden' not in columns[column][1]))
  306. for job in jobs:
  307. output.append(tuple(job[column] for column in columns if 'hidden' not in columns[column][1]))
  308. if not args.no_table:
  309. widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output))
  310. for row in output:
  311. print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths)))
  312. else:
  313. for row in output:
  314. print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))