From 192273850fbd8dc0fe4d7724bf0800513903bae4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 11 Jun 2026 09:50:43 -0700 Subject: [PATCH 1/2] docs: updates to install/tests/usage for 3.0 release --- docs/install.rst | 72 ++++------ docs/tests.rst | 10 +- docs/usage.rst | 309 +++++++++++++++++++++++------------------ kafka/admin/_topics.py | 2 +- 4 files changed, 205 insertions(+), 188 deletions(-) diff --git a/docs/install.rst b/docs/install.rst index 074fb36bb..ede1e90a9 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -19,75 +19,51 @@ Bleeding-Edge .. code:: bash - git clone https://github.com/dpkp/kafka-python - pip install ./kafka-python + pip install git+https://github.com/dpkp/kafka-python.git -Optional crc32c install -*********************** -Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python` -uses a new message protocol version, that requires calculation of `crc32c`, -which differs from the `zlib.crc32` hash implementation. By default `kafka-python` -calculates it in pure python, which is quite slow. To speed it up we optionally -support https://pypi.python.org/pypi/crc32c package if it's installed. +Optional Installs +***************** + +crc32c +====== +Highly recommended for performance optimization. By default `kafka-python` +calculates record checksums in pure python, but the calculation is somewhat +CPU intensive. As throughput increases this can become a bottleneck. Installing +the optional ``crc32c`` dependency reduces the CPU cost of each check using +an optimized C library. See https://pypi.python.org/pypi/crc32c . .. code:: bash pip install 'kafka-python[crc32c]' -Optional ZSTD install -********************* +zstd +==== -To enable ZSTD compression/decompression, install python-zstandard: +To enable ZSTD compression/decompression, install `python-zstandard`: >>> pip install 'kafka-python[zstd]' -Optional LZ4 install -******************** +lz4 +=== -To enable LZ4 compression/decompression, install python-lz4: +To enable LZ4 compression/decompression, install `python-lz4`: >>> pip install 'kafka-python[lz4]' -Optional Snappy install -*********************** - -Install Development Libraries -============================= - -Download and build Snappy from https://google.github.io/snappy/ - -Ubuntu: - -.. code:: bash - - apt-get install libsnappy-dev - -OSX: +snappy +====== -.. code:: bash - - brew install snappy - -From Source: +To enable Snappy compression/decompression, install `python-snappy`: .. code:: bash - wget https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz - tar xzvf snappy-1.1.3.tar.gz - cd snappy-1.1.3 - ./configure - make - sudo make install - -Install Python Module -===================== - -Install the `python-snappy` module + pip install 'kafka-python[snappy]' -.. code:: bash - pip install 'kafka-python[snappy]' +Note that python-snappy generally does not publish pre-compiled wheels, +so installation may require building the snappy library from source. +See https://google.github.io/snappy/ . diff --git a/docs/tests.rst b/docs/tests.rst index e03ff0970..7776de79b 100644 --- a/docs/tests.rst +++ b/docs/tests.rst @@ -4,19 +4,17 @@ Tests .. image:: https://img.shields.io/github/actions/workflow/status/dpkp/kafka-python/python-package.yml :target: https://github.com/dpkp/kafka-python/actions/workflows/python-package.yml -The test suite is run via pytest. - -Linting is run via pylint. - -Test coverage details are currently published as an html build artifact. +Testing uses pytest and pylint. The test suite includes unit tests that mock network interfaces, mock broker tests that simulate request/receive network messaging, as well as integration tests that setup and teardown kafka broker (and zookeeper where required) fixtures. +Test coverage details are currently published as an html build artifact. + Unit tests ------------------- +---------- To run the tests locally, install test dependencies: diff --git a/docs/usage.rst b/docs/usage.rst index f8d9f4ac7..bef44cac8 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -2,101 +2,12 @@ Usage ***** -CLI -=== - -The kafka module provides a simple command-line interface for consumer, producer, -and admin apis. - -python -m kafka.consumer ------------------------- - -.. code:: bash - - > python -m kafka.consumer --help - usage: python -m kafka.consumer [-h] -b BOOTSTRAP_SERVERS -t TOPICS -g GROUP [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] [--encoding ENCODING] - - Kafka console consumer - - options: - -h, --help show this help message and exit - -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap servers - -t TOPICS, --topic TOPICS - subscribe to topic - -g GROUP, --group GROUP - consumer group - -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG - additional configuration properties for kafka consumer - -l LOG_LEVEL, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - -f FORMAT, --format FORMAT - output format: str|raw|full - --encoding ENCODING encoding to use for str output decode() - - -python -m kafka.producer ------------------------- - -.. code:: bash - - > python -m kafka.producer --help - usage: python -m kafka.producer [-h] -b BOOTSTRAP_SERVERS -t TOPIC [-c EXTRA_CONFIG] [-l LOG_LEVEL] [--encoding ENCODING] - - Kafka console producer - - options: - -h, --help show this help message and exit - -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap servers - -t TOPIC, --topic TOPIC - publish to topic - -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG - additional configuration properties for kafka producer - -l LOG_LEVEL, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - --encoding ENCODING byte encoding for produced messages - - -python -m kafka.admin ---------------------- - -.. code:: bash - - > python -m kafka.admin --help - usage: python -m kafka.admin [-h] -b BOOTSTRAP_SERVERS [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] {cluster,configs,log-dirs,topics,consumer-groups} ... - - Kafka admin client - - positional arguments: - {cluster,configs,log-dirs,topics,consumer-groups} - subcommands - cluster Manage Kafka Cluster - configs Manage Kafka Configuration - log-dirs Manage Kafka Topic/Partition Log Directories - topics List/Describe/Create/Delete Kafka Topics - consumer-groups Manage Kafka Consumer Groups - - options: - -h, --help show this help message and exit - -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap servers - -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG - additional configuration properties for admin client - -l LOG_LEVEL, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - -f FORMAT, --format FORMAT - output format: raw|json - - KafkaConsumer ============== .. code:: python - from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition - import json - import msgpack + from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition, JsonSerializer, DefaultSerializer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', @@ -130,10 +41,13 @@ KafkaConsumer KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) # consume json messages - KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) + KafkaConsumer(value_deserializer=JsonSerializer()) - # consume msgpack - KafkaConsumer(value_deserializer=msgpack.unpackb) + # consume utf-8 + KafkaConsumer(value_deserializer=DefaultSerializer()) + + # consume utf-16 + KafkaConsumer(value_deserializer=DefaultSerializer('utf-16')) # StopIteration if no message after 1sec KafkaConsumer(consumer_timeout_ms=1000) @@ -142,8 +56,8 @@ KafkaConsumer consumer = KafkaConsumer() consumer.subscribe(pattern='^awesome.*') - # Use multiple consumers in parallel w/ 0.9 kafka brokers - # typically you would run each on a different server / process / CPU + # Use multiple consumers in parallel + # (run each on a different server / process / CPU) consumer1 = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers='my.server.com') @@ -161,10 +75,8 @@ KafkaProducer .. code:: python - from kafka import KafkaProducer + from kafka import KafkaProducer, JsonSerializer, DefaultSerializer from kafka.errors import KafkaError - import msgpack - import json producer = KafkaProducer(bootstrap_servers=['broker1:1234']) @@ -185,14 +97,18 @@ KafkaProducer print (record_metadata.offset) # produce keyed messages to enable hashed partitioning - producer.send('my-topic', key=b'foo', value=b'bar') + producer.send('keyed-topic', key=b'foo', value=b'bar') + + # encode str with utf-8 encoding + producer = KafkaProducer(value_serializer=DefaultSerializer()) + producer.send('utf-8-topic', 'value_str') - # encode objects via msgpack - producer = KafkaProducer(value_serializer=msgpack.dumps) - producer.send('msgpack-topic', {'key': 'value'}) + # encode str with utf-16 encoding + producer = KafkaProducer(value_serializer=DefaultSerializer('utf-16')) + producer.send('utf-16-topic', '懂不懂') # produce json messages - producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii')) + producer = KafkaProducer(value_serializer=JsonSerializer()) producer.send('json-topic', {'key': 'value'}) # produce asynchronously @@ -218,53 +134,180 @@ KafkaProducer producer = KafkaProducer(retries=5) -ClusterMetadata -=============== +KafkaAdminClient +================ .. code:: python - from kafka.cluster import ClusterMetadata + from kafka import KafkaAdminClient - clusterMetadata = ClusterMetadata(bootstrap_servers=['broker1:1234']) + admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) - # get all brokers metadata - print(clusterMetadata.brokers()) + # create topics with defaults (requires kafka 2.4+) + admin.create_topics(['testtopic1'], timeout_ms=None, validate_only=False) + # create a new topic with details + new_topics = { + 'num_partitions': 1, + 'replication_factor': 1, + 'assignments': {0: [1]}, # assign partition 0 to broker id 1 + 'configs': {'max_message_bytes': '1000000'}, # set non-default configs + } + admin.create_topics(new_topics, timeout_ms=None, validate_only=False) - # get specific broker metadata - print(clusterMetadata.broker_metadata('bootstrap-0')) + # delete a topic + admin.delete_topics(['testtopic1']) - # get all partitions of a topic - print(clusterMetadata.partitions_for_topic("topic")) + # list consumer groups + print(admin.list_groups()) - # list topics - print(clusterMetadata.topics()) + # get consumer group details + print(admin.describe_groups(['cft-plt-qa.connect'])) + # get consumer group offset + print(admin.list_group_offsets(['cft-plt-qa.connect'])) -KafkaAdminClient -================ -.. code:: python +CLI +=== - from kafka import KafkaAdminClient - from kafka.admin import NewTopic +The kafka module provides a simple command-line interface for consumer, producer, +and admin apis. - admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) +kafka-python consumer +--------------------- - # create a new topic - topics_list = [] - topics_list.append(NewTopic(name="testtopic", num_partitions=1, replication_factor=1)) - admin.create_topics(topics_list,timeout_ms=None, validate_only=False) +.. code:: bash - # delete a topic - admin.delete_topics(['testtopic']) + > kafka-python consumer --help - # list consumer groups - print(admin.list_consumer_groups()) + usage: kafka-python consumer [-h] -b BOOTSTRAP_SERVERS [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] -t TOPICS [-g GROUP] [-i GROUP_INSTANCE_ID] [-f FORMAT] + [--encoding ENCODING] - # get consumer group details - print(admin.describe_consumer_groups('cft-plt-qa.connect')) + Kafka console consumer - # get consumer group offset - print(admin.list_consumer_group_offsets('cft-plt-qa.connect')) + options: + -h, --help show this help message and exit + + connection: + -b, --bootstrap-servers BOOTSTRAP_SERVERS + host:port for cluster bootstrap server. Can be provided multiple times. + -S, --security-protocol SECURITY_PROTOCOL + PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + -M, --sasl-mechanism SASL_MECHANISM + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 + -U, --sasl-user SASL_USER + -P, --sasl-password SASL_PASSWORD + + logging: + -l, --log-level LOG_LEVEL + logging level, passed to logging.basicConfig + -L, --enable-logger ENABLE_LOGGER + enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled + -D, --disable-logger DISABLE_LOGGER + disable a specific logger. Can be provided multiple times. + + extended: + -C, --extra-config EXTRA_CONFIG + additional configuration properties for client in "key=val" format. Can be provided multiple times. + + consumer options: + -t, --topic TOPICS subscribe to topic + -g, --group GROUP consumer group + -i, --group-instance-id GROUP_INSTANCE_ID + static group membership identifier + -f, --format FORMAT output format: str|raw|full + --encoding ENCODING encoding to use for str output decode() + + +kafka-python producer +--------------------- + +.. code:: bash + + > kafka-python producer --help + + usage: kafka-python producer [-h] -b BOOTSTRAP_SERVERS [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] -t TOPIC [--encoding ENCODING] + + Kafka console producer + + options: + -h, --help show this help message and exit + + connection: + -b, --bootstrap-servers BOOTSTRAP_SERVERS + host:port for cluster bootstrap server. Can be provided multiple times. + -S, --security-protocol SECURITY_PROTOCOL + PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + -M, --sasl-mechanism SASL_MECHANISM + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 + -U, --sasl-user SASL_USER + -P, --sasl-password SASL_PASSWORD + + logging: + -l, --log-level LOG_LEVEL + logging level, passed to logging.basicConfig + -L, --enable-logger ENABLE_LOGGER + enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled + -D, --disable-logger DISABLE_LOGGER + disable a specific logger. Can be provided multiple times. + + extended: + -C, --extra-config EXTRA_CONFIG + additional configuration properties for client in "key=val" format. Can be provided multiple times. + + producer options: + -t, --topic TOPIC publish to topic + --encoding ENCODING byte encoding for produced messages + + +kafka-python admin +------------------ + +.. code:: bash + + > kafka-python admin --help + + usage: kafka-python admin [-h] [-b BOOTSTRAP_SERVERS] [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] [--format FORMAT] GROUP ... + + Kafka Admin Client + + options: + -h, --help show this help message and exit + + connection: + -b, --bootstrap-servers BOOTSTRAP_SERVERS + host:port for cluster bootstrap server. Can be provided multiple times. + -S, --security-protocol SECURITY_PROTOCOL + PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + -M, --sasl-mechanism SASL_MECHANISM + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 + -U, --sasl-user SASL_USER + -P, --sasl-password SASL_PASSWORD + + logging: + -l, --log-level LOG_LEVEL + logging level, passed to logging.basicConfig + -L, --enable-logger ENABLE_LOGGER + enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled + -D, --disable-logger DISABLE_LOGGER + disable a specific logger. Can be provided multiple times. + + extended: + -C, --extra-config EXTRA_CONFIG + additional configuration properties for client in "key=val" format. Can be provided multiple times. + + output: + --format FORMAT output format: raw|json + + Available command groups: + GROUP + acls Manage Kafka ACLs + cluster Manage Kafka Cluster + configs Manage Kafka Configuration + topics Manage Kafka Topics + partitions Manage Kafka Partitions + groups Manage Kafka Groups + transactions Inspect and recover from hanging transactions (KIP-664) + users Manage Kafka Users diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index 18bb165b1..b983393e9 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -99,7 +99,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ {topic_name: {num_partitions: int (default -1), replication_factor: int (default -1), - assignments: {partition: [broker_ids]}, + assignments: {partition_id: [broker_ids]}, configs: {key: value}}} List of NewTopic objects is deprecated. From a15f85655392b77d30c5e36ead9ffb9e69915876 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 11 Jun 2026 09:54:14 -0700 Subject: [PATCH 2/2] update .unicode_files --- .unicode_files | 1 + 1 file changed, 1 insertion(+) diff --git a/.unicode_files b/.unicode_files index 1b3cf3167..efef05618 100644 --- a/.unicode_files +++ b/.unicode_files @@ -1,6 +1,7 @@ # Files that have known unicode chars; all others should be ascii-only .github/workflows/codeql-analysis.yml AUTHORS.md +docs/usage.rst test/protocol/old/test_compact.py test/protocol/old/test_types.py test/protocol/schemas/test_codec_types.py