Would like to move to to https://github.com/rug-cit-hpc/pg-playbooks
but has large files...
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
149 lines
4.5 KiB
149 lines
4.5 KiB
#!/usr/bin/env python3 |
|
|
|
""" |
|
This tool is to be used from pg-login and to be run in a screen. |
|
It will update the fstab for /apps on all nodes and issue a remount. |
|
It will take care of draining and resuming a node. |
|
""" |
|
|
|
|
|
from collections import namedtuple |
|
from subprocess import run |
|
import logging |
|
import os |
|
import paramiko |
|
import sys |
|
import time |
|
import warnings |
|
import yaml |
|
|
|
logger = logging.getLogger('__name__') |
|
|
|
|
|
def execute_remote(address: str, command: str): |
|
""" |
|
Use paramiko to execute a command remotely |
|
""" |
|
try: |
|
logger.debug('executing on {address} , running : {command}') |
|
client = paramiko.SSHClient() |
|
client.load_system_host_keys() |
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
|
|
|
client.connect(address) |
|
_, ss_stdout, ss_stderr = client.exec_command(command) |
|
r_out, r_err = ss_stdout.readlines(), ss_stderr.read() |
|
logger.debug(r_err) |
|
if len(r_err) > 5: |
|
logger.error(r_err) |
|
else: |
|
logger.debug(r_out) |
|
client.close() |
|
except IOError: |
|
logger.warning(".. host " + address + " is not up") |
|
return "host not up", "host not up" |
|
|
|
return r_out, r_err |
|
|
|
|
|
def setup_logging(): |
|
logger.setLevel(logging.DEBUG) |
|
|
|
handler = logging.StreamHandler(sys.stdout) |
|
handler.setLevel(logging.DEBUG) |
|
formatter = logging.Formatter( |
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
logger.addHandler(handler) |
|
|
|
|
|
def get_node_states(): |
|
""" |
|
Retreive the state of all nodes in the cluster. |
|
|
|
Returns: list (list of Nodes objects) |
|
""" |
|
proc = run(['sinfo', '-h', '-o', '%n,%T'], capture_output=True) |
|
proc.check_returncode() |
|
|
|
Node = namedtuple('Node', ['hostname', 'state']) |
|
|
|
# make a list of named tuples with hostname and state properties |
|
nodes = [ |
|
Node(*i.split(',')) for i in proc.stdout.decode().split('\n') |
|
if i != '' |
|
] |
|
|
|
return nodes |
|
|
|
|
|
def update_mountpoint(node): |
|
execute_remote(node.hostname, 'umount /apps') |
|
|
|
update_fstab = 'sed -i s@172.23.56.1:/software@172.23.56.2:/software@ /etc/fstab' |
|
execute_remote(node.hostname, update_fstab) |
|
|
|
execute_remote(node.hostname, 'mount /apps') |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
setup_logging() |
|
warnings.filterwarnings(action='ignore', module='.*paramiko.*') |
|
nodes = get_node_states() |
|
|
|
try: |
|
with open('nodes_todo.yml') as f: |
|
nodes_todo = yaml.load(f.read()) |
|
logger.info('Resuming from previous run') |
|
|
|
except FileNotFoundError: |
|
nodes_todo = [node.hostname for node in nodes] |
|
|
|
while len(nodes_todo) > 0: |
|
nodes = get_node_states() |
|
|
|
# We set max 10 nodes to draining at the same time, |
|
if sum(node.state == 'draining' for node in nodes) < 10: |
|
|
|
for node in nodes: |
|
# filter unreachable nodes. |
|
if node.hostname in nodes_todo and node.state[-1] == '*': |
|
logger.info(f'Skipping unreachable node: {node.hostname}') |
|
nodes_todo.remove(node.hostname) |
|
|
|
elif node.hostname in nodes_todo and node.state in [ |
|
'idle', 'mix', 'alloc' |
|
]: |
|
# Set the node to draining. |
|
proc = run([ |
|
'scontrol', 'update', |
|
'nodename={}'.format(node.hostname), 'state=drain', |
|
'reason="migrate to bigger /apps"' |
|
]) |
|
proc.check_returncode() |
|
break |
|
|
|
logger.debug("sleeping 10 seconds") |
|
time.sleep(10) |
|
|
|
# remount /apps on the drained nodes. |
|
for drained_node in filter(lambda n: n.state == 'drained', nodes): |
|
if drained_node.hostname not in nodes_todo: |
|
# Maybe drained for another reason. |
|
logger.info(f'Drained node {drained_node.hostname} not in nodes_todo.') |
|
continue |
|
update_mountpoint(drained_node) |
|
proc = run([ |
|
'scontrol', 'update', |
|
'nodename={}'.format(drained_node.hostname), 'state=resume' |
|
]) |
|
proc.check_returncode() |
|
|
|
nodes_todo.remove(drained_node.hostname) |
|
|
|
with open('nodes_todo_tmp.yml', 'w') as f: |
|
f.write(yaml.dump(nodes_todo)) |
|
# A rename is atomic ensuring an uncorrupted nodes_todo.yaml. |
|
os.rename('nodes_todo_tmp.yml', 'nodes_todo.yml') |
|
logger.info(f'Updated {drained_node.hostname}')
|
|
|