Welcome to kafka-tools’ Documentation!

kafka-tools is a collection of various tools for managing Apache Kafka.

Getting Started

This document describes how to install and configure kafka-tools.

Prerequisites

Software that must be installed:

In addition, you will need to run it on a host that has:

  • Access to the Zookeeper ensemble for the Kafka cluster.
  • SSH access to the Kafka brokers (with credentials preferably loaded into ssh-agent).

Quick Install

  1. Download the Kafka binaries from https://kafka.apache.org/downloads.html.
  2. Use pip to install the kafka-tools packge from pypi.

Source Install

  1. Download the Kafka binaries from https://kafka.apache.org/downloads.html.
  2. Clone the kafka-tools repository from https://github.com/linkedin/kafka-tools
  3. Run the tests using tox.
  4. Install kafka-tools using setup.py.

Command List

kafka-assigner

kafka-assigner is used for performing partition reassignments and preferred replica elections. It uses the admin CLI utilities provided with Kafka and layers on additional logic to perform tasks like removing a broker, rebalancing partitions, fixing partition replication factors, and performing preferred replica elections.

Contribution guide

We’re always open to fixes and new features! Please open a PR for any changes that you have and someone will review and merge it. If you’re not up for writing the code, open an issue for any problems or requests.

Bug fixes

If you find a bug in “kafka-tools” we welcome patches to help fix the problem.

A few things to keep in mind when submitting bug fixes:

  • Please include details on reproducing the bug
  • Explain how your patch resolves the issue
  • Test cases for the fix are extremely helpful
  • Would your patch affect existing expected behaviors? For example, catching an exception may fix an issue for your code, but another caller may expect that exception in order to handle the error case differently.

Code guidelines

Please follow these Python best practices:

  • Provide documentation for new or modified APIs
  • Provide test cases for new or modified APIs

API Documentation

kafka.tools

kafka.tools.exceptions

exception kafka.tools.exceptions.AssignerException(custom_errstr=None)
exception kafka.tools.exceptions.BalanceException(custom_errstr=None)
exception kafka.tools.exceptions.ClientError(custom_errstr=None)
exception kafka.tools.exceptions.ClusterConsistencyException(custom_errstr=None)
exception kafka.tools.exceptions.ConfigurationError(custom_errstr=None)
exception kafka.tools.exceptions.ConfigurationException(custom_errstr=None)
exception kafka.tools.exceptions.ConnectionError(custom_errstr=None)
exception kafka.tools.exceptions.GroupError(custom_errstr=None)
exception kafka.tools.exceptions.KafkaToolsException(custom_errstr=None)
exception kafka.tools.exceptions.NotEnoughReplicasException(custom_errstr=None)
exception kafka.tools.exceptions.OffsetError(custom_errstr=None)
exception kafka.tools.exceptions.ProgrammingException(custom_errstr=None)
exception kafka.tools.exceptions.ReassignmentFailedException(custom_errstr=None)
exception kafka.tools.exceptions.ReplicaNotFoundException(custom_errstr=None)
exception kafka.tools.exceptions.TopicError(custom_errstr=None)
exception kafka.tools.exceptions.UnknownBrokerException(custom_errstr=None)
exception kafka.tools.exceptions.ZookeeperException(custom_errstr=None)

kafka.tools.models.broker

kafka.tools.models.cluster

kafka.tools.models.partition

kafka.tools.models.topic

kafka.tools.modules

kafka.tools.utilities

kafka.tools.utilities.check_java_home()

Make sure that JAVA_HOME in the current environment is specified and is valid.

Raises:ConfigurationException if JAVA_HOME is not set or does not contain java
kafka.tools.utilities.find_path_containing(fname)

Search the PATH for the given executable filename

Parameters:fname – the filename to check
Returns:the path that contains the filename
Raises:ConfigurationException if the filename cannot be found, or if it is not executable
kafka.tools.utilities.get_tools_path(tools_path=None)

Find the Kafka admin utilities, either from the provided arg or the PATH.

Parameters:tools_path – the path to use for locating the Kafka admin utilities.
Returns:the path that contains Kafka admin utilities
Raises:ConfigurationException if the path cannot be determined
kafka.tools.utilities.is_exec_file(fname)

Check if the given filename is a regular file and is executable.

Parameters:fname – the filename to check.
Returns:True if the filename given exists and is executable, False otherwise
kafka.tools.utilities.json_loads(json_str)

Load the provided string as JSON data. Make sure to try the python2 way and the python3 way

Parameters:json_str – The JSON encoded string
Returns:The decoded JSON object
kafka.tools.utilities.synchronized(item)

Decorator that synchronizes access to the instance method it decorates using a preexisting lock in the _lock attribute of the instance

kafka.tools.assigner

kafka.tools.assigner.actions.balance

kafka.tools.assigner.actions.balancemodules.count

kafka.tools.assigner.actions.balancemodules.even

kafka.tools.assigner.actions.balancemodules.leader

