在使用 Kafka 作为消息队列缓冲数据时,在某些业务使用场景中,我们可能需要根据实际情况调整消息在 Kafka 的保留时长。例如,对于用户行为埋点日志的数据,因为数据量较大,而且也没有必要保留 7 天(Kafka 默认消息保留时长),为减少 Kafka 集群的压力,此时就可以通过针对保存该类消息的 Topic 设置消息保留时长为 3 天。本文介绍如何通过全局和针对特定 Topic 设置 Kafka 的消息保留时长。

前提条件

本文的演示示例基于 CentOS 7 操作系统,使用的是 Apache Kafka,版本为 2.2.1,Scala 版本为 2.11,安装目录位于 /opt/kafka

Kafka 全局消息保留时长设置

首先,进入 Kafka 的配置文件目录,通常位于 Kafka 安装目录下的 config 目录下:

1
cd /opt/kafka/config

打开 Kafka 服务端的配置文件 server.properties

1
vim server.properties

找到配置项 log.retention.hours,可以看到该配置项默认为 168 小时,即 7 天,将其修改为 72 小时,保存并退出,接着重启 Kafka 以使刚刚的修改生效。

修改如下:

1
log.retention.hours=72

需要注意的是,通过该方式设置消息保留时长是对所有的 Topic 都生效。

说明:设置 Kafka 消息保留时长除了 log.retention.hours 这一配置项外,还有 log.retention.minuteslog.retention.ms 这两个配置项。这三个配置项都是控制一条消息的保留时长,只是它们的控制粒度不同,分别为小时级别、分钟级别和毫秒级别。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。

针对特定 Topic 消息保留时长设置

实际上,我们可以在不重启 Kafka 实例的情况下,针对 Kafka 中数据量大的 Topic 进行单独设置消息保留时长。

下面以名称是 test_a 的 Topic 为例,全局默认消息保留时长为 7 天,现在将其调整为 3 天。

首先,进入 Kafka 安装目录:

1
cd /opt/kafka

执行以下命令,查看 Topic test_a 的动态配置:

1
./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --describe

结果输出如下:

1
Configs for topic 'test_a' are cleanup.policy=compact

设置 Topic 的消息保留时长为 3 天,即 259200000 毫秒:

1
./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --alter --add-config retention.ms=259200000

结果如下:

1
Completed Updating config for entity: topic 'test_a'.

再次查看该 Topic 的动态配置:

1
./bin/kafka-configs.sh --zookeeper zk1:2181/kafka --entity-type topics --entity-name test_a --describe

输出结果为:

1
Configs for topic 'test_a' are cleanup.policy=compact,retention.ms=259200000

可以看到,该 Topic 的消息保留时长已成功设置为 259200000 毫秒。

小结

本文介绍了通过全局和针对特定 Topic 两种方法设置 Kafka 的消息保留时长。并具体示例演示了具体的操作方法。需要注意的是,不同的 Kafka 版本命令可能会有所不同。

(END)