定制化开发生产者发送消息给,定制化开发消息被追加值日志文件定制化开发并保留一定周期(基于配置)。定制化开发本文探讨对Kafk定制化开发主题配置消息保留时间。
定制化开发基于时间保留
定制化开发通过保留期属性,定制化开发消息就有了TTL(time to live 生存时间)。到期后,定制化开发消息被标记为删除,定制化开发从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。
接下来我们将学习如何通过代理配置属性进行调整,以设置新主题的保留周期,并通过主题级配置在运行时控制它。
服务器级配置
Kafka支持服务器级配置保留策略,我们可以通过配置以下三个基于时间的配置属性中的一个来进行优化:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
注意:Kafka用更高精度值覆盖低精度值,所以log.retention.ms具有最高的优先级。
查看默认值
首先让我们检查保留时间的缺省值,在kafka目录下执行下面命令:
$ grep -i 'log.retention.[hms].*\=' config/server.propertieslog.retention.hours=168
- 1
- 2
显示默认周期为7天。如果要设置消息保留周期为10分钟,可以通过config/server.properties配置文件的log.retention.minutes 属性进行配置。
log.retention.minutes=10
- 1
配置新主题
kafka提供了几个Shell脚本用于执行管理任务,利用它们创建工具脚本functions.sh。下面增加两个函数,分别为创建主题、展示配置:
function create_topic { topic_name="$1" bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \ --partitions 1 --replication-factor 1 \ --zookeeper localhost:2181}function describe_topic_config { topic_name="$1" ./bin/kafka-configs.sh --describe --all \ --bootstrap-server=0.0.0.0:9092 \ --topic ${topic_name}}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
现在创建两个独立脚本,create-topic.sh、get-topic-retention-time.sh:
bash-5.1# cat create-topic.sh#!/bin/bash../functions.shtopic_name="$1"create_topic "${topic_name}"exit $?
- 1
- 2
- 3
- 4
- 5
- 6
bash-5.1# cat get-topic-retention-time.sh#!/bin/bash../functions.shtopic_name="$1"describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'exit $?
- 1
- 2
- 3
- 4
- 5
- 6
简单解释下脚本的特殊符号:
$?-表示上一个命令执行状态.
$0-当前脚本的文件名称.
$#-在脚本中使用参数,如$1,$2分别表示第一个参数和第二参数.
$$-当前脚本的进程号,就是当前执行脚本的进程ID.
需要说明的是:describe_topic_config列出给定主题的所有属性配置,因此必须使用awk进行过滤,找出retention.ms property属性值。
现在可以启动kafka环境并验证retention.ms property属性配置:
bash-5.1# ./create-topic.sh test-topicCreated topic test-topic.bash-5.1# ./get-topic-retention-time.sh test-topicretention.ms=600000
- 1
- 2
- 3
- 4
通过脚本创建主题,列出描述,可以看到retention.ms 是 600000 (10分钟)。这是默认值,从之前设置server.properties文件中读出来的。
主题级配置
一旦kafka代理已经启动,log.retention.{hours|minutes|ms} 服务器级属性为只读属性。我们获得 retention.ms,但可以通过主题级参数进行调整。我们继续在functions.sh 脚本中增加方法配置主题属性:
function alter_topic_config { topic_name="$1" config_name="$2" config_value="$3" ./bin/kafka-configs.sh --alter \ --add-config ${config_name}=${config_value} \ --bootstrap-server=0.0.0.0:9092 \ --topic ${topic_name}}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
然后在alter-topic-config.sh 脚本使用它:
#!/bin/sh../functions.shalter_topic_retention_config $1 $2 $3exit $?
- 1
- 2
- 3
- 4
- 5
最后设置test-topic主题保存周期为5分钟,然后查看验证:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000Completed updating config for topic test-topic.bash-5.1# ./get-topic-retention-time.sh test-topicretention.ms=300000
- 1
- 2
- 3
- 4
- 5
验证
我们已经配置kafka主题的消息保留周期。现在来验证在超时后消息确实过期。
生产-消费
在 functions.sh脚本增加两个produce_message 和 consume_message 函数,其内部分别使用kafka-console-producer.sh 和 kafka-console-consumer.sh,分别用于产生/消费消息:
function produce_message { topic_name="$1" message="$2" echo "${message}" | ./bin/kafka-console-producer.sh \ --bootstrap-server=0.0.0.0:9092 \ --topic ${topic_name}}function consume_message { topic_name="$1" timeout="$2" ./bin/kafka-console-consumer.sh \ --bootstrap-server=0.0.0.0:9092 \ --from-beginning \ --topic ${topic_name} \ --max-messages 1 \ --timeout-ms $timeout}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
我们看到消费总是从头开始读消息,因为我们需要消费者读主题中任何有效的消息。
下面创建独立的生产者函数:
bash-5.1# cat producer.sh#!/bin/sh../functions.shtopic_name="$1"message="$2"produce_message ${topic_name} ${message}exit $?
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
最后创建消费者函数:
bash-5.1# cat consumer.sh#!/bin/sh../functions.shtopic_name="$1"timeout="$2"consume_message ${topic_name} $timeoutexit $?
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
消息过期
我们已经准备了工具函数,开始产生单个消息,然后消费两次:
bash-5.1# ./producer.sh "test-topic-2" "message1"bash-5.1# ./consumer.sh test-topic-2 10000message1Processed a total of 1 messagesbash-5.1# ./consumer.sh test-topic-2 10000message1Processed a total of 1 messages
- 1
- 2
- 3
- 4
- 5
- 6
- 7
我们看到消费者重复消费所有有效消息。现在引入延迟机制延迟5分钟,然后再次消费消息:
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)org.apache.kafka.common.errors.TimeoutExceptionProcessed a total of 0 messages
- 1
- 2
- 3
- 4
与我们期望一致,消费者没有发现任何消息,因为消息已经超过了它的保存周期。
限制
在Kafka Broker内部,维护另一个名为log.retention.check.interval.ms的属性,用于决定检查消息是否过期的频率。因此,为了保持保留策略的有效性,必须确保log.retention.check.interval.ms的值低于retention.ms 的属性值。对于任何给定的主题都一样。
总结
本文探索了Apache Kafka消息基于时间的保留策略。通过创建简单的shell脚本来简化管理过程,接着我们创建了独立的消费者和生产者,以验证在保留期之后消息的过期场景。