Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 126 additions & 4 deletions test/producer/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,49 @@ def test_transaction_aborted_error_on_user_abort_with_undrained_batches(client,
assert isinstance(future.exception, Errors.TransactionAbortedError)


def test_handle_produce_response():
pass
def test_handle_produce_response(sender, mocker):
"""Each partition response is routed to its batch by (topic, partition)."""
mocker.patch.object(sender, '_complete_batch')
batches = [producer_batch('foo', 0), producer_batch('foo', 1), producer_batch('bar', 0)]

_Topic = ProduceResponse.TopicProduceResponse
response = ProduceResponse(
throttle_time_ms=0,
responses=[
_Topic(name='foo', partition_responses=[
_partition_response(index=0, base_offset=10),
_partition_response(index=1, error_cls=Errors.NotLeaderForPartitionError),
]),
_Topic(name='bar', partition_responses=[
_partition_response(index=0, base_offset=66),
]),
])

sender._handle_produce_response(0, time.monotonic(), batches, response)

assert sender._complete_batch.call_count == 3
routed = {}
for c in sender._complete_batch.call_args_list:
batch, partition_response = c.args
routed[batch.topic_partition] = partition_response
assert routed[TopicPartition('foo', 0)].error_code == 0
assert routed[TopicPartition('foo', 0)].base_offset == 10
assert routed[TopicPartition('foo', 1)].error_code == Errors.NotLeaderForPartitionError.errno
assert routed[TopicPartition('bar', 0)].base_offset == 66


def test_handle_produce_response_acks_0(sender, mocker):
"""acks=0 requests get no response data; every batch in the request is
completed with a synthesized success."""
mocker.patch.object(sender, '_complete_batch')
batches = [producer_batch('foo', 0), producer_batch('foo', 1)]

sender._handle_produce_response(0, time.monotonic(), batches, None)

assert sender._complete_batch.call_count == 2
for c, batch in zip(sender._complete_batch.call_args_list, batches):
assert c.args[0] is batch
assert c.args[1].error_code == 0


def test_failed_produce(sender, mocker):
Expand All @@ -378,8 +419,89 @@ def test_failed_produce(sender, mocker):
])


def test_run_once():
pass
def test_run_once(sender, mocker):
"""The plain (non-transactional) iteration: drain pending topic adds into
cluster metadata, send producer data, and poll the client with the
timeout _send_producer_data returned."""
mocker.patch.object(sender, '_client')
mocker.patch.object(sender, '_send_producer_data', return_value=42)
spy_add_topic = mocker.spy(sender._metadata, 'add_topic')
sender.add_topic('foo-topic')

sender.run_once()

spy_add_topic.assert_called_once_with('foo-topic')
assert not sender._topics_to_add
sender._send_producer_data.assert_called_once()
sender._client.poll.assert_called_once_with(timeout_ms=42)


def test_run_once_gates_on_transactional_request(sender, transaction_manager, mocker):
"""An idempotent producer without a producer_id enqueues InitProducerId
and waits on the transactional request instead of sending produce data."""
sender._transaction_manager = transaction_manager
mocker.patch.object(sender, '_client')
mocker.patch.object(sender, '_send_producer_data')
mocker.patch.object(sender, '_maybe_send_pending_request', return_value=True)
spy_init = mocker.spy(transaction_manager, 'init_producer_id')
assert not transaction_manager.has_producer_id()

sender.run_once()

spy_init.assert_called_once()
sender._maybe_send_pending_request.assert_called_once()
sender._send_producer_data.assert_not_called()
sender._client.poll.assert_called_once_with(
timeout_ms=sender.config['retry_backoff_ms'])


def test_run_once_fatal_error_aborts_batches(sender, transaction_manager, mocker):
"""A fatal transaction state aborts incomplete batches and gates produce."""
sender._transaction_manager = transaction_manager
transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0))
error = Errors.ProducerFencedError()
transaction_manager.transition_to_fatal_error(error)
mocker.patch.object(sender, '_client')
mocker.patch.object(sender, '_send_producer_data')
mocker.patch.object(sender, '_maybe_abort_batches')

sender.run_once()

sender._maybe_abort_batches.assert_called_once_with(error)
sender._send_producer_data.assert_not_called()
sender._client.poll.assert_called_once_with(
timeout_ms=sender.config['retry_backoff_ms'])


def test_run_once_abortable_error_aborts_undrained_batches(client, mocker):
"""An abortable error fails the undrained batches but does not gate
produce -- the loop keeps running so the abort/EndTxn path can finish."""
cluster = client.cluster
tm = TransactionManager(
transactional_id='txn-id',
transaction_timeout_ms=60000,
retry_backoff_ms=100,
api_version=(2, 1),
metadata=cluster)
tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0))
from kafka.producer.transaction_manager import TransactionState
tm._transition_to(TransactionState.INITIALIZING)
tm._transition_to(TransactionState.READY)
tm.begin_transaction()
error = Errors.TopicAuthorizationFailedError('foo')
tm.transition_to_abortable_error(error)

accumulator = RecordAccumulator(transaction_manager=tm)
sender = Sender(client, cluster, accumulator, transaction_manager=tm)
mocker.patch.object(sender, '_client')
mocker.patch.object(sender, '_send_producer_data', return_value=0)
spy_abort = mocker.spy(accumulator, 'abort_undrained_batches')

sender.run_once()

spy_abort.assert_called_once_with(error)
sender._send_producer_data.assert_called_once()
sender._client.poll.assert_called_once_with(timeout_ms=0)


def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker):
Expand Down
Loading