diff --git a/test/mock_broker.py b/test/mock_broker.py index 97bc5590e..36ed7360a 100644 --- a/test/mock_broker.py +++ b/test/mock_broker.py @@ -413,6 +413,11 @@ async def handle_request(self, api_key, api_version, correlation_id, request_byt return queued_response response = await self._resolve_response( queued_response, api_key, api_version, correlation_id, request_bytes) + # A resolved response of None means "request handled, send + # nothing back" -- the real broker behavior for acks=0 + # ProduceRequests, which the client does not expect a response to. + if response is None: + return None return self._encode_response(response, api_version, correlation_id) # Then persistent responses set via respond_always @@ -420,6 +425,8 @@ async def handle_request(self, api_key, api_version, correlation_id, request_byt response = await self._resolve_response( self._always_responses[api_key], api_key, api_version, correlation_id, request_bytes) + if response is None: + return None return self._encode_response(response, api_version, correlation_id) # Fall back to auto-responses diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 148d361a9..dfe45459e 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -3,6 +3,7 @@ import collections import io import math +import threading import time from unittest.mock import call @@ -18,9 +19,12 @@ from kafka.producer.sender import Sender from kafka.protocol.producer import ProduceResponse from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager +from kafka.protocol.metadata import MetadataResponse from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition +from test.mock_broker import MockBroker + _PartitionProduceResponse = ProduceResponse.TopicProduceResponse.PartitionProduceResponse @@ -149,7 +153,6 @@ def test_create_produce_requests_negotiates_wire_version( # Advertise three broker entries (all pointing at this single MockBroker) # so ``manager.send(..., node_id=n)`` resolves for nodes 1 and 2 as well. # Must happen *before* bootstrap so the metadata response carries them. - from kafka.protocol.metadata import MetadataResponse Broker = MetadataResponse.MetadataResponseBroker broker.set_metadata(brokers=[ Broker(node_id=n, host=broker.host, port=broker.port, rack=None) @@ -179,6 +182,30 @@ def test_create_produce_requests_negotiates_wire_version( % (node, produce_version, captured.get('api_version'))) +@pytest.mark.parametrize("acks, expect_response", [ + (0, False), # fire-and-forget: no response expected + (1, True), # leader ack + (-1, True), # all in-sync replicas ack (acks='all') +]) +def test_produce_request_carries_acks(sender, acks, expect_response): + """_produce_request faithfully stamps the configured acks level onto the + ProduceRequest, and acks=0 is the only level that does not expect a + response (the request is fire-and-forget at the wire level).""" + batch = producer_batch() + request = sender._produce_request(0, acks, 1000, [batch]) + assert isinstance(request, ProduceRequest) + assert request.acks == acks + assert request.expect_response() is expect_response + + +def test_create_produce_requests_uses_configured_acks(client, accumulator, mocker): + """_create_produce_requests stamps the sender's configured acks onto + each per-node ProduceRequest.""" + sender = Sender(client, client.cluster, accumulator, acks=-1) + requests = sender._create_produce_requests({0: [producer_batch()]}) + assert requests[0].acks == -1 + + def test_complete_batch_success(sender): batch = producer_batch() assert not batch.produce_future.is_done @@ -1270,3 +1297,118 @@ def test_sender_loop_gates_on_bumping_state(self, sender, accumulator, mocker): sender._send_producer_data.assert_not_called() sender._client.poll.assert_called_once() + + +class TestProducerAcks: + """End-to-end producer acks coverage through MockBroker. + + These tests drive a real KafkaProducer (with its Sender thread) over a + MockBroker and verify, for each acks level, that: + + - the ProduceRequest reaching the broker carries the configured acks, and + - the send future resolves correctly given how the broker answers. + + The acks=0 case is the subtle one: ProduceRequest.expect_response() is + False, so the client never waits for (or receives) a response. The broker + still receives the request; the producer synthesizes a local success. + """ + + _TOPIC = 'acks-topic' + _API_VERSION = (2, 5) + _BASE_OFFSET = 77 + + def _metadata_topic(self, version=8): + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + return Topic(version=version, error_code=0, name=self._TOPIC, is_internal=False, + partitions=[ + Partition(version=version, error_code=0, partition_index=0, + leader_id=0, leader_epoch=0, + replica_nodes=[0], isr_nodes=[0], offline_replicas=[]), + ]) + + def _produce_response(self, version, base_offset=_BASE_OFFSET, error_code=0): + Topic = ProduceResponse.TopicProduceResponse + Partition = Topic.PartitionProduceResponse + return ProduceResponse( + throttle_time_ms=0, + responses=[ + Topic(name=self._TOPIC, partition_responses=[ + Partition(index=0, error_code=error_code, base_offset=base_offset, + log_append_time_ms=-1, log_start_offset=0, + record_errors=[], error_message=None, current_leader=None), + ]), + ]) + + def _make_producer(self, broker, acks): + return KafkaProducer( + kafka_client=broker.client_factory(), + bootstrap_servers=['%s:%d' % (broker.host, broker.port)], + api_version=self._API_VERSION, + acks=acks, + enable_idempotence=False, + retry_backoff_ms=10, + request_timeout_ms=5000, + ) + + @pytest.mark.parametrize("acks", [1, -1]) + def test_produce_acks_with_response(self, acks): + """acks=1 (leader) and acks=-1 (all ISR) both expect a response. The + request carries the configured acks, and the send future resolves with + the broker-assigned offset.""" + broker = MockBroker(broker_version=self._API_VERSION) + broker.set_metadata(topics=[self._metadata_topic()]) + captured = {} + + def on_produce(api_key, api_version, correlation_id, request_bytes): + request = ProduceRequest.decode(request_bytes, version=api_version, header=True) + captured['acks'] = request.acks + return self._produce_response(api_version) + + broker.respond_fn(ProduceRequest, on_produce) + + producer = self._make_producer(broker, acks) + try: + future = producer.send(self._TOPIC, value=b'payload', partition=0) + metadata = future.get(timeout=5) + assert future.succeeded() + assert metadata.topic == self._TOPIC + assert metadata.partition == 0 + assert metadata.offset == self._BASE_OFFSET + finally: + producer.close(timeout=2) + + assert captured.get('acks') == acks + + def test_produce_acks_0_fire_and_forget(self): + """acks=0: the broker receives the ProduceRequest but sends no response + (ProduceRequest.expect_response() is False). The producer must still + resolve the send future via a locally synthesized success.""" + broker = MockBroker(broker_version=self._API_VERSION) + broker.set_metadata(topics=[self._metadata_topic()]) + captured = {} + received = threading.Event() + + def on_produce(api_key, api_version, correlation_id, request_bytes): + request = ProduceRequest.decode(request_bytes, version=api_version, header=True) + captured['acks'] = request.acks + received.set() + # No response: real broker behavior for acks=0. + return None + + broker.respond_fn(ProduceRequest, on_produce) + + producer = self._make_producer(broker, acks=0) + try: + future = producer.send(self._TOPIC, value=b'payload', partition=0) + # The future resolves locally without any broker response. + metadata = future.get(timeout=5) + assert future.succeeded() + assert metadata.topic == self._TOPIC + assert metadata.partition == 0 + # The request really did reach the broker (the core worry: + # "no errors, but did it actually send?"). + assert received.wait(timeout=5) + assert captured.get('acks') == 0 + finally: + producer.close(timeout=2) diff --git a/test/test_mock_broker.py b/test/test_mock_broker.py index f8ebd5434..3be83667b 100644 --- a/test/test_mock_broker.py +++ b/test/test_mock_broker.py @@ -34,6 +34,17 @@ def _poll_for_future(client, future, timeout_ms=5000): return future +def _run_coroutine(coro): + """Drive a coroutine that is expected to complete without suspending, + returning its value. Used to unit-test MockBroker.handle_request for + synchronous (non-awaiting) respond_fns.""" + try: + coro.send(None) + except StopIteration as stop: + return stop.value + raise AssertionError('coroutine suspended unexpectedly') + + # --------------------------------------------------------------------------- # MockBroker unit tests (no event loop, no connections) # --------------------------------------------------------------------------- @@ -78,6 +89,28 @@ def test_respond_queues_in_order(self): assert broker._response_queue[0][1] is r1 assert broker._response_queue[1][1] is r2 + def test_respond_fn_returning_none_sends_no_response(self): + """A respond_fn that resolves to None means 'request handled, send + nothing back' -- the real broker behavior for acks=0 ProduceRequests. + handle_request still consumes the queue entry and returns None so + MockTransport delivers no bytes.""" + broker = MockBroker() + seen = [] + + def fn(api_key, api_version, correlation_id, request_bytes): + seen.append(correlation_id) + return None + + broker.respond_fn(MetadataRequest, fn) + result = _run_coroutine(broker.handle_request( + MetadataRequest.API_KEY, 0, correlation_id=99, request_bytes=b'')) + + assert result is None + assert seen == [99] + # The queue entry was consumed even though no response was produced. + assert len(broker._response_queue) == 0 + assert broker.requests_received == 1 + # --------------------------------------------------------------------------- # Integration tests: MockBroker + KafkaNetClient