diff --git a/inefficient_jobs_detector.yml b/inefficient_jobs_detector.yml new file mode 100644 index 0000000..a704e4f --- /dev/null +++ b/inefficient_jobs_detector.yml @@ -0,0 +1,5 @@ +--- +- hosts: scheduler + become: true + roles: + - inefficient_jobs_detector diff --git a/roles/inefficient_jobs_detector/files/find_inefficient_jobs b/roles/inefficient_jobs_detector/files/find_inefficient_jobs new file mode 100755 index 0000000..fe4110d --- /dev/null +++ b/roles/inefficient_jobs_detector/files/find_inefficient_jobs @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 + +""" +This script finds inefficient SLURM jobs for a given time range, +and reports them to Slack. + +To determine if a job is inefficient, its CPU and memory usage +are calculated and compared to a given threshold. +""" + +import argparse +import datetime +import json +import os +import re +import requests +import subprocess + +# Regular expression for parse_time function. +time_parts = re.compile(r'(((?P\d+)-)?(?P\d\d):)?' + + r'(?P\d\d):(?P\d\d(\.\d+)?)') + +message_header = '''The following job(s) wasted a lot of resources:''' + + +def parse_time(t): + """ + Parse a time string containing "days-hh:mm:ss". + + Args: + t (str): time string + + Returns: tuple: (seconds, minutes, hours, days), all as integers. + """ + + m = time_parts.match(t) + if m is None: + return 0.0, 0, 0, 0 + ss = float(m.group('seconds')) + mm = int(m.group('minutes')) + hh = int(m.group('hours') or '0') + dd = int(m.group('days') or '0') + + return ss, mm, hh, dd + + +def time_to_seconds(elapsed): + """ + Calculate total number of seconds for a time string containing "days-hh:mm:ss". + + Args: + elapsed (str): time string + + Returns: int: total number of seconds represented by the given time string. + """ + + ss, mm, hh, dd = parse_time(elapsed) + return dd * 24 * 60 * 60 + hh * 60 * 60 + mm * 60 + ss + + +FIELDS = ['jobid', 'user', 'partition', 'elapsedraw', 'nnodes', 'totalcpu', 'reqmem', 'maxrss', 'ncpus'] +FIELD_TYPES = [str, str, str, int, int, time_to_seconds, float, float, int] + + +def get_job_stats(start_time, end_time): + """ + Find all jobs between the given start and end time using the sacct command, + collect relevant information about the jobs, and return this as a dictionary. + + Args: + start time (str): start time as "T" + end time (str): end time as "T" + + Returns: dict: dictionary with all jobs, using the ID as key. + """ + + fields = ','.join(FIELDS) + sacct = subprocess.check_output( + ['sacct', f'--starttime={start_time}', f'--endtime={end_time}', '--allusers', f'--format={fields}', + '--state=CD,TO', '--units=G', '--parsable2', '--noheader'] + ).decode() + lines = sacct.strip().split('\n') + + jobs = {} + + for l in lines: + #values = list(map(lambda t,v: t(v), FIELD_TYPES, l.split('|'))) + values = l.split('|') + jobid = values[0] + + # Remove trailing G from MaxRSS + values[FIELDS.index('maxrss')] = values[FIELDS.index('maxrss')].split('G')[0] if values[FIELDS.index('maxrss')] else 0 + # ReqMem can be specified as Gn (GB per node) or Gc (GB per core); calculate the total and remove the suffix. + if values[FIELDS.index('reqmem')][-1] == 'n': + values[FIELDS.index('reqmem')] = values[FIELDS.index('reqmem')][:-2] + elif values[FIELDS.index('reqmem')][-1] == 'c': + # Multiply by the (average) number of CPUs per node. The real number of cores per node cannot be obtained by sacct. + cpn = int(values[FIELDS.index('ncpus')]) / int(values[FIELDS.index('nnodes')]) + values[FIELDS.index('reqmem')] = float(values[FIELDS.index('reqmem')][:-2]) * cpn + + # Convert all values to the specified type. + values = [t(v) for t,v in zip(FIELD_TYPES, values)] + + # If the job id contains a dot, it's a step of a job. + # In this case, check if its memory usage is higher than the current maximum memory usage of the job. + if '.' in values[0]: + jobs[jobid.split('.')[0]]['maxrss'] = max(values[FIELDS.index('maxrss')], jobs[jobid.split('.')[0]]['maxrss']) + continue + + job = dict(zip(FIELDS[1:], values[1:])) + + jobs[values[0]] = job + return jobs + + +def find_inefficient_jobs(jobs, min_walltime=600, cpueff_threshold=25, memeff_threshold=25, min_reqmem=10): + """ + Find and return the inefficient jobs in a dictionary of jobs. + + Args: + jobs (dict): all jobs to be considered. + min_walltime (int): jobs with a lower walltime will be ignored. + cpueff_threshold (int): jobs with a lower CPU efficiency percentage will be reported. + memeff_threshold (int): jobs with a lower memory efficiency percentage will be reported. + min_reqmem (int): jobs with a lower memory request (in GBs) will be ignored. + + Returns: dict: dictionary with all inefficient jobs, using the ID as key. + """ + + inefficient_jobs = {} + for id, stats in jobs.items(): + # Only take jobs with some minimum length in account; + # for shorter jobs the statistics may not be accurate. + if stats['elapsedraw'] > min_walltime: + cpueff = 100 * stats['totalcpu'] / (stats['elapsedraw'] * stats['ncpus']) + memeff = 100 * stats['maxrss'] / stats['reqmem'] + # Find jobs with a low memory or CPU efficiency. + # For the latter, we exclude jobs from the GPU partition, since they + # might still use the GPU, which could lead to low CPU utilization. + if (stats['reqmem'] > min_reqmem and memeff < memeff_threshold) or \ + (cpueff < cpueff_threshold and stats['partition'] != 'gpu'): + inefficient_jobs[id] = stats + inefficient_jobs[id]['cpueff'] = cpueff + inefficient_jobs[id]['memeff'] = memeff + + return inefficient_jobs + + +def post_to_slack(message: str, slack_url: str): + """ + Post a message to slack. + + Args: + message (str): Message to post + slack_url (str): url to post message to + """ + data = json.dumps({ + 'channel': '#kill-hogs', + 'username': 'Deputy_Enos', + 'text': message, + 'icon_emoji': ':enos:' + }).encode('utf-8') + response = requests.post( + slack_url, data=data, headers={'Content-Type': 'application/json'}) + + +def parse_args(): + """ + Parse the command-line parameters. + + Returns: + Namespace: populated namespace with the parsed values. + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--start", + type=str, + default=datetime.datetime.now().strftime("%Y-%m-%dT00:00:00"), + help="start time as YYYY-MM-DDTHH:MM:SS") + parser.add_argument( + "--end", + type=str, + default=datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + help="end time as YYYY-MM-DDTHH:MM:SS") + parser.add_argument( + "--min-walltime", + type=int, + default=600, + help="jobs with a lower walltime will be ignored") + parser.add_argument( + "--min-reqmem", + type=int, + default=10, + help="jobs with a lower memory request (in GBs) will be ignored") + parser.add_argument( + "--cpueff-threshold", + type=int, + default=25, + help="jobs with a lower CPU efficiency percentage will be reported") + parser.add_argument( + "--memeff-threshold", + type=int, + default=25, + help="jobs with a lower memory efficiency percentage will be reported") + + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + + jobstats = get_job_stats(args.start, args.end) + ineff_jobs = find_inefficient_jobs( + jobstats, + min_walltime = args.min_walltime, + min_reqmem = args.min_reqmem, + cpueff_threshold = args.cpueff_threshold, + memeff_threshold = args.memeff_threshold + ) + + if ineff_jobs: + slack_url = os.environ.get('SLACK_URL') + message = [message_header] + for jobid, job in ineff_jobs.items(): + message.append(f'Job id: {jobid:10} User: {job["user"]:9} CPU efficiency: {job["cpueff"]:4.1f}% Memory efficiency: {job["memeff"]:4.1f}%') + if slack_url: + post_to_slack('\n'.join(message), slack_url) + else: + print('\n'.join(message)) diff --git a/roles/inefficient_jobs_detector/tasks/main.yml b/roles/inefficient_jobs_detector/tasks/main.yml new file mode 100644 index 0000000..bc66836 --- /dev/null +++ b/roles/inefficient_jobs_detector/tasks/main.yml @@ -0,0 +1,21 @@ +--- +- name: install the script. + copy: + src: files/find_inefficient_jobs + dest: /sbin/find_inefficient_jobs + mode: 0755 + +- cron: + name: Report inefficient jobs. + weekday: '0' + minute: "59" + hour: "23" + user: root + job: 'SLACK_URL={{ slack_url }} /usr/bin/python3 /sbin/find_inefficient_jobs --min-walltime 3600 --memeff-threshold 10 --cpueff-threshold 10 --min-reqmem 24 --start `date --date="1 week ago" +'%Y-%m-%dT%H:%M'` --end `date +'%Y-%m-%dT%H:%M'`' + cron_file: find_inefficient_jobs + +# Don't buzz the whole team... yet +- cronvar: + name: MAILTO + value: b.e.droge@rug.nl + cron_file: find_inefficient_jobs diff --git a/roles/inefficient_jobs_detector/vars/main.yml b/roles/inefficient_jobs_detector/vars/main.yml new file mode 100644 index 0000000..b6b9157 --- /dev/null +++ b/roles/inefficient_jobs_detector/vars/main.yml @@ -0,0 +1,10 @@ +$ANSIBLE_VAULT;1.1;AES256 +38323261346639393130636237633131323663353035663964623138643335666332343735346566 +6662643730613634396265396234306239343438653034330a353538326330386235323232656363 +34386361643566376163616232346237306235613962313630353437623136393532333165656364 +3736306636633861640a336430316665386435613663363634336363616231643437303034663864 +63383864333166383730396364613036626665366464633661346235613537663731363464373831 +36326565616130396132373931336137373064663033333037346434613438633463663163656137 +38376231343432376436363234353934626534303063306365386639663938643639316130326638 +62383366663834626638626531333136636335383962626635383966613237356333666561623834 +37353066333235356630396131336162336239363035333065626561313336356166