Would like to move to to https://github.com/rug-cit-hpc/pg-playbooks but has large files...
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

242 lines
6.5 KiB

#!/bin/env python3
from collections import namedtuple
import datetime
import json
import os
import pynumparser
import re
import requests
import smtplib
import subprocess
import time
EMAIL_MESSAGE = """
Dear Peregrine user.
A script detected that one of your jobs on peregrine in the GPU queue is not using the GPU.
If they should be using the GPU could you please look into your code?
If you need help with that, we can recommend the walk in sessions of the support team.
https://wiki.hpc.rug.nl/peregrine/additional_information/walk-in_sessions
If the jobs are not supposed to use the gpu, please submit them to one of the other queues.
Yours sincerely,
Team HPC
ps: The following job was canceled:
"""
def line_to_job(line):
"""
Convert a sring containing <job-id>,<h-mm:ss>
to a named tuple, containing id and seconds.
Args:
line (string): line from squeue
Returns:
Job or None
"""
Job = namedtuple('Job', ['id', 'delta', 'node', 'user'])
params = line.split(',')
timestring = params[1]
if '-' in timestring:
days, remainder = timestring.split('-')
days = int(days)
else:
days = 0
remainder = timestring
values = [0, 0, 0] + [int(i) for i in remainder.split(':')]
hours, minutes, seconds = values[-3:]
delta = datetime.timedelta(
days=days, hours=hours, minutes=minutes, seconds=seconds)
params[1] = delta
try:
return Job(*params)
except:
return None
def parse_gpu_string(node_string):
"""
Parses a string in the format of.
pg-gpu[1-3] or pg-gpu[2,4,5]
Args:
node_string (string): The GPU node(s) in slurm format.
end (int): end of measurements timestamp.
Returns: list: A list of hostnames.
"""
match = re.search('(.+\[)([0-9]|-|,)+?(?=\])', node_string)
if match is None:
return [node_string]
base, sequence = match.group().split('[')
parser = pynumparser.NumberSequence(int)
return ['{}{:02d}'.format(base, i) for i in parser.parse(sequence)]
def get_gpus_usage(nodes, start, end):
"""
Calculate the average GPU usage between begin and end stamps.
of a sequence of gpus.
Args:
nodes (list): The GPU node(s) in slurm format.
start (int): start of measurements timestamp.
end (int): end of measurements timestamp.
Returns:
list: A list of tuples [(<hostname>, <percentage>)]
"""
usages = [(gpu, get_gpu_usage(gpu, start, end)) for gpu in parse_gpu_string(nodes)]
return usages
def get_gpu_usage(node, start, end):
"""
Calculate the average GPU usage between begin and end stamps.
Args:
node (string): The GPU node
start (int): start of measurements timestamp.
end (int): end of measurements timestamp.
Returns:
float: The average usage (%)
"""
payload = {
'query':
'utilization_gpu{{env="peregrine",instance="{}:9101",job="gpu"}}'.
format(node),
'start':
start,
'end':
end,
'step':
'60s'
}
try:
data = requests.get(
'https://knyft.hpc.rug.nl:9091/api/v1/query_range', params=payload)
values = json.loads(data.content.decode())['data']['result'][0]['values']
average = sum([int(i[1]) for i in values]) / len(values)
except:
import ipdb; ipdb.set_trace()
return average
def gpu_load(job):
"""
Calculate the GPU load of a RUNNING job.
Args:
job (Job)
Returns:
list: A list of tuples [(<hostname>, <percentage>)]
"""
end = time.time()
start = end - job.delta.total_seconds()
return get_gpus_usage(job.node, start, end)
def post_to_slack(message: str, slack_url: str):
"""
Post a message to slack.
Args:
message (str): Message to post
slack_url (str): url to post message to
"""
data = json.dumps({
'channel': '#peregrine-alerts',
'username': 'kill-hogs',
'text': message,
'icon_emoji': ':scales:'
}).encode('utf-8')
response = requests.post(
slack_url, data=data, headers={'Content-Type': 'application/json'})
def find_email(username):
"""
Return the email adress of <username> as reported by finger.
Args:
username (string): the username of the account.
Returns:
string: email adress or None
"""
finger = subprocess.run(
'finger {} -l -m'.format(username),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
data = finger.stdout.decode("utf-8").split('\n')
# Basic check: exactly one `@` and at least one `.` after the `@`.
match = re.search(r'(?<=Name: )[^@]+@[^@]+\.[^@]+$', data[0])
return match.group(0)
except IndexError:
# a more explicit pass
return None
def send_mail(sender: str, receiver: str, message: str, port: int = 25):
"""
Send a message to a user whose processes have been killed.
"""
message = f"""From: "(Kill Hogs)" <{sender}>
To: <{receiver}>
Subject: GPU job canceled.
{message}
"""
try:
smtpObj = smtplib.SMTP('localhost', port=port)
smtpObj.sendmail(sender, [receiver], message)
print(f"Successfully sent email to {receiver}.")
except Exception as e:
print("Error: unable to send email.\nThe error was:\n{}".format(e))
def main():
jobs = subprocess.check_output([
'squeue', '--partition', 'gpu', '--state=R', '-h', '-o', '%i,%M,%N,%u'
])
jobs = [line_to_job(l) for l in jobs.decode().split('\n') if l != '']
jobs = list(filter(lambda a: a != None, jobs))
long_jobs = filter(lambda j: j.delta.total_seconds() > 3600, jobs)
message = []
for job in long_jobs:
gpus_usage = gpu_load(job)
for entry in gpus_usage:
gpu, usage = entry
job_info = f'Job id: {job.id:10} User: {job.user:9} Gpu usage: {usage:5.1f} ({gpu})'
print(job_info)
if usage == 0.0:
message.append(job_info)
subprocess.check_output(['scancel', str(job.id)])
send_mail(
sender='root@peregrine.hpc.rug.nl',
receiver=find_email(job.user),
message=EMAIL_MESSAGE + job_info)
slack_url = os.environ.get('SLACK_URL')
if message != [] and slack_url is not None:
message = ['The following jobs are canceled:'] + message
post_to_slack(message='\n'.join(message), slack_url=slack_url)
print(message)
if __name__ == "__main__":
main()