kafka-topics.sh 是 kafka 提供的一个命令行工具, 用于管理和操作 kafka 主题; 它允许用户创建、删除、修改和查看 kafka 主题的配置;

源码

该脚本的核心逻辑是调用 Kafka 的 Java 工具类 kafka.admin.TopicCommand (或 kafka.admin.TopicCommand$, 取决于 Kafka 版本), 并传递相应的参数;

kafka-topics.sh 的源码很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx256M"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafka-topics'}

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.admin.TopicCommand "$@"

说白了, 就是调用 kafka-run-class.sh, 将参数传给 kafka.admin.TopicCommand 类执行;

工作原理

zookeeper 时代

kafka.admin.TopicCommand 的 main 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
opts.checkArgs()

val topicService = if (opts.zkConnect.isDefined)
ZookeeperTopicService(opts.zkConnect)
else
AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)

var exitCode = 0
try {
if (opts.hasCreateOption)
topicService.createTopic(opts)
else if (opts.hasAlterOption)
topicService.alterTopic(opts)
else if (opts.hasListOption)
topicService.listTopics(opts)
else if (opts.hasDescribeOption)
topicService.describeTopic(opts)
else if (opts.hasDeleteOption)
topicService.deleteTopic(opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
topicService.close()
Exit.exit(exitCode)
}
}

可以看到, 上述代码最关键的点是创建了一个 ZookeeperTopicService, 这意味着 TopicCommand 类的核心及本质, 就是和 zookeeper 打交道, 控制配置的写入、修改或读取; 对于增删改请求, 真正做事情的其实背后监听 zookeeper 配置变化的 Controller;
ZookeeperTopicService 有几种方法:

  • createTopic
  • alterTopic
  • listTopics
  • describeTopic
  • deleteTopic

 
我们以 createTopic 为例, 详细描述一下创建 kafka 主题的完整过程, 假设用户执行了如下命令:

1
kafka-topics.sh --create --bootstrap-server <broker_address> --replication-factor 3 --partitions 2 --topic my.test

  1. 写入 zk:

    • 参数校验: 检查 Topic 名称、副本数、Partition 数等参数的合法性。
    • 计算 Partition 分配方案: 根据 Broker 列表和副本因子 (replication-factor), 生成副本分配执行计划:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      // Partition 0 的 Leader 是 Broker 1, 副本分布在 Broker 1、2、3
      // Partition 1 的 Leader 是 Broker 2,副本分布在 Broker 2、3、1
      {
      "version": 1,
      "partitions": {
      "0": [1, 2, 3],
      "1": [2, 3, 1]
      }
      }
    • 执行 zk 配置写入:

      • Topic 配置: 写入 /config/topics/<topic_name> (如 retention.mscleanup.policy 等);
      • Partition 分配方案: 写入 /brokers/topics/<topic_name> (上述 json 格式的副本分配信息);
  2. Controller 处理:
    Kafka Controller 通过 ZooKeeper 的 Watch 机制监听到 /brokers/topics/<topic_name> 节点的创建或变更后:

    • 读取 Partition 分配方案: 从 /brokers/topics/<topic_name> 读取 Partition 的副本分配方案;
    • 更新集群状态:
      • 在内存中更新集群元数据(PartitionState)。
      • 将 Leader 和 ISR 信息写入 ZooKeeper 的 /brokers/topics/<topic_name>/partitions/<partition_id>/state;
    • 通过 RPC 向 Broker 发送指令:
      • LeaderAndIsrRequest: 通知相关 Broker 成为指定 Partition 的 Leader 或 Follower;
      • UpdateMetadataRequest: 更新集群元数据缓存 (所有 Broker 都会同步该信息);
  3. 副本工作:

    • 被选为 Leader 的 Broker 开始接受生产者请求;
    • Follower Broker 从 Leader 拉取数据, 完成副本同步;

总结:

  1. 解耦客户端与 Broker
    kafka-topics.sh 仅需写入 ZooKeeper,无需直接联系 Broker,降低依赖;
  2. 事件驱动架构
    Controller 通过 ZooKeeper Watch 监听变更,实现异步、分布式的协调;
  3. 最终一致性
    Topic 的实际创建可能需要几秒到几分钟(依赖副本同步),但 ZooKeeper 的写入是即时生效的;

KRaft 时代

常见运维命令

1. 创建主题

1
kafka-topics.sh --create --bootstrap-server <broker_address> --replication-factor <replication_factor> --partitions <number_of_partitions> --topic <topic_name>
  • --bootstrap-server: 指定 Kafka broker 的地址。
  • --replication-factor: 指定主题的副本因子(即每个分区的副本数)。
  • --partitions: 指定主题的分区数。
  • --topic: 指定要创建的主题名称。

2. 列出所有主题

1
kafka-topics.sh --list --bootstrap-server <broker_address>
  • --list: 列出所有主题。

3. 查看主题详细信息

1
kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name>
  • --describe: 查看指定主题的详细信息,包括分区、副本、ISR(In-Sync Replicas)等。

4. 删除主题

1
kafka-topics.sh --delete --bootstrap-server <broker_address> --topic <topic_name>
  • --delete: 删除指定主题。

5. 修改主题配置

1
kafka-topics.sh --alter --bootstrap-server <broker_address> --topic <topic_name> --config <key=value>
  • --alter: 修改指定主题的配置。
  • --config: 指定要修改的配置项及其值。

6. 增加分区数

1
kafka-topics.sh --alter --bootstrap-server <broker_address> --topic <topic_name> --partitions <new_number_of_partitions>
  • --partitions: 指定新的分区数。

7. 查看主题的配置

1
kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name> --topics-with-overrides
  • --topics-with-overrides: 查看指定主题的配置覆盖情况。

8. 查看所有主题的配置

1
kafka-topics.sh --describe --bootstrap-server <broker_address> --topics-with-overrides
  • 查看所有主题的配置覆盖情况。

9. 查看主题的分区分布

1
kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name> --under-replicated-partitions
  • --under-replicated-partitions: 查看指定主题的未完全复制的分区。