一 调优
1. Kafka 硬件配置选择
1.1 场景说明
100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。
1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。
每条日志大小:0.5k - 2k(取 1k)。
1150 条/每秒钟 * 1k ≈ 1m/s 。
高峰期每秒钟:1150 条 * 20 倍 = 23000 条。
每秒多少数据量:20MB/s。
1.2 服务器选择
服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1
= 2 * (20m/s * 2 / 100) + 1
= 3 台
建议 3 台服务器。
1.3 磁盘选择
kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。
建议选择普通的机械硬盘。
每天总数据量:1 亿条 * 1k ≈ 100g
100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T
建议三台服务器硬盘总大小,大于等于 1T
1.4 内存选择
Kafka 内存组成:堆内存 + 页缓存
1)堆内存:建议每个节点:10g ~ 15g
2)页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中
25%的数据在内存中就好。
每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g
建议服务器内存大于等于 11G。
1.5 cpu选择
num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。
num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。
num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。 建议 32 个 cpu core。
1.6 网络选择
网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。
2. 生产者调优
2.1 生产者原理
参数名称 | 描述 |
bootstrap.servers | 生产者连接集群所需的 broker 地址清单。 例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性
要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。
如果设置了重试,还想保证消息的有序性,需要设置
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
2.2 生产者如何提高吞吐量
参数名称 | 描述 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可
以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time
之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
2.3 数据可靠性
参数名称 | 描述 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all是等价的。 |
• 至少一次(At Least Once)= ACK 级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2
2.4 数据去重
参数名称 | 描述 |
enable.idempotence | 是否开启幂等性,默认 true,表示开启幂等性。 |
幂等性+事务
2.5 数据有序
单分区内,有序(有条件的,不能乱序);
多分区,分区与分区间无序
2.6 数据乱序
参数名称 | 描述 |
enable.idempotence | 是否开启幂等性,默认 true,表示开启幂等性。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
3. Kafka Broker
核心参数
参数名称 | 描述 |
replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出ISR。
该时间阈值,默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。建议关闭。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
自动创建主题
如果 broker 端配置参数auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为 1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为 false。
4. 消费者调优
4.1 核心参数
参数名称 | 描述 |
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全
类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了
消费者偏移量向Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | consumer_offsets 的分区数,默认是 50 个分区。不建议修改。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。
该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。不建议修改。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超
过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
4.2 消费者再平衡
参数名称 | 描述 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
partition.assignment.strategy | 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可以选择的策略包括: Range 、RoundRobin 、Sticky 、CooperativeSticky |
4.3 消费者如何提高吞吐量
增加分区 或者:
参数名称 | 描述 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条 |
5. Kafka总体(重点)
5.1 如何提高吞吐量
- 提升生产吞吐量
- buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
- batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
- linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
- compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的CPU 开销。
- 增加分区
- 消费者提高吞吐量
- 调整 fetch.max.bytes 大小,默认是 50m。
- 调整 max.poll.records 大小,默认是 500 条。
- 增加下游消费者处理能力
5.2 数据精准一次
- 生产者角度
- acks 设置为-1 (acks=-1)。
- 幂等性(enable.idempotence = true) + 事务 。
- broker 服务端角度
- 分区副本大于等于 2 (--replication-factor 2)。
- ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。
- 消费者
- 事务 + 手动提交 offset (enable.auto.commit = false)。
- 消费者输出的目的地必须支持事务(MySQL、Kafka)。
5.3 合理设置分区数
- 创建一个只有 1 个分区的 topic。
- 测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
- 假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
- 然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;分区数 = 100 / 20 = 5 分区
分区数一般设置为:3-10 个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。
5.4 单条日志大于 1m
参数名称 | 描述 |
message.max.bytes | 默认 1m,broker 端接收每个批次消息最大值。 |
max.request.size | 默认 1m,生产者发往 broker 每个请求消息最大值。针对 topic级别设置消息体的大小。 |
replica.fetch.max.bytes | 默认 1m,副本同步数据,每个批次消息最大值。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
6. 压测
6.1 生产者压测
测试:
1、batch.size=16384 linger.ms=0 9.76 MB/sec
2、batch.size=32768 linger.ms=0 9.76 MB/sec
3、batch.size=4096 linger.ms=0 3.81 MB/sec
4、batch.size=4096 linger.ms=50 3.83 MB/sec
5、batch.size=4096 linger.ms=50 compression.type=snappy 3.77 MB/sec
6、batch.size=4096 linger.ms=50 compression.type=zstd 5.68 MB/sec
7、batch.size=4096 linger.ms=50 compression.type=gzip 5.90 MB/sec
8、batch.size=4096 linger.ms=50 compression.type=lz4 3.72 MB/sec
9、batch.size=4096 linger.ms=50 buffer.memory=67108864 3.76 MB/sec
