diff --git a/gpu_detector.yml b/gpu_detector.yml new file mode 100644 index 0000000..502024c --- /dev/null +++ b/gpu_detector.yml @@ -0,0 +1,5 @@ +--- +- hosts: scheduler + become: true + roles: + - gpu_detector diff --git a/roles/gpu_detector/files/warn_gpu_not_used b/roles/gpu_detector/files/warn_gpu_not_used new file mode 100755 index 0000000..e6ba0a9 --- /dev/null +++ b/roles/gpu_detector/files/warn_gpu_not_used @@ -0,0 +1,242 @@ +#!/bin/env python3 + +from collections import namedtuple +import datetime +import json +import os +import pynumparser +import re +import requests +import smtplib +import subprocess +import time + +EMAIL_MESSAGE = """ +Dear Peregrine user. + +A script detected that one of your jobs on peregrine in the GPU queue is not using the GPU. + +If they should be using the GPU could you please look into your code? +If you need help with that, we can recommend the walk in sessions of the support team. +https://wiki.hpc.rug.nl/peregrine/additional_information/walk-in_sessions + +If the jobs are not supposed to use the gpu, please submit them to one of the other queues. + +Yours sincerely, + +Team HPC + +ps: The following job was canceled: + +""" + + +def line_to_job(line): + """ + Convert a sring containing , + to a named tuple, containing id and seconds. + + Args: + line (string): line from squeue + Returns: + Job or None + """ + Job = namedtuple('Job', ['id', 'delta', 'node', 'user']) + params = line.split(',') + timestring = params[1] + if '-' in timestring: + days, remainder = timestring.split('-') + days = int(days) + else: + days = 0 + remainder = timestring + + values = [0, 0, 0] + [int(i) for i in remainder.split(':')] + hours, minutes, seconds = values[-3:] + + delta = datetime.timedelta( + days=days, hours=hours, minutes=minutes, seconds=seconds) + params[1] = delta + try: + return Job(*params) + except: + return None + + +def parse_gpu_string(node_string): + """ + Parses a string in the format of. + pg-gpu[1-3] or pg-gpu[2,4,5] + + Args: + node_string (string): The GPU node(s) in slurm format. + end (int): end of measurements timestamp. + Returns: list: A list of hostnames. + """ + match = re.search('(.+\[)([0-9]|-|,)+?(?=\])', node_string) + if match is None: + return [node_string] + + base, sequence = match.group().split('[') + parser = pynumparser.NumberSequence(int) + return ['{}{:02d}'.format(base, i) for i in parser.parse(sequence)] + + +def get_gpus_usage(nodes, start, end): + """ + Calculate the average GPU usage between begin and end stamps. + of a sequence of gpus. + Args: + nodes (list): The GPU node(s) in slurm format. + start (int): start of measurements timestamp. + end (int): end of measurements timestamp. + Returns: + list: A list of tuples [(, )] + """ + usages = [(gpu, get_gpu_usage(gpu, start, end)) for gpu in parse_gpu_string(nodes)] + return usages + +def get_gpu_usage(node, start, end): + """ + Calculate the average GPU usage between begin and end stamps. + Args: + node (string): The GPU node + start (int): start of measurements timestamp. + end (int): end of measurements timestamp. + Returns: + float: The average usage (%) + """ + payload = { + 'query': + 'utilization_gpu{{env="peregrine",instance="{}:9101",job="gpu"}}'. + format(node), + 'start': + start, + 'end': + end, + 'step': + '60s' + } + try: + data = requests.get( + 'https://knyft.hpc.rug.nl:9091/api/v1/query_range', params=payload) + + values = json.loads(data.content.decode())['data']['result'][0]['values'] + average = sum([int(i[1]) for i in values]) / len(values) + except: + import ipdb; ipdb.set_trace() + + return average + + +def gpu_load(job): + """ + Calculate the GPU load of a RUNNING job. + Args: + job (Job) + Returns: + list: A list of tuples [(, )] + + """ + end = time.time() + start = end - job.delta.total_seconds() + return get_gpus_usage(job.node, start, end) + + +def post_to_slack(message: str, slack_url: str): + """ + Post a message to slack. + + Args: + message (str): Message to post + slack_url (str): url to post message to + """ + data = json.dumps({ + 'channel': '#peregrine-alerts', + 'username': 'kill-hogs', + 'text': message, + 'icon_emoji': ':scales:' + }).encode('utf-8') + response = requests.post( + slack_url, data=data, headers={'Content-Type': 'application/json'}) + + +def find_email(username): + """ + Return the email adress of as reported by finger. + + Args: + username (string): the username of the account. + + Returns: + string: email adress or None + + """ + finger = subprocess.run( + 'finger {} -l -m'.format(username), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + data = finger.stdout.decode("utf-8").split('\n') + # Basic check: exactly one `@` and at least one `.` after the `@`. + match = re.search(r'(?<=Name: )[^@]+@[^@]+\.[^@]+$', data[0]) + return match.group(0) + + except IndexError: + # a more explicit pass + return None + + +def send_mail(sender: str, receiver: str, message: str, port: int = 25): + """ + Send a message to a user whose processes have been killed. + """ + + message = f"""From: "(Kill Hogs)" <{sender}> +To: <{receiver}> +Subject: GPU job canceled. + +{message} + """ + + try: + smtpObj = smtplib.SMTP('localhost', port=port) + smtpObj.sendmail(sender, [receiver], message) + print(f"Successfully sent email to {receiver}.") + except Exception as e: + print("Error: unable to send email.\nThe error was:\n{}".format(e)) + + +def main(): + jobs = subprocess.check_output([ + 'squeue', '--partition', 'gpu', '--state=R', '-h', '-o', '%i,%M,%N,%u' + ]) + jobs = [line_to_job(l) for l in jobs.decode().split('\n') if l != ''] + jobs = list(filter(lambda a: a != None, jobs)) + long_jobs = filter(lambda j: j.delta.total_seconds() > 3600, jobs) + message = [] + for job in long_jobs: + gpus_usage = gpu_load(job) + for entry in gpus_usage: + gpu, usage = entry + job_info = f'Job id: {job.id:10} User: {job.user:9} Gpu usage: {usage:5.1f} ({gpu})' + print(job_info) + + if usage == 0.0: + message.append(job_info) + subprocess.check_output(['scancel', str(job.id)]) + send_mail( + sender='root@peregrine.hpc.rug.nl', + receiver=find_email(job.user), + message=EMAIL_MESSAGE + job_info) + + slack_url = os.environ.get('SLACK_URL') + if message != [] and slack_url is not None: + message = ['The following jobs are canceled:'] + message + post_to_slack(message='\n'.join(message), slack_url=slack_url) + print(message) + + +if __name__ == "__main__": + main() diff --git a/roles/gpu_detector/tasks/main.yml b/roles/gpu_detector/tasks/main.yml new file mode 100644 index 0000000..e6b6401 --- /dev/null +++ b/roles/gpu_detector/tasks/main.yml @@ -0,0 +1,27 @@ +--- +- name: Install yum dependencies + yum: + state: latest + update_cache: yes + disable_gpg_check: yes + name: + - python3-pynumparser + +- name: install the script. + copy: + src: files/warn_gpu_not_used + dest: /sbin/warn_gpu_not_used + mode: 0755 + +- cron: + name: Cancel gpu jobs not using any gpu. + minute: '*/15' + user: root + job: 'SLACK_URL={{ slack_url }} /usr/bin/python3 /sbin/warn_gpu_not_used' + cron_file: warn_gpu_not_used + +# Don't buzz the whole team... yet +- cronvar: + name: MAILTO + value: e.m.a.rijpkema@rug.nl + cron_file: warn_gpu_not_used diff --git a/roles/gpu_detector/vars/main.yml b/roles/gpu_detector/vars/main.yml new file mode 100644 index 0000000..c48c2c7 --- /dev/null +++ b/roles/gpu_detector/vars/main.yml @@ -0,0 +1,10 @@ +$ANSIBLE_VAULT;1.1;AES256 +64653062613335636139663930383861366163626366393034383436663264626266326138353461 +3236633634366335663530643934333634376264663330660a356161333937356464353635363165 +31613530313765373435383334393335666632666661636538393361303862663562383832383565 +3962363636663437620a633039343431623138376162656336396666316530376466633363323734 +33373137336233346261636366343264316636623732313965616535356366633139313939343966 +34386366323863396162366630353537353061306233313531663236613131346334376432346535 +30613937356565396131373462323762363935343764373264666661313065626538643832663435 +39303931656166646362386636373466323333313363323234323866333662373830363365616663 +3236