Kafka
角色
- producer:生产者,发送消息的
- broker:kafka服务
- topic:主题,消息的类别
- partition:分区,每个topic包含一个或多个分区
- replica:副本,一个分区拥有1个leader 0或多个follower
- consumer:消费者,从kafka服务读取消息的客户端
- consumer group:消费者群组,每个消费者属于一个特定的群组
- offset:偏移量,kafka用来确定消息是否被消费者消费过的标识
如何实现高吞吐
一个主题可以设置多个分区,将主题的消息打散到多个分区上面,实现了生产者和消费者消息处理的高吞吐量
如何实现高可用
每个分区可以设置leader和follower,每个消息发送时都是发送到对应分区的leader上面,follower再从leader同步,follower只对leader的数据进行备份,当leader发生故障时,从follower中进行选举切换为leader,实现了高可用
生产者发送流程原理
producer调用发送消息的方法后,经过 拦截器 -> 序列换器 -> 分区器 -> 缓冲队列 ->发送线程Sender;
拦截器:主要用来对消息进行拦截或者修改;
序列化器:对消息的key和value进行序列化;
分区器:决定将消息发送到哪个分区,主要有三种策略:
如果在发消息的时候指定了分区,则消息投递到指定的分区
如果没有指定分区,但是消息的key不为空,采用key的哈希值来选择一个分区
如果既没有指定分区,也没有key的话,会采用粘性分区器(Sticky Partition),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,再随机一个分区进行使用;
缓冲队列(RecordAccumulator 默认32M):消息进入队列并不会马上进行发送,而是需要满足条件后才会由Sender线程发送;
发送条件:
消息累计达到batch.size,默认是16kb
等待时间达到linger.ms,默认是0毫秒
发送线程Sender(Sender -> 创建Request -> NetworkClient -> Selector)
Request:Sender从缓冲队列中获取到ProducerBatch数据,原来的ProducerBatch队列是List结构的数据,现将ProducerBatch的数据取出来跟ProducerBatch一起组成Map的数据格式,定义完请求方式和回调方式,然后通过NetworkClient发送出去;
NetworkClient :一个网络客户端实现,它可以用于生产者发送消息,也可以用于消费者消费消息以及服务器端broker之间的通信,每个NetworkClient 默认缓存5个请求;
Selector:发送给kafka集群以及获取kafka回调的消息
消费者与分区关系
同一个消费者组消费者数=分区数:每个消费者消费一个分区的消息;
同一个消费者组消费者数消费者数<分区数:某些消费者会处理多个分区的消息;
同一个消费者组消费者数消费者数>分区数:多余的消费者将空等,无法处理消息;
消费者与分区的分配策略
RangeAssignor:
将消费者按照member.id(如果用户指定了group.instance.id 则用该id作为member.id,否则随机生成)进行字典排序,一个主题下的分区数除以消费者数量,不能整除向上取整,然后一次向每个消费者分配指定的分区;
RoundRobinAssignor:
将所有的消费者按照member.id进行排序,所有分区按照数值排序,然后分区轮询分配给消费者,如果当前消费者没有订阅当前主题则跳过;有消费者挂掉了进行分区重新分配
StickyAssignor:
核心思想是保证分区均衡、发生分区重分配尽量保留现有的分配结果,初次分配跟RoundRobinAssignor策略一样,当有其他的消费者挂掉了,其他存活的消费者保持现有的分配不懂,挂掉的消费者的分区优先分给持有分区少的消费者。
Producer如何保证消息不丢失和幂等性
ack配置
- ack=0:生产者只管发送消息,不管kafka是否接收到消息了;
- ack=1(默认):生产者发送消息后,kafka接收到消息后且消息写入到leader副本里面才会回应生产者,如果此时leader副本发生故障也会导致消息丢失;
- ack=-1:生产者发送消息后,kafka接收到消息后且消息写入到所有副本中才会回应生产者;
retries配置
重试策略,可以配置重试次数,生产者投递消息失败了可以进行重试操作;
幂等性如何保证:每个生产者在初始化的时候会分配一个ProducerId,消息投递的时候还会带上SequenceNumber(类似消费者的偏移量),消息在kafka上落盘成功了就会使SequenceNumber+1,生产者因为其他原因导致没有收到ACK回应会进行重复,重发的消息中会包含SequenceNumber,但是消费的SequenceNumber跟kafka的SequenceNumber对不上认为是重复消息;