Would like to move to to https://github.com/rug-cit-hpc/pg-playbooks but has large files...
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
7.8 KiB

#!/usr/bin/env python3
This script finds inefficient SLURM jobs for a given time range,
and reports them to Slack.
To determine if a job is inefficient, its CPU and memory usage
are calculated and compared to a given threshold.
import argparse
import datetime
import json
import os
import re
import requests
import subprocess
# Regular expression for parse_time function.
time_parts = re.compile(r'(((?P<days>\d+)-)?(?P<hours>\d\d):)?' +
message_header = '''The following job(s) wasted a lot of resources:'''
def parse_time(t):
Parse a time string containing "days-hh:mm:ss".
t (str): time string
Returns: tuple: (seconds, minutes, hours, days), all as integers.
m = time_parts.match(t)
if m is None:
return 0.0, 0, 0, 0
ss = float(m.group('seconds'))
mm = int(m.group('minutes'))
hh = int(m.group('hours') or '0')
dd = int(m.group('days') or '0')
return ss, mm, hh, dd
def time_to_seconds(elapsed):
Calculate total number of seconds for a time string containing "days-hh:mm:ss".
elapsed (str): time string
Returns: int: total number of seconds represented by the given time string.
ss, mm, hh, dd = parse_time(elapsed)
return dd * 24 * 60 * 60 + hh * 60 * 60 + mm * 60 + ss
FIELDS = ['jobid', 'user', 'partition', 'elapsedraw', 'nnodes', 'totalcpu', 'reqmem', 'maxrss', 'ncpus']
FIELD_TYPES = [str, str, str, int, int, time_to_seconds, float, float, int]
def get_job_stats(start_time, end_time):
Find all jobs between the given start and end time using the sacct command,
collect relevant information about the jobs, and return this as a dictionary.
start time (str): start time as "<YYYY-MM-DD>T<HH:MM:SS>"
end time (str): end time as "<YYYY-MM-DD>T<HH:MM:SS>"
Returns: dict: dictionary with all jobs, using the ID as key.
fields = ','.join(FIELDS)
sacct = subprocess.check_output(
['sacct', f'--starttime={start_time}', f'--endtime={end_time}', '--allusers', f'--format={fields}',
'--state=CD,TO', '--units=G', '--parsable2', '--noheader']
lines = sacct.strip().split('\n')
jobs = {}
for l in lines:
#values = list(map(lambda t,v: t(v), FIELD_TYPES, l.split('|')))
values = l.split('|')
jobid = values[0]
# Remove trailing G from MaxRSS
values[FIELDS.index('maxrss')] = values[FIELDS.index('maxrss')].split('G')[0] if values[FIELDS.index('maxrss')] else 0
# ReqMem can be specified as Gn (GB per node) or Gc (GB per core); calculate the total and remove the suffix.
if values[FIELDS.index('reqmem')][-1] == 'n':
values[FIELDS.index('reqmem')] = values[FIELDS.index('reqmem')][:-2]
elif values[FIELDS.index('reqmem')][-1] == 'c':
# Multiply by the (average) number of CPUs per node. The real number of cores per node cannot be obtained by sacct.
cpn = int(values[FIELDS.index('ncpus')]) / int(values[FIELDS.index('nnodes')])
values[FIELDS.index('reqmem')] = float(values[FIELDS.index('reqmem')][:-2]) * cpn
# Convert all values to the specified type.
values = [t(v) for t,v in zip(FIELD_TYPES, values)]
# If the job id contains a dot, it's a step of a job.
# In this case, check if its memory usage is higher than the current maximum memory usage of the job.
if '.' in values[0]:
jobs[jobid.split('.')[0]]['maxrss'] = max(values[FIELDS.index('maxrss')], jobs[jobid.split('.')[0]]['maxrss'])
job = dict(zip(FIELDS[1:], values[1:]))
jobs[values[0]] = job
return jobs
def find_inefficient_jobs(jobs, min_walltime=600, cpueff_threshold=25, memeff_threshold=25, min_reqmem=10):
Find and return the inefficient jobs in a dictionary of jobs.
jobs (dict): all jobs to be considered.
min_walltime (int): jobs with a lower walltime will be ignored.
cpueff_threshold (int): jobs with a lower CPU efficiency percentage will be reported.
memeff_threshold (int): jobs with a lower memory efficiency percentage will be reported.
min_reqmem (int): jobs with a lower memory request (in GBs) will be ignored.
Returns: dict: dictionary with all inefficient jobs, using the ID as key.
inefficient_jobs = {}
for id, stats in jobs.items():
# Only take jobs with some minimum length in account;
# for shorter jobs the statistics may not be accurate.
if stats['elapsedraw'] > min_walltime:
cpueff = 100 * stats['totalcpu'] / (stats['elapsedraw'] * stats['ncpus'])
memeff = 100 * stats['maxrss'] / stats['reqmem']
# Find jobs with a low memory or CPU efficiency.
# For the latter, we exclude jobs from the GPU partition, since they
# might still use the GPU, which could lead to low CPU utilization.
if (stats['reqmem'] > min_reqmem and memeff < memeff_threshold) or \
(cpueff < cpueff_threshold and stats['partition'] != 'gpu'):
inefficient_jobs[id] = stats
inefficient_jobs[id]['cpueff'] = cpueff
inefficient_jobs[id]['memeff'] = memeff
return inefficient_jobs
def post_to_slack(message: str, slack_url: str):
Post a message to slack.
message (str): Message to post
slack_url (str): url to post message to
data = json.dumps({
'channel': '#kill-hogs',
'username': 'Deputy_Enos',
'text': message,
'icon_emoji': ':enos:'
response = requests.post(
slack_url, data=data, headers={'Content-Type': 'application/json'})
def parse_args():
Parse the command-line parameters.
Namespace: populated namespace with the parsed values.
parser = argparse.ArgumentParser()
help="start time as YYYY-MM-DDTHH:MM:SS")
help="end time as YYYY-MM-DDTHH:MM:SS")
help="jobs with a lower walltime will be ignored")
help="jobs with a lower memory request (in GBs) will be ignored")
help="jobs with a lower CPU efficiency percentage will be reported")
help="jobs with a lower memory efficiency percentage will be reported")
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
jobstats = get_job_stats(args.start, args.end)
ineff_jobs = find_inefficient_jobs(
min_walltime = args.min_walltime,
min_reqmem = args.min_reqmem,
cpueff_threshold = args.cpueff_threshold,
memeff_threshold = args.memeff_threshold
if ineff_jobs:
slack_url = os.environ.get('SLACK_URL')
message = [message_header]
for jobid, job in ineff_jobs.items():
message.append(f'Job id: {jobid:10} User: {job["user"]:9} CPU efficiency: {job["cpueff"]:4.1f}% Memory efficiency: {job["memeff"]:4.1f}%')
if slack_url:
post_to_slack('\n'.join(message), slack_url)