Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 146 additions & 1 deletion test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.coordinator.base import UnjoinedGroupException
from kafka.net.compat import KafkaNetClient
from kafka.net.manager import KafkaConnectionManager
from kafka.protocol.consumer import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse,
JoinGroupRequest, JoinGroupResponse,
SyncGroupRequest, SyncGroupResponse,
HeartbeatRequest, HeartbeatResponse,
)
from kafka.protocol.metadata import (
FindCoordinatorRequest, FindCoordinatorResponse, MetadataResponse,
)
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod

from test.mock_broker import MockCluster


@pytest.fixture
def coordinator(broker, client, metrics):
Expand Down Expand Up @@ -2442,3 +2449,141 @@ def test_close_no_autocommit_still_revokes(self, mocker, coordinator):
coordinator._maybe_auto_commit_offsets_sync.assert_not_called()
listener.on_partitions_revoked.assert_called_once_with(
{TopicPartition('t', 0)})


# ---------------------------------------------------------------------------
# Heartbeat coordinator failover
# (https://github.com/dpkp/kafka-python/issues/1134)
# ---------------------------------------------------------------------------


def _seed_stable_group(coordinator, coordinator_id=0):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator.coordinator_id = coordinator_id
coordinator._generation = Generation(1, 'member-1', b'')
coordinator.state = MemberState.STABLE
coordinator.rejoin_needed = False


def _find_coordinator_response(group_id, broker):
"""A successful FindCoordinatorResponse naming `broker` as coordinator.

Top-level fields cover v0-v3; the coordinators array covers v4+
(whichever version the connection negotiates)."""
Coordinator = FindCoordinatorResponse.Coordinator
return FindCoordinatorResponse(
throttle_time_ms=0, error_code=0, error_message=None,
node_id=broker.node_id, host=broker.host, port=broker.port,
coordinators=[Coordinator(
key=group_id, node_id=broker.node_id,
host=broker.host, port=broker.port,
error_code=0, error_message=None)])


def _run_heartbeat_loop_until(coordinator, condition, timeout=5.0):
"""Run the real _heartbeat_loop coroutine until condition() or timeout,
then shut the loop down cleanly."""
net = coordinator._manager._net
coordinator._maybe_start_heartbeat_loop()
coordinator._enable_heartbeat()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline and not condition():
net.poll(timeout_ms=10)
coordinator._close_heartbeat()
deadline = time.monotonic() + 2
while time.monotonic() < deadline and not coordinator._heartbeat_loop_future.is_done:
net.poll(timeout_ms=10)


def test_heartbeat_coordinator_not_available_recovers(mocker, broker, client, metrics):
"""A heartbeat answered with CoordinatorNotAvailableError must mark the
coordinator dead, rediscover it via FindCoordinator, and resume
heartbeating -- not retry forever against the dead coordinator."""
coordinator = ConsumerCoordinator(
client, SubscriptionState(), metrics=metrics,
api_version=broker.broker_version,
heartbeat_interval_ms=10, retry_backoff_ms=10)
try:
client._manager.bootstrap(timeout_ms=5000)
_seed_stable_group(coordinator)
dead_spy = mocker.spy(coordinator, 'coordinator_dead')

# First heartbeat: the broker reports the coordinator gone.
broker.respond(HeartbeatRequest, HeartbeatResponse(
throttle_time_ms=0,
error_code=Errors.CoordinatorNotAvailableError.errno))
# Rediscovery: this broker is (still) the coordinator.
broker.respond_always(
FindCoordinatorRequest,
_find_coordinator_response(coordinator.group_id, broker))
# All heartbeats after the scripted failure succeed.
recovered = []
def ok_heartbeat(api_key, api_version, correlation_id, request_bytes):
recovered.append(correlation_id)
return HeartbeatResponse(throttle_time_ms=0, error_code=0)
broker.respond_always(HeartbeatRequest, ok_heartbeat)

_run_heartbeat_loop_until(coordinator, lambda: recovered)

assert recovered, 'heartbeat never resumed after coordinator rediscovery'
assert isinstance(dead_spy.call_args_list[0][0][0],
Errors.CoordinatorNotAvailableError)
# Rediscovered via FindCoordinator (synthesized coordinator node id).
assert coordinator.coordinator_id == 'coordinator-0'
# The error does not invalidate the generation or group membership.
assert coordinator.state is MemberState.STABLE
assert not coordinator._generation.is_lost()
finally:
coordinator._close_heartbeat()
coordinator.reset_generation() # skip LeaveGroup wire traffic in close()
coordinator.close(timeout_ms=0)


def test_heartbeat_coordinator_broker_failover(mocker, net, metrics):
"""When the coordinator broker dies (connections dropped and refused),
the consumer must fail the heartbeat at the transport level, rediscover
the coordinator on a surviving broker, and resume heartbeating against
the newly elected coordinator."""
mock_cluster = MockCluster(num_brokers=2)
manager = KafkaConnectionManager(
net,
bootstrap_servers=mock_cluster.bootstrap_servers(),
api_version=mock_cluster.broker_version,
request_timeout_ms=5000)
mock_cluster.attach(manager)
client = KafkaNetClient(net=net, manager=manager)
coordinator = ConsumerCoordinator(
client, SubscriptionState(), metrics=metrics,
api_version=mock_cluster.broker_version,
heartbeat_interval_ms=10, retry_backoff_ms=10)
try:
manager.bootstrap(timeout_ms=5000)
_seed_stable_group(coordinator, coordinator_id=0)
dead_spy = mocker.spy(coordinator, 'coordinator_dead')

# Broker 0 (the coordinator) dies before the next heartbeat, and the
# cluster elects broker 1: surviving brokers answer FindCoordinator
# with the new coordinator.
mock_cluster[0].stop()
mock_cluster.set_coordinator(coordinator.group_id, 1)
recovered = []
def ok_heartbeat(api_key, api_version, correlation_id, request_bytes):
recovered.append(correlation_id)
return HeartbeatResponse(throttle_time_ms=0, error_code=0)
mock_cluster[1].respond_always(HeartbeatRequest, ok_heartbeat)

_run_heartbeat_loop_until(coordinator, lambda: recovered)

assert recovered, 'heartbeat never resumed on the new coordinator'
# The failed send to the dead broker marked the coordinator dead...
assert dead_spy.call_count >= 1
# ...and rediscovery pointed at broker 1.
assert coordinator.coordinator_id == 'coordinator-1'
assert coordinator.state is MemberState.STABLE
assert not coordinator._generation.is_lost()
finally:
coordinator._close_heartbeat()
coordinator.reset_generation()
coordinator.close(timeout_ms=0)
client.close()
manager.close()
Loading
Loading