一 客户端开发
一般生产逻辑需要以下步骤:
- 配置生产者客户端参数,创建相应的生产者实例
- 构建待发送信息
- 发送信息
- 关闭生产者
生产者demo
public class KafkaProducerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static Properties initConfig() { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "producer.client.id.demo"); return props; } public static void main(String[] args) throws InterruptedException { // 1. 配置生产者客户端参数 Properties props = initConfig(); // 1. 创建相应的生产者实例 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 2. 构建待发送信息 ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!"); try { // 3. 发送消息 producer.send(record); } catch (Exception e) { e.printStackTrace(); } } }
1.1 必要参数
- bootstrap.servers:指定 kafka 的 broker 地址清单
- key.serializer & value.serializer:kafka 的 broker 接受数据为 byte[],所以数据需要进行序列化和反序列化,这部分没有默认值,需要填写序列器的全限定名
1.2 消息发送
构建信息发送对象:ProducerRecord
public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp; }
其中 topic 和 value为必填值,其他值都可以按需添加。如果添加 key,将会根据 key 分发到不同的Partition。
消息发送主要有三种方式:发后既忘(fire-and-forget)、同步(sync)、异步(async)。
常用的就是fire-and-forget 方式,这种方式性能最高,但是可靠性最差,在发生不可重试异常时信息会丢失。
producer.send(record)
1.2.1 异步方式
异步回调方式调用如下:
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(metadata.partition() + ":" + metadata.offset()); } } });
metadata 和 exception两个参数为互斥的,即必有一个不为空。
异步即消息发送至RecordAccumulator后即返回,不关注发送到服务器是否成功。
1.2.2 同步方式
如果使用同步方式可以如下,使用 get 会阻塞等待 Kafka 响应。
producer.send(record).get();
如果需要更详细的发送消息,可以使用同步方法如下:
try { Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); System.out.println(metadata.topic() + ":" + metadata.partition() + ":" + metadata.offset()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); }
发送时的异常:
KafkaProducer 中一般会发生两种异常:可重试异常和不可重试异常
可重试包括网络异常等;不可重试包括发送消息体过大等,不会在重试发送,直接抛出异常。
props.put(ProducerConfig.RETRIES_CONFIG,10); // 可重试异常的重发次数
同步即消息发送至服务器后返回,关注发送到服务器是否成功。
1.3 拦截器
生产者拦截器可以实现对消息发送前的过滤、修改、或者对发送信息的统计工作。
自定义拦截器时需要实现 ProducerInterceptor接口。该接口包含3个方法:
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record),在序列化和分区器之前执行该方法。一般用于对value 进行修改
- public void onAcknowledgement(RecordMetadata recordMetadata, Exception e),在消息被应答之前或者消息发送失败时调用该方法,由于设定的 callback 执行。
- public void close(),用于关闭拦截器的资源清理。
以下 demo 为每条消息加入 pre 前缀,并计算发送成功率。如下:
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> { private volatile long sendSuccess = 0; private volatile long sendFailure = 0; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { String modifiedValue = "prefix1-" + record.value(); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (e == null) { sendSuccess++; } else { sendFailure++; } } @Override public void close() { double successRatio = (double) sendSuccess / (sendFailure + sendSuccess); System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%"); } @Override public void configure(Map<String, ?> map) { } }
使用时只需要修改 Producer 实例的 interceptor.classes属性即可。
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
拦截器属性可配置多个拦截器,组成拦截器链。只需配置属性时,将多个拦截器类名通过”,”链接。
1.4 序列化
生产者和消费者对 value 的序列化和反序列化需要一致。对于 string、integer 等基本类型可以使用基本序列器,但是对于传递对象 value 时,需要自己实现序列化器。如下:
public class ProtostuffSerializer implements Serializer<Company> { public void configure(Map<String, ?> configs, boolean isKey) { } public byte[] serialize(String topic, Company data) { if (data == null) { return null; } Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass()); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); byte[] protostuff = null; try { protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } return protostuff; } public void close() { } }
使用时只需要修改 Producer 实例的 value 序列化属性即可。
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ProtostuffSerializer.class.getName());
1.5 分区器
消息调用 send 方法后,可能会经过拦截器、序列化器(必经)、分区器,然后发到 broker。
如果ProducerRecord未指定 Partition,那么需要经过分区器根据 key 计算 Partition。
- Key 不为 null:对key 进行哈希计算分区号
- Key 为 null:进行轮询方式发往各分区
如果不同分区数据量相差较大,则会出现问题
如果要自定义分区器,需要和 DefaultPartitioner 一样实现 Partitioner 接口即可。
public class DemoPartitioner implements Partitioner { private final AtomicInteger counter = new AtomicInteger(0); @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (null == keyBytes) { return counter.getAndIncrement() % numPartitions; } else return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
使用时只需要修改 Producer 实例的 partitioner.class 属性即可。
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DemoPartitioner.class.getName());
二 生产者-分区
2.1 分区策略
分区策略如下:
- 如果有定义的分区器,使用分区器进行分区
- 如果有定义消息的key,对key进行hash然后对partition数量取余确定发送分区号
- 如果都无,进行随机发送。但是会粘连发送,直到在partition的batch满,或者延时到达,再重新随机发送新的partition。
2.2 自定义分区器
见1.5 分区器
三 生产者-如何提高吞吐
优化RecordAccumulator到kafka集群的吞吐
优化参数:
batch.size: 批次大小,默认16k。可以修改为32k
linger.ms: 延时时间,默认0ms。可以修改为5-100ms
compression.type:压缩,常用snappy
RecordAccumulator:缓冲区大小。可以修改为64m
四 数据保障
4.1 数据可靠
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
数据完全可靠条件:acks=-1 && 分区副本≥2 && ISR里应答的最小副本数量≥2
4.2 数据重复问题
- 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 最多一次(At Most Once)= ACK级别设置为0
- 总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
- 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。
事务原理
开启事务前提是开启幂等性
4.3 数据顺序问题
如何保证单分区内数据不乱序:
(比如req3在发送时失败,在重新发送时候可以会在req4之后,造成乱序)
在1.x之后,可以设置req缓存为5以内。因为服务端会缓存最近的5个req元数据,根据幂等性中的单调递增的Sequence Number来判断是否为顺序的req,否则会等待。
四 原理分析
生产者客户端主要有两个线程,分别为主线程和 sender 线程。
主线程通过将信息经过各拦截器等缓存到 消息累加器中。使用该缓存主要是便于 sender 进行批量发送,减少网络传输资源以提升性能。该消息累加器大小可以通过参数’buffer.memory’配置,默认是32MB,如果生产消息的速度超过了发送到服务器速度,则会导致该累加器内存不足。可能会导致阻塞或者抛出异常。
消息累加器中为每个 Partition 都创建一个双向队列、即 Deque<ProducerBatch>。消息通过尾部写入,sender 线程消费头部进行发送 kafka。ProducerBatch不是ProducerRecord,他可能包含一个或多个ProducerRecord,是根据 Batch 的大小和生产消息的大小决定的。
当一个 ProducerRecord 进入到消息累加器中,先根据消息分区找到对应的双向队列(无则新建)。然后从队列尾部获取一个 ProducerBatch(无则新建)。查看ProducerBatch是否还能写下该信息,不能则新建一个 batch。创建 ProducerBatch 的时候判断该ProducerRecord的大小是否超过设置的 batch.size,如果没超过就放入到一个标准大小的ProducerBatch 中,并且该ProducerBatch交由 BufferPool 进行复用;如果超过就创建一个ProducerRecord大小的ProducerBatch,并不进行复用。(batch.size 默认16KB)。
在 sender 中先将<分区,Deque<ProducerBatch>>转换为<node,Deque<ProducerBatch>>,再转换为<node,Deque<Request>>,在从 sender 线程发送到 kafka 之前还会将该内容保存到 InFlightRequests 保存。缓存已经发出但是未收到响应的请求。如图中可以看出 node1负载压力较大(有较多的请求未响应),
三 重要的生产者参数
3.1 acks
改参数指定Partition 中必须有多少副本收到该消息才认为写入成功。涉及可靠性和吞吐量。该值为 String 类型。
acks=1:默认值为1,即只要 Partition 的 leader 副本成功接受,即认为发送成功。该情况在 leader崩溃重新选主等情况中可以捕获异常,重新发送。但是如果 leader 已经写入成功,在 follower 副本拉取前 leader 崩溃,则仍然会消息丢失。
acks=0:发送信息后不等待任何服务器响应。该方式吞吐量最大,但是可靠性最低。
acks=-1:需要等待所有副本成功接受,才认为发送成功。可靠性最高。但是如果副本为1,只有 leader 时,也就退化成了acks=1.
3.2 max.request.size
生产者客户端发送消息的最大值。默认为1MB。
不建议修改该值,请先考虑拆分发送的情况。
