diff --git a/src/common/schedulers/slurm_commands.py b/src/common/schedulers/slurm_commands.py index ee45b0a9..a46e4412 100644 --- a/src/common/schedulers/slurm_commands.py +++ b/src/common/schedulers/slurm_commands.py @@ -13,6 +13,11 @@ import logging import os import re +import shlex + +# A nosec comment is appended to the following line in order to disable the B404 check. +# In this file the input of the module subprocess is trusted. +import subprocess # nosec B404 from datetime import datetime, timezone from typing import Dict, List @@ -60,11 +65,25 @@ SCONTROL = f"sudo {SLURM_BINARIES_DIR}/scontrol" SINFO = f"{SLURM_BINARIES_DIR}/sinfo" -SCONTROL_OUTPUT_AWK_PARSER = ( - 'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | ' - + "grep -oP '^(NodeName=\\S+)|(NodeAddr=\\S+)|(NodeHostName=\\S+)|(? str: + """ + Run a `scontrol ` command as a standalone subprocess and return its stdout. + + scontrol's exit code, stderr and raw stdout are logged. When scontrol exits non-zero but still returns + output (e.g. one of the requested nodes does not exist: Slurm prints "Node not found" and exits 1 + while still returning the other nodes), the output is returned and a warning is logged. When it exits + non-zero with no output, an error is logged and, when raise_on_error, a CalledProcessError is raised. + """ + command = f"{SCONTROL} {scontrol_args}" + log.debug("Executing scontrol command: %s", command) + try: + # nosec B603: command is built from trusted/validated input and run without a shell. + result = subprocess.run( # nosec B603 + shlex.split(command), + timeout=command_timeout, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + except subprocess.TimeoutExpired: + log.error("scontrol command timed out after %s seconds: '%s'", command_timeout, command) + raise + except OSError as e: + log.error("Unable to execute scontrol command '%s'. Failed with exception: %s", command, e) + raise + + stdout = result.stdout or "" + stderr = (result.stderr or "").strip() + # Raw scontrol output is logged at DEBUG only, so it is not emitted at the default INFO level. + log.debug("scontrol command '%s' exited with code %s. Raw output:\n%s", command, result.returncode, stdout) + + if result.returncode != 0: + if stdout.strip(): + log.warning( + "scontrol command '%s' returned non-zero exit code %s but produced output; proceeding with " + "the output. stderr: '%s'", + command, + result.returncode, + stderr, + ) + else: + log.error( + "scontrol command '%s' failed with exit code %s and produced no output. stderr: '%s'", + command, + result.returncode, + stderr, + ) + if raise_on_error: + raise subprocess.CalledProcessError(result.returncode, command, output=stdout, stderr=result.stderr) + + return stdout + + +def _extract_scontrol_records(raw_output: str, field_regex) -> List[Dict[str, str]]: + """ + Split raw scontrol output into records and extract the relevant `key=value` fields of each record. + + Records are separated by blank lines. For each record, only the fields matched by field_regex are kept, + returned as a dict mapping the Slurm field name to its value. Records with no matching field are skipped. + """ + records = [] + for record in re.split(r"\n\n+", raw_output.strip()): + fields = {} + for match in field_regex.finditer(record): + key, _, value = match.group(0).partition("=") + fields[key] = value + if fields: + records.append(fields) + return records + + def get_nodes_info(nodes: str = "", command_timeout=DEFAULT_GET_INFO_COMMAND_TIMEOUT) -> List[SlurmNode]: """ Retrieve SlurmNode list from slurm nodelist notation. @@ -351,15 +447,11 @@ def get_nodes_info(nodes: str = "", command_timeout=DEFAULT_GET_INFO_COMMAND_TIM if nodes == "": nodes = _get_all_partition_nodes(",".join(PartitionNodelistMapping.instance().get_partitions())) - # Validation to sanitize the input argument and make it safe to use the function affected by B604 validate_subprocess_argument(nodes) - # awk is used to replace the \n\n record separator with '######\n' # Note: In case the node does not belong to any partition the Partitions field is missing from Slurm output - show_node_info_command = f"{SCONTROL} show nodes {nodes} | {SCONTROL_OUTPUT_AWK_PARSER}" - nodeinfo_str = check_command_output(show_node_info_command, timeout=command_timeout, shell=True) # nosec B604 - - return _parse_nodes_info(nodeinfo_str) + raw_node_info = _run_scontrol_command(f"show nodes {nodes}", command_timeout=command_timeout) + return _parse_nodes_info(_extract_scontrol_records(raw_node_info, SCONTROL_NODE_INFO_FIELD_REGEX)) def get_partitions_info(command_timeout=DEFAULT_GET_INFO_COMMAND_TIMEOUT) -> List[SlurmPartition]: @@ -368,16 +460,10 @@ def get_partitions_info(command_timeout=DEFAULT_GET_INFO_COMMAND_TIMEOUT) -> Lis This function considers only partitions managed by ParallelCluster. """ - partitions = list(PartitionNodelistMapping.instance().get_partitions()) - grep_filter = _get_partition_grep_filter(partitions) - show_partition_info_command = ( - f"{SCONTROL} show partitions -o {grep_filter} " + '| grep -oP "^PartitionName=\\K(\\S+)| State=\\K(\\S+)"' - ) - # It's safe to use the function affected by B604 since the command is fully built in this code - partition_info_str = check_command_output( - show_partition_info_command, timeout=command_timeout, shell=True # nosec B604 - ) - partitions_info = _parse_partition_name_and_state(partition_info_str) + partitions = set(PartitionNodelistMapping.instance().get_partitions()) + raw_partition_info = _run_scontrol_command("show partitions", command_timeout=command_timeout) + partition_records = _extract_scontrol_records(raw_partition_info, SCONTROL_PARTITION_INFO_FIELD_REGEX) + partitions_info = _parse_partitions_info(partition_records, managed_partitions=partitions or None) return [ SlurmPartition( partition_name, @@ -388,15 +474,6 @@ def get_partitions_info(command_timeout=DEFAULT_GET_INFO_COMMAND_TIMEOUT) -> Lis ] -def _get_partition_grep_filter(partitions: List[str]) -> str: - grep_filter = "" - if partitions: - grep_filter += " | grep" - for partition in partitions: - grep_filter += f' -e "PartitionName={partition}"' - return grep_filter - - def resume_powering_down_nodes(): """Resume nodes that are powering_down so that are set in power state right away.""" # TODO: This function was added due to Slurm ticket 12915. The bug is not reproducible and the ticket was then @@ -406,9 +483,23 @@ def resume_powering_down_nodes(): update_nodes(nodes=powering_down_nodes, state="resume", raise_on_error=False) -def _parse_partition_name_and_state(partition_info): - """Parse partition name and state from scontrol output.""" - return grouper(partition_info.splitlines(), 2) +def _parse_partitions_info(partition_records, managed_partitions=None): + """ + Extract (partition_name, partition_state) tuples from scontrol partition records. + + Only partitions in managed_partitions are returned; if managed_partitions is None, all partitions are + returned. Records missing the name or state are skipped. + """ + partitions_info = [] + for fields in partition_records: + partition_name = fields.get("PartitionName") + partition_state = fields.get("State") + if not partition_name or not partition_state: + continue + if managed_partitions is not None and partition_name not in managed_partitions: + continue + partitions_info.append((partition_name, partition_state)) + return partitions_info def _get_all_partition_nodes(partition_name, command_timeout=DEFAULT_GET_INFO_COMMAND_TIMEOUT): @@ -433,43 +524,14 @@ def _get_slurm_nodes(states=None, partition_name=None, command_timeout=DEFAULT_G return check_command_output(sinfo_command, timeout=command_timeout, shell=True).splitlines() # nosec B604 -def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]: - """Parse slurm node info into SlurmNode objects.""" - # [ec2-user@ip-10-0-0-58 ~]$ /opt/slurm/bin/scontrol show nodes compute-dy-c5xlarge-[1-3],compute-dy-c5xlarge-50001\ - # | awk 'BEGIN{{RS="\n\n" ; ORS="######\n";}} {{print}}' | grep -oP "^(NodeName=\S+)|(NodeAddr=\S+) - # |(NodeHostName=\S+)|(? List[SlurmNode]: + """ + Build SlurmNode objects from scontrol node records extracted by _extract_scontrol_records. + Each record is a dict mapping Slurm field names to their values, e.g.: + {"NodeName": "compute-dy-c5xlarge-1", "NodeAddr": "1.2.3.4", "NodeHostName": "compute-dy-c5xlarge-1", + "State": "IDLE+CLOUD+POWER", "Partitions": "compute,compute2", "SlurmdStartTime": "2023-01-26T09:57:15"} + """ map_slurm_key_to_arg = { "NodeName": "name", "NodeAddr": "nodeaddr", @@ -485,13 +547,12 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]: date_fields = ["SlurmdStartTime", "LastBusyTime"] - node_info = slurm_node_info.split("######\n") slurm_nodes = [] - for node in node_info: - lines = node.splitlines() + for fields in node_records: + if "NodeName" not in fields: + continue kwargs = {} - for line in lines: - key, value = line.split("=", 1) + for key, value in fields.items(): if key in date_fields: if value not in ["None", "Unknown"]: value = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").astimezone(tz=timezone.utc) @@ -501,15 +562,12 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]: # Slurm reports an unset InstanceId as "(null)" value = None kwargs[map_slurm_key_to_arg[key]] = value - if lines: - try: - if is_static_node(kwargs["name"]): - node = StaticNode(**kwargs) - slurm_nodes.append(node) - else: - node = DynamicNode(**kwargs) - slurm_nodes.append(node) - except InvalidNodenameError: - log.warning("Ignoring node %s because it has an invalid name", kwargs["name"]) + try: + if is_static_node(kwargs["name"]): + slurm_nodes.append(StaticNode(**kwargs)) + else: + slurm_nodes.append(DynamicNode(**kwargs)) + except InvalidNodenameError: + log.warning("Ignoring node %s because it has an invalid name", kwargs["name"]) return slurm_nodes diff --git a/src/common/schedulers/slurm_reservation_commands.py b/src/common/schedulers/slurm_reservation_commands.py index 78ee8741..afd71681 100644 --- a/src/common/schedulers/slurm_reservation_commands.py +++ b/src/common/schedulers/slurm_reservation_commands.py @@ -9,6 +9,7 @@ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and # limitations under the License. import logging +import re # A nosec comment is appended to the following line in order to disable the B404 check. # In this file the input of the module subprocess is trusted. @@ -16,7 +17,12 @@ from datetime import datetime from typing import List, Union -from common.schedulers.slurm_commands import DEFAULT_SCONTROL_COMMAND_TIMEOUT, SCONTROL +from common.schedulers.slurm_commands import ( + DEFAULT_SCONTROL_COMMAND_TIMEOUT, + SCONTROL, + _extract_scontrol_records, + _run_scontrol_command, +) from common.utils import ( SlurmCommandError, SlurmCommandErrorHandler, @@ -30,9 +36,11 @@ logger = logging.getLogger(__name__) -SCONTROL_SHOW_RESERVATION_OUTPUT_AWK_PARSER = ( - 'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | ' - + "grep -oP '^(ReservationName=\\S+)|(? List[SlurmReservation]: - """Parse slurm reservations info into SlurmReservation objects.""" - # $ /opt/slurm/bin/scontrol show reservations awk 'BEGIN{{RS="\n\n" ; ORS="######\n";}} {{print}}' | - # grep -oP '^(ReservationName=\S+)|(? List[SlurmReservation]: + """ + Build SlurmReservation objects from scontrol reservation records extracted by _extract_scontrol_records. + + Each record is a dict mapping Slurm field names to their values, e.g.: + {"ReservationName": "root_8", "Nodes": "queuep4d-dy-crp4d-[1-5]", "Users": "root", "State": "ACTIVE"} + """ map_slurm_key_to_arg = {"ReservationName": "name", "Nodes": "nodes", "Users": "users", "State": "state"} - reservation_info = slurm_reservations_info.split("######\n") slurm_reservations = [] - for reservation in reservation_info: - lines = reservation.splitlines() - kwargs = {} - for line in lines: - key, value = line.split("=") - kwargs[map_slurm_key_to_arg[key]] = value - if lines: - reservation = SlurmReservation(**kwargs) - slurm_reservations.append(reservation) + for fields in reservation_records: + kwargs = {map_slurm_key_to_arg[key]: value for key, value in fields.items()} + slurm_reservations.append(SlurmReservation(**kwargs)) return slurm_reservations diff --git a/tests/common/schedulers/test_slurm_commands.py b/tests/common/schedulers/test_slurm_commands.py index f27c59fd..e07e1877 100644 --- a/tests/common/schedulers/test_slurm_commands.py +++ b/tests/common/schedulers/test_slurm_commands.py @@ -9,23 +9,26 @@ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and # limitations under the License. import os.path -import platform +import subprocess from datetime import datetime, timezone -from typing import Dict, List +from typing import Dict from unittest.mock import call, patch import pytest from assertpy import assert_that from common.schedulers.slurm_commands import ( SCONTROL, - SCONTROL_OUTPUT_AWK_PARSER, + SCONTROL_NODE_INFO_FIELD_REGEX, + SCONTROL_PARTITION_INFO_FIELD_REGEX, SINFO, PartitionNodelistMapping, _batch_node_info, + _extract_scontrol_records, _get_all_partition_nodes, - _get_partition_grep_filter, _get_slurm_nodes, _parse_nodes_info, + _parse_partitions_info, + _run_scontrol_command, get_nodes_info, is_static_node, parse_nodename, @@ -39,7 +42,6 @@ update_nodes, update_partitions, ) -from common.utils import check_command_output from slurm_plugin.slurm_resources import DynamicNode, InvalidNodenameError, PartitionStatus, SlurmPartition, StaticNode @@ -92,23 +94,27 @@ def test_is_static_node(nodename, expected_is_static): @pytest.mark.parametrize( - "node_info, expected_parsed_nodes_output, invalid_name", + "node_records, expected_parsed_nodes_output, invalid_name", [ ( - "NodeName=multiple-st-c5xlarge-1\n" - "NodeAddr=172.31.10.155\n" - "NodeHostName=172-31-10-155\n" - "State=MIXED+CLOUD\n" - "Partitions=multiple\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - "######\n" - "NodeName=multiple-dy-c5xlarge-2\n" - "NodeAddr=172.31.7.218\n" - "NodeHostName=172-31-7-218\n" - "State=IDLE+CLOUD+POWER\n" - "Partitions=multiple\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - "######\n", + [ + { + "NodeName": "multiple-st-c5xlarge-1", + "NodeAddr": "172.31.10.155", + "NodeHostName": "172-31-10-155", + "State": "MIXED+CLOUD", + "Partitions": "multiple", + "SlurmdStartTime": "2023-01-23T17:57:07", + }, + { + "NodeName": "multiple-dy-c5xlarge-2", + "NodeAddr": "172.31.7.218", + "NodeHostName": "172-31-7-218", + "State": "IDLE+CLOUD+POWER", + "Partitions": "multiple", + "SlurmdStartTime": "2023-01-23T17:57:07", + }, + ], [ StaticNode( "multiple-st-c5xlarge-1", @@ -130,25 +136,29 @@ def test_is_static_node(nodename, expected_is_static): False, ), ( - "NodeName=queue1-st-crt2micro-1\n" - "NodeAddr=10.0.236.182\n" - "NodeHostName=queue1-st-crt2micro-1\n" - "State=IDLE+CLOUD+MAINTENANCE+RESERVED\n" - "Partitions=queue1\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - "LastBusyTime=2023-10-13T10:13:20\n" - "ReservationName=root_1\n" - "######\n" - "NodeName=queuep4d-dy-crp4d-1\n" - "NodeAddr=queuep4d-dy-crp4d-1\n" - "NodeHostName=queuep4d-dy-crp4d-1\n" - "State=DOWN+CLOUD+MAINTENANCE+POWERED_DOWN+RESERVED\n" - "Partitions=queuep4d\n" - "SlurmdStartTime=None\n" - "LastBusyTime=Unknown\n" - "Reason=test [slurm@2023-10-20T07:18:35]\n" - "ReservationName=root_6\n" - "######\n", + [ + { + "NodeName": "queue1-st-crt2micro-1", + "NodeAddr": "10.0.236.182", + "NodeHostName": "queue1-st-crt2micro-1", + "State": "IDLE+CLOUD+MAINTENANCE+RESERVED", + "Partitions": "queue1", + "SlurmdStartTime": "2023-01-23T17:57:07", + "LastBusyTime": "2023-10-13T10:13:20", + "ReservationName": "root_1", + }, + { + "NodeName": "queuep4d-dy-crp4d-1", + "NodeAddr": "queuep4d-dy-crp4d-1", + "NodeHostName": "queuep4d-dy-crp4d-1", + "State": "DOWN+CLOUD+MAINTENANCE+POWERED_DOWN+RESERVED", + "Partitions": "queuep4d", + "SlurmdStartTime": "None", + "LastBusyTime": "Unknown", + "Reason": "test [slurm@2023-10-20T07:18:35]", + "ReservationName": "root_6", + }, + ], [ StaticNode( "queue1-st-crt2micro-1", @@ -173,14 +183,17 @@ def test_is_static_node(nodename, expected_is_static): False, ), ( - "NodeName=multiple-dy-c5xlarge-3\n" - "NodeAddr=multiple-dy-c5xlarge-3\n" - "NodeHostName=multiple-dy-c5xlarge-3\n" - "State=IDLE+CLOUD+POWER\n" - "Partitions=multiple\n" - "Reason=some reason \n" - "SlurmdStartTime=None\n" - "######\n", + [ + { + "NodeName": "multiple-dy-c5xlarge-3", + "NodeAddr": "multiple-dy-c5xlarge-3", + "NodeHostName": "multiple-dy-c5xlarge-3", + "State": "IDLE+CLOUD+POWER", + "Partitions": "multiple", + "Reason": "some reason ", + "SlurmdStartTime": "None", + }, + ], [ DynamicNode( "multiple-dy-c5xlarge-3", @@ -195,14 +208,17 @@ def test_is_static_node(nodename, expected_is_static): False, ), ( - "NodeName=multiple-dy-c5xlarge-3\n" - "NodeAddr=multiple-dy-c5xlarge-3\n" - "NodeHostName=multiple-dy-c5xlarge-3\n" - "State=IDLE+CLOUD+POWER\n" - "Partitions=multiple\n" - "Reason=some reason containing key=value entries \n" - "SlurmdStartTime=None\n" - "######\n", + [ + { + "NodeName": "multiple-dy-c5xlarge-3", + "NodeAddr": "multiple-dy-c5xlarge-3", + "NodeHostName": "multiple-dy-c5xlarge-3", + "State": "IDLE+CLOUD+POWER", + "Partitions": "multiple", + "Reason": "some reason containing key=value entries ", + "SlurmdStartTime": "None", + }, + ], [ DynamicNode( "multiple-dy-c5xlarge-3", @@ -217,22 +233,26 @@ def test_is_static_node(nodename, expected_is_static): False, ), ( - "NodeName=multiple-dy-c5xlarge-5\n" - "NodeAddr=multiple-dy-c5xlarge-5\n" - "NodeHostName=multiple-dy-c5xlarge-5\n" - "State=IDLE+CLOUD+POWER\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - "LastBusyTime=2023-01-23T17:57:07\n" - # missing partitions - "######\n" - # Invalid node name - "NodeName=test-no-partition\n" - "NodeAddr=test-no-partition\n" - "NodeHostName=test-no-partition\n" - "State=IDLE+CLOUD+POWER\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - # missing partitions - "######\n", + [ + { + "NodeName": "multiple-dy-c5xlarge-5", + "NodeAddr": "multiple-dy-c5xlarge-5", + "NodeHostName": "multiple-dy-c5xlarge-5", + "State": "IDLE+CLOUD+POWER", + "SlurmdStartTime": "2023-01-23T17:57:07", + "LastBusyTime": "2023-01-23T17:57:07", + # missing partitions + }, + # Invalid node name + { + "NodeName": "test-no-partition", + "NodeAddr": "test-no-partition", + "NodeHostName": "test-no-partition", + "State": "IDLE+CLOUD+POWER", + "SlurmdStartTime": "2023-01-23T17:57:07", + # missing partitions + }, + ], [ DynamicNode( "multiple-dy-c5xlarge-5", @@ -248,22 +268,26 @@ def test_is_static_node(nodename, expected_is_static): ), # Test case: InstanceId is parsed from scontrol show nodes output; "(null)" is normalized to None ( - "NodeName=queue1-st-c5xlarge-1\n" - "NodeAddr=10.0.1.1\n" - "NodeHostName=queue1-st-c5xlarge-1\n" - "State=IDLE+CLOUD\n" - "Partitions=queue1\n" - "SlurmdStartTime=2023-01-23T17:57:07\n" - "InstanceId=i-0abc123def456\n" - "######\n" - "NodeName=queue1-dy-c5xlarge-2\n" - "NodeAddr=queue1-dy-c5xlarge-2\n" - "NodeHostName=queue1-dy-c5xlarge-2\n" - "State=IDLE+CLOUD+POWER\n" - "Partitions=queue1\n" - "SlurmdStartTime=None\n" - "InstanceId=(null)\n" - "######\n", + [ + { + "NodeName": "queue1-st-c5xlarge-1", + "NodeAddr": "10.0.1.1", + "NodeHostName": "queue1-st-c5xlarge-1", + "State": "IDLE+CLOUD", + "Partitions": "queue1", + "SlurmdStartTime": "2023-01-23T17:57:07", + "InstanceId": "i-0abc123def456", + }, + { + "NodeName": "queue1-dy-c5xlarge-2", + "NodeAddr": "queue1-dy-c5xlarge-2", + "NodeHostName": "queue1-dy-c5xlarge-2", + "State": "IDLE+CLOUD+POWER", + "Partitions": "queue1", + "SlurmdStartTime": "None", + "InstanceId": "(null)", + }, + ], [ StaticNode( "queue1-st-c5xlarge-1", @@ -288,8 +312,8 @@ def test_is_static_node(nodename, expected_is_static): ), ], ) -def test_parse_nodes_info(node_info, expected_parsed_nodes_output, invalid_name, caplog): - parsed_node_info = _parse_nodes_info(node_info) +def test_parse_nodes_info(node_records, expected_parsed_nodes_output, invalid_name, caplog): + parsed_node_info = _parse_nodes_info(node_records) assert_that(parsed_node_info).is_equal_to(expected_parsed_nodes_output) if invalid_name: assert_that(caplog.text).contains("Ignoring node test-no-partition because it has an invalid name") @@ -1206,7 +1230,7 @@ def test_get_all_partition_nodes( @pytest.mark.parametrize( - "nodes, cmd_timeout, partition_nodelist_mapping, expected_command", + "nodes, cmd_timeout, partition_nodelist_mapping, expected_scontrol_args", [ pytest.param( "node1 node2", @@ -1215,7 +1239,7 @@ def test_get_all_partition_nodes( "test": "test-st-cr1-[1-10],test-dy-cr2-[1-2]", "test2": "test2-st-cr1-[1-10],test2-dy-cr2-[1-2]", }, - f"{SCONTROL} show nodes node1 node2 | {SCONTROL_OUTPUT_AWK_PARSER}", + "show nodes node1 node2", id="Test with nodes provided by caller", ), pytest.param( @@ -1225,13 +1249,12 @@ def test_get_all_partition_nodes( "test": "test-st-cr1-[1-10],test-dy-cr2-[1-2]", "test2": "test2-st-cr1-[1-10],test2-dy-cr2-[1-2]", }, - f"{SCONTROL} show nodes test-st-cr1-[1-10],test-dy-cr2-[1-2],test2-st-cr1-[1-10],test2-dy-cr2-[1-2] | " - f"{SCONTROL_OUTPUT_AWK_PARSER}", + "show nodes test-st-cr1-[1-10],test-dy-cr2-[1-2],test2-st-cr1-[1-10],test2-dy-cr2-[1-2]", id="Test with nodes not provided by caller. Nodes are retrieved from PC-managed partitions ", ), ], ) -def test_get_nodes_info(nodes, cmd_timeout, partition_nodelist_mapping: Dict, expected_command, mocker): +def test_get_nodes_info(nodes, cmd_timeout, partition_nodelist_mapping: Dict, expected_scontrol_args, mocker): # Mock get_partitions() method of the PartitionNodelistMapping singleton used in get_nodes_info() mocker.patch( "common.schedulers.slurm_commands.PartitionNodelistMapping.get_partitions", @@ -1242,48 +1265,45 @@ def test_get_nodes_info(nodes, cmd_timeout, partition_nodelist_mapping: Dict, ex "common.schedulers.slurm_commands._get_all_partition_nodes", return_value=",".join([nodelist for partition, nodelist in partition_nodelist_mapping.items()]), ) - # Mock check_command_output call performed in get_nodes_info() - check_command_output_mocked = mocker.patch( - "common.schedulers.slurm_commands.check_command_output", + # Mock the standalone scontrol invocation performed in get_nodes_info() + run_scontrol_command_mocked = mocker.patch( + "common.schedulers.slurm_commands._run_scontrol_command", + return_value="", autospec=True, ) get_nodes_info(nodes, cmd_timeout) - check_command_output_mocked.assert_called_with(expected_command, timeout=cmd_timeout, shell=True) + run_scontrol_command_mocked.assert_called_with(expected_scontrol_args, command_timeout=cmd_timeout) @pytest.mark.parametrize( - "nodes, cmd_timeout, run_command_call, run_command_side_effect, expected_exception", + "nodes, cmd_timeout, expected_scontrol_args, expected_exception", [ ( "node1 node2", 30, - f"{SCONTROL} show nodes node1 node2 | {SCONTROL_OUTPUT_AWK_PARSER}", - None, + "show nodes node1 node2", None, ), ( "node1 & rm -rf / node2", None, None, - None, ValueError, ), ], ) -def test_get_nodes_info_argument_validation( - nodes, cmd_timeout, run_command_call, run_command_side_effect, expected_exception, mocker -): +def test_get_nodes_info_argument_validation(nodes, cmd_timeout, expected_scontrol_args, expected_exception, mocker): if expected_exception is ValueError: with pytest.raises(ValueError): get_nodes_info(nodes, cmd_timeout) else: - check_command_output_mocked = mocker.patch( - "common.schedulers.slurm_commands.check_command_output", - side_effect=run_command_side_effect, + run_scontrol_command_mocked = mocker.patch( + "common.schedulers.slurm_commands._run_scontrol_command", + return_value="", autospec=True, ) get_nodes_info(nodes, cmd_timeout) - check_command_output_mocked.assert_called_with(run_command_call, timeout=30, shell=True) + run_scontrol_command_mocked.assert_called_with(expected_scontrol_args, command_timeout=30) @pytest.mark.parametrize( @@ -1351,49 +1371,203 @@ def test_get_nodes_info_argument_validation( " ReservationName=root_5\n" ), ( - "NodeName=queue1-st-compute-resource-1-1\nNodeAddr=192.168.123.191\n" - "NodeHostName=queue1-st-compute-resource-1-1\nState=DOWN+CLOUD+REBOOT_ISSUED\nPartitions=queue1\n" - "SlurmdStartTime=2023-01-26T09:57:15\n" - "LastBusyTime=2023-01-26T09:57:15\n" - "Reason=Reboot ASAP : reboot issued [slurm@2023-01-26T10:11:39]\n######\n" - "NodeName=queue1-st-compute-resource-1-2\nNodeAddr=192.168.123.192\n" - "NodeHostName=queue1-st-compute-resource-1-2\nState=DOWN+CLOUD+REBOOT_ISSUED\nPartitions=queue1\n" - "SlurmdStartTime=2023-01-26T09:57:16\n" - "LastBusyTime=Unknown\n" - "Reason=Reboot ASAP : reboot issued [slurm@2023-01-26T10:11:40]\n######\n" - "NodeName=queue1-st-crt2micro-1\nNodeAddr=10.0.236.182\n" - "NodeHostName=queue1-st-crt2micro-1\nState=IDLE+CLOUD+MAINTENANCE+RESERVED\nPartitions=queue1\n" - "SlurmdStartTime=2023-10-13T10:13:17\n" - "LastBusyTime=2023-10-13T10:13:20\nReservationName=root_5######\n" + [ + { + "NodeName": "queue1-st-compute-resource-1-1", + "NodeAddr": "192.168.123.191", + "NodeHostName": "queue1-st-compute-resource-1-1", + "State": "DOWN+CLOUD+REBOOT_ISSUED", + "Partitions": "queue1", + "SlurmdStartTime": "2023-01-26T09:57:15", + "LastBusyTime": "2023-01-26T09:57:15", + "Reason": "Reboot ASAP : reboot issued [slurm@2023-01-26T10:11:39]", + }, + { + "NodeName": "queue1-st-compute-resource-1-2", + "NodeAddr": "192.168.123.192", + "NodeHostName": "queue1-st-compute-resource-1-2", + "State": "DOWN+CLOUD+REBOOT_ISSUED", + "Partitions": "queue1", + "SlurmdStartTime": "2023-01-26T09:57:16", + "LastBusyTime": "Unknown", + "Reason": "Reboot ASAP : reboot issued [slurm@2023-01-26T10:11:40]", + }, + { + "NodeName": "queue1-st-crt2micro-1", + "NodeAddr": "10.0.236.182", + "NodeHostName": "queue1-st-crt2micro-1", + "State": "IDLE+CLOUD+MAINTENANCE+RESERVED", + "Partitions": "queue1", + "SlurmdStartTime": "2023-10-13T10:13:17", + "LastBusyTime": "2023-10-13T10:13:20", + "ReservationName": "root_5", + }, + ] ), ) ], ) -def test_scontrol_output_awk_parser(scontrol_output, expected_parsed_output): - # This test makes use of grep option -P that is only supported by GNU grep. - # To have the test passing on MacOS you have to install GNU Grep: brew install grep. - scontrol_awk_parser = ( - SCONTROL_OUTPUT_AWK_PARSER - if platform.system() != "Darwin" - else SCONTROL_OUTPUT_AWK_PARSER.replace("grep", "ggrep") - ) - parsed_output = check_command_output(f'echo "{scontrol_output}" | {scontrol_awk_parser}', shell=True) +def test_extract_scontrol_records(scontrol_output, expected_parsed_output): + # _extract_scontrol_records reproduces in pure Python the record splitting and field extraction that was + # previously done via an awk/grep shell pipeline, so no external awk/grep is required. + parsed_output = _extract_scontrol_records(scontrol_output, SCONTROL_NODE_INFO_FIELD_REGEX) assert_that(parsed_output).is_equal_to(expected_parsed_output) @pytest.mark.parametrize( - "partitions, expected_grep_filter", + "raw_partition_info, expected_records", [ pytest.param( - ["queue1", "queue2", "queue3"], - ' | grep -e "PartitionName=queue1" -e "PartitionName=queue2" -e "PartitionName=queue3"', - id="Regular list of partitions", + "PartitionName=queue1\n" + " AllocNodes=ALL Default=YES QoS=N/A\n" + " State=UP TotalCPUs=10 TotalNodes=5\n" + "\n" + "PartitionName=queue2\n" + " AllocNodes=ALL Default=NO QoS=N/A\n" + " State=INACTIVE TotalCPUs=20 TotalNodes=10\n", + [ + {"PartitionName": "queue1", "State": "UP"}, + {"PartitionName": "queue2", "State": "INACTIVE"}, + ], + id="Multi-line partitions split on blank lines", ), - pytest.param([], "", id="Empty list of partitions (it should not be possible as of 3.7.0)"), + pytest.param("", [], id="Empty scontrol output"), ], ) -def test_grep_partition_filter(partitions: List[str], expected_grep_filter: str): - assert_that(_get_partition_grep_filter(partitions)).is_equal_to(expected_grep_filter) +def test_extract_scontrol_records_partitions(raw_partition_info, expected_records): + records = _extract_scontrol_records(raw_partition_info, SCONTROL_PARTITION_INFO_FIELD_REGEX) + assert_that(records).is_equal_to(expected_records) + + +@pytest.mark.parametrize( + "partition_records, managed_partitions, expected_partitions_info", + [ + pytest.param( + [ + {"PartitionName": "queue1", "State": "UP"}, + {"PartitionName": "queue2", "State": "INACTIVE"}, + {"PartitionName": "queue3", "State": "UP"}, + ], + {"queue1", "queue3"}, + [("queue1", "UP"), ("queue3", "UP")], + id="Filter to managed partitions only", + ), + pytest.param( + [ + {"PartitionName": "queue1", "State": "UP"}, + {"PartitionName": "queue2", "State": "INACTIVE"}, + ], + None, + [("queue1", "UP"), ("queue2", "INACTIVE")], + id="No filter returns all partitions", + ), + pytest.param( + [{"State": "UP"}, {"PartitionName": "queue1"}], + None, + [], + id="Records missing name or state are skipped", + ), + pytest.param([], {"queue1"}, [], id="No records"), + ], +) +def test_parse_partitions_info(partition_records, managed_partitions, expected_partitions_info): + assert_that(_parse_partitions_info(partition_records, managed_partitions)).is_equal_to(expected_partitions_info) + + +def _completed_process(returncode, stdout, stderr): + return subprocess.CompletedProcess(args=["scontrol"], returncode=returncode, stdout=stdout, stderr=stderr) + + +def test_run_scontrol_command_success_with_output(mocker): + subprocess_run_mocked = mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + return_value=_completed_process(0, "NodeName=q1-st-c5-1\n", ""), + ) + output = _run_scontrol_command("show nodes q1-st-c5-1", command_timeout=15) + assert_that(output).is_equal_to("NodeName=q1-st-c5-1\n") + # scontrol is run as a standalone command (no shell), split into args + args, kwargs = subprocess_run_mocked.call_args + assert_that(args[0]).is_equal_to(SCONTROL.split() + ["show", "nodes", "q1-st-c5-1"]) + assert_that(kwargs["shell"] if "shell" in kwargs else False).is_false() + assert_that(kwargs["timeout"]).is_equal_to(15) + + +def test_run_scontrol_command_success_empty_output_returns_empty(mocker): + # scontrol exiting 0 with no output is returned as-is (no error). In practice scontrol prints a + # "No in the system" line for empty results, so this is a defensive case. + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + return_value=_completed_process(0, "", ""), + ) + assert_that(_run_scontrol_command("show nodes q1-st-c5-1")).is_equal_to("") + + +def test_run_scontrol_command_nonzero_no_output_raises_and_logs_stderr(mocker, caplog): + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + return_value=_completed_process(1, "", "sudo: a password is required"), + ) + with pytest.raises(subprocess.CalledProcessError): + _run_scontrol_command("show nodes q1-st-c5-1") + # The failure must be logged with scontrol's exit code and stderr so it is diagnosable. + assert_that(caplog.text).contains("failed with exit code 1") + assert_that(caplog.text).contains("sudo: a password is required") + + +def test_run_scontrol_command_nonzero_no_output_no_raise(mocker, caplog): + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + return_value=_completed_process(1, "", "some error"), + ) + output = _run_scontrol_command("show reservations", raise_on_error=False) + assert_that(output).is_equal_to("") + assert_that(caplog.text).contains("failed with exit code 1") + + +def test_run_scontrol_command_nonzero_with_output_warns_and_returns(mocker, caplog): + # Slurm prints valid data for existing nodes but exits 1 when a requested node is not found. This must be + # tolerated (output returned) and only warned about, distinct from a hard failure. + stdout = "NodeName=q1-st-c5-1 State=IDLE+CLOUD\nNode q1-st-c5-2 not found\n" + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + return_value=_completed_process(1, stdout, ""), + ) + output = _run_scontrol_command("show nodes q1-st-c5-1,q1-st-c5-2") + assert_that(output).is_equal_to(stdout) + assert_that(caplog.text).contains("returned non-zero exit code 1 but produced output") + + +def test_run_scontrol_command_timeout_raises_and_logs(mocker, caplog): + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="scontrol", timeout=30), + ) + with pytest.raises(subprocess.TimeoutExpired): + _run_scontrol_command("show nodes q1-st-c5-1", command_timeout=30) + assert_that(caplog.text).contains("timed out after 30 seconds") + + +def test_run_scontrol_command_oserror_raises_and_logs(mocker, caplog): + # scontrol/sudo binary missing or not executable: subprocess.run raises an OSError before any exit code. + mocker.patch( + "common.schedulers.slurm_commands.subprocess.run", + side_effect=FileNotFoundError("No such file or directory: 'sudo'"), + ) + with pytest.raises(OSError): + _run_scontrol_command("show nodes q1-st-c5-1") + assert_that(caplog.text).contains("Unable to execute scontrol command") + + +def test_get_nodes_info_parsing_failure_propagates(mocker): + # If scontrol returns a field the parser cannot interpret (e.g. an unparseable date), the error surfaces + # instead of being silently masked by the (former) shell pipeline. + mocker.patch( + "common.schedulers.slurm_commands._run_scontrol_command", + return_value="NodeName=q1-st-c5-1\nSlurmdStartTime=not-a-valid-date\n", + autospec=True, + ) + with pytest.raises(ValueError): + get_nodes_info("q1-st-c5-1") class TestPartitionNodelistMapping: diff --git a/tests/common/schedulers/test_slurm_reservation_commands.py b/tests/common/schedulers/test_slurm_reservation_commands.py index a3651284..e600056b 100644 --- a/tests/common/schedulers/test_slurm_reservation_commands.py +++ b/tests/common/schedulers/test_slurm_reservation_commands.py @@ -16,7 +16,6 @@ from assertpy import assert_that from common.schedulers.slurm_commands import DEFAULT_SCONTROL_COMMAND_TIMEOUT, SCONTROL from common.schedulers.slurm_reservation_commands import ( - SCONTROL_SHOW_RESERVATION_OUTPUT_AWK_PARSER, _add_param, _create_or_update_reservation, _parse_reservations_info, @@ -474,38 +473,50 @@ def test_is_slurm_reservation( def test_get_slurm_reservations_info(mocker): - # Mock check_command_output call performed in get_slurm_reservations_info() - check_command_output_mocked = mocker.patch( - "common.schedulers.slurm_reservation_commands.check_command_output", autospec=True + # scontrol is now run as a standalone subprocess (no shell pipeline) and parsed in Python. + raw_reservations_output = ( + "ReservationName=root_8 StartTime=2023-10-25T09:46:49 EndTime=2024-10-24T09:46:49 Duration=365-00:00:00\n" + " Nodes=queuep4d-dy-crp4d-[1-5] NodeCnt=5 CoreCnt=480 Features=(null) PartitionName=(null) Flags=MAINT\n" + " TRES=cpu=480\n" + " Users=root Groups=(null) Accounts=(null) Licenses=(null) State=ACTIVE BurstBuffer=(null) Watts=n/a\n" + " MaxStartDelay=(null)\n" ) - get_slurm_reservations_info() - expected_cmd = f"{SCONTROL} show reservations | {SCONTROL_SHOW_RESERVATION_OUTPUT_AWK_PARSER}" - check_command_output_mocked.assert_called_with( - expected_cmd, raise_on_error=True, timeout=DEFAULT_SCONTROL_COMMAND_TIMEOUT, shell=True + run_scontrol_command_mocked = mocker.patch( + "common.schedulers.slurm_reservation_commands._run_scontrol_command", + return_value=raw_reservations_output, + autospec=True, ) + reservations = get_slurm_reservations_info() + run_scontrol_command_mocked.assert_called_with( + "show reservations", command_timeout=DEFAULT_SCONTROL_COMMAND_TIMEOUT, raise_on_error=True + ) + assert_that(reservations).is_equal_to([SlurmReservation("root_8", "ACTIVE", "queuep4d-dy-crp4d-[1-5]", "root")]) + + +def test_get_slurm_reservations_info_no_reservations(mocker): + # When there are no reservations, scontrol prints "No reservations in the system" and exits 0; this must + # parse to an empty list rather than raising. + mocker.patch( + "common.schedulers.slurm_reservation_commands._run_scontrol_command", + return_value="No reservations in the system\n", + autospec=True, + ) + assert_that(get_slurm_reservations_info()).is_equal_to([]) @pytest.mark.parametrize( - "reservations_info, expected_parsed_reservations_output", + "reservation_records, expected_parsed_reservations_output", [ - ("######\n", []), + ([], []), ( - "ReservationName=root_8\nNodes=queuep4d-dy-crp4d-[1-5]\nUsers=root\nState=ACTIVE\n######\n", + [{"ReservationName": "root_8", "Nodes": "queuep4d-dy-crp4d-[1-5]", "Users": "root", "State": "ACTIVE"}], [SlurmReservation("root_8", "ACTIVE", "queuep4d-dy-crp4d-[1-5]", "root")], ), ( - ( - "ReservationName=root_8\n" - "Nodes=queuep4d-dy-crp4d-[1-5]\n" - "Users=root\n" - "State=ACTIVE\n" - "######\n" - "ReservationName=root_9\n" - "Nodes=queue1-st-crt2micro-1\n" - "Users=root\n" - "State=ACTIVE\n" - "######\n" - ), + [ + {"ReservationName": "root_8", "Nodes": "queuep4d-dy-crp4d-[1-5]", "Users": "root", "State": "ACTIVE"}, + {"ReservationName": "root_9", "Nodes": "queue1-st-crt2micro-1", "Users": "root", "State": "ACTIVE"}, + ], [ SlurmReservation("root_8", "ACTIVE", "queuep4d-dy-crp4d-[1-5]", "root"), SlurmReservation("root_9", "ACTIVE", "queue1-st-crt2micro-1", "root"), @@ -513,6 +524,6 @@ def test_get_slurm_reservations_info(mocker): ), ], ) -def test_parse_reservations_info(reservations_info, expected_parsed_reservations_output): - parsed_info = _parse_reservations_info(reservations_info) +def test_parse_reservations_info(reservation_records, expected_parsed_reservations_output): + parsed_info = _parse_reservations_info(reservation_records) assert_that(parsed_info).is_equal_to(expected_parsed_reservations_output)