From e8ded9944e99747d3d23707c1f04704390396bb4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 12 Jun 2026 14:14:37 -0700 Subject: [PATCH] Fill-in empty sender tests --- test/producer/test_sender.py | 130 +++++++++++++++++++++++++++++++++-- 1 file changed, 126 insertions(+), 4 deletions(-) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 148d361a9..3ccbe75bc 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -363,8 +363,49 @@ def test_transaction_aborted_error_on_user_abort_with_undrained_batches(client, assert isinstance(future.exception, Errors.TransactionAbortedError) -def test_handle_produce_response(): - pass +def test_handle_produce_response(sender, mocker): + """Each partition response is routed to its batch by (topic, partition).""" + mocker.patch.object(sender, '_complete_batch') + batches = [producer_batch('foo', 0), producer_batch('foo', 1), producer_batch('bar', 0)] + + _Topic = ProduceResponse.TopicProduceResponse + response = ProduceResponse( + throttle_time_ms=0, + responses=[ + _Topic(name='foo', partition_responses=[ + _partition_response(index=0, base_offset=10), + _partition_response(index=1, error_cls=Errors.NotLeaderForPartitionError), + ]), + _Topic(name='bar', partition_responses=[ + _partition_response(index=0, base_offset=66), + ]), + ]) + + sender._handle_produce_response(0, time.monotonic(), batches, response) + + assert sender._complete_batch.call_count == 3 + routed = {} + for c in sender._complete_batch.call_args_list: + batch, partition_response = c.args + routed[batch.topic_partition] = partition_response + assert routed[TopicPartition('foo', 0)].error_code == 0 + assert routed[TopicPartition('foo', 0)].base_offset == 10 + assert routed[TopicPartition('foo', 1)].error_code == Errors.NotLeaderForPartitionError.errno + assert routed[TopicPartition('bar', 0)].base_offset == 66 + + +def test_handle_produce_response_acks_0(sender, mocker): + """acks=0 requests get no response data; every batch in the request is + completed with a synthesized success.""" + mocker.patch.object(sender, '_complete_batch') + batches = [producer_batch('foo', 0), producer_batch('foo', 1)] + + sender._handle_produce_response(0, time.monotonic(), batches, None) + + assert sender._complete_batch.call_count == 2 + for c, batch in zip(sender._complete_batch.call_args_list, batches): + assert c.args[0] is batch + assert c.args[1].error_code == 0 def test_failed_produce(sender, mocker): @@ -378,8 +419,89 @@ def test_failed_produce(sender, mocker): ]) -def test_run_once(): - pass +def test_run_once(sender, mocker): + """The plain (non-transactional) iteration: drain pending topic adds into + cluster metadata, send producer data, and poll the client with the + timeout _send_producer_data returned.""" + mocker.patch.object(sender, '_client') + mocker.patch.object(sender, '_send_producer_data', return_value=42) + spy_add_topic = mocker.spy(sender._metadata, 'add_topic') + sender.add_topic('foo-topic') + + sender.run_once() + + spy_add_topic.assert_called_once_with('foo-topic') + assert not sender._topics_to_add + sender._send_producer_data.assert_called_once() + sender._client.poll.assert_called_once_with(timeout_ms=42) + + +def test_run_once_gates_on_transactional_request(sender, transaction_manager, mocker): + """An idempotent producer without a producer_id enqueues InitProducerId + and waits on the transactional request instead of sending produce data.""" + sender._transaction_manager = transaction_manager + mocker.patch.object(sender, '_client') + mocker.patch.object(sender, '_send_producer_data') + mocker.patch.object(sender, '_maybe_send_pending_request', return_value=True) + spy_init = mocker.spy(transaction_manager, 'init_producer_id') + assert not transaction_manager.has_producer_id() + + sender.run_once() + + spy_init.assert_called_once() + sender._maybe_send_pending_request.assert_called_once() + sender._send_producer_data.assert_not_called() + sender._client.poll.assert_called_once_with( + timeout_ms=sender.config['retry_backoff_ms']) + + +def test_run_once_fatal_error_aborts_batches(sender, transaction_manager, mocker): + """A fatal transaction state aborts incomplete batches and gates produce.""" + sender._transaction_manager = transaction_manager + transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0)) + error = Errors.ProducerFencedError() + transaction_manager.transition_to_fatal_error(error) + mocker.patch.object(sender, '_client') + mocker.patch.object(sender, '_send_producer_data') + mocker.patch.object(sender, '_maybe_abort_batches') + + sender.run_once() + + sender._maybe_abort_batches.assert_called_once_with(error) + sender._send_producer_data.assert_not_called() + sender._client.poll.assert_called_once_with( + timeout_ms=sender.config['retry_backoff_ms']) + + +def test_run_once_abortable_error_aborts_undrained_batches(client, mocker): + """An abortable error fails the undrained batches but does not gate + produce -- the loop keeps running so the abort/EndTxn path can finish.""" + cluster = client.cluster + tm = TransactionManager( + transactional_id='txn-id', + transaction_timeout_ms=60000, + retry_backoff_ms=100, + api_version=(2, 1), + metadata=cluster) + tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0)) + from kafka.producer.transaction_manager import TransactionState + tm._transition_to(TransactionState.INITIALIZING) + tm._transition_to(TransactionState.READY) + tm.begin_transaction() + error = Errors.TopicAuthorizationFailedError('foo') + tm.transition_to_abortable_error(error) + + accumulator = RecordAccumulator(transaction_manager=tm) + sender = Sender(client, cluster, accumulator, transaction_manager=tm) + mocker.patch.object(sender, '_client') + mocker.patch.object(sender, '_send_producer_data', return_value=0) + spy_abort = mocker.spy(accumulator, 'abort_undrained_batches') + + sender.run_once() + + spy_abort.assert_called_once_with(error) + sender._send_producer_data.assert_called_once() + sender._client.poll.assert_called_once_with(timeout_ms=0) def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker):