Apache Kafka 如何通过全局和针对特定 Topic 设置消息的保留时长
在使用 Kafka 作为消息队列缓冲数据时,在某些业务使用场景中,我们可能需要根据实际情况调整消息在 Kafka 的保留时长。例如,对于用户行为埋点日志的数据,因为数据量较大,而且也没有必要保留 7 天(Kafka 默认消息保留时长),为减少 Kafka 集群的压力,此时就可以通过针对保存该类消息的 Topic 设置消息保留时长为 3 天。本文介绍如何通过全局和针对特定 Topic 设置 Kafka 的消息保留时长。
前提条件
本文的演示示例基于 CentOS 7 操作系统,使用的是 Apache Kafka,版本为 2.2.1,Scala 版本为 2.11,安装目录位于 /opt/kafka。
Kafka 全局消息保留时长设置
首先,进入 Kafka 的配置文件目录,通常位于 Kafka 安装目录下的 config 目录下:
1 | cd /opt/kafka/config |
打开 Kafka 服务端的配置文件 server.properties:
1 | vim server.properties |
找到配置项 log.retention.hours
,可以看到该配置项默认为 168 小时,即 7 天,将其修改为 72 小时,保存并退出,接着重启 Kafka 以使刚刚的修改生效。
修改如下:
1 | log.retention.hours=72 |
需要注意的是,通过该方式设置消息保留时长是对所有的 Topic 都生效。
说明:设置 Kafka 消息保留时长除了
log.retention.hours
这一配置项外,还有log.retention.minutes
和log.retention.ms
这两个配置项。这三个配置项都是控制一条消息的保留时长,只是它们的控制粒度不同,分别为小时级别、分钟级别和毫秒级别。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
针对特定 Topic 消息保留时长设置
实际上,我们可以在不重启 Kafka 实例的情况下,针对 Kafka 中数据量大的 Topic 进行单独设置消息保留时长。
下面以名称是 test_a 的 Topic 为例,全局默认消息保留时长为 7 天,现在将其调整为 3 天。
首先,进入 Kafka 安装目录:
1 | cd /opt/kafka |
执行以下命令,查看 Topic test_a 的动态配置:
1 | ./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --describe |
结果输出如下:
1 | Configs for topic 'test_a' are cleanup.policy=compact |
设置 Topic 的消息保留时长为 3 天,即 259200000 毫秒:
1 | ./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --alter --add-config retention.ms=259200000 |
结果如下:
1 | Completed Updating config for entity: topic 'test_a'. |
再次查看该 Topic 的动态配置:
1 | ./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --describe |
输出结果为:
1 | Configs for topic 'test_a' are cleanup.policy=compact,retention.ms=259200000 |
可以看到,该 Topic 的消息保留时长已成功设置为 259200000 毫秒。
小结
本文介绍了通过全局和针对特定 Topic 两种方法设置 Kafka 的消息保留时长。并具体示例演示了具体的操作方法。需要注意的是,不同的 Kafka 版本命令可能会有所不同。
(END)