Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
**BUG FIXES**
- Fix clustermgtd failing to detect compute node bootstrap timeouts, which prevented the cluster from entering protected mode.
- Fix an issue where compute nodes are incorrectly replaced when launching a large number of nodes due to eventual consistency.
- Fix an issue where starting the compute fleet may not reliably recover the cluster from protected mode.

3.15.0
------
Expand Down
38 changes: 38 additions & 0 deletions src/common/schedulers/slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,44 @@ def update_all_partitions(state, reset_node_addrs_hostname):
return False


def reset_nodes_in_inactive_partitions():
"""
Reset nodeaddr/nodehostname and set to down the nodes belonging to INACTIVE partitions.

This is meant to be called when starting the compute fleet, before bringing partitions back UP, to clean up nodes
that an INACTIVE partition may have left behind (e.g. dynamic nodes that were powering up without a backing
instance when protected mode disabled the partition). If not reset, those nodes would be detected as bootstrap
failures right after start and send the cluster back into protected mode. Only INACTIVE partitions are considered,
so nodes of active partitions (which may be running jobs) are not affected.
"""
try:
# Collect the node names of INACTIVE partitions, skipping partitions with no nodes to avoid building a
# malformed (e.g. empty or with consecutive commas) node list.
inactive_partition_nodes = [
part.nodenames
for part in get_partitions_info()
if PartitionStatus(part.state) == PartitionStatus.INACTIVE and part.nodenames
]
if not inactive_partition_nodes:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a log line saying that there is no nodes in inactive partitions to reset?
I know it may seem a minor, but considering that we are adding this logic to fix a race condition, it may be useuful for the troubleshooting to know exactly what the daemons sees when it checks

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

log.info("No nodes in INACTIVE partitions, nothing to reset.")
return
# Reset only the nodes that actually need it (e.g. nodeaddr still set), mirroring clustermgtd's inactive
# cleanup.
nodes_to_reset = {
node.name for node in get_nodes_info(",".join(inactive_partition_nodes)) if node.needs_reset_when_inactive()
}
if nodes_to_reset:
log.info(
"Resetting nodeaddr/nodehostname and setting to down the following nodes of INACTIVE partitions: %s",
sorted(nodes_to_reset),
)
reset_nodes(nodes_to_reset, state="down", reason="inactive partition", raise_on_error=False)
else:
log.info("No nodes of INACTIVE partitions need to be reset.")
except Exception as e:
log.error("Failed when resetting nodes of INACTIVE partitions with error %s", e)


def _batch_attribute(attribute, batch_size, expected_length=None):
"""Parse an attribute into batches."""
if type(attribute) is str:
Expand Down
7 changes: 6 additions & 1 deletion src/slurm_plugin/fleet_status_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from logging.config import fileConfig

from botocore.config import Config
from common.schedulers.slurm_commands import resume_powering_down_nodes, update_all_partitions
from common.schedulers.slurm_commands import (
reset_nodes_in_inactive_partitions,
resume_powering_down_nodes,
update_all_partitions,
)
from slurm_plugin.clustermgtd import ComputeFleetStatus, ComputeFleetStatusManager
from slurm_plugin.common import log_exception
from slurm_plugin.instance_manager import InstanceManager
Expand Down Expand Up @@ -91,6 +95,7 @@ def _manage_fleet_status_transition(config, computefleet_status_data_path):

def _start_partitions():
log.info("Setting slurm partitions to UP and resuming nodes...")
reset_nodes_in_inactive_partitions()
update_all_partitions(PartitionStatus.UP, reset_node_addrs_hostname=False)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We explicitly set reset_node_addrs_hostname=False. which seems the opposite of what we are doing by adding reset_nodes_in_inactive_partitions. Why do we think this is not a contradiction and it is fine?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_all_partitions(UP, reset_node_addrs_hostname=False) only flips the partition state and intentionally leaves node addrs alone. And the node reset is a separate, explicit step (reset_nodes_in_inactive_partitions) scoped only to INACTIVE partitions and executed before the partitions are brought back UP.

So not a contradiction.

