Kafka学习笔记
Kafka名字来源
Kafka作者之一Jay Kreps曾经谈及过命名的原因:
因为 Kafka 系统的写性能很强,所以找了个作家的名字来命名似乎是一个好主意。大学期间我上了很多文学课,非常喜欢 Franz Kafka 这个作家,另外为开源软件起这个名字听上去很酷。
Kafka的定位
Kafka 是 LinkedIn 公司内部孵化的项目。
Kafka 在设计之初就旨在提供三个方面的特性:
- 提供一套 API 实现生产者和消费者;
- 降低网络传输和磁盘存储开销;
- 实现高伸缩性架构。
Kafka既是消息引擎系统,也是一个分布式流处理平台。
Kafka作为流处理平台和其他主流计算框架相比的优势
- 更容易实现端到端的正确性(精确一次处理语义)
- Kafka Streams宣称自己是一个用于搭建流处理的客户端库,而非完整功能系统,有利于区别其他框架的目标市场(瞄准小公司)
Kafka术语
一张图说明Kafka术语

- 消息(Record):消息引擎处理的主要对象
- 主题(Topic):承载消息的逻辑容器,具体使用中用于区分业务
- 分区(Partition):一个有序的消息队列,一个主题下可以有多个分区
- 消息位移(Offset):表示分区中每条消息的位置,是一个单调递增不变的值
- 副本(Replica):Kafka中一条消息可以被复制到多个地方实现数据冗余,这些地方就是副本。副本还分leader副本和follower副本。副本在分区的层级下,一个分区可以配置多个副本实现数据高可靠
- 生产者(Producer):向主题发布新消息的应用程序
- 消费者(Consumer):从主题订阅新消息的应用程序
- 消费者位移(Consumer Offset):表征消费者的消费进度,每个消费者都有自己的消费者位移
- 消费者组(Consumer Group):多个消费者实例共同构成一个组,同时消费多个分区实现高吞吐
- 重平衡(Rebalance):消费者组内一个消费者实例挂掉后,其他消费者自动重新分配订阅主题分区的过程。Rebalance是Kafka实现消费者端高可用的手段
不能保持默认值的参数
Broker端参数
- log.dirs:没有默认值,必须手动指定。CVS格式指定多个路径,且多个路径最好分布在不同磁盘上,提供读写性能和提高可用性
- listeners:监听器,告诉外部连接者要用什么协议访问主机和端口开放的Kafka服务
- advertised.listeners:这组监听器是Broker用于对外发布的
- auto.create.topics.enable:配置为true时表示,当收到消息,发现所属主题不存在时则自动创建,建议生产配置false
- unclean.leader.election.enable:是否允许unclean leader选举。是指落后最新消息的partition被选举为leader的许可。若配置为true,则有可能导致消息丢失
- auto.leader.rebalance.enable:是否允许定期重新选举leader,没必要,且对生产环境影响非常大,建议配置false
- log.retention.{hour|minutes|ms}:三个配置都是用来控制一条消息被保存多长时间,ms优先级最高,hour最低
- log.retention.bytes:指Broker为消息保存的总磁盘容量大小,默认值-1,标识存多少都行
- message.max.bytes:Broker能接收的最大消息大小
Topic级参数
- retention.ms:规定了该topic消息被保存的时长,默认7天,优先级高于Broker配置
- retention.bytes:规定了该topic预留多少磁盘空间
- message.max.bytes:该topic下单条消息最大尺寸,可用于不同业务不同配置
Producer端参数
Consumer端参数
JVM参数
设置环境变量,提高JVM堆大小,业界公认最佳配置为6G,修改GC算法
- KAFKA_HEAP_OPTS:堆大小
- KAFKA_JVM_PERFORMANCE_OPTS:GC算法
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
操作系统参数
- 文件描述符限制调大
- 文件系统类型:根据官网的测试报告,XFS 的性能要强于 ext4,ZFS性能貌似更好
- Swap修改成1:改成0容易导致内存溢出来不及处理
- 文件刷盘时间间隔:消息写到页缓存上即算成功,减小刷盘间隔可以提高可用性,但是降低吞吐量
实现无消息丢失
配置最佳实践
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
- 设置 acks = all。acks 是 Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。