diff --git a/.unicode_files b/.unicode_files index efef05618..98fdd9d53 100644 --- a/.unicode_files +++ b/.unicode_files @@ -1,7 +1,7 @@ # Files that have known unicode chars; all others should be ascii-only .github/workflows/codeql-analysis.yml AUTHORS.md -docs/usage.rst +docs/high_level.rst test/protocol/old/test_compact.py test/protocol/old/test_types.py test/protocol/schemas/test_codec_types.py diff --git a/docs/cli/admin.rst b/docs/cli/admin.rst new file mode 100644 index 000000000..ab59326d6 --- /dev/null +++ b/docs/cli/admin.rst @@ -0,0 +1,113 @@ +kafka-python admin +****************** + +``kafka-python admin`` exposes :class:`~kafka.KafkaAdminClient` +operations as a command-line tool. Commands are grouped by the kind of +resource they act on (topics, partitions, configs, ...). Each group +contains one or more subcommands. + +Output is printed to stdout. ``--format raw`` (the default) uses +``pprint``; ``--format json`` emits a single JSON document, useful for +piping to ``jq`` or other tooling. + +.. code-block:: bash + + kafka-python admin -b localhost:9092 cluster describe + kafka-python admin -b localhost:9092 --format json topics list + python -m kafka.admin -b localhost:9092 topics create -t foo --num-partitions 3 + + +Global options +============== + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :nosubcommands: + + +acls +==== + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: acls + + +cluster +======= + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: cluster + + +configs +======= + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: configs + + +topics +====== + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: topics + + +partitions +========== + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: partitions + + +groups +====== + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: groups + + +transactions +============ + +KIP-664 administrative tools for inspecting and recovering from hanging +transactions. ``list``, ``describe``, and ``describe-producers`` require +broker >= 3.0 (``describe-producers`` works against broker >= 2.8). + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: transactions + + +users +===== + +SCRAM credential management. See KIP-554. + +.. argparse:: + :module: kafka.cli.admin + :func: main_parser + :prog: kafka-python admin + :path: users diff --git a/docs/cli/consumer.rst b/docs/cli/consumer.rst new file mode 100644 index 000000000..e745c294a --- /dev/null +++ b/docs/cli/consumer.rst @@ -0,0 +1,33 @@ +kafka-python consumer +********************* + +A line-oriented console consumer. Subscribes to one or more topics and +prints each received record to standard output. + +Examples +======== + +.. code-block:: bash + + # Read every record on a topic until interrupted + kafka-python consumer -b localhost:9092 -t my-topic + + # Join a consumer group and commit offsets automatically + kafka-python consumer -b localhost:9092 -g my-group -t my-topic + + # Read from the beginning, then exit after 1s of idle + kafka-python consumer -b localhost:9092 -t my-topic \ + -C auto_offset_reset=earliest \ + -C consumer_timeout_ms=1000 + + # Print full ConsumerRecord (topic, partition, offset, key, value, ...) + kafka-python consumer -b localhost:9092 -t my-topic -f full + + +Reference +========= + +.. argparse:: + :module: kafka.cli.consumer + :func: main_parser + :prog: kafka-python consumer diff --git a/docs/cli/index.rst b/docs/cli/index.rst new file mode 100644 index 000000000..be3da1bd7 --- /dev/null +++ b/docs/cli/index.rst @@ -0,0 +1,83 @@ +Command-Line Interface +********************** + +kafka-python ships simple command-line interfaces for consumer, producer, +and admin clients. They can be invoked either as the ``kafka-python`` +console script or as module entry points: + +.. code-block:: bash + + kafka-python consumer -b localhost:9092 -t my-topic + kafka-python producer -b localhost:9092 -t my-topic + kafka-python admin -b localhost:9092 cluster describe + + # equivalent module invocations + python -m kafka.consumer -b localhost:9092 -t my-topic + python -m kafka.producer -b localhost:9092 -t my-topic + python -m kafka.admin -b localhost:9092 cluster describe + +The ``kafka-python admin`` command, in particular, is a convenient +alternative to the apache kafka ``bin/`` scripts when a compatible JVM is +not available. + +.. toctree:: + :maxdepth: 2 + + consumer + producer + admin + + +Common Options +============== + +All three commands share a common set of connection, logging, and +configuration options. They are documented in full on the individual +command pages; the summary below highlights the most commonly used +flags. + +Connection +---------- + +``-b/--bootstrap-servers HOST:PORT`` + One or more bootstrap servers used to discover the rest of the + cluster. May be supplied multiple times. + +``-S/--security-protocol`` + One of ``PLAINTEXT``, ``SSL``, ``SASL_PLAINTEXT``, ``SASL_SSL``. + Defaults to ``PLAINTEXT``. + +``-M/--sasl-mechanism`` + One of ``PLAIN``, ``GSSAPI``, ``OAUTHBEARER``, ``SCRAM-SHA-256``, + ``SCRAM-SHA-512``. Defaults to ``PLAIN``. + +``-U/--sasl-user`` / ``-P/--sasl-password`` + Credentials for SASL ``PLAIN`` and ``SCRAM-*`` mechanisms. + +Logging +------- + +``-l/--log-level`` + Python ``logging`` level (``DEBUG``, ``INFO``, ...). Defaults to + ``CRITICAL`` so the CLI is quiet by default. + +``-L/--enable-logger`` / ``-D/--disable-logger`` + Selectively turn on or off a single logger by name. Both flags may + be supplied multiple times. + +Extended Configuration +---------------------- + +``-C/--extra-config key=value`` + Pass arbitrary keyword arguments through to the underlying client + constructor (:class:`~kafka.KafkaConsumer`, + :class:`~kafka.KafkaProducer`, or + :class:`~kafka.KafkaAdminClient`). Values that parse as ``int``, + ``True``, ``False``, or ``None`` are converted; everything else is + passed through as a string. May be supplied multiple times. + + .. code-block:: bash + + kafka-python consumer -b localhost:9092 -t foo \ + -C auto_offset_reset=earliest \ + -C consumer_timeout_ms=1000 diff --git a/docs/cli/producer.rst b/docs/cli/producer.rst new file mode 100644 index 000000000..a6739b21c --- /dev/null +++ b/docs/cli/producer.rst @@ -0,0 +1,25 @@ +kafka-python producer +********************* + +A line-oriented console producer. Reads lines from standard input and +publishes each one as a record to the configured topic. + +Examples +======== + +.. code-block:: bash + + # Publish a single message + echo "hello kafka" | kafka-python producer -b localhost:9092 -t my-topic + + # Stream lines from a file + kafka-python producer -b localhost:9092 -t my-topic < messages.txt + + +Reference +========= + +.. argparse:: + :module: kafka.cli.producer + :func: main_parser + :prog: kafka-python producer diff --git a/docs/conf.py b/docs/conf.py index 668ed4a8b..ae24f6e10 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,6 +20,10 @@ # documentation root, use os.path.abspath to make it absolute, like shown here. sys.path.insert(0, os.path.abspath('../')) +# Suppress ANSI color escapes from argparse 3.14+ help/usage output captured +# by sphinx-argparse. +os.environ['NO_COLOR'] = '1' + # -- General configuration ------------------------------------------------ # If your documentation needs a minimal Sphinx version, state it here. @@ -34,6 +38,7 @@ 'sphinx.ext.viewcode', 'sphinx.ext.napoleon', 'sphinx_rtd_theme', + 'sphinxarg.ext', ] # Add any paths that contain templates here, relative to this directory. diff --git a/docs/high_level.rst b/docs/high_level.rst new file mode 100644 index 000000000..73f5e00f9 --- /dev/null +++ b/docs/high_level.rst @@ -0,0 +1,167 @@ +High-Level Clients +****************** + + +KafkaConsumer +============== + +.. code:: python + + from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition, JsonSerializer, DefaultSerializer + + # To consume latest messages and auto-commit offsets + consumer = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers=['localhost:9092']) + for message in consumer: + # message value and key are raw bytes -- decode if necessary! + # e.g., for unicode: `message.value.decode('utf-8')` + print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, + message.offset, message.key, + message.value)) + + # Manually commit offsets (disable auto-commit) + consumer = KafkaConsumer('my-topic', + group_id='my-group', + enable_auto_commit=False, + bootstrap_servers=['localhost:9092']) + for message in consumer: + # process message + process_message(message) + # TopicPartition for this record + tp = TopicPartition(message.topic, message.partition) + # Note: When committing offsets manually, commit the next offset the consumer + # should read. For example, after successfully processing a message at + # offset 42, commit offset 43. + consumer.commit({ + tp: OffsetAndMetadata(message.offset + 1, '', -1) + }) + + # consume earliest available messages, don't commit offsets + KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) + + # consume json messages + KafkaConsumer(value_deserializer=JsonSerializer()) + + # consume utf-8 + KafkaConsumer(value_deserializer=DefaultSerializer()) + + # consume utf-16 + KafkaConsumer(value_deserializer=DefaultSerializer('utf-16')) + + # StopIteration if no message after 1sec + KafkaConsumer(consumer_timeout_ms=1000) + + # Subscribe to a regex topic pattern + consumer = KafkaConsumer() + consumer.subscribe(pattern='^awesome.*') + + # Use multiple consumers in parallel + # (run each on a different server / process / CPU) + consumer1 = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers='my.server.com') + consumer2 = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers='my.server.com') + + +There are many configuration options for the consumer class. See +:class:`~kafka.KafkaConsumer` API documentation for more details. + + +KafkaProducer +============== + +.. code:: python + + from kafka import KafkaProducer, JsonSerializer, DefaultSerializer + from kafka.errors import KafkaError + + producer = KafkaProducer(bootstrap_servers=['broker1:1234']) + + # Asynchronous by default + future = producer.send('my-topic', b'raw_bytes') + + # Block for 'synchronous' sends + try: + record_metadata = future.get(timeout=10) + except KafkaError: + # Decide what to do if produce request failed... + log.exception() + pass + + # Successful result returns assigned partition and offset + print (record_metadata.topic) + print (record_metadata.partition) + print (record_metadata.offset) + + # produce keyed messages to enable hashed partitioning + producer.send('keyed-topic', key=b'foo', value=b'bar') + + # encode str with utf-8 encoding + producer = KafkaProducer(value_serializer=DefaultSerializer()) + producer.send('utf-8-topic', 'value_str') + + # encode str with utf-16 encoding + producer = KafkaProducer(value_serializer=DefaultSerializer('utf-16')) + producer.send('utf-16-topic', '懂不懂') + + # produce json messages + producer = KafkaProducer(value_serializer=JsonSerializer()) + producer.send('json-topic', {'key': 'value'}) + + # produce asynchronously + for _ in range(100): + producer.send('my-topic', b'msg') + + def on_send_success(record_metadata): + print(record_metadata.topic) + print(record_metadata.partition) + print(record_metadata.offset) + + def on_send_error(excp): + log.error('I am an errback', exc_info=excp) + # handle exception + + # produce asynchronously with callbacks + producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error) + + # block until all async messages are sent + producer.flush() + + # configure multiple retries + producer = KafkaProducer(retries=5) + + +KafkaAdminClient +================ + +.. code:: python + + from kafka import KafkaAdminClient + + admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) + + # create topics with defaults (requires kafka 2.4+) + admin.create_topics(['testtopic1'], timeout_ms=None, validate_only=False) + # create a new topic with details + new_topics = { + 'num_partitions': 1, + 'replication_factor': 1, + 'assignments': {0: [1]}, # assign partition 0 to broker id 1 + 'configs': {'max_message_bytes': '1000000'}, # set non-default configs + } + admin.create_topics(new_topics, timeout_ms=None, validate_only=False) + + # delete a topic + admin.delete_topics(['testtopic1']) + + # list consumer groups + print(admin.list_groups()) + + # get consumer group details + print(admin.describe_groups(['cft-plt-qa.connect'])) + + # get consumer group offset + print(admin.list_group_offsets(['cft-plt-qa.connect'])) diff --git a/docs/index.rst b/docs/index.rst index 94bee4973..dce909411 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -224,7 +224,7 @@ Module CLI Interface kafka-python also provides simple command-line interfaces for consumer, producer, and admin clients. Access via ``python -m kafka.consumer``, ``python -m kafka.producer``, and ``python -m kafka.admin``. -See `Usage `_ for more details. +See `CLI `_ for the full reference. Compression @@ -277,7 +277,8 @@ See https://docs.python.org/3/howto/logging.html for overview / howto. :hidden: :maxdepth: 2 - Usage Overview + High-level Clients + Command Line Interface API install tests diff --git a/docs/requirements.txt b/docs/requirements.txt index 61a675cab..aeb09b21d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,5 +1,6 @@ sphinx==8.1.3 sphinx_rtd_theme==3.0.2 +sphinx-argparse==0.5.2 # Install kafka-python in editable mode # This allows the sphinx autodoc module diff --git a/docs/upgrade_to_3_0.rst b/docs/upgrade_to_3_0.rst index 4a40db220..bd4f4ea09 100644 --- a/docs/upgrade_to_3_0.rst +++ b/docs/upgrade_to_3_0.rst @@ -70,7 +70,7 @@ disruptions. Set ``session_timeout_ms=10000`` to restore the old default. ``api_version_auto_timeout_ms`` renamed to ``bootstrap_timeout_ms`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The config previously named ``api_version_auto_timeout_ms`` is now ``bootstrap_timeout_ms``, applies to the entire bootstrap process @@ -311,7 +311,7 @@ A few additions that may be useful: - **CLI.** A single ``kafka-python`` entry point wraps the admin / consumer / producer subcommands (also runnable as - ``python -m kafka.admin`` etc.). See :doc:`usage` for examples. + ``python -m kafka.admin`` etc.). See :doc:`cli/index` for examples. - **New public exports.** ``OffsetSpec`` and ``IsolationLevel`` are now importable directly from ``kafka``. - **HTTP CONNECT proxy support.** Pass ``proxy_url='http://...'`` to diff --git a/docs/usage.rst b/docs/usage.rst deleted file mode 100644 index bef44cac8..000000000 --- a/docs/usage.rst +++ /dev/null @@ -1,313 +0,0 @@ -Usage -***** - - -KafkaConsumer -============== - -.. code:: python - - from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition, JsonSerializer, DefaultSerializer - - # To consume latest messages and auto-commit offsets - consumer = KafkaConsumer('my-topic', - group_id='my-group', - bootstrap_servers=['localhost:9092']) - for message in consumer: - # message value and key are raw bytes -- decode if necessary! - # e.g., for unicode: `message.value.decode('utf-8')` - print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, - message.offset, message.key, - message.value)) - - # Manually commit offsets (disable auto-commit) - consumer = KafkaConsumer('my-topic', - group_id='my-group', - enable_auto_commit=False, - bootstrap_servers=['localhost:9092']) - for message in consumer: - # process message - process_message(message) - # TopicPartition for this record - tp = TopicPartition(message.topic, message.partition) - # Note: When committing offsets manually, commit the next offset the consumer - # should read. For example, after successfully processing a message at - # offset 42, commit offset 43. - consumer.commit({ - tp: OffsetAndMetadata(message.offset + 1, '', -1) - }) - - # consume earliest available messages, don't commit offsets - KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) - - # consume json messages - KafkaConsumer(value_deserializer=JsonSerializer()) - - # consume utf-8 - KafkaConsumer(value_deserializer=DefaultSerializer()) - - # consume utf-16 - KafkaConsumer(value_deserializer=DefaultSerializer('utf-16')) - - # StopIteration if no message after 1sec - KafkaConsumer(consumer_timeout_ms=1000) - - # Subscribe to a regex topic pattern - consumer = KafkaConsumer() - consumer.subscribe(pattern='^awesome.*') - - # Use multiple consumers in parallel - # (run each on a different server / process / CPU) - consumer1 = KafkaConsumer('my-topic', - group_id='my-group', - bootstrap_servers='my.server.com') - consumer2 = KafkaConsumer('my-topic', - group_id='my-group', - bootstrap_servers='my.server.com') - - -There are many configuration options for the consumer class. See -:class:`~kafka.KafkaConsumer` API documentation for more details. - - -KafkaProducer -============== - -.. code:: python - - from kafka import KafkaProducer, JsonSerializer, DefaultSerializer - from kafka.errors import KafkaError - - producer = KafkaProducer(bootstrap_servers=['broker1:1234']) - - # Asynchronous by default - future = producer.send('my-topic', b'raw_bytes') - - # Block for 'synchronous' sends - try: - record_metadata = future.get(timeout=10) - except KafkaError: - # Decide what to do if produce request failed... - log.exception() - pass - - # Successful result returns assigned partition and offset - print (record_metadata.topic) - print (record_metadata.partition) - print (record_metadata.offset) - - # produce keyed messages to enable hashed partitioning - producer.send('keyed-topic', key=b'foo', value=b'bar') - - # encode str with utf-8 encoding - producer = KafkaProducer(value_serializer=DefaultSerializer()) - producer.send('utf-8-topic', 'value_str') - - # encode str with utf-16 encoding - producer = KafkaProducer(value_serializer=DefaultSerializer('utf-16')) - producer.send('utf-16-topic', '懂不懂') - - # produce json messages - producer = KafkaProducer(value_serializer=JsonSerializer()) - producer.send('json-topic', {'key': 'value'}) - - # produce asynchronously - for _ in range(100): - producer.send('my-topic', b'msg') - - def on_send_success(record_metadata): - print(record_metadata.topic) - print(record_metadata.partition) - print(record_metadata.offset) - - def on_send_error(excp): - log.error('I am an errback', exc_info=excp) - # handle exception - - # produce asynchronously with callbacks - producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error) - - # block until all async messages are sent - producer.flush() - - # configure multiple retries - producer = KafkaProducer(retries=5) - - -KafkaAdminClient -================ - -.. code:: python - - from kafka import KafkaAdminClient - - admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) - - # create topics with defaults (requires kafka 2.4+) - admin.create_topics(['testtopic1'], timeout_ms=None, validate_only=False) - # create a new topic with details - new_topics = { - 'num_partitions': 1, - 'replication_factor': 1, - 'assignments': {0: [1]}, # assign partition 0 to broker id 1 - 'configs': {'max_message_bytes': '1000000'}, # set non-default configs - } - admin.create_topics(new_topics, timeout_ms=None, validate_only=False) - - # delete a topic - admin.delete_topics(['testtopic1']) - - # list consumer groups - print(admin.list_groups()) - - # get consumer group details - print(admin.describe_groups(['cft-plt-qa.connect'])) - - # get consumer group offset - print(admin.list_group_offsets(['cft-plt-qa.connect'])) - - -CLI -=== - -The kafka module provides a simple command-line interface for consumer, producer, -and admin apis. - -kafka-python consumer ---------------------- - -.. code:: bash - - > kafka-python consumer --help - - usage: kafka-python consumer [-h] -b BOOTSTRAP_SERVERS [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] -t TOPICS [-g GROUP] [-i GROUP_INSTANCE_ID] [-f FORMAT] - [--encoding ENCODING] - - Kafka console consumer - - options: - -h, --help show this help message and exit - - connection: - -b, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap server. Can be provided multiple times. - -S, --security-protocol SECURITY_PROTOCOL - PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL - -M, --sasl-mechanism SASL_MECHANISM - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 - -U, --sasl-user SASL_USER - -P, --sasl-password SASL_PASSWORD - - logging: - -l, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - -L, --enable-logger ENABLE_LOGGER - enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled - -D, --disable-logger DISABLE_LOGGER - disable a specific logger. Can be provided multiple times. - - extended: - -C, --extra-config EXTRA_CONFIG - additional configuration properties for client in "key=val" format. Can be provided multiple times. - - consumer options: - -t, --topic TOPICS subscribe to topic - -g, --group GROUP consumer group - -i, --group-instance-id GROUP_INSTANCE_ID - static group membership identifier - -f, --format FORMAT output format: str|raw|full - --encoding ENCODING encoding to use for str output decode() - - -kafka-python producer ---------------------- - -.. code:: bash - - > kafka-python producer --help - - usage: kafka-python producer [-h] -b BOOTSTRAP_SERVERS [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] -t TOPIC [--encoding ENCODING] - - Kafka console producer - - options: - -h, --help show this help message and exit - - connection: - -b, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap server. Can be provided multiple times. - -S, --security-protocol SECURITY_PROTOCOL - PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL - -M, --sasl-mechanism SASL_MECHANISM - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 - -U, --sasl-user SASL_USER - -P, --sasl-password SASL_PASSWORD - - logging: - -l, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - -L, --enable-logger ENABLE_LOGGER - enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled - -D, --disable-logger DISABLE_LOGGER - disable a specific logger. Can be provided multiple times. - - extended: - -C, --extra-config EXTRA_CONFIG - additional configuration properties for client in "key=val" format. Can be provided multiple times. - - producer options: - -t, --topic TOPIC publish to topic - --encoding ENCODING byte encoding for produced messages - - -kafka-python admin ------------------- - -.. code:: bash - - > kafka-python admin --help - - usage: kafka-python admin [-h] [-b BOOTSTRAP_SERVERS] [-S SECURITY_PROTOCOL] [-M SASL_MECHANISM] [-U SASL_USER] [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER] [-D DISABLE_LOGGER] [-C EXTRA_CONFIG] [--format FORMAT] GROUP ... - - Kafka Admin Client - - options: - -h, --help show this help message and exit - - connection: - -b, --bootstrap-servers BOOTSTRAP_SERVERS - host:port for cluster bootstrap server. Can be provided multiple times. - -S, --security-protocol SECURITY_PROTOCOL - PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL - -M, --sasl-mechanism SASL_MECHANISM - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 - -U, --sasl-user SASL_USER - -P, --sasl-password SASL_PASSWORD - - logging: - -l, --log-level LOG_LEVEL - logging level, passed to logging.basicConfig - -L, --enable-logger ENABLE_LOGGER - enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled - -D, --disable-logger DISABLE_LOGGER - disable a specific logger. Can be provided multiple times. - - extended: - -C, --extra-config EXTRA_CONFIG - additional configuration properties for client in "key=val" format. Can be provided multiple times. - - output: - --format FORMAT output format: raw|json - - Available command groups: - GROUP - acls Manage Kafka ACLs - cluster Manage Kafka Cluster - configs Manage Kafka Configuration - topics Manage Kafka Topics - partitions Manage Kafka Partitions - groups Manage Kafka Groups - transactions Inspect and recover from hanging transactions (KIP-664) - users Manage Kafka Users - - diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index e6449ec55..d753fa122 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -16,6 +16,17 @@ from kafka.errors import BrokerResponseError +DEFAULT_COMMAND_GROUPS = ( + ACLsCommandGroup, ClusterCommandGroup, ConfigsCommandGroup, + TopicsCommandGroup, PartitionsCommandGroup, GroupsCommandGroup, + TransactionsCommandGroup, UsersCommandGroup, +) + + +def main_parser(prog=None): + return build_parser(DEFAULT_COMMAND_GROUPS, prog=prog) + + def build_parser(groups=(), prog=None): parser = argparse.ArgumentParser( prog=prog or 'python -m kafka.admin', @@ -39,11 +50,7 @@ def build_parser(groups=(), prog=None): def run_cli(args=None, prog=None): - parser = build_parser([ - ACLsCommandGroup, ClusterCommandGroup, ConfigsCommandGroup, - TopicsCommandGroup, PartitionsCommandGroup, GroupsCommandGroup, - TransactionsCommandGroup, UsersCommandGroup, - ], prog=prog) + parser = main_parser(prog=prog) config = parser.parse_args(args) if not config.group: parser.print_help()