# TODO: This function was added due to Slurm ticket 12915. The bug is not reproducible and the ticket was then
# closed. This operation may now be useless: we need to check this.
Expand Down
69 changes: 69 additions & 0 deletions tests/common/schedulers/test_slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
get_nodes_info,
is_static_node,
parse_nodename,
reset_nodes_in_inactive_partitions,
resume_powering_down_nodes,
set_nodes_down,
set_nodes_drain,
Expand Down Expand Up @@ -976,6 +977,74 @@ def test_update_all_partitions(
update_partitions_spy.assert_called_with(partitions_to_update, state)


@pytest.mark.parametrize(
("mock_partitions", "expected_get_nodes_arg", "inactive_nodes_info", "expected_reset_nodes"),
[
# Only INACTIVE partitions are considered; the UP partition's nodes are not queried/reset. Among the INACTIVE
# nodes, only those needing reset (nodeaddr still set) are reset.
(
[
SlurmPartition("queue1", "queue1-dy-c5xlarge-1,queue1-dy-c5xlarge-2", "INACTIVE"),
SlurmPartition("queue2", "queue2-dy-c5xlarge-1", "UP"),
SlurmPartition("queue3", "queue3-dy-c5xlarge-1", "INACTIVE"),
],
"queue1-dy-c5xlarge-1,queue1-dy-c5xlarge-2,queue3-dy-c5xlarge-1",
[
# nodeaddr set (dirty) -> needs reset
DynamicNode("queue1-dy-c5xlarge-1", "1.2.3.4", "1.2.3.4", "IDLE+CLOUD+POWERING_UP", "queue1"),
# nodeaddr already equal to name and powered down (clean) -> does not need reset
DynamicNode(
"queue1-dy-c5xlarge-2", "queue1-dy-c5xlarge-2", "queue1-dy-c5xlarge-2", "DOWN+CLOUD", "queue1"
),
DynamicNode("queue3-dy-c5xlarge-1", "1.2.3.5", "1.2.3.5", "IDLE+CLOUD+POWERING_UP", "queue3"),
],
{"queue1-dy-c5xlarge-1", "queue3-dy-c5xlarge-1"},
),
# No INACTIVE partition: get_nodes_info / reset_nodes are not called at all.
(
[
SlurmPartition("queue1", "queue1-dy-c5xlarge-1", "UP"),
SlurmPartition("queue2", "queue2-dy-c5xlarge-1", "UP"),
],
None,
[],
None,
),
# INACTIVE partition with no nodes: skipped so the node list is never malformed, nothing is queried/reset.
(
[
SlurmPartition("queue1", "", "INACTIVE"),
],
None,
[],
None,
),
],
)
def test_reset_nodes_in_inactive_partitions(
mock_partitions, expected_get_nodes_arg, inactive_nodes_info, expected_reset_nodes, mocker
):
reset_nodes_spy = mocker.patch("common.schedulers.slurm_commands.reset_nodes", autospec=True)
get_nodes_info_spy = mocker.patch(
"common.schedulers.slurm_commands.get_nodes_info", return_value=inactive_nodes_info, autospec=True
)
mocker.patch("common.schedulers.slurm_commands.get_partitions_info", return_value=mock_partitions, autospec=True)

reset_nodes_in_inactive_partitions()

if expected_get_nodes_arg is None:
get_nodes_info_spy.assert_not_called()
reset_nodes_spy.assert_not_called()
else:
get_nodes_info_spy.assert_called_once_with(expected_get_nodes_arg)
if expected_reset_nodes:
reset_nodes_spy.assert_called_once_with(
expected_reset_nodes, state="down", reason="inactive partition", raise_on_error=False
)
else:
reset_nodes_spy.assert_not_called()


def test_resume_powering_down_nodes(mocker):
get_slurm_nodes_mocked = mocker.patch("common.schedulers.slurm_commands._get_slurm_nodes", autospec=True)
update_nodes_mocked = mocker.patch("common.schedulers.slurm_commands.update_nodes", autospec=True)
Expand Down
25 changes: 21 additions & 4 deletions tests/slurm_plugin/test_fleet_status_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,30 @@ def test_get_computefleet_status(test_datadir, config_file, expected_status):


def test_start_partitions(mocker):
update_all_partitions_mocked = mocker.patch("slurm_plugin.fleet_status_manager.update_all_partitions")
resume_powering_down_nodes_mocked = mocker.patch("slurm_plugin.fleet_status_manager.resume_powering_down_nodes")
# Attach all mocks to a single manager so we can assert the order in which they are called.
manager = mocker.MagicMock()
manager.attach_mock(
mocker.patch("slurm_plugin.fleet_status_manager.reset_nodes_in_inactive_partitions"),
"reset_nodes_in_inactive_partitions",
)
manager.attach_mock(
mocker.patch("slurm_plugin.fleet_status_manager.update_all_partitions"), "update_all_partitions"
)
manager.attach_mock(
mocker.patch("slurm_plugin.fleet_status_manager.resume_powering_down_nodes"), "resume_powering_down_nodes"
)

_start_partitions()

update_all_partitions_mocked.assert_called_once_with(PartitionStatus.UP, reset_node_addrs_hostname=False)
resume_powering_down_nodes_mocked.assert_called_once()
# INACTIVE partition nodes must be reset BEFORE partitions are brought back UP, otherwise nodes left behind by
# protected mode would be re-detected as bootstrap failures right after start and re-trigger protected mode.
assert_that(manager.mock_calls).is_equal_to(
[
mocker.call.reset_nodes_in_inactive_partitions(),
mocker.call.update_all_partitions(PartitionStatus.UP, reset_node_addrs_hostname=False),
mocker.call.resume_powering_down_nodes(),
]
)


def test_stop_partitions(mocker):
Expand Down
Loading