Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions test/producer/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,38 @@
from kafka.partitioner import Partitioner, StickyPartitioner
from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch

from test.mock_broker import MockBroker


def _mock_producer(**configs):
"""A KafkaProducer wired to a fresh MockBroker (no real network).

Defaults to a non-idempotent producer (no InitProducerId traffic). Use
as a context manager so close() joins the Sender thread.
"""
broker = MockBroker(broker_version=(4, 3))
configs.setdefault('api_version', (4, 3))
configs.setdefault('enable_idempotence', False)
return KafkaProducer(
kafka_client=broker.client_factory(),
bootstrap_servers=['%s:%d' % (broker.host, broker.port)],
**configs)


def test_kafka_producer_thread_close():
# Explicit close() (rather than the context manager) is the behavior under
# test here -- it must join the Sender thread and return the thread count
# to baseline.
threads = threading.active_count()
producer = KafkaProducer(api_version=(2, 1), enable_idempotence=False)
producer = _mock_producer()
assert threading.active_count() == threads + 1
producer.close()
assert threading.active_count() == threads


def test_kafka_producer_context_manager_closes_on_exit():
threads = threading.active_count()
with KafkaProducer(api_version=(2, 1), enable_idempotence=False) as producer:
with _mock_producer() as producer:
assert threading.active_count() == threads + 1
assert producer._closed is False
assert producer._closed is True
Expand Down Expand Up @@ -72,8 +92,15 @@ def test_partition_explicit_partition_rejects_unknown_partition():

def _producer_for_send_test(partitioner):
"""Build a real KafkaProducer but replace the accumulator + sender
with mocks so ``send()`` doesn't try to actually push data."""
producer = KafkaProducer(api_version=(2, 1), partitioner=partitioner, enable_idempotence=False)
with mocks so ``send()`` doesn't try to actually push data.

__init__ already starts a real Sender thread; we stop and join it before
swapping in the mock so it isn't orphaned (close() would otherwise act on
the mock and leak the real daemon thread). MockBroker keeps it off the
real network."""
producer = _mock_producer(partitioner=partitioner)
producer._sender.initiate_close()
producer._sender.join(2)
producer._accumulator = MagicMock()
producer._sender = MagicMock()
producer._metadata = MagicMock()
Expand All @@ -99,11 +126,10 @@ def test_send_null_key_triggers_on_new_batch_via_abort_retry():
matching KafkaProducer.doSend's abort-for-new-batch retry path."""
partitioner = MagicMock(spec=StickyPartitioner)
partitioner.partition.side_effect = [3, 7] # initial pick, post-rotate
producer = _producer_for_send_test(partitioner)
abort = (None, False, False, True)
producer._accumulator.append.side_effect = [abort, _success_result()]

try:
with _producer_for_send_test(partitioner) as producer:
producer._accumulator.append.side_effect = [abort, _success_result()]
producer.send('t', value=b'msg')
# Initial pick + post-rotate re-pick.
assert partitioner.partition.call_count == 2
Expand All @@ -116,53 +142,45 @@ def test_send_null_key_triggers_on_new_batch_via_abort_retry():
second_call = producer._accumulator.append.call_args_list[1]
tp_arg = second_call.args[0]
assert tp_arg.partition == 7
finally:
producer.close(timeout=1)


def test_send_keyed_skips_on_new_batch():
"""Keyed records bypass the sticky abort-retry path - on_new_batch
must not fire."""
partitioner = MagicMock(spec=StickyPartitioner)
partitioner.partition.return_value = 0
producer = _producer_for_send_test(partitioner)
producer._accumulator.append.return_value = _success_result()

try:
with _producer_for_send_test(partitioner) as producer:
producer._accumulator.append.return_value = _success_result()
producer.send('t', key=b'k', value=b'v')
partitioner.on_new_batch.assert_not_called()
# Keyed records pass abort_on_new_batch=False directly - one append.
assert producer._accumulator.append.call_count == 1
kwargs = producer._accumulator.append.call_args.kwargs
assert kwargs.get('abort_on_new_batch') is False
finally:
producer.close(timeout=1)


def test_send_with_explicit_partition_skips_on_new_batch():
"""Explicit partition overrides the partitioner entirely - no
rotation hook should fire."""
partitioner = MagicMock(spec=StickyPartitioner)
producer = _producer_for_send_test(partitioner)
producer._accumulator.append.return_value = _success_result()

try:
with _producer_for_send_test(partitioner) as producer:
producer._accumulator.append.return_value = _success_result()
producer.send('t', value=b'v', partition=1)
partitioner.partition.assert_not_called()
partitioner.on_new_batch.assert_not_called()
# Explicit partition also goes straight to abort_on_new_batch=False.
kwargs = producer._accumulator.append.call_args.kwargs
assert kwargs.get('abort_on_new_batch') is False
finally:
producer.close(timeout=1)


