diff --git a/src/ansys/fluent/core/scheduler/__init__.py b/src/ansys/fluent/core/scheduler/__init__.py new file mode 100644 index 00000000000..081c16ab2e5 --- /dev/null +++ b/src/ansys/fluent/core/scheduler/__init__.py @@ -0,0 +1,3 @@ +"""A package providing job scheduler support.""" + +from .load_machines import load_machines # noqa: F401 diff --git a/src/ansys/fluent/core/scheduler/load_machines.py b/src/ansys/fluent/core/scheduler/load_machines.py new file mode 100644 index 00000000000..ddeea1f9ba0 --- /dev/null +++ b/src/ansys/fluent/core/scheduler/load_machines.py @@ -0,0 +1,469 @@ +"""A module that provides machine list construction for distributed parallel +environments, including queueing systems. + +Currently supports UGE, LSF, PBS and SLURM by parsing the contents of +the PE_HOSTFILE, LSB_MCPU_HOSTS, PBS_NODEFILE and SLURM_JOB_NODELIST +variables, respectively. +""" +import csv +import os +import subprocess + +from ansys.fluent.core.scheduler.machine_list import Machine, MachineList + + +def load_machines(machine_info=None, host_info=None, ncores=None): + """Provide a function to construct a machine list from allocated machines. + + Parameters + ---------- + machine_info : list[dict[str, int]], optional + List of machines provided by the caller. Must be of the form: + [{'machine-name' : , 'core-count' : }, + {'machine-name' : , 'core-count' : }, + ... ] + host_info : str, optional + Host file name or list of machines and cores as a string separated by + commas and colons as follows: + Example 1: 'M0:3,M1:2' + Example 2: 'M0,M0,M0,M1,M1' + ncores : int, optional + Total core count. + If provided without `machine_info`, sets the core count for local + parallel. If both `machine_info` and `ncores` are provided, then the + machine list determined by `machine_info` will be limited by the + `ncores` value. + + Returns + ------- + MachineList + A list of machines. + + Notes + ----- + On UGE the PE_HOSTFILE variable is used to find machines, LSB_MCPU_HOSTS + list for LSF, PBS_NODEFILE for PBS and SLURM_JOB_NODELIST on SLURM. + Unsupported job schedulers may provide alternative ways of providing a list + of machines, in that case the list must be pre-parsed and provided via the + machineDict parameter. + + Depending on the SLURM environment, the hostnames contained within the + SLURM_JOB_NODELIST variable may not be valid to ssh to. In that case we + cannot pass these names to the solver. So, in the SLURM branch there is a + test to check if we can ssh to the first host, if not, get 'actual' machine + names using scontrol. + """ + + machine_list = [] + + if machine_info: + machine_list = _construct_machine_list_manual(machine_info) + elif host_info: + machine_list = _parse_host_info(host_info) + elif "PE_HOSTFILE" in os.environ: + hostFileName = os.environ.get("PE_HOSTFILE") + machine_list = _construct_machine_list_uge(hostFileName) + elif "LSB_MCPU_HOSTS" in os.environ: + hostList = os.environ.get("LSB_MCPU_HOSTS") + machine_list = _construct_machine_list_lsf(hostList) + elif "PBS_NODEFILE" in os.environ: + hostFileName = os.environ.get("PBS_NODEFILE") + machine_list = _construct_machine_list_pbs(hostFileName) + elif "SLURM_JOB_NODELIST" in os.environ: + hostList = os.environ.get("SLURM_JOB_NODELIST") + machine_list = _construct_machine_list_slurm(hostList) + sshTest = ( + "ssh " + + str(machine_list.machines[0].host_name) + + " /bin/true > /dev/null 2>&1; echo $?" + ) + p = subprocess.Popen(sshTest, shell=True, stdout=subprocess.PIPE) + procOutput = p.communicate() + if procOutput[0] != b"0\n": + runCommand = ( + r"scontrol show node ${SLURM_JOB_NODELIST} | " + r"awk '/NodeAddr=/ {print $1}' | cut -f2 -d=" + ) + p = subprocess.Popen(runCommand, shell=True, stdout=subprocess.PIPE) + procOutput = p.communicate() + hostList = procOutput[0].decode("utf-8").replace("\n", ",") + length = len(hostList) + hostList = hostList[0 : length - 1] + machine_list = _construct_machine_list_slurm(hostList) + elif "CCP_NODES" in os.environ: + hostList = os.environ.get("CCP_NODES") + machine_list = _construct_machine_list_ccs(hostList) + elif ncores: + machine_list = _get_local_machine(ncores) + + if machine_list and ncores: + # If both machine list and number of cores are provided, edit the + # machine list to use exactly the number of cores indicated. + machine_list = _restrict_machines_to_core_count(machine_list, ncores) + + return machine_list + + +def _parse_host_info(host_info): + """Parse the host machine information from command-line arguments. + + Returns + ------- + list[dict] : + A list of dictionaries formatted as: + {'machine-name' : ###, 'core-count' : ###} + """ + + if ( + (":" in host_info or "," in host_info) + and not "\\" in host_info + and not "/" in host_info + ): + # Filenames generally shouldn't have ':', + # so assume it's a string list and parse accordingly + sMod = 1 if host_info[0] == "[" else 0 + sBeg = sMod + sEnd = len(host_info) - sMod + machine_data = host_info[sBeg:sEnd].split(",") + else: + # Read from the file + with open(host_info, "r") as f: + machine_data = f.read().splitlines() + + return _parse_machine_data(machine_data) + + +def _parse_machine_data(machine_data): + """Parse the host machine data provided as a list of strings. + + Parameters + ---------- + machine_data : list[str] + The data can be a list of machines such as: + ["M0","M1","M1"] + or it can include cores as well: + ["M0:1","M1:2"] + or a combination thereof. + + Returns + ------- + list[dict] : + The return value is a list of dictionaries formatted as: + {'machine-name' : ###, 'core-count' : ###} + """ + + machineList = MachineList() + + for datum in machine_data: + # Parse machine name and core count + if ":" in datum: + # Machine and core given + datumPair = datum.split(":") + machineName = datumPair[0].strip() + ncores = int(datumPair[1]) + else: + # Just machine name - assume one core + machineName = datum.strip() + ncores = 1 + + if machineName == "": + raise RuntimeError("Problem with machine list format.") + + # Add to existing machine if already in the list + for machine in machineList.machines: + if machine.host_name == machineName: + machine.number_of_cores += ncores + break + else: # machine name not already in machineList + machineList.add(Machine(machineName, ncores)) + + return machineList + + +def _get_local_machine(ncores): + """Provide private module function to convert a core count into a machine + list for a local job.""" + + import socket + + hostName = socket.gethostname() + machineList = MachineList() + machineList.add(Machine(hostName, ncores)) + + return machineList + + +def _restrict_machines_to_core_count(old_machine_list, ncores): + """Provide private module function to adjust the number of cores used per + machine based on a user-supplied core count. + + Parameters + ---------- + old_machine_list : MachineList + List of machines to be modified. + ncores : int + Updated core count. + + Returns + ------- + MachineList + New MachineList constrained to number of requested cores. + + Notes + ----- + Every machine contributes cores to the new list until ncores is reached. + However, the original machine order is preserved. This ensures that all + machines are maximally utilized. The old machine list is sorted by core + count so that uneven distributions favor machines with more cores. If a + total of x cores are available on the machines and x <= ncores, then the + returned machine list will be identical to the input. + """ + + if ncores >= old_machine_list.number_of_cores: + return old_machine_list + + # Get indices ordering the machines from largest to smallest core count + machineListOrder = [ + i[0] + for i in sorted( + enumerate(old_machine_list.machines), + key=lambda x: x[1].number_of_cores, + reverse=True, + ) + ] + + newMachineList = MachineList() + for m in old_machine_list.machines: + newMachineList.add(Machine(m.host_name, 0, m.queue_name, m.core_list)) + + ncoresRemain = ncores + while ncoresRemain != 0: + for i in machineListOrder: + oldMachine = old_machine_list.machines[i] + if oldMachine.number_of_cores != 0 and ncoresRemain != 0: + newMachineList.machines[i].number_of_cores += 1 + oldMachine.number_of_cores -= 1 + ncoresRemain -= 1 + if ncoresRemain == 0: + break + + return newMachineList + + +def _construct_machine_list_uge(host_filename): + """Provide private module function to parse the UGE host file.""" + csv.register_dialect("pemachines", delimiter=" ", skipinitialspace=True) + machineList = MachineList() + with open(host_filename, "r") as peFile: + peReader = csv.reader(peFile, dialect="pemachines") + for row in peReader: + if len(row) == 0: + break + m = Machine(row[0], int(row[1]), row[2], None if len(row) == 4 else row[3]) + machineList.add(m) + return machineList + + +def _construct_machine_list_lsf(host_list): + """Provide private module function to parse the LSF host list.""" + machineList = MachineList() + splitHostList = host_list.split() + im = 0 + while im < len(splitHostList): + machineList.add(Machine(splitHostList[im], int(splitHostList[im + 1]))) + im += 2 + return machineList + + +def _construct_machine_list_pbs(host_filename): + """Provide private module function to parse the PBS host file.""" + # PBS_NODE file has one machine name per line per core allocated on the machine. + # It's identical to a Fluent host file format. This code accumulates the total + # core count on each machine. + machineDict = {} + with open(host_filename, "r") as pbsFile: + for hostname in pbsFile: + hostname = hostname.rstrip("\r\n") + if hostname in machineDict: + machineDict[hostname].number_of_cores += 1 + else: + machineDict[hostname] = Machine(hostname, 1) + + # Convert accumulated dictionary to a MachineList + machineList = MachineList() + for m in list(machineDict.values()): + machineList.add(m) + return machineList + + +def _construct_machine_list_slurm(host_list): + """Provide a private module function to parse the SLURM host and task + lists. + + The SLURM system provides a comma separated list of host names. The host + names may be listed individually or consecutive host names may have IDs that + are provided by a set within brackets: + + SLURM_JOB_NODELIST = machinea[2-5,7,14-15],machineb,machinec[008-010,012,017-019],machined[099-101] ... + + Consecutive IDs may be prefixed (or pre-padded) with zeros so that the + string representation of each machine ID always has the same length as the + number of digits required to represent the last machine ID in the bracketed + range. + + The cores allocated to each machine come in a separate variable + + SLURM_TASKS_PER_NODE = '10,3,12(x2),4,15(x5)' + + An (x#) after the core count indicates that the core count is repeated # + times. The order is the same as SLURM_JOB_NODELIST. + """ + import re + + machineList = MachineList() + splitHostList = host_list.split(",") + coresPerMachine = 1 + ntasksPerNodeSet = False + if "SLURM_NTASKS_PER_NODE" in os.environ: + coresPerMachine = int(os.environ.get("SLURM_NTASKS_PER_NODE")) + ntasksPerNodeSet = True + + # Regular expression to identify if a host entry contains a single range of machines + pRange = re.compile(r"\[.*\]") + # Regular expressions to identify a single machine ID within brackets + pIDOne = re.compile(r"^.*\[(\d*)$") + pIDOneNext = re.compile(r"^(\d*)") + # Regular expressions to identify a range of machine IDs within brackets + pIDRangeFirst = re.compile(r"^.*\[(\d*)-(\d*).*$") + pIDRangeNext = re.compile(r"^(\d*)-(\d*)") + # Regular expressions to identify if the IDs in a range use zero padding + pIdsPadded = re.compile(r"^.*\[(0\d*)-\d*.*$") + pIdsPaddedNext = re.compile(r"(0\d*)-\d*") + # Regular expression to identify the machine name prefix for a range + pMachinePrefix = re.compile(r"(^.*)\[") + + entry = 0 + while entry < len(splitHostList): + hosts = splitHostList[entry] + prefixMatch = pMachinePrefix.match(hosts) + # Machine has no brackets, just add to the list + if not prefixMatch: + machineList.add(Machine(hosts, coresPerMachine)) + entry += 1 + # Add all machines in the bracketed range if one is provided + else: + machinePrefix = prefixMatch.group(1) + # Check if first bracket entry is "M[a-b" or "M[a". Check for a range first. + machineIDs = pIDRangeFirst.match(hosts) + if machineIDs: + idfirst = int(machineIDs.group(1)) + idlast = int(machineIDs.group(2)) + paddedIDs = pIdsPadded.match(hosts) + for id in range(idfirst, idlast + 1): + if paddedIDs: + machineName = machinePrefix + str(id).rjust( + len(paddedIDs.group(1)), "0" + ) + else: + machineName = machinePrefix + str(id) + machineList.add(Machine(machineName, coresPerMachine)) + else: + machineIDs = pIDOne.match(hosts) + id = int(machineIDs.group(1)) + numch = len(re.compile(r"^.*\[(\d*)$").match(hosts).group(1)) + machineName = machinePrefix + str(id).rjust(numch, "0") + machineList.add(Machine(machineName, coresPerMachine)) + + entry += 1 + # If a host has more than one numbered range, process them. + if len(pRange.findall(hosts)) == 0: + if entry < len(splitHostList): + hosts = splitHostList[entry] + # Check if next entry is "a-b" or "a". Check for a range first. + machineIDs = pIDRangeNext.match(hosts) + if machineIDs: + singleID = False + else: + singleID = True + machineIDs = pIDOneNext.match(hosts) + while machineIDs: + if singleID: + id = int(machineIDs.group(0)) + numch = len(re.compile(r"^(\d*)").match(hosts).group(0)) + machineName = machinePrefix + str(id).rjust(numch, "0") + machineList.add(Machine(machineName, coresPerMachine)) + else: + idfirst = int(machineIDs.group(1)) + idlast = int(machineIDs.group(2)) + paddedIDs = pIdsPaddedNext.match(hosts) + for id in range(idfirst, idlast + 1): + if paddedIDs: + machineName = machinePrefix + str(id).rjust( + len(paddedIDs.group(1)), "0" + ) + else: + machineName = machinePrefix + str(id) + machineList.add(Machine(machineName, coresPerMachine)) + + entry += 1 + if entry < len(splitHostList): + hosts = splitHostList[entry] + machineIDs = pIDRangeNext.match(hosts) + if machineIDs: + singleID = False + else: + machineIDs = pIDOneNext.match(hosts) + if machineIDs and len(machineIDs.group(0)) > 0: + singleID = True + else: + singleID = False + machineIDs = None + else: + machineIDs = None + + if not ntasksPerNodeSet and "SLURM_TASKS_PER_NODE" in os.environ: + splitCoreList = os.environ["SLURM_TASKS_PER_NODE"].split(",") + coresPerMachine = [] + for numcores in splitCoreList: + beg = numcores.find("(x") + if beg > 0: + end = numcores.find(")") + for _ in range(int(numcores[beg + 2 : end])): + coresPerMachine.append(int(numcores[0:beg])) + else: + coresPerMachine.append(int(numcores)) + icores = 0 + for machine in machineList.machines: + machine.number_of_cores = coresPerMachine[icores] + icores += 1 + + return machineList + + +def _construct_machine_list_ccs(host_list): + """Provide private module function to parse the Windows HPC/CCS host list. + + Parameters + ---------- + host_list : str + A single string with the following format: + + "#hosts host1 #cores1 host2 #cores2 host3 #cores3 ... hostN #coresN" + """ + machineList = MachineList() + splitHostList = host_list.split() + numMachines = int(splitHostList[0]) + im = 1 + for _ in range(numMachines): + machineList.add(Machine(splitHostList[im], int(splitHostList[im + 1]))) + im += 2 + return machineList + + +def _construct_machine_list_manual(machine_info): + """Provide a private module function to convert a machine information list + into a list of machine objects.""" + machineList = MachineList() + for m in machine_info: + machineList.add(Machine(m["machine-name"], m["core-count"])) + return machineList diff --git a/src/ansys/fluent/core/scheduler/machine_list.py b/src/ansys/fluent/core/scheduler/machine_list.py new file mode 100644 index 00000000000..82444fa1d04 --- /dev/null +++ b/src/ansys/fluent/core/scheduler/machine_list.py @@ -0,0 +1,164 @@ +"""A module used to provide abstract machine objects for queue system +interfaces. + +This module provides two objects that help with interfacing python scripts with +the job scheduler environments: + + Machine + This is used to represent a single machine allocated by the queue system + and query details about it. + + MachineList + This is used to load and query a queue system machine file. Instances + of this object hold a collection of Machine objects that are initialized + when the machine file is loaded. +""" + +from builtins import object +import copy + + +class Machine(object): + """Provides an interface for a single machine allocated by a queue + system.""" + + def __init__(self, hostName, numberOfCores, queueName=None, coreList=None): + """Constructs a machine from the information provided. + + Parameters + ---------- + hostName : str + Host name of the machine + numberOfCores : int + The number of cores allocated on the machine + queueName : str + Optionally specifies the queue the machine is executing in. + coreList : list[int] + Optionally provides the list of allocated core IDs. + """ + self._hostName = hostName + self._numberOfCores = numberOfCores + self._queueName = queueName + self._coreList = coreList + + def __repr__(self): + """Returns a string representation for the machine.""" + return ( + "Hostname:" + + self._hostName + + ", Cores: " + + str(self._numberOfCores) + + ", Queue: " + + self._queueName + ) + + @property + def host_name(self): + """Returns the hostname listed in the machine file.""" + return self._hostName + + @property + def number_of_cores(self): + """Returns the number of cores allocated on the machine.""" + return self._numberOfCores + + @number_of_cores.setter + def number_of_cores(self, value): + self._numberOfCores = value + + @property + def queue_name(self): + """Returns the name of the queue the machine is allocated in.""" + return self._queueName + + @property + def core_list(self): + """Returns a list of core IDs allocated on the machine.""" + return self._coreList + + +class MachineList(object): + """Provides an interface to list of machines allocated by a queue + system.""" + + def __init__(self, machinesIn=[]): + """Constructs and initializes an empty machine file object.""" + self._machines = [] + for machine in machinesIn: + self._machines.append(machine) + + def __iter__(self): + return self._machines.__iter__() + + def __deepcopy__(self, memo): + machineList = [] + for m in self.machines: + machineList.append(m) + return MachineList(copy.deepcopy(machineList, memo)) + + def reset(self): + """Resets the machine file data to the initial values.""" + self._machines = [] + + def add(self, m): + self._machines.append(m) + + def remove(self, m): + self._machines.remove(m) + + def sort_by_core_count(self): + """Sorts the machines by core count, reordering the existing data.""" + self._machines.sort(key=lambda machine: machine.number_of_cores, reverse=True) + + def sort_by_core_count_ascending(self): + """Sorts the machines by core count, reordering the existing data.""" + self._machines.sort(key=lambda machine: machine.number_of_cores) + + def remove_empty_machines(self): + """Removes all machines with 0 cores.""" + self._machines = [m for m in self._machines if m.number_of_cores > 0] + + def move_local_host_to_front(self): + """Moves the local host machine to the front of the machine list, + creating it if it does not exist.""" + import socket + + localHostName = socket.gethostname() + localHostNameComponents = localHostName.split(".") + localHostIndex = -1 + for im, m in enumerate(self._machines): + # Check if hostName == localHostName, comparing as much of the name as possible + hostNameComponents = m.host_name.split(".") + imin = min(len(localHostNameComponents), len(hostNameComponents)) + if hostNameComponents[:imin] == localHostNameComponents[:imin]: + localHostIndex = im + # If the local host is in the list move it to the beginning + if localHostIndex > -1: + localMachine = self._machines.pop(localHostIndex) + # Place the object in the front of the list + self._machines.insert(0, localMachine) + + @property + def machines(self): + """Returns the entire list of machines.""" + return self._machines + + @property + def num_machines(self): + """Returns the total number of machines.""" + return len(self._machines) + + @property + def number_of_cores(self): + """Returns the total number of cores.""" + return sum([m.number_of_cores for m in self._machines]) + + @property + def max_cores(self): + """Returns the maximum number of cores.""" + return max([m.number_of_cores for m in self._machines]) + + @property + def min_cores(self): + """Returns the minimum number of cores.""" + return min([m.number_of_cores for m in self._machines]) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 00000000000..92e7f12ba31 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,381 @@ +""" +Provide a module to test the algorithms which parse job scheduler environments +for machines to run on. +""" +from builtins import range +import os +import unittest + +from ansys.fluent.core.scheduler.load_machines import ( + _construct_machine_list_slurm, + _parse_host_info, + _parse_machine_data, + load_machines, +) +from ansys.fluent.core.scheduler.machine_list import Machine, MachineList + + +class TestMachine(unittest.TestCase): + """A basic test that checks Machine object behavior.""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_initialize_host(self): + """Test that a Machine initializes as expected.""" + machine = Machine("machine", 20) + self.assertEqual(machine.host_name, "machine") + self.assertEqual(machine.number_of_cores, 20) + self.assertEqual(machine.queue_name, None) + self.assertEqual(machine.core_list, None) + + def test_modify_host(self): + """Test that a Machine can be modified.""" + machine = Machine("machine", 20, "allq", "0:0") + machine.number_of_cores = 12 + self.assertEqual(machine.number_of_cores, 12) + machine.number_of_cores = machine.number_of_cores + 2 + machine.number_of_cores += 1 + self.assertEqual(machine.number_of_cores, 15) + machine.number_of_cores -= 3 + self.assertEqual(machine.number_of_cores, 12) + + +class TestMachineList(unittest.TestCase): + """Provide a test suite that checks that the MachineList object behaves properly.""" + + def setUp(self): + self._machineList = MachineList() + + def tearDown(self): + self._machineList.reset() + + def test_initialize_machinelist(self): + """Tests that a host file object initializes properly.""" + newMachineFile = MachineList() + self.assertIsInstance(newMachineFile, MachineList) + self.assertEqual(newMachineFile.machines, []) + self.assertEqual(newMachineFile.num_machines, 0) + + def test_copy_machinelist(self): + """Tests that the internal copy function works properly.""" + import copy + + newMachineList = copy.deepcopy(self._machineList) + for m1, m2 in zip(self._machineList.machines, newMachineList.machines): + self.assertEqual(m1.host_name, m2.host_name) + self.assertEqual(m1.number_of_cores, m2.number_of_cores) + + def test_add_to_machinelist(self): + """Tests that a machines can be added to a machine list.""" + self._machineList.add(Machine("machine1", 20, "allq", "0:0")) + self._machineList.add(Machine("machine2", 20, "allq", "0:0")) + self.assertEqual(self._machineList.num_machines, 2) + + def test_number_of_cores_and_machines(self): + """Test that the total and max number of cores and machines is working.""" + self._machineList.add(Machine("machine1", 20, "allq", "0:0")) + self._machineList.add(Machine("machine2", 25, "allq", "0:0")) + self._machineList.add(Machine("machine3", 15, "allq", "0:0")) + self.assertEqual(self._machineList.num_machines, 3) + self.assertEqual(self._machineList.number_of_cores, 60) + self.assertEqual(self._machineList.max_cores, 25) + self.assertEqual(self._machineList.min_cores, 15) + + def test_sort_machine_list(self): + """Test that the machines are sorted in order of decreasing core count.""" + self._machineList.add(Machine("machine1", 15, "allq", "0:0")) + self._machineList.add(Machine("machine2", 10, "allq", "0:0")) + self._machineList.add(Machine("machine3", 5, "allq", "0:0")) + + # Sort in ascending order + self._machineList.sort_by_core_count_ascending() + numCores = self._machineList.machines[0].number_of_cores + for h in range(1, len(self._machineList.machines)): + self.assertLessEqual( + numCores, self._machineList.machines[h].number_of_cores + ) + numCores = self._machineList.machines[h].number_of_cores + + # Sort in descending order + self._machineList.sort_by_core_count() + numCores = self._machineList.machines[0].number_of_cores + for h in range(1, len(self._machineList.machines)): + self.assertLessEqual( + self._machineList.machines[h].number_of_cores, numCores + ) + numCores = self._machineList.machines[h].number_of_cores + + def test_remote_empty_machines(self): + self._machineList.add(Machine("machine1", 5)) + self._machineList.add(Machine("machine2", 0)) + self._machineList.remove_empty_machines() + self.assertEqual(self._machineList.num_machines, 1) + self.assertEqual(self._machineList.machines[0].host_name, "machine1") + + def test_move_local_host_to_front(self): + import socket + + localHostName = socket.gethostname() + + self._machineList.add(Machine("M0", 2)) + self._machineList.add(Machine(localHostName, 1)) + self._machineList.add(Machine("M1", 3)) + self._machineList.move_local_host_to_front() + self.assertEqual(self._machineList.machines[0].host_name, localHostName) + self.assertEqual(self._machineList.machines[0].number_of_cores, 1) + self.assertEqual(self._machineList.machines[1].host_name, "M0") + self.assertEqual(self._machineList.machines[1].number_of_cores, 2) + self.assertEqual(self._machineList.machines[2].host_name, "M1") + self.assertEqual(self._machineList.machines[2].number_of_cores, 3) + + def test_deep_copy_machinelist(self): + self._machineList.add(Machine("wathpc-2-0.local", 23)) + self._machineList.add(Machine("wathpc-2-1.local", 23)) + self._machineList.add(Machine("wathpc-2-2.local", 23)) + self._machineList.add(Machine("wathpc-2-3.local", 23)) + import copy + + machineListCopy = copy.deepcopy(self._machineList) + for m1, m2 in zip(self._machineList.machines, machineListCopy.machines): + self.assertEqual(m1.host_name, m2.host_name) + self.assertEqual(m1.number_of_cores, m2.number_of_cores) + + +class TestLoadMachines(unittest.TestCase): + """Provide a test suite that checks that loadMachines behaves properly.""" + + def setUp(self): + self._machineList = MachineList() + + def tearDown(self): + self._machineList.reset() + + def test_constrain_machines1(self): + machineList = load_machines(host_info="M0:2,M1:3,M2:2", ncores=4) + expectedValue = {"M0": 1, "M1": 2, "M2": 1} + self.assertEqual(len(machineList.machines), 3) + for machine in machineList.machines: + self.assertEqual(machine.number_of_cores, expectedValue[machine.host_name]) + # Ensure that the order is preserved + self.assertEqual(machineList.machines[0].host_name, "M0") + + def test_constrain_machines2(self): + machineList = load_machines(host_info="M0:2,M1:3,M2:2", ncores=3) + expectedValue = {"M0": 1, "M1": 1, "M2": 1} + self.assertEqual(len(machineList.machines), 3) + for machine in machineList.machines: + self.assertEqual(machine.number_of_cores, expectedValue[machine.host_name]) + # Ensure that the order is preserved + self.assertEqual(machineList.machines[0].host_name, "M0") + + def test_overload_machines1(self): + machineList = load_machines(host_info="M0:2,M1:1", ncores=10) + expectedValue = {"M0": 2, "M1": 1} + self.assertEqual(len(machineList.machines), 2) + for machine in machineList.machines: + self.assertEqual(machine.number_of_cores, expectedValue[machine.host_name]) + # Ensure that the order is preserved + self.assertEqual(machineList.machines[0].host_name, "M0") + + def test_overload_machines2(self): + machineList = load_machines(host_info="M0,M0,M1", ncores=10) + expectedValue = {"M0": 2, "M1": 1} + self.assertEqual(len(machineList.machines), 2) + for machine in machineList.machines: + self.assertEqual(machine.number_of_cores, expectedValue[machine.host_name]) + # Ensure that the order is preserved + self.assertEqual(machineList.machines[0].host_name, "M0") + + def test_winhpc(self): + os.environ["CCP_NODES"] = "3 M0 8 M1 8 M2 16" + machineList = load_machines() + self.assertEqual(machineList.num_machines, 3) + self.assertEqual(machineList.number_of_cores, 32) + self.assertEqual(machineList.machines[0].host_name, "M0") + self.assertEqual(machineList.machines[1].host_name, "M1") + self.assertEqual(machineList.machines[2].host_name, "M2") + + def test_slurm_no_brackets(self): + os.environ["SLURM_JOB_NODELIST"] = "M0,M1,M2" + os.environ["SLURM_NTASKS_PER_NODE"] = "8" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 3) + self.assertEqual(machineList.number_of_cores, 24) + self.assertEqual(machineList.machines[0].host_name, "M0") + self.assertEqual(machineList.machines[1].host_name, "M1") + self.assertEqual(machineList.machines[2].host_name, "M2") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + def test_slurm_no_padding(self): + os.environ["SLURM_JOB_NODELIST"] = "M[0-2]" + os.environ["SLURM_NTASKS_PER_NODE"] = "12" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 3) + self.assertEqual(machineList.number_of_cores, 36) + self.assertEqual(machineList.machines[0].host_name, "M0") + self.assertEqual(machineList.machines[1].host_name, "M1") + self.assertEqual(machineList.machines[2].host_name, "M2") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + def test_slurm_hosts_with_dash(self): + os.environ["SLURM_JOB_NODELIST"] = "M-n50-[0-1],M-p50-[9-11]" + os.environ["SLURM_NTASKS_PER_NODE"] = "12" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 5) + self.assertEqual(machineList.number_of_cores, 60) + self.assertEqual(machineList.machines[0].host_name, "M-n50-0") + self.assertEqual(machineList.machines[1].host_name, "M-n50-1") + self.assertEqual(machineList.machines[2].host_name, "M-p50-9") + self.assertEqual(machineList.machines[3].host_name, "M-p50-10") + self.assertEqual(machineList.machines[4].host_name, "M-p50-11") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + def test_slurm_with_padding(self): + os.environ["SLURM_JOB_NODELIST"] = "MC[008-009,010,011,012-014]" + os.environ["SLURM_TASKS_PER_NODE"] = "8,10(x2),12(x3),10" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 7) + self.assertEqual(machineList.number_of_cores, 74) + self.assertEqual(machineList.machines[0].host_name, "MC008") + self.assertEqual(machineList.machines[0].number_of_cores, 8) + self.assertEqual(machineList.machines[1].host_name, "MC009") + self.assertEqual(machineList.machines[1].number_of_cores, 10) + self.assertEqual(machineList.machines[2].host_name, "MC010") + self.assertEqual(machineList.machines[2].number_of_cores, 10) + self.assertEqual(machineList.machines[3].host_name, "MC011") + self.assertEqual(machineList.machines[3].number_of_cores, 12) + self.assertEqual(machineList.machines[4].host_name, "MC012") + self.assertEqual(machineList.machines[4].number_of_cores, 12) + self.assertEqual(machineList.machines[5].host_name, "MC013") + self.assertEqual(machineList.machines[5].number_of_cores, 12) + self.assertEqual(machineList.machines[6].host_name, "MC014") + self.assertEqual(machineList.machines[6].number_of_cores, 10) + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_TASKS_PER_NODE"] + + def test_slurm_with_padding_one_hostlist(self): + os.environ["SLURM_JOB_NODELIST"] = "MD[099-101]" + os.environ["SLURM_NTASKS_PER_NODE"] = "12" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 3) + self.assertEqual(machineList.number_of_cores, 36) + self.assertEqual(machineList.machines[0].host_name, "MD099") + self.assertEqual(machineList.machines[1].host_name, "MD100") + self.assertEqual(machineList.machines[2].host_name, "MD101") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + def test_slurm_no_padding_commas(self): + os.environ["SLURM_JOB_NODELIST"] = "M[2-3,4,5-7,8-11,12-14,15-16]" + os.environ["SLURM_NTASKS_PER_NODE"] = "12" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 15) + self.assertEqual(machineList.number_of_cores, 180) + self.assertEqual(machineList.machines[0].host_name, "M2") + self.assertEqual(machineList.machines[1].host_name, "M3") + self.assertEqual(machineList.machines[2].host_name, "M4") + self.assertEqual(machineList.machines[3].host_name, "M5") + self.assertEqual(machineList.machines[4].host_name, "M6") + self.assertEqual(machineList.machines[5].host_name, "M7") + self.assertEqual(machineList.machines[6].host_name, "M8") + self.assertEqual(machineList.machines[7].host_name, "M9") + self.assertEqual(machineList.machines[8].host_name, "M10") + self.assertEqual(machineList.machines[9].host_name, "M11") + self.assertEqual(machineList.machines[10].host_name, "M12") + self.assertEqual(machineList.machines[11].host_name, "M13") + self.assertEqual(machineList.machines[12].host_name, "M14") + self.assertEqual(machineList.machines[13].host_name, "M15") + self.assertEqual(machineList.machines[14].host_name, "M16") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + def test_slurm_very_complex(self): + os.environ[ + "SLURM_JOB_NODELIST" + ] = "M[2-3,4,5-7,8-11,12-14,15-16],MB,MC[008-009,010-011,012-014],MD[099-101]" + os.environ["SLURM_NTASKS_PER_NODE"] = "24" + hostList = os.environ.get("SLURM_JOB_NODELIST") + machineList = _construct_machine_list_slurm(hostList) + self.assertEqual(machineList.num_machines, 26) + self.assertEqual(machineList.number_of_cores, 624) + self.assertEqual(machineList.machines[0].host_name, "M2") + self.assertEqual(machineList.machines[0].number_of_cores, 24) + self.assertEqual(machineList.machines[7].host_name, "M9") + self.assertEqual(machineList.machines[14].host_name, "M16") + self.assertEqual(machineList.machines[15].host_name, "MB") + self.assertEqual(machineList.machines[16].host_name, "MC008") + self.assertEqual(machineList.machines[19].host_name, "MC011") + self.assertEqual(machineList.machines[22].host_name, "MC014") + self.assertEqual(machineList.machines[24].host_name, "MD100") + del os.environ["SLURM_JOB_NODELIST"] + del os.environ["SLURM_NTASKS_PER_NODE"] + + +class TestMachineListCmdLine(unittest.TestCase): + """Provide a test suite that checks the machine list parser.""" + + def setUp(self): + self._expectedValues = {"M0": 2, "M1": 4} + + def tearDown(self): + pass + + def test_parse_machine_data(self): + machineDataList = [["M0:2", "M1:2", "M1:2"], ["M0", "M0", "M1", "M1:3"]] + + for machineData in machineDataList: + machineList = _parse_machine_data(machineData) + for machine in machineList.machines: + self.assertEqual( + machine.number_of_cores, self._expectedValues[machine.host_name] + ) + + def test_cmd_string(self): + hostLists = ["M0:2,M1:2,M1:2", "M0,M0,M1,M1:3"] + + for hostList in hostLists: + machineList = _parse_host_info(hostList) + for machine in machineList.machines: + self.assertEqual( + machine.number_of_cores, self._expectedValues[machine.host_name] + ) + + def test_host_file(self): + import os.path + + hostfile = "hosts.txt" + # This unit test only runs if the file exists + if os.path.isfile(hostfile): + machineList = _parse_host_info(hostfile) + for machine in machineList.machines: + self.assertEqual( + machine.number_of_cores, self._expectedValues[machine.host_name] + ) + + def test_file_not_found(self): + hostfile = "nonExistentFile.txt" + self.assertRaises(IOError, _parse_host_info, hostfile) + + +suite1 = unittest.TestLoader().loadTestsFromTestCase(TestMachine) +suite2 = unittest.TestLoader().loadTestsFromTestCase(TestMachineList) +suite3 = unittest.TestLoader().loadTestsFromTestCase(TestLoadMachines) +suite4 = unittest.TestLoader().loadTestsFromTestCase(TestMachineListCmdLine) +alltests = unittest.TestSuite([suite1, suite2, suite3, suite4]) +unittest.TextTestRunner(verbosity=2).run(alltests) + +if __name__ == "__main__": + unittest.main()