kafka.tools.assigner.actions.balancemodules.rackaware

kafka.tools.assigner.actions.balancemodules.rackaware.check_partition_swappable(replicas_a, replicas_b, pos)

Check if the broker at position pos in the first replica list can be swapped with the replica at position pos in the second list 1. replicas_a[pos] must not be in the replicas_b list 2. replicas_b[pos] must not be in the replicas_a list 3. replicas_a[pos] must have a different rack than the replicas in replicas_b (except for replicas_b[pos]) 4. replicas_b[pos] must have a different rack than the replicas in replicas_a (except for replicas_a[pos])

Params replicas_a:
 the first replica list
Params replicas_b:
 the second replica list
Params pos:the position in the replica list to be replaced
Returns:True if the broker can be swapped into this replica list, False otherwise
kafka.tools.assigner.actions.balancemodules.rackaware.difference_in_size_to_last_partition(partition, partitions)

Return the difference in size between the specified Partition and the last Partition in the provided list. If the list is empty, return infinity.

Params partition:
 a Partition object to use for calculating the difference
Params partitions:
 a list of Partition objects
Returns:The difference in size between partition and the last Partition in the partitions list, or infinity
kafka.tools.assigner.actions.balancemodules.rackaware.racks_for_replica_list(replicas, pos=None)

Returns a set of racks for each of the given replicas in the list Skip the replica at position pos, if specified

Params replicas:
 a list of Broker objects
Params pos:a replica position to skip, or None to not skip a replica
Returns:a list of racks

kafka.tools.assigner.actions.balancemodules.size

kafka.tools.assigner.actions.clone

kafka.tools.assigner.actions.elect

kafka.tools.assigner.actions.remove

kafka.tools.assigner.actions.reorder

kafka.tools.assigner.actions.setrf

kafka.tools.assigner.actions.trim

kafka.tools.assigner.arguments

class kafka.tools.assigner.arguments.CSVAction(option_strings, dest, nargs=None, **kwargs)

kafka.tools.assigner.batcher

kafka.tools.assigner.models.reassignment

kafka.tools.assigner.models.replica_election

kafka.tools.assigner.sizers.ssh

kafka.tools

kafka.tools.protocol.arguments

kafka.tools.protocol.errors

kafka.tools.protocol.help

kafka.tools.protocol.types.bytebuffer

kafka.tools.protocol.requests.api_versions_v0

kafka.tools.protocol.requests.controlled_shutdown_v1

kafka.tools.protocol.requests.create_topics_v0

kafka.tools.protocol.requests.delete_topics_v0

kafka.tools.protocol.requests.describe_groups_v0

kafka.tools.protocol.requests.group_coordinator_v0

kafka.tools.protocol.requests.heartbeat_v0

kafka.tools.protocol.requests.join_group_v0

kafka.tools.protocol.requests.leader_and_isr_v0

kafka.tools.protocol.requests.leave_group_v0

kafka.tools.protocol.requests.list_groups_v0

kafka.tools.protocol.requests.list_offset_v0

kafka.tools.protocol.requests.offset_commit_v0

kafka.tools.protocol.requests.offset_commit_v1

kafka.tools.protocol.requests.offset_commit_v2

kafka.tools.protocol.requests.offset_fetch_v0

kafka.tools.protocol.requests.offset_fetch_v1

kafka.tools.protocol.requests.sasl_handshake_v0

kafka.tools.protocol.requests.stop_replica_v0

kafka.tools.protocol.requests.sync_group_v0

kafka.tools.protocol.requests.topic_metadata_v0

kafka.tools.protocol.requests.topic_metadata_v1

kafka.tools.protocol.requests.update_metadata_v0

kafka.tools.protocol.responses.api_versions_v0

kafka.tools.protocol.responses.controlled_shutdown_v1

kafka.tools.protocol.responses.create_topics_v0

kafka.tools.protocol.responses.delete_topics_v0

kafka.tools.protocol.responses.describe_groups_v0

kafka.tools.protocol.responses.group_coordinator_v0

kafka.tools.protocol.responses.heartbeat_v0

kafka.tools.protocol.responses.join_group_v0

kafka.tools.protocol.responses.leader_and_isr_v0

kafka.tools.protocol.responses.leave_group_v0

kafka.tools.protocol.responses.list_groups_v0

kafka.tools.protocol.responses.list_offset_v0

kafka.tools.protocol.responses.metadata_v0

kafka.tools.protocol.responses.metadata_v1

kafka.tools.protocol.responses.offset_commit_v0

kafka.tools.protocol.responses.offset_commit_v1

kafka.tools.protocol.responses.offset_commit_v2

kafka.tools.protocol.responses.offset_fetch_v0

kafka.tools.protocol.responses.offset_fetch_v1

kafka.tools.protocol.responses.sasl_handshake_v0

kafka.tools.protocol.responses.stop_replica_v0

kafka.tools.protocol.responses.sync_group_v0

kafka.tools.protocol.responses.update_metadata_v0

Indices and tables