def test_idempotent_producer_reset_producer_id(cluster):
transaction_manager = TransactionManager(
transactional_id=None,
transaction_timeout_ms=1000,
retry_backoff_ms=100,
api_version=(0, 11),
api_version=(4, 3),
metadata=cluster,
)

Expand Down
121 changes: 70 additions & 51 deletions test/producer/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from kafka.producer.producer_batch import ProducerBatch
from kafka.producer.record_accumulator import RecordAccumulator
from kafka.producer.sender import Sender
from kafka.protocol.producer import ProduceResponse
from kafka.protocol.producer import (
InitProducerIdRequest, InitProducerIdResponse, ProduceResponse,
)
from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager
from kafka.protocol.metadata import MetadataResponse
from kafka.record.memory_records import MemoryRecordsBuilder
Expand Down Expand Up @@ -89,12 +91,18 @@ def producer_batch(topic='foo', partition=0, magic=2):


@pytest.fixture
def transaction_manager(cluster):
def transaction_manager(request, cluster):
"""Defaults to a current broker version. Override for version-specific
behavior via indirect parametrize, e.g.:

@pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"])
"""
api_version = getattr(request, 'param', (4, 3))
return TransactionManager(
transactional_id=None,
transaction_timeout_ms=60000,
retry_backoff_ms=100,
api_version=(2, 1),
api_version=api_version,
metadata=cluster)


Expand Down Expand Up @@ -335,7 +343,12 @@ def test_fail_batch(sender, accumulator, transaction_manager, mocker):
batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY)


def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, transaction_manager, mocker):
@pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"])
def test_out_of_order_sequence_number_reset_producer_id_pre_kip360(sender, accumulator, transaction_manager, mocker):
# Pre-KIP-360 (broker < 2.5): an idempotent producer recovers from a
# non-retriable sequence error by resetting the producer id. On 2.5+ this
# path bumps the producer epoch instead -- see
# TestKip360SenderIntegration.test_out_of_order_sequence_triggers_epoch_bump.
sender._transaction_manager = transaction_manager
assert transaction_manager.transactional_id is None # this test is for idempotent producer only
mocker.patch.object(TransactionManager, 'reset_producer_id')
Expand All @@ -361,7 +374,7 @@ def test_transaction_aborted_error_on_user_abort_with_undrained_batches(client,
transactional_id='txn-id',
transaction_timeout_ms=60000,
retry_backoff_ms=100,
api_version=(2, 1),
api_version=(4, 3),
metadata=cluster)
tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0))
# Move to READY without going through InitProducerIdHandler
Expand Down Expand Up @@ -953,35 +966,47 @@ def test_end_to_end_split_and_complete(self, accumulator):
assert future.value.partition == 0


def _mock_producer(idempotent, **configs):
"""A KafkaProducer wired to a fresh MockBroker (no real network).

Returned as a live producer; use it as a context manager so close()
joins the Sender thread instead of leaking it (close(timeout=0) does
not join, and a leaked Sender daemon can log into pytest's torn-down
capture streams under load).

For idempotent producers the Sender eagerly sends InitProducerId; we
script a success so it resolves in-memory rather than spinning against
the mock or falling back to a real bootstrap.
"""
broker = MockBroker(broker_version=(0, 11))
if idempotent:
broker.respond_always(InitProducerIdRequest, InitProducerIdResponse(
throttle_time_ms=0, error_code=0, producer_id=1, producer_epoch=0))
configs.setdefault('api_version', (4, 3))
return KafkaProducer(
kafka_client=broker.client_factory(),
bootstrap_servers=['%s:%d' % (broker.host, broker.port)],
enable_idempotence=idempotent,
**configs)


class TestIdempotentProducerMaxInFlight:
def test_idempotent_config_allows_max_in_flight_up_to_5(self):
"""Idempotent producer allows max_in_flight 1-5."""
for max_in_flight in (1, 2, 3, 4, 5):
p = KafkaProducer(
enable_idempotence=True,
max_in_flight_requests_per_connection=max_in_flight,
api_version=(0, 11),
)
assert p.config['max_in_flight_requests_per_connection'] == max_in_flight
p.close(timeout=0)
with _mock_producer(idempotent=True,
max_in_flight_requests_per_connection=max_in_flight) as p:
assert p.config['max_in_flight_requests_per_connection'] == max_in_flight

