From 1a68ed8bfa90ed055ee6af64b41d2b40d85efc09 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 13 Jun 2026 09:41:05 -0700 Subject: [PATCH] Use 4.3 api_version for producer tests; fixup inadvertent sender thread leak in test --- test/producer/test_producer.py | 58 ++++++++++------ test/producer/test_sender.py | 121 +++++++++++++++++++-------------- 2 files changed, 108 insertions(+), 71 deletions(-) diff --git a/test/producer/test_producer.py b/test/producer/test_producer.py index 8337b4947..d8d92b996 100644 --- a/test/producer/test_producer.py +++ b/test/producer/test_producer.py @@ -9,10 +9,30 @@ from kafka.partitioner import Partitioner, StickyPartitioner from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch +from test.mock_broker import MockBroker + + +def _mock_producer(**configs): + """A KafkaProducer wired to a fresh MockBroker (no real network). + + Defaults to a non-idempotent producer (no InitProducerId traffic). Use + as a context manager so close() joins the Sender thread. + """ + broker = MockBroker(broker_version=(4, 3)) + configs.setdefault('api_version', (4, 3)) + configs.setdefault('enable_idempotence', False) + return KafkaProducer( + kafka_client=broker.client_factory(), + bootstrap_servers=['%s:%d' % (broker.host, broker.port)], + **configs) + def test_kafka_producer_thread_close(): + # Explicit close() (rather than the context manager) is the behavior under + # test here -- it must join the Sender thread and return the thread count + # to baseline. threads = threading.active_count() - producer = KafkaProducer(api_version=(2, 1), enable_idempotence=False) + producer = _mock_producer() assert threading.active_count() == threads + 1 producer.close() assert threading.active_count() == threads @@ -20,7 +40,7 @@ def test_kafka_producer_thread_close(): def test_kafka_producer_context_manager_closes_on_exit(): threads = threading.active_count() - with KafkaProducer(api_version=(2, 1), enable_idempotence=False) as producer: + with _mock_producer() as producer: assert threading.active_count() == threads + 1 assert producer._closed is False assert producer._closed is True @@ -72,8 +92,15 @@ def test_partition_explicit_partition_rejects_unknown_partition(): def _producer_for_send_test(partitioner): """Build a real KafkaProducer but replace the accumulator + sender - with mocks so ``send()`` doesn't try to actually push data.""" - producer = KafkaProducer(api_version=(2, 1), partitioner=partitioner, enable_idempotence=False) + with mocks so ``send()`` doesn't try to actually push data. + + __init__ already starts a real Sender thread; we stop and join it before + swapping in the mock so it isn't orphaned (close() would otherwise act on + the mock and leak the real daemon thread). MockBroker keeps it off the + real network.""" + producer = _mock_producer(partitioner=partitioner) + producer._sender.initiate_close() + producer._sender.join(2) producer._accumulator = MagicMock() producer._sender = MagicMock() producer._metadata = MagicMock() @@ -99,11 +126,10 @@ def test_send_null_key_triggers_on_new_batch_via_abort_retry(): matching KafkaProducer.doSend's abort-for-new-batch retry path.""" partitioner = MagicMock(spec=StickyPartitioner) partitioner.partition.side_effect = [3, 7] # initial pick, post-rotate - producer = _producer_for_send_test(partitioner) abort = (None, False, False, True) - producer._accumulator.append.side_effect = [abort, _success_result()] - try: + with _producer_for_send_test(partitioner) as producer: + producer._accumulator.append.side_effect = [abort, _success_result()] producer.send('t', value=b'msg') # Initial pick + post-rotate re-pick. assert partitioner.partition.call_count == 2 @@ -116,8 +142,6 @@ def test_send_null_key_triggers_on_new_batch_via_abort_retry(): second_call = producer._accumulator.append.call_args_list[1] tp_arg = second_call.args[0] assert tp_arg.partition == 7 - finally: - producer.close(timeout=1) def test_send_keyed_skips_on_new_batch(): @@ -125,36 +149,30 @@ def test_send_keyed_skips_on_new_batch(): must not fire.""" partitioner = MagicMock(spec=StickyPartitioner) partitioner.partition.return_value = 0 - producer = _producer_for_send_test(partitioner) - producer._accumulator.append.return_value = _success_result() - try: + with _producer_for_send_test(partitioner) as producer: + producer._accumulator.append.return_value = _success_result() producer.send('t', key=b'k', value=b'v') partitioner.on_new_batch.assert_not_called() # Keyed records pass abort_on_new_batch=False directly - one append. assert producer._accumulator.append.call_count == 1 kwargs = producer._accumulator.append.call_args.kwargs assert kwargs.get('abort_on_new_batch') is False - finally: - producer.close(timeout=1) def test_send_with_explicit_partition_skips_on_new_batch(): """Explicit partition overrides the partitioner entirely - no rotation hook should fire.""" partitioner = MagicMock(spec=StickyPartitioner) - producer = _producer_for_send_test(partitioner) - producer._accumulator.append.return_value = _success_result() - try: + with _producer_for_send_test(partitioner) as producer: + producer._accumulator.append.return_value = _success_result() producer.send('t', value=b'v', partition=1) partitioner.partition.assert_not_called() partitioner.on_new_batch.assert_not_called() # Explicit partition also goes straight to abort_on_new_batch=False. kwargs = producer._accumulator.append.call_args.kwargs assert kwargs.get('abort_on_new_batch') is False - finally: - producer.close(timeout=1) def test_idempotent_producer_reset_producer_id(cluster): @@ -162,7 +180,7 @@ def test_idempotent_producer_reset_producer_id(cluster): transactional_id=None, transaction_timeout_ms=1000, retry_backoff_ms=100, - api_version=(0, 11), + api_version=(4, 3), metadata=cluster, ) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 84b69966a..e127251f8 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -17,7 +17,9 @@ from kafka.producer.producer_batch import ProducerBatch from kafka.producer.record_accumulator import RecordAccumulator from kafka.producer.sender import Sender -from kafka.protocol.producer import ProduceResponse +from kafka.protocol.producer import ( + InitProducerIdRequest, InitProducerIdResponse, ProduceResponse, +) from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager from kafka.protocol.metadata import MetadataResponse from kafka.record.memory_records import MemoryRecordsBuilder @@ -89,12 +91,18 @@ def producer_batch(topic='foo', partition=0, magic=2): @pytest.fixture -def transaction_manager(cluster): +def transaction_manager(request, cluster): + """Defaults to a current broker version. Override for version-specific + behavior via indirect parametrize, e.g.: + + @pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"]) + """ + api_version = getattr(request, 'param', (4, 3)) return TransactionManager( transactional_id=None, transaction_timeout_ms=60000, retry_backoff_ms=100, - api_version=(2, 1), + api_version=api_version, metadata=cluster) @@ -335,7 +343,12 @@ def test_fail_batch(sender, accumulator, transaction_manager, mocker): batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY) -def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, transaction_manager, mocker): +@pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"]) +def test_out_of_order_sequence_number_reset_producer_id_pre_kip360(sender, accumulator, transaction_manager, mocker): + # Pre-KIP-360 (broker < 2.5): an idempotent producer recovers from a + # non-retriable sequence error by resetting the producer id. On 2.5+ this + # path bumps the producer epoch instead -- see + # TestKip360SenderIntegration.test_out_of_order_sequence_triggers_epoch_bump. sender._transaction_manager = transaction_manager assert transaction_manager.transactional_id is None # this test is for idempotent producer only mocker.patch.object(TransactionManager, 'reset_producer_id') @@ -361,7 +374,7 @@ def test_transaction_aborted_error_on_user_abort_with_undrained_batches(client, transactional_id='txn-id', transaction_timeout_ms=60000, retry_backoff_ms=100, - api_version=(2, 1), + api_version=(4, 3), metadata=cluster) tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0)) # Move to READY without going through InitProducerIdHandler @@ -953,35 +966,47 @@ def test_end_to_end_split_and_complete(self, accumulator): assert future.value.partition == 0 +def _mock_producer(idempotent, **configs): + """A KafkaProducer wired to a fresh MockBroker (no real network). + + Returned as a live producer; use it as a context manager so close() + joins the Sender thread instead of leaking it (close(timeout=0) does + not join, and a leaked Sender daemon can log into pytest's torn-down + capture streams under load). + + For idempotent producers the Sender eagerly sends InitProducerId; we + script a success so it resolves in-memory rather than spinning against + the mock or falling back to a real bootstrap. + """ + broker = MockBroker(broker_version=(0, 11)) + if idempotent: + broker.respond_always(InitProducerIdRequest, InitProducerIdResponse( + throttle_time_ms=0, error_code=0, producer_id=1, producer_epoch=0)) + configs.setdefault('api_version', (4, 3)) + return KafkaProducer( + kafka_client=broker.client_factory(), + bootstrap_servers=['%s:%d' % (broker.host, broker.port)], + enable_idempotence=idempotent, + **configs) + + class TestIdempotentProducerMaxInFlight: def test_idempotent_config_allows_max_in_flight_up_to_5(self): """Idempotent producer allows max_in_flight 1-5.""" for max_in_flight in (1, 2, 3, 4, 5): - p = KafkaProducer( - enable_idempotence=True, - max_in_flight_requests_per_connection=max_in_flight, - api_version=(0, 11), - ) - assert p.config['max_in_flight_requests_per_connection'] == max_in_flight - p.close(timeout=0) + with _mock_producer(idempotent=True, + max_in_flight_requests_per_connection=max_in_flight) as p: + assert p.config['max_in_flight_requests_per_connection'] == max_in_flight def test_idempotent_config_rejects_max_in_flight_above_5(self): """Idempotent producer rejects max_in_flight > 5.""" with pytest.raises(Errors.KafkaConfigurationError, match="max_in_flight_requests_per_connection=6"): - KafkaProducer( - enable_idempotence=True, - max_in_flight_requests_per_connection=6, - api_version=(0, 11), - ) + _mock_producer(idempotent=True, max_in_flight_requests_per_connection=6) def test_idempotent_default_max_in_flight(self): """Idempotent producer defaults to max_in_flight=5 (no longer overridden to 1).""" - p = KafkaProducer( - enable_idempotence=True, - api_version=(0, 11), - ) - assert p.config['max_in_flight_requests_per_connection'] == 5 - p.close(timeout=0) + with _mock_producer(idempotent=True) as p: + assert p.config['max_in_flight_requests_per_connection'] == 5 def test_idempotent_producer_forces_guarantee_message_order(self): """guarantee_message_order is forced True when idempotence is enabled, @@ -992,34 +1017,22 @@ def test_idempotent_producer_forces_guarantee_message_order(self): Java's producer enforces this for the same reason. """ for max_in_flight in (1, 2, 3, 4, 5): - p = KafkaProducer( - enable_idempotence=True, - max_in_flight_requests_per_connection=max_in_flight, - api_version=(0, 11), - ) - assert p._sender.config['guarantee_message_order'] is True, ( - 'idempotence should force guarantee_message_order=True (max_in_flight=%d)' - % max_in_flight) - p.close(timeout=0) + with _mock_producer(idempotent=True, + max_in_flight_requests_per_connection=max_in_flight) as p: + assert p._sender.config['guarantee_message_order'] is True, ( + 'idempotence should force guarantee_message_order=True (max_in_flight=%d)' + % max_in_flight) def test_non_idempotent_guarantee_message_order_only_when_max_in_flight_1(self): """For non-idempotent producers, guarantee_message_order is only True when max_in_flight == 1 (the original Java behavior).""" - p1 = KafkaProducer( - enable_idempotence=False, - max_in_flight_requests_per_connection=1, - api_version=(0, 11), - ) - assert p1._sender.config['guarantee_message_order'] is True - p1.close(timeout=0) + with _mock_producer(idempotent=False, + max_in_flight_requests_per_connection=1) as p1: + assert p1._sender.config['guarantee_message_order'] is True - p5 = KafkaProducer( - enable_idempotence=False, - max_in_flight_requests_per_connection=5, - api_version=(0, 11), - ) - assert p5._sender.config['guarantee_message_order'] is False - p5.close(timeout=0) + with _mock_producer(idempotent=False, + max_in_flight_requests_per_connection=5) as p5: + assert p5._sender.config['guarantee_message_order'] is False def _setup_drain(self, client, transaction_manager, tp): """Helper to set up cluster and transaction_manager for drain tests.""" @@ -1214,8 +1227,11 @@ def test_retention_based_unknown_producer_id_retries(self, sender, accumulator, # last_acked_offset is also cleared by reset_sequence_for_partition assert transaction_manager.last_acked_offset(tp) == -1 - def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, transaction_manager, mocker): - """UnknownProducerIdError with log_start_offset <= last_acked_offset is fatal.""" + @pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"]) + def test_real_data_loss_unknown_producer_id_fails_pre_kip360(self, sender, accumulator, transaction_manager, mocker): + """Pre-KIP-360 (broker < 2.5): UnknownProducerIdError with + log_start_offset <= last_acked_offset is fatal. On 2.5+ this recovers + via an epoch bump instead.""" sender._transaction_manager = transaction_manager mocker.patch.object(accumulator, 'reenqueue') @@ -1240,8 +1256,11 @@ def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, tra assert future.failed() assert isinstance(future.exception, Errors.UnknownProducerIdError) - def test_unknown_producer_id_without_log_start_offset_fails(self, sender, accumulator, transaction_manager, mocker): - """UnknownProducerIdError without log_start_offset info (old broker) falls through to failure.""" + @pytest.mark.parametrize("transaction_manager", [(2, 1)], indirect=True, ids=["broker2.1"]) + def test_unknown_producer_id_without_log_start_offset_fails_pre_kip360(self, sender, accumulator, transaction_manager, mocker): + """Pre-KIP-360 (broker < 2.5): UnknownProducerIdError without + log_start_offset info (old broker) falls through to failure. On 2.5+ + this recovers via an epoch bump instead.""" sender._transaction_manager = transaction_manager mocker.patch.object(accumulator, 'reenqueue') @@ -1436,7 +1455,7 @@ class TestProducerAcks: """ _TOPIC = 'acks-topic' - _API_VERSION = (2, 5) + _API_VERSION = (4, 3) _BASE_OFFSET = 77 def _metadata_topic(self, version=8):