Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions test/mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,20 @@ async def handle_request(self, api_key, api_version, correlation_id, request_byt
return queued_response
response = await self._resolve_response(
queued_response, api_key, api_version, correlation_id, request_bytes)
# A resolved response of None means "request handled, send
# nothing back" -- the real broker behavior for acks=0
# ProduceRequests, which the client does not expect a response to.
if response is None:
return None
return self._encode_response(response, api_version, correlation_id)

# Then persistent responses set via respond_always
if api_key in self._always_responses:
response = await self._resolve_response(
self._always_responses[api_key], api_key, api_version,
correlation_id, request_bytes)
if response is None:
return None
return self._encode_response(response, api_version, correlation_id)

# Fall back to auto-responses
Expand Down
144 changes: 143 additions & 1 deletion test/producer/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import io
import math
import threading
import time
from unittest.mock import call

Expand All @@ -18,9 +19,12 @@
from kafka.producer.sender import Sender
from kafka.protocol.producer import ProduceResponse
from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager
from kafka.protocol.metadata import MetadataResponse
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.structs import TopicPartition

from test.mock_broker import MockBroker

_PartitionProduceResponse = ProduceResponse.TopicProduceResponse.PartitionProduceResponse


Expand Down Expand Up @@ -149,7 +153,6 @@ def test_create_produce_requests_negotiates_wire_version(
# Advertise three broker entries (all pointing at this single MockBroker)
# so ``manager.send(..., node_id=n)`` resolves for nodes 1 and 2 as well.
# Must happen *before* bootstrap so the metadata response carries them.
from kafka.protocol.metadata import MetadataResponse
Broker = MetadataResponse.MetadataResponseBroker
broker.set_metadata(brokers=[
Broker(node_id=n, host=broker.host, port=broker.port, rack=None)
Expand Down Expand Up @@ -179,6 +182,30 @@ def test_create_produce_requests_negotiates_wire_version(
% (node, produce_version, captured.get('api_version')))


@pytest.mark.parametrize("acks, expect_response", [
(0, False), # fire-and-forget: no response expected
(1, True), # leader ack
(-1, True), # all in-sync replicas ack (acks='all')
])
def test_produce_request_carries_acks(sender, acks, expect_response):
"""_produce_request faithfully stamps the configured acks level onto the
ProduceRequest, and acks=0 is the only level that does not expect a
response (the request is fire-and-forget at the wire level)."""
batch = producer_batch()
request = sender._produce_request(0, acks, 1000, [batch])
assert isinstance(request, ProduceRequest)
assert request.acks == acks
assert request.expect_response() is expect_response


def test_create_produce_requests_uses_configured_acks(client, accumulator, mocker):
"""_create_produce_requests stamps the sender's configured acks onto
each per-node ProduceRequest."""
sender = Sender(client, client.cluster, accumulator, acks=-1)
requests = sender._create_produce_requests({0: [producer_batch()]})
assert requests[0].acks == -1


def test_complete_batch_success(sender):
batch = producer_batch()
assert not batch.produce_future.is_done
Expand Down Expand Up @@ -1270,3 +1297,118 @@ def test_sender_loop_gates_on_bumping_state(self, sender, accumulator, mocker):

sender._send_producer_data.assert_not_called()
sender._client.poll.assert_called_once()


class TestProducerAcks:
"""End-to-end producer acks coverage through MockBroker.

These tests drive a real KafkaProducer (with its Sender thread) over a
MockBroker and verify, for each acks level, that:

- the ProduceRequest reaching the broker carries the configured acks, and
- the send future resolves correctly given how the broker answers.

The acks=0 case is the subtle one: ProduceRequest.expect_response() is
False, so the client never waits for (or receives) a response. The broker
still receives the request; the producer synthesizes a local success.
"""

_TOPIC = 'acks-topic'
_API_VERSION = (2, 5)
_BASE_OFFSET = 77

def _metadata_topic(self, version=8):
Topic = MetadataResponse.MetadataResponseTopic
Partition = Topic.MetadataResponsePartition
return Topic(version=version, error_code=0, name=self._TOPIC, is_internal=False,
partitions=[
Partition(version=version, error_code=0, partition_index=0,
leader_id=0, leader_epoch=0,
replica_nodes=[0], isr_nodes=[0], offline_replicas=[]),
])

def _produce_response(self, version, base_offset=_BASE_OFFSET, error_code=0):
Topic = ProduceResponse.TopicProduceResponse
Partition = Topic.PartitionProduceResponse
return ProduceResponse(
throttle_time_ms=0,
responses=[
Topic(name=self._TOPIC, partition_responses=[
Partition(index=0, error_code=error_code, base_offset=base_offset,
log_append_time_ms=-1, log_start_offset=0,
record_errors=[], error_message=None, current_leader=None),
]),
])

def _make_producer(self, broker, acks):
return KafkaProducer(
kafka_client=broker.client_factory(),
bootstrap_servers=['%s:%d' % (broker.host, broker.port)],
api_version=self._API_VERSION,
acks=acks,
enable_idempotence=False,
retry_backoff_ms=10,
request_timeout_ms=5000,
)

@pytest.mark.parametrize("acks", [1, -1])
def test_produce_acks_with_response(self, acks):
"""acks=1 (leader) and acks=-1 (all ISR) both expect a response. The
request carries the configured acks, and the send future resolves with
the broker-assigned offset."""
broker = MockBroker(broker_version=self._API_VERSION)
broker.set_metadata(topics=[self._metadata_topic()])
captured = {}

def on_produce(api_key, api_version, correlation_id, request_bytes):
request = ProduceRequest.decode(request_bytes, version=api_version, header=True)
captured['acks'] = request.acks
return self._produce_response(api_version)

broker.respond_fn(ProduceRequest, on_produce)

producer = self._make_producer(broker, acks)
try:
future = producer.send(self._TOPIC, value=b'payload', partition=0)
metadata = future.get(timeout=5)
assert future.succeeded()
assert metadata.topic == self._TOPIC
assert metadata.partition == 0
assert metadata.offset == self._BASE_OFFSET
finally:
producer.close(timeout=2)

assert captured.get('acks') == acks

def test_produce_acks_0_fire_and_forget(self):
"""acks=0: the broker receives the ProduceRequest but sends no response
(ProduceRequest.expect_response() is False). The producer must still
resolve the send future via a locally synthesized success."""
broker = MockBroker(broker_version=self._API_VERSION)
broker.set_metadata(topics=[self._metadata_topic()])
captured = {}
received = threading.Event()

def on_produce(api_key, api_version, correlation_id, request_bytes):
request = ProduceRequest.decode(request_bytes, version=api_version, header=True)
captured['acks'] = request.acks
received.set()
# No response: real broker behavior for acks=0.
return None

broker.respond_fn(ProduceRequest, on_produce)

producer = self._make_producer(broker, acks=0)
try:
future = producer.send(self._TOPIC, value=b'payload', partition=0)
# The future resolves locally without any broker response.
metadata = future.get(timeout=5)
assert future.succeeded()
assert metadata.topic == self._TOPIC
assert metadata.partition == 0
# The request really did reach the broker (the core worry:
# "no errors, but did it actually send?").
assert received.wait(timeout=5)
assert captured.get('acks') == 0
finally:
producer.close(timeout=2)
33 changes: 33 additions & 0 deletions test/test_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def _poll_for_future(client, future, timeout_ms=5000):
return future


def _run_coroutine(coro):
"""Drive a coroutine that is expected to complete without suspending,
returning its value. Used to unit-test MockBroker.handle_request for
synchronous (non-awaiting) respond_fns."""
try:
coro.send(None)
except StopIteration as stop:
return stop.value
raise AssertionError('coroutine suspended unexpectedly')


# ---------------------------------------------------------------------------
# MockBroker unit tests (no event loop, no connections)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -78,6 +89,28 @@ def test_respond_queues_in_order(self):
assert broker._response_queue[0][1] is r1
assert broker._response_queue[1][1] is r2

def test_respond_fn_returning_none_sends_no_response(self):
"""A respond_fn that resolves to None means 'request handled, send
nothing back' -- the real broker behavior for acks=0 ProduceRequests.
handle_request still consumes the queue entry and returns None so
MockTransport delivers no bytes."""
broker = MockBroker()
seen = []

def fn(api_key, api_version, correlation_id, request_bytes):
seen.append(correlation_id)
return None

broker.respond_fn(MetadataRequest, fn)
result = _run_coroutine(broker.handle_request(
MetadataRequest.API_KEY, 0, correlation_id=99, request_bytes=b''))

assert result is None
assert seen == [99]
# The queue entry was consumed even though no response was produced.
assert len(broker._response_queue) == 0
assert broker.requests_received == 1


# ---------------------------------------------------------------------------
# Integration tests: MockBroker + KafkaNetClient
Expand Down
Loading