def test_idempotent_config_rejects_max_in_flight_above_5(self):
"""Idempotent producer rejects max_in_flight > 5."""
with pytest.raises(Errors.KafkaConfigurationError, match="max_in_flight_requests_per_connection=6"):
KafkaProducer(
enable_idempotence=True,
max_in_flight_requests_per_connection=6,
api_version=(0, 11),
)
_mock_producer(idempotent=True, max_in_flight_requests_per_connection=6)

def test_idempotent_default_max_in_flight(self):
"""Idempotent producer defaults to max_in_flight=5 (no longer overridden to 1)."""
p = KafkaProducer(
enable_idempotence=True,
api_version=(0, 11),
)
assert p.config['max_in_flight_requests_per_connection'] == 5
p.close(timeout=0)
with _mock_producer(idempotent=True) as p:
assert p.config['max_in_flight_requests_per_connection'] == 5

def test_idempotent_producer_forces_guarantee_message_order(self):
"""guarantee_message_order is forced True when idempotence is enabled,
Expand All @@ -992,34 +1017,22 @@ def test_idempotent_producer_forces_guarantee_message_order(self):
Java's producer enforces this for the same reason.
"""
for max_in_flight in (1, 2, 3, 4, 5):
p = KafkaProducer(
enable_idempotence=True,
max_in_flight_requests_per_connection=max_in_flight,
api_version=(0, 11),
)
assert p._sender.config['guarantee_message_order'] is True, (
'idempotence should force guarantee_message_order=True (max_in_flight=%d)'
% max_in_flight)
p.close(timeout=0)
with _mock_producer(idempotent=True,
max_in_flight_requests_per_connection=max_in_flight) as p:
assert p._sender.config['guarantee_message_order'] is True, (
'idempotence should force guarantee_message_order=True (max_in_flight=%d)'
% max_in_flight)

def test_non_idempotent_guarantee_message_order_only_when_max_in_flight_1(self):
"""For non-idempotent producers, guarantee_message_order is only True
when max_in_flight == 1 (the original Java behavior)."""
p1 = KafkaProducer(
enable_idempotence=False,
max_in_flight_requests_per_connection=1,
api_version=(0, 11),
)
assert p1._sender.config['guarantee_message_order'] is True
p1.close(timeout=0)
with _mock_producer(idempotent=False,
max_in_flight_requests_per_connection=1) as p1:
assert p1._sender.config['guarantee_message_order'] is True

p5 = KafkaProducer(
enable_idempotence=False,
max_in_flight_requests_per_connection=5,
api_version=(0, 11),
)
assert p5._sender.config['guarantee_message_order'] is False
p5.close(timeout=0)
with _mock_producer(idempotent=False,
max_in_flight_requests_per_connection=5) as p5:
assert p5._sender.config['guarantee_message_order'] is False

def _setup_drain(self, client, transaction_manager, tp):
"""Helper to set up cluster and transaction_manager for drain tests."""
Expand Down Expand Up @@ -1214,8 +1227,11 @@ def test_retention_based_unknown_producer_id_retries(self, sender, accumulator,
# last_acked_offset is also cleared by reset_sequence_for_partition
assert transaction_manager.last_acked_offset(tp) == -1

def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, transaction_manager, mocker):
"""UnknownProducerIdError with log_start_offset <= last_acked_offset is fatal."""
@pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"])
def test_real_data_loss_unknown_producer_id_fails_pre_kip360(self, sender, accumulator, transaction_manager, mocker):
"""Pre-KIP-360 (broker < 2.5): UnknownProducerIdError with
log_start_offset <= last_acked_offset is fatal. On 2.5+ this recovers
via an epoch bump instead."""
sender._transaction_manager = transaction_manager
mocker.patch.object(accumulator, 'reenqueue')

Expand All @@ -1240,8 +1256,11 @@ def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, tra
assert future.failed()
assert isinstance(future.exception, Errors.UnknownProducerIdError)

def test_unknown_producer_id_without_log_start_offset_fails(self, sender, accumulator, transaction_manager, mocker):
"""UnknownProducerIdError without log_start_offset info (old broker) falls through to failure."""
@pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"])
def test_unknown_producer_id_without_log_start_offset_fails_pre_kip360(self, sender, accumulator, transaction_manager, mocker):
"""Pre-KIP-360 (broker < 2.5): UnknownProducerIdError without
log_start_offset info (old broker) falls through to failure. On 2.5+
this recovers via an epoch bump instead."""
sender._transaction_manager = transaction_manager
mocker.patch.object(accumulator, 'reenqueue')

Expand Down Expand Up @@ -1436,7 +1455,7 @@ class TestProducerAcks:
"""

_TOPIC = 'acks-topic'
_API_VERSION = (2, 5)
_API_VERSION = (4, 3)
_BASE_OFFSET = 77

def _metadata_topic(self, version=8):
Expand Down
Loading