kafka-topics.sh
是 kafka 提供的一个命令行工具, 用于管理和操作 kafka 主题; 它允许用户创建、删除、修改和查看 kafka 主题的配置;
源码
该脚本的核心逻辑是调用 Kafka 的 Java 工具类 kafka.admin.TopicCommand
(或 kafka.admin.TopicCommand$
, 取决于 Kafka 版本), 并传递相应的参数;
kafka-topics.sh 的源码很简单:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx256M"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafka-topics'}
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.admin.TopicCommand "$@"
说白了, 就是调用 kafka-run-class.sh
, 将参数传给 kafka.admin.TopicCommand 类执行;
工作原理
zookeeper 时代
kafka.admin.TopicCommand 的 main 方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
opts.checkArgs()
val topicService = if (opts.zkConnect.isDefined)
ZookeeperTopicService(opts.zkConnect)
else
AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)
var exitCode = 0
try {
if (opts.hasCreateOption)
topicService.createTopic(opts)
else if (opts.hasAlterOption)
topicService.alterTopic(opts)
else if (opts.hasListOption)
topicService.listTopics(opts)
else if (opts.hasDescribeOption)
topicService.describeTopic(opts)
else if (opts.hasDeleteOption)
topicService.deleteTopic(opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
topicService.close()
Exit.exit(exitCode)
}
}
可以看到, 上述代码最关键的点是创建了一个 ZookeeperTopicService
, 这意味着 TopicCommand 类的核心及本质, 就是和 zookeeper 打交道, 控制配置的写入、修改或读取; 对于增删改请求, 真正做事情的其实背后监听 zookeeper 配置变化的 Controller;
ZookeeperTopicService 有几种方法:
- createTopic
- alterTopic
- listTopics
- describeTopic
- deleteTopic
我们以 createTopic 为例, 详细描述一下创建 kafka 主题的完整过程, 假设用户执行了如下命令:1
kafka-topics.sh --create --bootstrap-server <broker_address> --replication-factor 3 --partitions 2 --topic my.test
写入 zk:
- 参数校验: 检查 Topic 名称、副本数、Partition 数等参数的合法性。
计算 Partition 分配方案: 根据 Broker 列表和副本因子 (replication-factor), 生成副本分配执行计划:
1
2
3
4
5
6
7
8
9// Partition 0 的 Leader 是 Broker 1, 副本分布在 Broker 1、2、3
// Partition 1 的 Leader 是 Broker 2,副本分布在 Broker 2、3、1
{
"version": 1,
"partitions": {
"0": [1, 2, 3],
"1": [2, 3, 1]
}
}执行 zk 配置写入:
- Topic 配置: 写入
/config/topics/<topic_name>
(如retention.ms
、cleanup.policy
等); - Partition 分配方案: 写入
/brokers/topics/<topic_name>
(上述 json 格式的副本分配信息);
- Topic 配置: 写入
Controller 处理:
Kafka Controller 通过 ZooKeeper 的 Watch 机制监听到/brokers/topics/<topic_name>
节点的创建或变更后:- 读取 Partition 分配方案: 从
/brokers/topics/<topic_name>
读取 Partition 的副本分配方案; - 更新集群状态:
- 在内存中更新集群元数据(
PartitionState
)。 - 将 Leader 和 ISR 信息写入 ZooKeeper 的
/brokers/topics/<topic_name>/partitions/<partition_id>/state
;
- 在内存中更新集群元数据(
- 通过 RPC 向 Broker 发送指令:
- LeaderAndIsrRequest: 通知相关 Broker 成为指定 Partition 的 Leader 或 Follower;
- UpdateMetadataRequest: 更新集群元数据缓存 (所有 Broker 都会同步该信息);
- 读取 Partition 分配方案: 从
副本工作:
- 被选为 Leader 的 Broker 开始接受生产者请求;
- Follower Broker 从 Leader 拉取数据, 完成副本同步;
总结:
- 解耦客户端与 Broker
kafka-topics.sh 仅需写入 ZooKeeper,无需直接联系 Broker,降低依赖; - 事件驱动架构
Controller 通过 ZooKeeper Watch 监听变更,实现异步、分布式的协调; - 最终一致性
Topic 的实际创建可能需要几秒到几分钟(依赖副本同步),但 ZooKeeper 的写入是即时生效的;
KRaft 时代
常见运维命令
1. 创建主题
1 | kafka-topics.sh --create --bootstrap-server <broker_address> --replication-factor <replication_factor> --partitions <number_of_partitions> --topic <topic_name> |
--bootstrap-server
: 指定 Kafka broker 的地址。--replication-factor
: 指定主题的副本因子(即每个分区的副本数)。--partitions
: 指定主题的分区数。--topic
: 指定要创建的主题名称。
2. 列出所有主题
1 | kafka-topics.sh --list --bootstrap-server <broker_address> |
--list
: 列出所有主题。
3. 查看主题详细信息
1 | kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name> |
--describe
: 查看指定主题的详细信息,包括分区、副本、ISR(In-Sync Replicas)等。
4. 删除主题
1 | kafka-topics.sh --delete --bootstrap-server <broker_address> --topic <topic_name> |
--delete
: 删除指定主题。
5. 修改主题配置
1 | kafka-topics.sh --alter --bootstrap-server <broker_address> --topic <topic_name> --config <key=value> |
--alter
: 修改指定主题的配置。--config
: 指定要修改的配置项及其值。
6. 增加分区数
1 | kafka-topics.sh --alter --bootstrap-server <broker_address> --topic <topic_name> --partitions <new_number_of_partitions> |
--partitions
: 指定新的分区数。
7. 查看主题的配置
1 | kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name> --topics-with-overrides |
--topics-with-overrides
: 查看指定主题的配置覆盖情况。
8. 查看所有主题的配置
1 | kafka-topics.sh --describe --bootstrap-server <broker_address> --topics-with-overrides |
- 查看所有主题的配置覆盖情况。
9. 查看主题的分区分布
1 | kafka-topics.sh --describe --bootstrap-server <broker_address> --topic <topic_name> --under-replicated-partitions |
--under-replicated-partitions
: 查看指定主题的未完全复制的分区。