Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ kafka-python
kafka-python is a pure-python client library for Apache Kafka, the distributed
stream processing engine. It has no external dependencies and no Cython/C/rust
core, making installation across a wide variety of environments simple and easy
to manage.
to manage. It provides high-level class components for consumer, producer, and
admin clients, as well as CLI scripts for quick interactive tasks.

kafka-python can also be used as a simple alternative to the apache kafka admin
scripts, which require an installed/compatible jvm. A simple CLI interface for
admin commands is provided as ``kafka-python admin`` / ``python -m kafka.admin``.
``kafka-python admin`` serves as a simple alternative to the apache kafka bin/
scripts, particularly if/when you do not have easy access to an
installed/compatible jvm. The CLI interface for admin commands is provided as
``kafka-python admin`` and ``python -m kafka.admin``.

Users looking to add more raw throughput can ``pip install crc32c`` as
an optional dependency, offloading one of the most CPU intensive subsystems
Expand Down Expand Up @@ -85,30 +87,37 @@ that expose basic message attributes: topic, partition, offset, key, and value:
.. code-block:: python

# manually assign the partition list for the consumer
from kafka import TopicPartition
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a
``value_deserializer`` to automatically decode into something else. Helpers
are available for simple utf-8 string decoding (``DefaultSerializer``) and
json (``JsonSerializer``).

.. code-block:: python

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
# Deserialize json-encoded values
from kafka import KafkaConsumer, JsonSerializer
consumer = KafkaConsumer(value_deserializer=JsonSerializer())
consumer.subscribe(['json-foo'])
for msg in consumer:
assert isinstance(msg.value, dict)

.. code-block:: python

# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
# Access record headers. The returned value is a list of
# (str, bytes) tuples, representing the header key and value.
for msg in consumer:
print (msg.headers)

.. code-block:: python

# Read only committed messages from transactional topic
consumer = KafkaConsumer(isolation_level='read_committed')
from kafka import KafkaConsumer, IsolationLevel
consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED)
consumer.subscribe(['txn_topic'])
for msg in consumer:
print(msg)
Expand All @@ -132,12 +141,14 @@ for more details.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
# Fire-and-forget: send() is async and returns before delivery
producer.send('foobar', b'some_message_bytes')

.. code-block:: python

# Block until a single message is sent (or timeout)
# To check the status of an async message delivery, use .get()
future = producer.send('foobar', b'another_message')
# future.get() will block until it can return the result or raise on error
result = future.get(timeout=60)

.. code-block:: python
Expand All @@ -147,6 +158,9 @@ for more details.
# only useful if you configure internal batching using linger_ms
producer.flush()

Message keys are used to hash messages with the same key to the same
partition. Both keys and values should be raw bytes unless a serializer is configured.

.. code-block:: python

# Use a key for hashed-partitioning
Expand All @@ -155,23 +169,30 @@ for more details.
.. code-block:: python

# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
from kafka import KafkaProducer, JsonSerializer
producer = KafkaProducer(value_serializer=JsonSerializer())
producer.send('fizzbuzz', {'foo': 'bar'})

.. code-block:: python

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
from kafka import KafkaProducer, DefaultSerializer
producer = KafkaProducer(key_serializer=DefaultSerializer())
producer.send('flipflap', key='ping', value=b'1234')

Compression can be used to reduce message size on the wire. Gzip is supported
via python stdlib. For other compression types you must install optional
dependencies.

.. code-block:: python

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)

KafkaProducer also supports transactions and message headers when needed.

.. code-block:: python

# Use transactions
Expand Down
52 changes: 37 additions & 15 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ kafka-python
kafka-python is a pure-python client library for Apache Kafka, the distributed
stream processing engine. It has no external dependencies and no Cython/C/rust
core, making installation across a wide variety of environments simple and easy
to manage.
to manage. It provides high-level class components for consumer, producer, and
admin clients, as well as CLI scripts for quick interactive tasks.

kafka-python can also be used as a simple alternative to the apache kafka admin
scripts, which require an installed/compatible jvm. A simple CLI interface for
admin commands is provided as ``kafka-python admin`` / ``python -m kafka.admin``.
``kafka-python admin`` serves as a simple alternative to the apache kafka bin/
scripts, particularly if/when you do not have easy access to an
installed/compatible jvm. The CLI interface for admin commands is provided as
``kafka-python admin`` and ``python -m kafka.admin``.

Users looking to add more raw throughput can ``pip install crc32c`` as
an optional dependency, offloading one of the most CPU intensive subsystems
Expand Down Expand Up @@ -85,30 +87,37 @@ that expose basic message attributes: topic, partition, offset, key, and value:
.. code-block:: python

# manually assign the partition list for the consumer
from kafka import TopicPartition
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a
``value_deserializer`` to automatically decode into something else. Helpers
are available for simple utf-8 string decoding (``DefaultSerializer``) and
json (``JsonSerializer``).

.. code-block:: python

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
# Deserialize json-encoded values
from kafka import KafkaConsumer, JsonSerializer
consumer = KafkaConsumer(value_deserializer=JsonSerializer())
consumer.subscribe(['json-foo'])
for msg in consumer:
assert isinstance(msg.value, dict)

.. code-block:: python

# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
# Access record headers. The returned value is a list of
# (str, bytes) tuples, representing the header key and value.
for msg in consumer:
print (msg.headers)

.. code-block:: python

# Read only committed messages from transactional topic
consumer = KafkaConsumer(isolation_level='read_committed')
from kafka import KafkaConsumer, IsolationLevel
consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED)
consumer.subscribe(['txn_topic'])
for msg in consumer:
print(msg)
Expand All @@ -131,12 +140,14 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
# Fire-and-forget: send() is async and returns before delivery
producer.send('foobar', b'some_message_bytes')

.. code-block:: python

# Block until a single message is sent (or timeout)
# To check the status of an async message delivery, use .get()
future = producer.send('foobar', b'another_message')
# future.get() will block until it can return the result or raise on error
result = future.get(timeout=60)

.. code-block:: python
Expand All @@ -146,6 +157,9 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
# only useful if you configure internal batching using linger_ms
producer.flush()

Message keys are used to hash messages with the same key to the same
partition. Both keys and values should be raw bytes unless a serializer is configured.

.. code-block:: python

# Use a key for hashed-partitioning
Expand All @@ -154,23 +168,30 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
.. code-block:: python

# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
from kafka import KafkaProducer, JsonSerializer
producer = KafkaProducer(value_serializer=JsonSerializer())
producer.send('fizzbuzz', {'foo': 'bar'})

.. code-block:: python

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
from kafka import KafkaProducer, DefaultSerializer
producer = KafkaProducer(key_serializer=DefaultSerializer())
producer.send('flipflap', key='ping', value=b'1234')

Compression can be used to reduce message size on the wire. Gzip is supported
via python stdlib. For other compression types you must install optional
dependencies.

.. code-block:: python

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)

KafkaProducer also supports transactions and message headers when needed.

.. code-block:: python

# Use transactions
Expand Down Expand Up @@ -263,4 +284,5 @@ See https://docs.python.org/3/howto/logging.html for overview / howto.
compatibility
support
license
Upgrading to 3.0 <upgrade_to_3_0>
changelog
Loading
Loading