From 63822ed323c45bfcd47da5d654bada04d4081502 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 12 Jun 2026 11:55:10 -0700 Subject: [PATCH 1/2] Add MockCluster for multi-broker testing --- test/mock_broker.py | 261 +++++++++++++++++++++++++++++++++++++-- test/test_mock_broker.py | 162 +++++++++++++++++++++++- 2 files changed, 414 insertions(+), 9 deletions(-) diff --git a/test/mock_broker.py b/test/mock_broker.py index ab5aca97d..97bc5590e 100644 --- a/test/mock_broker.py +++ b/test/mock_broker.py @@ -17,6 +17,15 @@ broker.attach(manager) # Or use the consumer/client factory fixtures for a one-liner setup. + +For multi-broker scenarios (coordinator failover, leader movement), use +MockCluster, which routes connections to the matching broker by (host, port):: + + cluster = MockCluster(num_brokers=2) + cluster.attach(manager) + cluster.set_coordinator('my-group', 0) # brokers answer FindCoordinator + cluster[0].stop() # kill broker 0: live connections abort, reconnects fail + cluster.set_coordinator('my-group', 1) # "election": broker 1 takes over """ import collections @@ -24,11 +33,16 @@ import logging import struct import time +import weakref import kafka.errors as Errors from kafka.protocol.admin import DescribeClusterRequest, DescribeClusterResponse from kafka.protocol.broker_version_data import BrokerVersionData -from kafka.protocol.metadata import ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse +from kafka.protocol.metadata import ( + ApiVersionsRequest, ApiVersionsResponse, CoordinatorType, + FindCoordinatorRequest, FindCoordinatorResponse, + MetadataRequest, MetadataResponse, +) class _MockBrokerFailure: @@ -73,6 +87,7 @@ def __init__(self, net, broker, node_id=0, host='localhost', port=9092): self._write_buffer = bytearray() self.last_read = time.monotonic() self.last_write = time.monotonic() + broker._register_transport(self) @property def last_activity(self): @@ -209,10 +224,20 @@ def __init__(self, node_id=0, host='localhost', port=9092, broker_version=(4, 2) # Scripted response queue: list of (api_key, response_object) pairs self._response_queue = collections.deque() + # Persistent responses checked after the scripted queue: api_key -> response/fn + self._always_responses = {} + + # Broker lifecycle: stop() flips this and aborts live transports + self.online = True + self._transports = weakref.WeakSet() + # Counters for debugging self.requests_received = 0 self.responses_sent = 0 + def _register_transport(self, transport): + self._transports.add(transport) + def set_broker_version(self, broker_version): self.broker_version = broker_version # Build the auto-response for ApiVersionsRequest @@ -299,6 +324,54 @@ def respond_fn(self, request_class, fn): """ self._response_queue.append((request_class.API_KEY, fn)) + def respond_always(self, request_class, response): + """Set a persistent response for all requests of the given type. + + Unlike :meth:`respond`, the response is not consumed -- every matching + request gets it. Scripted queue entries (:meth:`respond` / + :meth:`respond_fn` / :meth:`fail_next`) take precedence; built-in + auto-responses (ApiVersions / Metadata / DescribeCluster) are checked + after, so a persistent response can override them. Useful for periodic + traffic like heartbeats. + + Arguments: + request_class: The request class for API key matching. + response: A response object, or a callable / coroutine function + with the same signature as :meth:`respond_fn`. + """ + self._always_responses[request_class.API_KEY] = response + + def stop(self, error=None): + """Take the broker down, as if the process was killed. + + Aborts all live transports with ``error`` and refuses new connections + (an attached manager's ``_build_transport`` raises + ``KafkaConnectionError``) until :meth:`start` is called. This + exercises the client's real connection-lost, connect-failure, and + reconnect-backoff paths. + + Arguments: + error: Exception delivered to live transports' ``abort()``. + Defaults to ``Errors.KafkaConnectionError``. + """ + if error is None: + error = Errors.KafkaConnectionError('MockBroker stopped') + self.online = False + for transport in list(self._transports): + if transport.is_closing(): + continue + # abort() must run on the event loop: connection_lost mutates + # state the loop owns. call_soon_threadsafe works both when the + # loop runs on an IO thread and when a test drives poll() inline. + try: + transport._net.call_soon_threadsafe(lambda t=transport: t.abort(error)) + except RuntimeError: + pass # selector already closed; nothing left to abort + + def start(self): + """Bring a stopped broker back online. New connections succeed again.""" + self.online = True + def fail_next(self, request_class, error=None): """Enqueue a transport failure for the next request of the given type. @@ -338,15 +411,17 @@ async def handle_request(self, api_key, api_version, correlation_id, request_byt del self._response_queue[i] if isinstance(queued_response, _MockBrokerFailure): return queued_response - if callable(queued_response): - response = queued_response(api_key, api_version, correlation_id, request_bytes) - # Support both sync and async respond_fn callables - if hasattr(response, '__await__'): - response = await response - else: - response = queued_response + response = await self._resolve_response( + queued_response, api_key, api_version, correlation_id, request_bytes) return self._encode_response(response, api_version, correlation_id) + # Then persistent responses set via respond_always + if api_key in self._always_responses: + response = await self._resolve_response( + self._always_responses[api_key], api_key, api_version, + correlation_id, request_bytes) + return self._encode_response(response, api_version, correlation_id) + # Fall back to auto-responses if api_key == ApiVersionsRequest.API_KEY: if api_version > self._api_versions_max: @@ -389,6 +464,17 @@ async def handle_request(self, api_key, api_version, correlation_id, request_byt len(self._response_queue), [(k, type(r).__name__) for k, r in self._response_queue])) + @staticmethod + async def _resolve_response(response, api_key, api_version, correlation_id, request_bytes): + """Resolve a scripted entry to a response object, calling/awaiting + respond_fn-style callables as needed.""" + if callable(response): + response = response(api_key, api_version, correlation_id, request_bytes) + # Support both sync and async respond_fn callables + if hasattr(response, '__await__'): + response = await response + return response + def attach(self, manager): """Monkey-patch a KafkaConnectionManager to route all connections through this MockBroker. @@ -403,6 +489,10 @@ def attach(self, manager): broker = self async def _mock_build_transport(node, timeout_at=None): + if not broker.online: + raise Errors.KafkaConnectionError( + 'connect to %s:%s refused (MockBroker stopped)' + % (node.host, node.port)) return MockTransport( manager._net, broker, node_id=node.node_id, host=node.host, port=node.port) @@ -448,3 +538,158 @@ def _encode_response(response, api_version, correlation_id): r.API_VERSION = version r.with_header(correlation_id=correlation_id) return r.encode(header=True, framed=True) + + +class MockCluster: + """A set of MockBrokers routed by (host, port) for multi-broker tests. + + Broker ``i`` gets ``node_id=i`` and ``port=base_port + i``. All brokers + advertise the same broker list via :meth:`set_metadata`; individual + brokers can still be given divergent metadata with + ``cluster[i].set_metadata(...)`` to simulate stale views. + + Scripting is per-broker via the usual MockBroker API, and brokers can be + taken down and brought back with ``cluster[i].stop()`` / ``start()``. + + Arguments: + num_brokers (int): Number of brokers. Default 3. + broker_version (tuple): Version for all brokers. Default ``(4, 2)``. + host (str): Hostname shared by all brokers. Default 'localhost'. + base_port (int): Port of broker 0. Default 9092. + """ + + def __init__(self, num_brokers=3, broker_version=(4, 2), host='localhost', base_port=9092): + self.broker_version = broker_version + self.brokers = [ + MockBroker(node_id=i, host=host, port=base_port + i, + broker_version=broker_version) + for i in range(num_brokers) + ] + self._by_addr = {(b.host, b.port): b for b in self.brokers} + self._coordinators = {} + self.set_metadata() + + def __getitem__(self, node_id): + return self.brokers[node_id] + + def __iter__(self): + return iter(self.brokers) + + def __len__(self): + return len(self.brokers) + + def bootstrap_servers(self): + """Comma-separated host:port list covering all brokers, for client config.""" + return ','.join('%s:%d' % (b.host, b.port) for b in self.brokers) + + def set_metadata(self, topics=None): + """Configure a consistent MetadataRequest auto-response on every broker. + + Arguments: + topics: List of MetadataResponseTopic objects to include. + """ + Broker = MetadataResponse.MetadataResponseBroker + brokers = [Broker(node_id=b.node_id, host=b.host, port=b.port, rack=None) + for b in self.brokers] + for b in self.brokers: + b.set_metadata(topics=topics, brokers=brokers) + + def set_coordinator(self, key, broker_id, key_type=CoordinatorType.GROUP): + """Name a broker as coordinator for a group / transactional id. + + After the first call, every broker auto-answers + ``FindCoordinatorRequest`` from the cluster's coordinator map + (scripted ``respond()`` / ``respond_fn()`` entries still take + precedence, and a per-broker ``respond_always()`` for + FindCoordinator overrides the cluster handler on that broker). + Keys without a mapping are answered with COORDINATOR_NOT_AVAILABLE, + so tests can model election gaps. + + The map does not react to :meth:`MockBroker.stop`: like a real + cluster mid-failover, surviving brokers keep naming the dead + coordinator until the test re-points the key. + + Arguments: + key (str): group_id or transactional_id. + broker_id (int or None): Index of the coordinator broker. None + clears the mapping (subsequent lookups get + COORDINATOR_NOT_AVAILABLE). + key_type (CoordinatorType or int): GROUP (0), TRANSACTION (1), + or SHARE (2). Default: GROUP. + """ + if broker_id is None: + self._coordinators.pop((int(key_type), key), None) + else: + self._coordinators[(int(key_type), key)] = self.brokers[broker_id].node_id + for b in self.brokers: + b.respond_always(FindCoordinatorRequest, self._handle_find_coordinator) + + def _handle_find_coordinator(self, api_key, api_version, correlation_id, request_bytes): + request = FindCoordinatorRequest.decode( + request_bytes, version=api_version, header=True) + # v4+ (KIP-699) carry the keys in an array; v0-v3 a single key field. + keys = list(request.coordinator_keys) or [request.key] + Coordinator = FindCoordinatorResponse.Coordinator + coordinators = [] + for key in keys: + broker_id = self._coordinators.get((request.key_type, key)) + if broker_id is None: + coordinators.append(Coordinator( + key=key, node_id=-1, host='', port=-1, + error_code=Errors.CoordinatorNotAvailableError.errno, + error_message=None)) + else: + broker = self.brokers[broker_id] + coordinators.append(Coordinator( + key=key, node_id=broker.node_id, host=broker.host, + port=broker.port, error_code=0, error_message=None)) + # Top-level fields serve v0-v3 (single key); the array serves v4+. + first = coordinators[0] + return FindCoordinatorResponse( + throttle_time_ms=0, + error_code=first.error_code, error_message=first.error_message, + node_id=first.node_id, host=first.host, port=first.port, + coordinators=coordinators) + + def attach(self, manager): + """Monkey-patch a KafkaConnectionManager to route each new connection + to the cluster member matching the target node's (host, port). + + Routing by address (rather than node_id) also covers bootstrap + connections, which use synthetic node ids like 'bootstrap-0', and + synthesized coordinator ids like 'coordinator-1'. Connection attempts + to an unknown address or a stopped broker raise + ``KafkaConnectionError``, like a real connect to a dead host. + + Arguments: + manager: A ``KafkaConnectionManager`` instance. + """ + cluster = self + + async def _mock_build_transport(node, timeout_at=None): + broker = cluster._by_addr.get((node.host, node.port)) + if broker is None or not broker.online: + raise Errors.KafkaConnectionError( + 'connect to %s:%s refused' % (node.host, node.port)) + return MockTransport( + manager._net, broker, + node_id=node.node_id, host=node.host, port=node.port) + + manager._build_transport = _mock_build_transport + + def client_factory(self): + """Return a callable suitable for passing as ``kafka_client=...`` to + ``KafkaConsumer``, ``KafkaProducer``, or ``KafkaAdminClient``. + + Pass ``bootstrap_servers=cluster.bootstrap_servers()`` alongside so + the client's bootstrap addresses match the cluster's brokers. + """ + cluster = self + + def factory(**kwargs): + from kafka.net.compat import KafkaNetClient + client = KafkaNetClient(**kwargs) + cluster.attach(client._manager) + return client + + return factory diff --git a/test/test_mock_broker.py b/test/test_mock_broker.py index 3e8a4469c..f8ebd5434 100644 --- a/test/test_mock_broker.py +++ b/test/test_mock_broker.py @@ -16,7 +16,7 @@ from kafka.protocol.metadata import MetadataRequest, MetadataResponse from kafka.structs import TopicPartition -from test.mock_broker import MockBroker +from test.mock_broker import MockBroker, MockCluster def _poll_for_future(client, future, timeout_ms=5000): @@ -283,6 +283,35 @@ def test_api_version_negotiation(self): finally: client.close() + def test_respond_always(self): + """respond_always serves every matching request; scripted queue wins.""" + broker = MockBroker() + Broker = MetadataResponse.MetadataResponseBroker + always = MetadataResponse( + version=8, throttle_time_ms=0, + brokers=[Broker(node_id=0, host='localhost', port=9092, rack=None)], + cluster_id='always-cluster', controller_id=0, topics=[]) + scripted = MetadataResponse( + version=8, throttle_time_ms=0, + brokers=[Broker(node_id=0, host='localhost', port=9092, rack=None)], + cluster_id='scripted-cluster', controller_id=0, topics=[]) + broker.respond_always(MetadataRequest, always) + client = self._make_client(broker) + try: + client.check_version(timeout_ms=5000) + cluster = client.cluster + broker.respond(MetadataRequest, scripted) + future = cluster.request_update() + _poll_for_future(client, future) + assert cluster.cluster_id == 'scripted-cluster' + # Persistent response is served on every subsequent request + for _ in range(2): + future = cluster.request_update() + _poll_for_future(client, future) + assert cluster.cluster_id == 'always-cluster' + finally: + client.close() + def test_admin_client_with_mock(self): """KafkaAdminClient works through MockBroker for basic operations.""" broker = MockBroker() @@ -313,3 +342,134 @@ def test_admin_client_with_mock(self): assert described[0]['name'] == 'admin-topic' finally: admin.close() + + +# --------------------------------------------------------------------------- +# MockCluster: multi-broker routing and lifecycle +# --------------------------------------------------------------------------- + + +class TestMockCluster: + + def _make_client(self, cluster): + client = KafkaNetClient( + bootstrap_servers=cluster.bootstrap_servers(), + api_version=cluster.broker_version, + request_timeout_ms=5000, + metadata_max_age_ms=300000, + ) + cluster.attach(client._manager) + return client + + def test_default_construction(self): + cluster = MockCluster(num_brokers=3) + assert len(cluster) == 3 + assert [b.node_id for b in cluster] == [0, 1, 2] + assert cluster[1].port == 9093 + assert cluster.bootstrap_servers() == 'localhost:9092,localhost:9093,localhost:9094' + # All brokers advertise the full broker list + for b in cluster: + assert len(b._metadata_response.brokers) == 3 + + def test_bootstrap_and_per_node_routing(self): + """Requests to node N land on broker N, not on a shared broker.""" + cluster = MockCluster(num_brokers=2) + client = self._make_client(cluster) + try: + client.check_version(timeout_ms=5000) + assert {b.node_id for b in client.cluster.brokers()} == {0, 1} + + for node_id in (0, 1): + before = cluster[node_id].requests_received + client.await_ready(node_id, timeout_ms=5000) + future = client.send(node_id, MetadataRequest(max_version=9)) + _poll_for_future(client, future) + assert future.succeeded() + assert cluster[node_id].requests_received > before + finally: + client.close() + + def test_set_coordinator(self): + """set_coordinator answers FindCoordinator per (key_type, key), with + COORDINATOR_NOT_AVAILABLE for unmapped keys, and re-points live.""" + import kafka.errors as Errors + from kafka.protocol.metadata import CoordinatorType, FindCoordinatorRequest + + cluster = MockCluster(num_brokers=2) + cluster.set_coordinator('my-group', 1) + cluster.set_coordinator('my-txn', 0, key_type=CoordinatorType.TRANSACTION) + client = self._make_client(cluster) + try: + client.check_version(timeout_ms=5000) + node_id = client.least_loaded_node(bootstrap_fallback=True) + client.await_ready(node_id, timeout_ms=5000) + + def find(key, key_type): + future = client.send(node_id, FindCoordinatorRequest( + key=key, key_type=key_type, coordinator_keys=[key])) + _poll_for_future(client, future) + assert future.succeeded() + response = future.value + return response.coordinators[0] if response.coordinators else response + + result = find('my-group', CoordinatorType.GROUP) + assert result.error_code == 0 + assert (result.node_id, result.port) == (1, 9093) + + # The same key maps independently per key_type. + result = find('my-txn', CoordinatorType.TRANSACTION) + assert (result.node_id, result.port) == (0, 9092) + result = find('my-txn', CoordinatorType.GROUP) + assert result.error_code == Errors.CoordinatorNotAvailableError.errno + + # Re-point after a simulated election. + cluster.set_coordinator('my-group', 0) + result = find('my-group', CoordinatorType.GROUP) + assert (result.node_id, result.port) == (0, 9092) + + # Clearing the mapping models an election gap. + cluster.set_coordinator('my-group', None) + result = find('my-group', CoordinatorType.GROUP) + assert result.error_code == Errors.CoordinatorNotAvailableError.errno + finally: + client.close() + + def test_stop_and_start(self): + """stop() aborts live connections and refuses new ones; start() recovers.""" + import kafka.errors as Errors + + cluster = MockCluster(num_brokers=2) + client = self._make_client(cluster) + try: + client.check_version(timeout_ms=5000) + client.await_ready(0, timeout_ms=5000) + client.await_ready(1, timeout_ms=5000) + + cluster[0].stop() + # In-flight request on the existing connection fails when the + # scheduled abort lands. + future = client.send(0, MetadataRequest(max_version=9)) + _poll_for_future(client, future) + assert future.failed() + assert not client.is_ready(0) + + # New connection attempts are refused while stopped. + with pytest.raises(Errors.KafkaConnectionError): + client.await_ready(0, timeout_ms=200) + + # Broker 1 is unaffected. + future = client.send(1, MetadataRequest(max_version=9)) + _poll_for_future(client, future) + assert future.succeeded() + + # After start(), the client reconnects once backoff expires. + cluster[0].start() + deadline = time.monotonic() + 5 + while time.monotonic() < deadline and not client.is_ready(0): + try: + client.await_ready(0, timeout_ms=500) + except Errors.KafkaConnectionError: + client.poll(timeout_ms=50) + assert client.is_ready(0) + finally: + client.close() From 21b09d27eec4f44d8cc3e4482d3730cd25a011af Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 12 Jun 2026 11:55:32 -0700 Subject: [PATCH 2/2] Test coordinator failover on CoordinatorNotAvailableError --- test/consumer/test_coordinator.py | 147 +++++++++++++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 45f042a38..38facda21 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -19,16 +19,23 @@ import kafka.errors as Errors from kafka.future import Future from kafka.coordinator.base import UnjoinedGroupException +from kafka.net.compat import KafkaNetClient +from kafka.net.manager import KafkaConnectionManager from kafka.protocol.consumer import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, JoinGroupRequest, JoinGroupResponse, SyncGroupRequest, SyncGroupResponse, + HeartbeatRequest, HeartbeatResponse, +) +from kafka.protocol.metadata import ( + FindCoordinatorRequest, FindCoordinatorResponse, MetadataResponse, ) -from kafka.protocol.metadata import MetadataResponse from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod +from test.mock_broker import MockCluster + @pytest.fixture def coordinator(broker, client, metrics): @@ -2442,3 +2449,141 @@ def test_close_no_autocommit_still_revokes(self, mocker, coordinator): coordinator._maybe_auto_commit_offsets_sync.assert_not_called() listener.on_partitions_revoked.assert_called_once_with( {TopicPartition('t', 0)}) + + +# --------------------------------------------------------------------------- +# Heartbeat coordinator failover +# (https://github.com/dpkp/kafka-python/issues/1134) +# --------------------------------------------------------------------------- + + +def _seed_stable_group(coordinator, coordinator_id=0): + coordinator._subscription.subscribe(topics=['foobar']) + coordinator.coordinator_id = coordinator_id + coordinator._generation = Generation(1, 'member-1', b'') + coordinator.state = MemberState.STABLE + coordinator.rejoin_needed = False + + +def _find_coordinator_response(group_id, broker): + """A successful FindCoordinatorResponse naming `broker` as coordinator. + + Top-level fields cover v0-v3; the coordinators array covers v4+ + (whichever version the connection negotiates).""" + Coordinator = FindCoordinatorResponse.Coordinator + return FindCoordinatorResponse( + throttle_time_ms=0, error_code=0, error_message=None, + node_id=broker.node_id, host=broker.host, port=broker.port, + coordinators=[Coordinator( + key=group_id, node_id=broker.node_id, + host=broker.host, port=broker.port, + error_code=0, error_message=None)]) + + +def _run_heartbeat_loop_until(coordinator, condition, timeout=5.0): + """Run the real _heartbeat_loop coroutine until condition() or timeout, + then shut the loop down cleanly.""" + net = coordinator._manager._net + coordinator._maybe_start_heartbeat_loop() + coordinator._enable_heartbeat() + deadline = time.monotonic() + timeout + while time.monotonic() < deadline and not condition(): + net.poll(timeout_ms=10) + coordinator._close_heartbeat() + deadline = time.monotonic() + 2 + while time.monotonic() < deadline and not coordinator._heartbeat_loop_future.is_done: + net.poll(timeout_ms=10) + + +def test_heartbeat_coordinator_not_available_recovers(mocker, broker, client, metrics): + """A heartbeat answered with CoordinatorNotAvailableError must mark the + coordinator dead, rediscover it via FindCoordinator, and resume + heartbeating -- not retry forever against the dead coordinator.""" + coordinator = ConsumerCoordinator( + client, SubscriptionState(), metrics=metrics, + api_version=broker.broker_version, + heartbeat_interval_ms=10, retry_backoff_ms=10) + try: + client._manager.bootstrap(timeout_ms=5000) + _seed_stable_group(coordinator) + dead_spy = mocker.spy(coordinator, 'coordinator_dead') + + # First heartbeat: the broker reports the coordinator gone. + broker.respond(HeartbeatRequest, HeartbeatResponse( + throttle_time_ms=0, + error_code=Errors.CoordinatorNotAvailableError.errno)) + # Rediscovery: this broker is (still) the coordinator. + broker.respond_always( + FindCoordinatorRequest, + _find_coordinator_response(coordinator.group_id, broker)) + # All heartbeats after the scripted failure succeed. + recovered = [] + def ok_heartbeat(api_key, api_version, correlation_id, request_bytes): + recovered.append(correlation_id) + return HeartbeatResponse(throttle_time_ms=0, error_code=0) + broker.respond_always(HeartbeatRequest, ok_heartbeat) + + _run_heartbeat_loop_until(coordinator, lambda: recovered) + + assert recovered, 'heartbeat never resumed after coordinator rediscovery' + assert isinstance(dead_spy.call_args_list[0][0][0], + Errors.CoordinatorNotAvailableError) + # Rediscovered via FindCoordinator (synthesized coordinator node id). + assert coordinator.coordinator_id == 'coordinator-0' + # The error does not invalidate the generation or group membership. + assert coordinator.state is MemberState.STABLE + assert not coordinator._generation.is_lost() + finally: + coordinator._close_heartbeat() + coordinator.reset_generation() # skip LeaveGroup wire traffic in close() + coordinator.close(timeout_ms=0) + + +def test_heartbeat_coordinator_broker_failover(mocker, net, metrics): + """When the coordinator broker dies (connections dropped and refused), + the consumer must fail the heartbeat at the transport level, rediscover + the coordinator on a surviving broker, and resume heartbeating against + the newly elected coordinator.""" + mock_cluster = MockCluster(num_brokers=2) + manager = KafkaConnectionManager( + net, + bootstrap_servers=mock_cluster.bootstrap_servers(), + api_version=mock_cluster.broker_version, + request_timeout_ms=5000) + mock_cluster.attach(manager) + client = KafkaNetClient(net=net, manager=manager) + coordinator = ConsumerCoordinator( + client, SubscriptionState(), metrics=metrics, + api_version=mock_cluster.broker_version, + heartbeat_interval_ms=10, retry_backoff_ms=10) + try: + manager.bootstrap(timeout_ms=5000) + _seed_stable_group(coordinator, coordinator_id=0) + dead_spy = mocker.spy(coordinator, 'coordinator_dead') + + # Broker 0 (the coordinator) dies before the next heartbeat, and the + # cluster elects broker 1: surviving brokers answer FindCoordinator + # with the new coordinator. + mock_cluster[0].stop() + mock_cluster.set_coordinator(coordinator.group_id, 1) + recovered = [] + def ok_heartbeat(api_key, api_version, correlation_id, request_bytes): + recovered.append(correlation_id) + return HeartbeatResponse(throttle_time_ms=0, error_code=0) + mock_cluster[1].respond_always(HeartbeatRequest, ok_heartbeat) + + _run_heartbeat_loop_until(coordinator, lambda: recovered) + + assert recovered, 'heartbeat never resumed on the new coordinator' + # The failed send to the dead broker marked the coordinator dead... + assert dead_spy.call_count >= 1 + # ...and rediscovery pointed at broker 1. + assert coordinator.coordinator_id == 'coordinator-1' + assert coordinator.state is MemberState.STABLE + assert not coordinator._generation.is_lost() + finally: + coordinator._close_heartbeat() + coordinator.reset_generation() + coordinator.close(timeout_ms=0) + client.close() + manager.close()