diff --git a/README.rst b/README.rst index b81a3fe43..3681d71ca 100644 --- a/README.rst +++ b/README.rst @@ -15,11 +15,13 @@ kafka-python kafka-python is a pure-python client library for Apache Kafka, the distributed stream processing engine. It has no external dependencies and no Cython/C/rust core, making installation across a wide variety of environments simple and easy -to manage. +to manage. It provides high-level class components for consumer, producer, and +admin clients, as well as CLI scripts for quick interactive tasks. -kafka-python can also be used as a simple alternative to the apache kafka admin -scripts, which require an installed/compatible jvm. A simple CLI interface for -admin commands is provided as ``kafka-python admin`` / ``python -m kafka.admin``. +``kafka-python admin`` serves as a simple alternative to the apache kafka bin/ +scripts, particularly if/when you do not have easy access to an +installed/compatible jvm. The CLI interface for admin commands is provided as +``kafka-python admin`` and ``python -m kafka.admin``. Users looking to add more raw throughput can ``pip install crc32c`` as an optional dependency, offloading one of the most CPU intensive subsystems @@ -85,30 +87,37 @@ that expose basic message attributes: topic, partition, offset, key, and value: .. code-block:: python # manually assign the partition list for the consumer - from kafka import TopicPartition + from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([TopicPartition('foobar', 2)]) msg = next(consumer) +Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a +``value_deserializer`` to automatically decode into something else. Helpers +are available for simple utf-8 string decoding (``DefaultSerializer``) and +json (``JsonSerializer``). + .. code-block:: python - # Deserialize msgpack-encoded values - consumer = KafkaConsumer(value_deserializer=msgpack.loads) - consumer.subscribe(['msgpackfoo']) + # Deserialize json-encoded values + from kafka import KafkaConsumer, JsonSerializer + consumer = KafkaConsumer(value_deserializer=JsonSerializer()) + consumer.subscribe(['json-foo']) for msg in consumer: assert isinstance(msg.value, dict) .. code-block:: python - # Access record headers. The returned value is a list of tuples - # with str, bytes for key and value + # Access record headers. The returned value is a list of + # (str, bytes) tuples, representing the header key and value. for msg in consumer: print (msg.headers) .. code-block:: python # Read only committed messages from transactional topic - consumer = KafkaConsumer(isolation_level='read_committed') + from kafka import KafkaConsumer, IsolationLevel + consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED) consumer.subscribe(['txn_topic']) for msg in consumer: print(msg) @@ -132,12 +141,14 @@ for more details. from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:1234') for _ in range(100): + # Fire-and-forget: send() is async and returns before delivery producer.send('foobar', b'some_message_bytes') .. code-block:: python - # Block until a single message is sent (or timeout) + # To check the status of an async message delivery, use .get() future = producer.send('foobar', b'another_message') + # future.get() will block until it can return the result or raise on error result = future.get(timeout=60) .. code-block:: python @@ -147,6 +158,9 @@ for more details. # only useful if you configure internal batching using linger_ms producer.flush() +Message keys are used to hash messages with the same key to the same +partition. Both keys and values should be raw bytes unless a serializer is configured. + .. code-block:: python # Use a key for hashed-partitioning @@ -155,16 +169,21 @@ for more details. .. code-block:: python # Serialize json messages - import json - producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + from kafka import KafkaProducer, JsonSerializer + producer = KafkaProducer(value_serializer=JsonSerializer()) producer.send('fizzbuzz', {'foo': 'bar'}) .. code-block:: python # Serialize string keys - producer = KafkaProducer(key_serializer=str.encode) + from kafka import KafkaProducer, DefaultSerializer + producer = KafkaProducer(key_serializer=DefaultSerializer()) producer.send('flipflap', key='ping', value=b'1234') +Compression can be used to reduce message size on the wire. Gzip is supported +via python stdlib. For other compression types you must install optional +dependencies. + .. code-block:: python # Compress messages @@ -172,6 +191,8 @@ for more details. for i in range(1000): producer.send('foobar', b'msg %d' % i) +KafkaProducer also supports transactions and message headers when needed. + .. code-block:: python # Use transactions diff --git a/docs/index.rst b/docs/index.rst index 050b293ed..0401a1055 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -15,11 +15,13 @@ kafka-python kafka-python is a pure-python client library for Apache Kafka, the distributed stream processing engine. It has no external dependencies and no Cython/C/rust core, making installation across a wide variety of environments simple and easy -to manage. +to manage. It provides high-level class components for consumer, producer, and +admin clients, as well as CLI scripts for quick interactive tasks. -kafka-python can also be used as a simple alternative to the apache kafka admin -scripts, which require an installed/compatible jvm. A simple CLI interface for -admin commands is provided as ``kafka-python admin`` / ``python -m kafka.admin``. +``kafka-python admin`` serves as a simple alternative to the apache kafka bin/ +scripts, particularly if/when you do not have easy access to an +installed/compatible jvm. The CLI interface for admin commands is provided as +``kafka-python admin`` and ``python -m kafka.admin``. Users looking to add more raw throughput can ``pip install crc32c`` as an optional dependency, offloading one of the most CPU intensive subsystems @@ -85,30 +87,37 @@ that expose basic message attributes: topic, partition, offset, key, and value: .. code-block:: python # manually assign the partition list for the consumer - from kafka import TopicPartition + from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([TopicPartition('foobar', 2)]) msg = next(consumer) +Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a +``value_deserializer`` to automatically decode into something else. Helpers +are available for simple utf-8 string decoding (``DefaultSerializer``) and +json (``JsonSerializer``). + .. code-block:: python - # Deserialize msgpack-encoded values - consumer = KafkaConsumer(value_deserializer=msgpack.loads) - consumer.subscribe(['msgpackfoo']) + # Deserialize json-encoded values + from kafka import KafkaConsumer, JsonSerializer + consumer = KafkaConsumer(value_deserializer=JsonSerializer()) + consumer.subscribe(['json-foo']) for msg in consumer: assert isinstance(msg.value, dict) .. code-block:: python - # Access record headers. The returned value is a list of tuples - # with str, bytes for key and value + # Access record headers. The returned value is a list of + # (str, bytes) tuples, representing the header key and value. for msg in consumer: print (msg.headers) .. code-block:: python # Read only committed messages from transactional topic - consumer = KafkaConsumer(isolation_level='read_committed') + from kafka import KafkaConsumer, IsolationLevel + consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED) consumer.subscribe(['txn_topic']) for msg in consumer: print(msg) @@ -131,12 +140,14 @@ client. See `KafkaProducer `_ for more details. from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:1234') for _ in range(100): + # Fire-and-forget: send() is async and returns before delivery producer.send('foobar', b'some_message_bytes') .. code-block:: python - # Block until a single message is sent (or timeout) + # To check the status of an async message delivery, use .get() future = producer.send('foobar', b'another_message') + # future.get() will block until it can return the result or raise on error result = future.get(timeout=60) .. code-block:: python @@ -146,6 +157,9 @@ client. See `KafkaProducer `_ for more details. # only useful if you configure internal batching using linger_ms producer.flush() +Message keys are used to hash messages with the same key to the same +partition. Both keys and values should be raw bytes unless a serializer is configured. + .. code-block:: python # Use a key for hashed-partitioning @@ -154,16 +168,21 @@ client. See `KafkaProducer `_ for more details. .. code-block:: python # Serialize json messages - import json - producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + from kafka import KafkaProducer, JsonSerializer + producer = KafkaProducer(value_serializer=JsonSerializer()) producer.send('fizzbuzz', {'foo': 'bar'}) .. code-block:: python # Serialize string keys - producer = KafkaProducer(key_serializer=str.encode) + from kafka import KafkaProducer, DefaultSerializer + producer = KafkaProducer(key_serializer=DefaultSerializer()) producer.send('flipflap', key='ping', value=b'1234') +Compression can be used to reduce message size on the wire. Gzip is supported +via python stdlib. For other compression types you must install optional +dependencies. + .. code-block:: python # Compress messages @@ -171,6 +190,8 @@ client. See `KafkaProducer `_ for more details. for i in range(1000): producer.send('foobar', b'msg %d' % i) +KafkaProducer also supports transactions and message headers when needed. + .. code-block:: python # Use transactions @@ -263,4 +284,5 @@ See https://docs.python.org/3/howto/logging.html for overview / howto. compatibility support license + Upgrading to 3.0 changelog diff --git a/docs/upgrade_to_3_0.rst b/docs/upgrade_to_3_0.rst new file mode 100644 index 000000000..4a40db220 --- /dev/null +++ b/docs/upgrade_to_3_0.rst @@ -0,0 +1,349 @@ +Upgrading from 2.3 to 3.0 +========================= + +kafka-python 3.0 is a major release with several breaking changes. This +guide walks through everything an application upgrading from 2.3 will +typically need to look at, in roughly the order of how likely it is to +matter. + +Most applications using the public ``KafkaProducer`` / ``KafkaConsumer`` / +``KafkaAdminClient`` APIs at default settings should still work without +code changes. The biggest change is that kafka-python no longer works on +Python 2. Python 3.8 is now the minimum supported python interpreter. +Other use cases that will need changes are: catching +``NoBrokersAvailableError`` (error removed), implementing a custom +``'kafka_client'`` (internals have changed substantially), using a +non-default ``Serializer`` / ``Deserializer``, producer ``Partitioner``, +or consumer ``AbstractPartitionAssignor`` (minor abstract interface changes), +or using the ``'sasl_oauth_token_provider'`` configuration (import rename). + +A full list of changes is in the :doc:`changelog`. This page covers only +the user-visible breaking changes and the most useful additions. + + +Python compatibility +-------------------- + +**Python 2 is no longer supported.** 3.0 requires Python 3.8 or newer. If +you still need Python 2.7, pin to ``kafka-python<3.0``; the 2.3.x line +remains compatible. + + +Configuration changes +--------------------- + +Producer defaults +^^^^^^^^^^^^^^^^^ + +``KafkaProducer`` now enables idempotence and full acks by default +(`KIP-679 `_): + +- ``enable_idempotence`` now defaults to ``True`` (was ``False``). +- ``acks`` now effectively defaults to ``'all'`` (``-1``) (was ``1``). + +These together mean the producer waits for full ISR acknowledgment and +deduplicates retries on the broker side, giving exactly-once-per-broker +delivery semantics out of the box. + +To restore 2.3 behavior, pass ``enable_idempotence=False`` and let +``acks`` default, or set ``acks=1`` explicitly:: + + KafkaProducer( + bootstrap_servers=..., + enable_idempotence=False, + acks=1, + ) + +If you explicitly pass ``acks=0`` or ``acks=1`` (or +``max_in_flight_requests_per_connection > 5``) without also passing +``enable_idempotence=False``, ``KafkaProducer`` raises +``KafkaConfigurationError`` rather than silently dropping idempotence. + +Consumer ``session_timeout_ms`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The default consumer ``session_timeout_ms`` is now **45000** (45s), +up from 10000 (10s), tracking the upstream change in +`KIP-735 `_. +This reduces spurious rebalances under transient broker / network +disruptions. Set ``session_timeout_ms=10000`` to restore the old +default. + +``api_version_auto_timeout_ms`` renamed to ``bootstrap_timeout_ms`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The config previously named ``api_version_auto_timeout_ms`` is now +``bootstrap_timeout_ms``, applies to the entire bootstrap process +(not just the API-version probe), and defaults to 30000 (30s). +The old name is no longer accepted. + +``sasl_oauth_token_provider`` abstract baseclass / ``kafka.sasl`` module moved +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``kafka.sasl`` module has been moved to ``kafka.net.sasl``. Users that have +implemented an ``AbstractTokenProvider`` (or implemented a custom SASL mechanism) +will need to modify imports:: + + from kafka.net.sasl.oauth import AbstractTokenProvider + +``buffer_memory`` removed +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``buffer_memory`` config has been removed from ``KafkaProducer``; +the deprecation warning emitted in 2.x is now a hard error. Use +``max_request_size`` (per-request bound) and let the accumulator manage +in-flight memory naturally. + + +Serializer / Deserializer interface +----------------------------------- + +The ``Serializer`` and ``Deserializer`` abstract interfaces now receive +``headers`` alongside the topic and the payload:: + + # old + class MySerializer(Serializer): + def serialize(self, topic, data): + ... + + # new + class MySerializer(Serializer): + def serialize(self, topic, headers, data): + ... + +Old single-argument callables (or two-argument ``(topic, data)`` +serializers) still work - ``KafkaProducer`` / ``KafkaConsumer`` wrap +them automatically - but emit a ``DeprecationWarning``. Update your +classes to accept the ``headers`` arg. + +Two helper classes are now shipped: ``DefaultSerializer`` (a UTF-8 +serializer/deserializer) and ``JsonSerializer``. Importable from +``kafka.serializer``. Both serialize/deserialize None<->None to +maintain expected key partitioning behavior. + + +Partitioner interface +--------------------- + +The partitioner contract changed shape. In 2.3 a partitioner was a +callable invoked as ``partitioner(serialized_key, all_partitions, available)``. +In 3.0 partitioners are instances of the ``kafka.partitioner.Partitioner`` +ABC with a ``partition(...)`` method that receives the topic, both the +raw and serialized key / value, and the live ``cluster`` snapshot:: + + # old (2.3) - a callable + class MyPartitioner: + def __call__(self, key, all_partitions, available): + ... + + # new (3.0) + from kafka.partitioner import Partitioner + + class MyPartitioner(Partitioner): + def partition(self, topic, key, serialized_key, value, serialized_value, cluster): + partitions = sorted(cluster.partitions_for_topic(topic)) + available = list(cluster.available_partitions_for_topic(topic)) + ... + +Legacy callables that match the old shape still work (with a +``DeprecationWarning``); subclasses of the new ABC must implement the +new signature. + +The default partitioner is still ``DefaultPartitioner`` (murmur2 hash +on the serialized key; null keys go to a random available partition) - +**unchanged** routing behavior for callers using the default. + +A **sticky partitioner** +(`KIP-480 `_) +is now shipped and is opt-in: for records with a null key, it sticks to +one partition per topic until the current batch fills, then rotates. +Enable it by passing an instance:: + + from kafka.partitioner import StickyPartitioner + + KafkaProducer( + bootstrap_servers=..., + partitioner=StickyPartitioner(), + ) + +Keyed-record routing is unchanged under either partitioner. + + +Admin client API +---------------- + +Response shapes +^^^^^^^^^^^^^^^ + +Admin client methods now return plain ``dict`` / ``list`` structures +derived from the protocol response, instead of namedtuples or custom +response classes. If your code accesses fields by attribute +(``response.topics[0].name``), switch to dict access +(``response['topics'][0]['name']``). + +``create_topics`` / ``create_partitions`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``create_topics`` now accepts any of the following: + +- a list of topic name strings (uses broker defaults for partitions / + replication on broker >= 2.4) +- a dict ``{name: {num_partitions, replication_factor, assignments, configs}}`` +- a list of ``NewTopic`` instances (deprecated) + +The dict form is the recommended shape going forward. ``NewTopic`` and +``NewPartitions`` are still exported and still work, but emit +deprecation warnings. + +Group APIs renamed +^^^^^^^^^^^^^^^^^^ + +The group management APIs were renamed and expanded; consult the +:doc:`apidoc/KafkaAdminClient` reference for the current method names. +The most common 2.x names (``describe_consumer_groups``, +``list_consumer_groups``, ``delete_consumer_groups``) continue to work, +but were renamed to ``describe_groups``, ``list_groups``, etc... +New: ``list_group_offsets``, ``list_group_members``, +``reset_group_offsets`` (with extended options), +``remove_group_members``. + + +Consumer API +------------ + +``partition_assignment_strategy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Assignor classes passed as ``partition_assignment_strategy`` are now +always instantiated before use. If you have a custom AbstractPartitionAssignor +class that uses @classmethod you will need to drop the decorators and update +to instance method definitions. + +Incremental cooperative rebalance +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +KIP-429 cooperative rebalancing is now supported. Existing +``ConsumerRebalanceListener`` callbacks (``on_partitions_revoked`` / +``on_partitions_assigned``) continue to work. New: +``on_partitions_lost`` hook, ``AsyncConsumerRebalanceListener`` base +class for listeners that need to ``await`` rather than block, and +``RebalanceListener`` is now called on ``consumer.close()``. + +Exceptions raised from a rebalance listener now propagate, instead of +being swallowed. Audit any custom listener code for accidental raises. + + +Error hierarchy +--------------- + +- ``KafkaError`` now subclasses ``Exception`` (was ``RuntimeError``). + Code that catches ``RuntimeError`` to handle kafka-python errors will + no longer match; catch ``KafkaError`` (or ``Exception``). +- ``IncompatibleBrokerVersion`` is now a subclass of + ``UnsupportedVersionError``. Code catching either will keep working; + code that distinguished them by type will not. +- ``NoBrokersAvailableError`` has been **removed**. Connection-failure + scenarios that previously raised it now raise ``KafkaTimeoutError`` + (during bootstrap) or specific connection / authentication errors at + send time. +- ``KafkaProtocolError`` is no longer marked retriable; if you used + ``err.retriable`` to drive retry loops, this changes behavior on + protocol-level errors. +- New base classes ``RetriableError`` and ``InvalidMetadataError`` are + available for catching whole categories of errors at once. + + +Broker version checks +--------------------- + +kafka-python now relies exclusively on ApiVersionsRequest to determine +broker verions. For early brokers older than 0.10 you must now pass +an explicit ``api_version`` in order to connect. + +In addition the configuration ``api_version_auto_timeout_ms=`` which +previously was used to manage timeouts during the broker version check +process has been removed. It has been replaced with the more general +``bootstrap_timeout_ms`` (see above). + + +Removed and relocated internals +------------------------------- + +If your code imported from kafka-python internals (not the public top- +level API), several modules have moved or been removed: + +- ``kafka.client_async`` and most of ``kafka.conn`` are **removed**. + Their responsibilities now live in ``kafka.net`` (selector / event + loop, connection pool, transport). A compatibility shim + ``kafka.net.compat.KafkaNetClient`` exposes the legacy + ``client.poll()`` / ``client.send()`` shape for callers that have + not migrated. +- Users that implemented a custom ``kafka_client`` will need to evaluate + independently. Due to the migration to ``kafka.net`` most internals + no longer directly rely on the ``kafka_client`` interface, instead + using the simpler ``kafka.net.manager`` connection manager and + ``kafka.net.selector`` IO event loop. +- ``HeartbeatThread`` is gone; consumer heartbeats now run as an + ``async def`` coroutine on a shared IO thread. +- The hand-written protocol classes in ``kafka.protocol.*`` have been + replaced by classes generated from the upstream Apache Kafka JSON + schemas. The legacy hand-written protocol types are still available + under ``kafka.protocol.old`` for the small number of places that + reach into protocol internals. + +Version probes for pre-0.10 brokers have been removed. ``kafka-python`` +3.0 always sends ``ApiVersionsRequest`` on connect and uses the result +to negotiate per-API versions. Brokers older than 0.10 no longer +auto-detect; pass ``api_version=(0, 9)`` (or your specific version) +explicitly if you need to talk to one. + + +New conveniences +---------------- + +A few additions that may be useful: + +- **Context managers.** ``KafkaProducer``, ``KafkaConsumer``, and + ``KafkaAdminClient`` all now support ``with`` syntax:: + + with KafkaProducer(bootstrap_servers=...) as producer: + producer.send('my-topic', b'hello') + +- **CLI.** A single ``kafka-python`` entry point wraps the admin / + consumer / producer subcommands (also runnable as + ``python -m kafka.admin`` etc.). See :doc:`usage` for examples. +- **New public exports.** ``OffsetSpec`` and ``IsolationLevel`` are + now importable directly from ``kafka``. +- **HTTP CONNECT proxy support.** Pass ``proxy_url='http://...'`` to + any client to tunnel through an HTTP CONNECT proxy + (`RFC 7231 S4.3.6 `_). +- **TCP keepalive** is now enabled by default on broker sockets. +- **TLS minimum version** is now TLS 1.2; the default ``SSLContext`` + uses ``PROTOCOL_TLS_CLIENT``. + + +Upgrade checklist +----------------- + +In order, the things most likely to bite an upgrading 2.3 application: + +1. Make sure you are using Python 3.8+. +2. If you pass ``buffer_memory=`` to ``KafkaProducer``, remove it. +3. If you pass ``api_version_auto_timeout_ms=``, rename to + ``bootstrap_timeout_ms``. +4. If your application can't tolerate idempotent / ``acks=all`` + semantics - typically because you explicitly want ``acks=0`` / + ``acks=1`` - pass ``enable_idempotence=False`` to ``KafkaProducer``. +5. If you catch ``NoBrokersAvailableError``, replace it with + ``KafkaTimeoutError``. +6. If you catch ``RuntimeError`` from kafka-python code, switch to + ``KafkaError`` (or ``Exception``). +7. If you implement a custom ``Serializer`` / ``Deserializer``, update + the signature to ``(self, topic, headers, data)``. +8. If you implement a custom ``Partitioner``, update the signature to + ``(self, topic, key, serialized_key, value, serialized_value, cluster)``. +9. If you consume admin client responses by attribute access, switch to + dict access. +10. If you import from ``kafka.client_async`` or ``kafka.conn``, + migrate to ``kafka.net`` (or use the ``KafkaNetClient`` compat + shim).