image-20230922161858159

0、目标

  • 能够掌握kafka的各个组件及作用

  • 能够运用kafka的ui工具来管理kafka

  • 学会使用springboot来操作kafka

  • 实战运用,实现order记录的kafka传输

1、应用场景

1.1 kafka场景

Kafka最初是由LinkedIn公司采用Scala语言开发,致力于为各行各业的公司提供实时数据处理服务解决方案,基于ZooKeeper,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以 高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。

​ kafka目前支持多种客户端语言:java,python,c++,php等等。

​ Apache Kafka能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。

(1)日志收集:收集各种服务的log,通过kafka以统一接口服务的方式开放 给各种consumer,例如Hadoop、Hbase、Solr等;

1
2
3
其实开源产品有很多,包括Scribe、Apache Flume,很多人使用Kafka代替日志聚合(log aggregation)。
日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和副本机制确保了更强的耐用性保,并且端到端延迟更低。
类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。

(2)消息系统:==解耦==和生产者和消费者、缓存消息等;

1
2
3
4
5
Kafka被当作传统消息中间件的替代品。与大多数消息系统相比,Kafka具有更好的吞吐量,内置的分区,多副本和容错性,这使其成为大规模消息处理应用程序的良好解决方案。
在我们的经验中,消息的使用通常是相对较低的吞吐量,但可能需要较低的端到端延迟,并且通常需要强大的持久性保证,这些Kafka都能提供。
在这些要点中,Kafka可与传统消息系统(如ActiveMQ或RabbitMQ)媲美。

消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。

(3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,PV、UV等数据,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;

1
我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等

(4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

1
2
用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPS、QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka与ELK(ElasticSearch、Logstash和Kibana)整合构建应用服务监控系统。 
流水数据是所有站点对其网站使用情况做报表时都要用到的数据中最常用的一部分,流水数据包括PV,浏览内容信息以及搜索记录等。这些数据通常是先以日志文件的形式存在,然后有周期的去对这些日志文件进行统计分析处理,然后获得需要的KPI指标结果

(5)流式处理:实时领域中用于流式的数据处理工作,比如spark streaming和storm、flink.

1
2
例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中,后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。
从0.10.0.0版本开始,Apache Kafka提供了一个名为Kafka Streams的轻量级,但功能强大的流处理库,可执行如上所述的数据处理。除了Kafka Streams之外,替代开源流处理工具还包括Apache Storm和Apache Samza。

以上数据的特点

  • 数据不可变
  • 海量数据
  • 需要实时处理

传统消息系统并不能很好的支持。kafka可以搞定。

1.2 kafka特性

kafka以高吞吐量著称,主要有以下特性:

(1)高吞吐量、低延迟:即使是非常普通的硬件kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;

  • 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
  • zero-copy:减少IO操作步骤。
  • 支持数据批量发送和拉取。
  • 支持数据压缩。
  • Topic划分为多个partition,提高并行处理能力。

(2)可扩展性:kafka集群支持热扩展;

kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。

而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。

(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份replication防止数据丢失;

kafka(MQ)要实现从producer到consumer之间的可靠的消息传送和分发。传统的MQ系统通常都是通过broker和consumer间的确认(ack)机制实现的,并在broker保存消息分发的状态。

即使这样一致性也是很难保证。kafka的做法是由consumer自己保存状态,也不要任何确认。这样虽然consumer负担更重,但其实更灵活了。

因为不管consumer上任何原因导致需要重新处理消息,都可以再次从broker获得。

(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);

(5)高并发:支持数千个客户端同时读写;

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

  • 支持同步和异步复制两种HA

  • Consumer客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据

  • 消费状态保存在客户端

  • 消息存储顺序写

  • 数据迁移、扩容对用户透明

  • 支持Hadoop并行数据加载。

  • 支持online和offline的场景

  • scale out:无需停机即可扩展机器。

  • 定期删除机制,支持设定partitions的segment file保留时间。

Producer负载均衡和HA机制

  • producer根据用户指定的算法,将消息发送到指定的partition。
  • 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上。
  • 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over。
  • 通过zookeeper管理broker与consumer的动态加入与离开。

Consumer的pull机制

由于kafka broker会持久化数据,broker没有cahce压力,因此,consumer比较适合采取pull的方式消费数据,具体特别如下:

  • 简化kafka设计,降低了难度。
  • Consumer根据消费能力自主控制消息拉取速度。
  • consumer根据自身情况自主选择消费模式,例如批量,重复消费,从制定partition或位置(offset)开始消费等.

Consumer与topic关系以及机制

本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.

对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic.
如果所有的consumer都具有相同的group,这种情况和JMS queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是”发布-订阅”;消息将会广播给所有的消费者.

在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);每个group中consumer消息消费互相独立;我们可以认为一个group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息.

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的.事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的.

通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效,那么其消费的partitions将会有其他consumer自动接管.kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

Producer均衡算法

kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/“partitions leader列表”等信息.

当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”.事实上,消息被路由到哪个partition上,有producer客户端决定.
比如可以采用”random””key-hash””轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的.
在producer端的配置文件中,开发者可以指定partition路由的方式.

Consumer均衡算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力.

  1. 假如topic1,具有如下partitions: P0,P1,P2,P3
  2. 加入group中,有如下consumer: C0,C1
  3. 首先根据partition索引号对partitions排序: P0,P1,P2,P3
  4. 根据consumer.id排序: C0,C1
  5. 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
  6. 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

kafka broker集群内broker之间replica机制

kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);

备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个”consumer”,

消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower”落后”太多或者失效,leader将会把它从replicas同步列表中删除.

当所有的follower都将一条消息保存成功,此消息才被认为是”committed”,那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.

即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(备注:不同于其他分布式存储,比如hbase需要”多数派”存活才行)

kafka判定一个follower存活与否的条件有2个:

  1. follower需要和zookeeper保持良好的链接

  2. 它必须能够及时的跟进leader,不能落后太多.

如果同时满足上述2个条件,那么leader就认为此follower是”活跃的”.如果一个follower失效(server失效)或者落后太多,

leader将会把它从同步列表中移除[备注:如果此replicas落后太多,它将会继续从leader中fetch数据,直到足够up-to-date,然后再次加入到同步列表中;kafka不会更换replicas宿主!因为”同步列表”中replicas需要足够快,这样才能保证producer发布消息时接受到ACK的延迟较小。

当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个”up-to-date”的follower.kafka中leader选举并没有采用”投票多数派”的算法,

因为这种算法对于”网络稳定性”/“投票参与者数量”等条件有较高的要求,而且kafka集群的设计,还需要容忍N-1个replicas失效.对于kafka而言,每个partition中所有的replicas信息都可以在zookeeper中获得,那么选举leader将是一件非常简单的事情.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.

在选举新leader,需要考虑到”负载均衡”,partition leader较少的broker将会更有可能成为新的leader.

在整几个集群中,只要有一个replicas存活,那么此partition都可以继续接受读写操作.

1.3 消息对比

  • 如果普通的业务消息解耦,消息传输,rabbitMq是首选,它足够简单,管理方便,性能够用。
  • 如果在上述,日志、消息收集、访问记录等高吞吐,实时性场景下,推荐kafka,它基于分布式,扩容便捷
  • 如果很重的业务,要做到极高的可靠性,考虑rocketMq,但是它太重。需要你有足够的了解

解耦:

image-20230418074335887

1.4 大厂应用

  • 京东通过kafka搭建数据平台,用于用户购买、浏览等行为的分析。成功抗住6.18的流量洪峰
  • 阿里借鉴kafka的理念,推出自己的rocketmq。在设计上参考了kafka的架构体系
  • 已经服务于LinkedIn、Netflix、Uber以及Verizon,并为此建立了实时信息处理平台

2、基础组件

2.1 角色

image-20230922161911776

image-20230922161919674

  • broker:节点,就是你看到的机器,一台搭建了kafka服务的机器。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • provider:消息生产者,就是向kafka broker发消息的客户端。一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过自定义分区器来实现。
  • consumer:消息消费者,向kafka broker取消息的客户端,读消息的(pull拉取,要考虑到消费者的消费能力。用推送的话,消费者可能会拒绝或者消息丢失)。消费者可以订阅一个或者多个主题,并按照消息生成的顺序来读取它们。消费者通过检查消息的偏移量 (offset) 来区分读取过的消息。偏移量是一个不断递增的数值,在创建消息时,Kafka 会把它添加到其中,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。
  • zookeeper:信息中心,记录kafka的各种信息的地方
  • controller:其中的一个broker,作为leader身份来负责管理整个集群。如果挂掉,借助zk重新选主
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

2.2 逻辑组件

image-20230922161926617

  • topic:主题,一个消息的通道,收发总得知道消息往哪投,可以理解为一个队列

  • partition:分区/分片,每个主题可以有多个分区分担数据的传递,多条路并行,吞吐量大。为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

  • Replicas:副本,每个分区可以设置多个副本,副本之间数据一致。相当于备份,有备胎更可靠

  • leader & follower:主从,上面的这些副本里有1个身份为leader,其他的为follower。leader处理partition的所有读写请求

image-20230922161930765

​ 一个Topic会被归类为一则消息,每个Topic可以被分割为多个Partition,在每条消息中,它在文件中的位置称为Offset,用于标记唯一一条消息。在Kafka中,消息被消费后,消息仍然会被保留一定时间后在删除,比如在配置信息中,文件信息保留7天,那么7天后,不管Kafka中的消息是否被消费,都会被删除;以此来释放磁盘空间,减少磁盘的IO消耗。

  在Kafka中,一个Topic的多个分区,被分布在Kafka集群的多个Server上,每个Server负责分区中消息的读写操作。另外,Kafka还可以配置分区需要备份的个数,以便提高可用行。由于用到来ZK来协调,每个分区都有一个Server为Leader状态,服务对外响应(如读写操作),若该Leader宕机,会由其他的Follower来选举出新的Leader来保证集群的高可用性。

分片目的:

  • 提高读写的效率: 分片可以分布在不同节点上, 在进行读写的时候, 可以让多个节点一起参与(提高并行度)
  • 分布式存储: 解决了单台节点存储容量有限的问题

2.3 副本集合

副本:提高数据的可靠性, 防止数据丢失

副本的数量最多和集群节点数量保持一致, 但是一般设置为 2个 或者 3个

  • AR(assigned replica):所有副本的统称,AR=ISR+OSR
  • ISR(In-sync Replica):同步中的副本,可以参与leader选主。一旦落后太多(数量滞后和时间滞后两个维度)会被踢到OSR。
  • OSR(Out-Sync Relipcas):踢出同步的副本,一直追赶leader,追上后会进入ISR

Kafka必须提供数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并接收客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者换句话说,follower追赶leader数据。leader负责维护和跟踪ISR中所有follower滞后状态。当生产者发送一条消息到Broker,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower”落后”太多或者失效,leader将会把它从replicas从ISR移除。

partition的follower追上leader含义

Kafka中每个partition的follower没有“赶上”leader的日志可能会从同步副本列表中移除。下面用一个例子解释一下“追赶”到底是什么意思。

请看一个例子:主题名称为foo 1 partition 3 replicas。

假如partition的replication分布在Brokers 1、2和3上,并且Broker 3消息已经成功提交。同步副本列表中1为leader、2和3为follower。

假设replica.lag.max.messages设置为4,表明只要follower落后leader不超过3,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,表明只要follower向leader发送请求时间间隔不超过500 ms,就不会被标记为死亡,也不会从同步副本列中移除。

image-20230922161935528

下面看看,生产者发送下一条消息写入leader,与此同时follower Broker 3 GC暂停,如下图所示:

image-20230922161940060

直到follower Broker 3从同步副本列表中移除或追赶上leader log end offset,最新的消息才会认为提交。注意,因为follower Broker 3小于replica.lag.max.messages= 4落后于leader Broker 1,Kafka不会从同步副本列表中移除。在这种情况下,这意味着follower Broker 3需要迎头追赶上知道offset = 6,如果是,那么它完全“赶上” leader Broker 1 log end offset。让我们假设代理3出来的GC暂停在100 ms和追赶上领袖的日志结束偏移量。在这种状态下,下面partition日志会看起来像这样

是什么原因导致分区的副本与leader不同步

一个副本可以不同步Leader有如下几个原因

  • 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
  • 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
  • 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

一个partition的follower落后于leader足够多时,被认为不在同步副本列表或处于滞后状态。在Kafka-0.8.2.x中,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replicas响应partition leader的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,而后者是用来检测失效或死亡的副本

2.4 消息标记

image-20230922161947124

image-20230922161952042

image-20230922161958688

  • offset:偏移量,消息消费到哪一条了?每个消费者都有自己的偏移量.kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset,
确定它在分区日志中唯一的位置。

  • HW:(high watermark):副本的高水印值,客户端最多能消费到的位置,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的。
  • LEO:(log end offset):日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。

Kafka每个topic的partition有N个副本,其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。

那么这三者有什么关系呢?

比如在副本数等于3的情况下,消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW一般会小于LEO,即LEO>=HW。

具体的同步原理,下面章节会详细讲到

3、架构探索

3.1 发展历程

http://kafka.apache.org/downloads

image-20220414214313523

image-20220414214333638

3.1.1 版本命名

Kafka在1.0.0版本前的命名规则是4位,比如0.8.2.1,0.8是大版本号,2是小版本号,1表示打过1个补丁

现在的版本号命名规则是3位,格式是“大版本号”+“小版本号”+“修订补丁数”,比如2.5.0,前面的2代表的是大版本号,中间的5代表的是小版本号,0表示没有打过补丁

我们所看到的下载包,前面是scala编译器的版本,后面才是真正的kafka版本。

3.1.2 演进历史

0.7版本 只提供了最基础的消息队列功能

0.8版本 引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案。

0.9版本 增加权限和认证,使用Java重写了新的consumer API,Kafka Connect功能;不建议使用consumer API;

0.10版本 引入Kafka Streams功能,正式升级成分布式流处理平台;建议版本0.10.2.2;建议使用新版consumer API

0.11版本 producer API幂等,事务API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化

1.0和2.0版本 Kafka Streams改进;建议版本2.0

3.2 集群搭建(助学)

1)原生启动

kafka启动需要zookeeper,第一步启动zk:

1
docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13

原生安装:下载后解压启动即可 http://kafka.apache.org/downloads

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动命令
bin/kafka-server-start.sh config/server.properties
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

#server.properties配置说明
#表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
#brokder对外提供的服务入口地址,默认9092
listeners=PLAINTEXT://:9092
#设置存放消息日志文件的地址
log.dirs=/tmp/kafka/log
#Kafka所需Zookeeper集群地址,这里是关键!加入同一个zk的kafka为同一集群
zookeeper.connect=zookeeper:2181

3)推荐docker-compose 一键启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#参考资料中的kafka.yml
#注意hostname问题,ip地址:192.168.10.30,换成你自己服务器的
#docker-compose -f kafka.yml up -d 启动
version: '3'
services:
zookeeper:
image: zookeeper:3.4.13

kafka-1:
container_name: kafka-1
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10903:9092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10903
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10904:9092
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10904
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper

3.2.1 基准测试

kafka的基准测试:

​ 主要指的安装完成Kafka集群后, 进行测试操作, 测试其是否承载多大的并发量(读写效率)

注意: 在进行Kafka的基准测试的时候, 受Topic的分片和副本的数量影响会比较大, 一般在测试的时候, 会构建多个topic, 每一个topic设置不同的分片和副本的数量, 比如: 一个设置分片多一些, 副本少一些, 一个设置分片少一些, 副本多一些, 要不设置分片多副本也多

  • 1- 创建一个Topic
1
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
  • 2- 测试写入的数据的效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
./kafka-producer-perf-test.sh --topic test02  --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

属性说明:
--num-records: 发送的总消息量
--throughput: 指定吞吐量(限流) -1 不限制
--record-size: 每条数据的大小(字节)
--producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 设置生产者的配置信息(连接地址, 消息确认方案)

写后的结果:
5000000 records sent, 134578.634296 records/sec (128.34 MB/sec), 239.83 ms avg latency, 1524.00 ms max latency, 45 ms 50th, 940 ms 95th, 1269 ms 99th, 1461 ms 99.9th.

需关注的信息:
5000000 records sent : 总计写入了多少条数据
134578.634296 records/sec: 每秒中可以处理多少条数据
128.34 MB/sec: 每秒钟可以处理的数据量是多大
  • 3- 测试读取数据的效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000

属性:
--fetch-size 1048576 : 每次从kafka端拉取的数据量
--messages: 测试的总消息量

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-30 21:08:26:464, 2023-03-30 21:08:42:373, 4768.3716, 299.7279, 5000000, 314287.5102, 1680181706706, -1680181690797, -0.0000, -0.0030

start.time: 2023-03-30 21:08:26:464 启动时间
end.time: 2023-03-30 21:08:42:373 结束时间
data.consumed.in.MB: 4768.3716 总大小
MB.sec: 299.7279 每秒中可以处理的大小
data.consumed.in.nMsg: 5000000 总消息量
nMsg.sec: 314287.5102 每秒钟可以处理的数据

总结:

1
2
3
4
5
假设Kafka的节点数量是无限多的:
topic的分片数量越多, 理论上读写效率越高
topic的副本数量越多, 理论上写入的效率变差

一般可以将分片的数量设置为节点数量的三倍左右, 副本数量为1, 基本上可以测试出最佳性能

3.3 组件探秘

命令行工具是管理kafka集群最直接的工具。官方自带,不需要额外安装。

3.2.1 主题创建

image-20230922162010903

1
2
3
4
5
6
#进入容器
docker exec -it kafka-1 sh
#进入bin目录
cd /opt/kafka/bin
#创建
kafka-topics.sh --zookeeper zookeeper:2181 --bootstrap-server localhost:9092 --create --topic test --partitions 2 --replication-factor 1
  • 如何修改Topic:
1
2
3
4
Topic 仅允许增大分片(负载均衡时用), 不允许减少分片, 同时也不支持修改副本数量

增大分区:
./kafka-topics.sh --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 5
  • 如何删除Topic
1
2
3
4
5
6
7
8
9
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01

注意:
默认情况下, 删除一个topic 仅仅是标记删除, 主要原因: kafka担心误删数据, 一般需要用户手动删除

如果想执行删除的时候, 直接将topic完整的删除掉: 此时需要在server.properties 配置中修改一个配置为true
delete.topic.enable=true

如果topic中的数据量非常少, 或者说没有任何的数据的时候, 此时topic会自动先执行逻辑删除, 然后再物理删除, 不管是否配置了delete.topic.enable=true

3.2.2 查看主题

1
kafka-topics.sh --zookeeper zookeeper:2181 --list

3.2.3 主题详情

1
2
3
4
5
6
7
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
#分析输出:
Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1


我的测试:

image-20230922163754614

3.2.4 消息收发

1
2
3
4
5
6
7
8
9
10
11
12
#使用docker连接任意集群中的一个容器
docker exec -it kafka-1 sh

#进入kafka的容器内目录
cd /opt/kafka/bin

#客户端监听
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

#另起一个终端,验证发送
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

image-20220414220642328

我的测试:

image-20220524142254970

3.2.5 分组消费

发布订阅模式下,能否实现订阅者负载均衡消费呢?当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。

同一个订阅组会消费topic所有消息,每条消息只会被同一个订阅组的一个消费节点消费,同一个订阅组内不同消费节点会消费不同消息

1
2
3
4
5
6
#启动两个consumer时,如果不指定group信息,消息被广播
#指定相同的group,让多个消费者分工消费(画图:group原理)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#结果:在发送方,连续发送 1-4 ,4条消息,同一group下的两台consumer交替消费,并发执行

我的测试:

image-20220524143822304

注意!!!

这是在消费者和分区数相等(都是2)的情况下。 如果同一group下的 ( 消费者数量 > 分区数量 ) 那么就会有消费者闲置。

验证方式:

可以再多启动几个消费者试一试,会发现,超出2个的时候,有的始终不会消费到消息。 停掉可以消费到的,那么闲置的会被激活,进入工作状态

image-20220524144117440

闲置的会激活

image-20220524144246504

如果只是一个组,组名不同的消费者:

image-20220524144847315

3.2.6 指定分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#指定分区通过参数 --partition,注意!需要去掉上面的group
#指定分区的意义在于,保障消息传输的顺序性(画图:kafka顺序性原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1

#结果:发送1-4条消息,交替出现。说明消息被均分到各个分区中投递

> 这样就有问题,从不同的分区消费,无法保障顺序 => 指定key消费


#默认的发送是没有指定key的
#要指定分区发送,就需要定义key。那么相同的key被路由到同一个分区
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true

#携带key再发送,注意key和value之间用tab分割(hash(key) % 2 = 0 1 )
>1 1111
>1 2222
>2 3333
>2 4444

#查看consumer的接收情况
#结果:相同的key被同一个consumer消费掉

我的测试:

image-20220524154815984

3.2.7 偏移量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#偏移量决定了消息从哪开始消费,支持:开头,还是末尾

# earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

# 注意点!!!有提交偏移量的话,仍然以提交的为主,即便使用earliest,比提交点更早的也不会被提取

#--offset [earliest|latest(默认)] , 或者 --from-beginning
#新起一个终端,指定offset位置
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning

#结果:之前发送的消息,从头又消费了一遍!

我的测试:

earliest:把消息从最早到现在消费一遍

image-20220524155233368

3.4 zk探秘

前面说过,zk存储了kafka集群的相关信息,本节来探索内部的秘密。

Zooinspector工具也可以很方便的看:

image-20220524160045512

命令的方式:

kafka的信息记录在zk中,进入zk容器,查看相关节点和信息

1
2
3
4
5
6
7
docker exec -it kafka-zookeeper-1 sh

>./bin/zkCli.sh

>ls /

#结果:得到以下配置信息

image-20230922162149091

3.4.1 broker信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]

#机器broker信息
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.10.30:10903"],"jmx_port":-1,"host":"192.168.10.30","timestamp":"1609825245500","port":10903,"version":4}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0

3.4.2 主题与分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#分区节点路径
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
[state]

#分区信息,leader所在的机器id,isr列表等
[zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

3.4.3 消费者与偏移量

1
2
3
4
5
[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
#空的???
#那么,消费者以及它的偏移记在哪里呢???

kafka 消费者记录 group 的消费 偏移量 有两种方式 :

1)kafka 自维护 (新)

2)zookpeer 维护 (旧) ,已经逐渐被废弃

查看方式:

上面的消费用的是控制台工具,这个工具使用–bootstrap-server,不经过zk,也就不会记录到/consumers下。

其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面

1
2
3
4
5
6
7
8
9
10
#先起一个消费端,指定group
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#使用控制台工具查看消费者及偏移量情况
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa

#查看偏移量详情
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa

当前与LEO保持一致,说明消息都完整的被消费过

image-20210105174238632

停掉consumer后,往provider中再发几条记录,offset开始滞后:

image-20210105174424989

重新启动consumer,消费到最新的消息,同时再返回看偏移量,消息得到同步:

image-20210105174658407

3.4.4 controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#当前集群中的主控节点是谁
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0

3.5 km

3.5.1 启动

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源。提供可视化kafka集群操作

官网:https://github.com/yahoo/kafka-manager/releases

注意它的版本,docker社区的镜像版本滞后于kafka,我们自己来打镜像。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#Dockerfile
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]

#打包,注意将kafka-manager-2.0.0.2放到同一目录
docker build -t km:2002 .

# 还可以直接拉取
docker pull liggdocker/km:2002
# 修改镜像标签为km:2002
docker tag imageId km:2002
#启动:在上面的yml里,services节点下加一段
#参考资料:km.yml
#执行: docker-compose -f km.yml up -d
km:
image: liggdocker/km:2002
ports:
- 10906:9000
depends_on:
- zookeeper

完整的km.yml内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3'
services:
zookeeper:
image: zookeeper:3.4.13

kafka-1:
container_name: kafka-1
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10903:9092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10903
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10904:9092
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10904
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
km:
image: liggdocker/km:2002
ports:
- 10906:9000
depends_on:
- zookeeper

3.5.2 使用

使用km可以方便的查看以下信息:

  • cluster:创建集群,填写zk地址,选中jmx,consumer信息等选项
  • brokers:列表,机器信息
  • topic:主题信息,主题内的分区信息。创建新的主题,增加分区
  • cosumers: 消费者信息,偏移量等

也可以使用kafkatool工具

4、深入应用

4.0 Java API的操作

  • 1- 创建一个Maven的项目, 导入相关的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>

  • 2- 创建两个包目录: com.itheima.kafka.producer 和 com.itheima.kafka.consumer

image-20230922162304811

如何将数据生产到Kafka

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {
public static void main(String[] args) {

// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类

Producer<String, String> producer = new KafkaProducer<>(props);

// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));
producer.send(producerRecord);
}

// 第三步: 关闭生产者对象
producer.close();
}

}

如何从Kafka中消费数据

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

// 演示消费端的代码实现
public class KafkaConsumerTest {

public static void main(String[] args) {
// 第一步: 创建消费者的核心对象: KafkaConsumer 并添加配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 指定kafka的集群地址
props.setProperty("group.id", "test"); // 指定消费者组
props.setProperty("enable.auto.commit", "true"); // 是否开启自动提交消息偏移量
props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交消息偏移量的间隔时间
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类型
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value反序列化类型

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 第二步: 指定监听那些topic中数据 (支持多个一起监听)
consumer.subscribe(Arrays.asList("test01"));


while (true) { // 不断监听, 持续一直监听
// 第三步: 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候,等待的超时时间, 如果过了这个等待超时时间, 返回 空对象(对象是存在的, 但是内部没有任何的数据, 相当于空容器)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset(); // 获取消息偏移量
String key = record.key(); // 获取消息的key值
String value = record.value(); // 获取消息的value值
int partition = record.partition(); // 获取消息从那个分片来

System.out.println("消息的偏移量为:"+ offset +"; 消息所属分片:"+partition +";消息的key值:"+key+";消息的value:"+value);
}
}
}
}

可能出现,控制台 不打印 也不报错(底层是保存, 缺少log4j.properties日志配置):

发现本地windows的hosts文件配置有问题: C:\Windows\System32\drivers\etc\hosts 文件

1
2
3
4
5
6
必须配置以下内容:
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn

报这个问题同学, 大概率是只配置 node1 node2和node3 并没有配置全称

4.1 springboot-kafka

1)配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kafka:
bootstrap-servers: 192.168.10.30:10903,192.168.10.30:10904
producer: # producer 生产者
retries: 0 # 重试次数
acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 # 一次最多发送数据量
buffer-memory: 33554432 # 生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer: # consumer消费者
group-id: javagroup # 默认的消费组ID
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
auto-offset-reset: latest #earliest,latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2)启动信息

image-20210106143550399

4.2 消息发送

4.2.1 发送类型

KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法

详细代码参考:AsyncProducer.java

消费者使用:KafkaConsumer.java

1)同步发送

1
2
3
4
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
//注意,可以设置等待时间,超出后,不再等候结果
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());

通过swagger发送,控制台可以正常打印send result

swagger访问地址:http://localhost:8080/doc.html

2)阻断

在服务器上,将kafka暂停服务

1
docker-compose -f km.yml pause kafka-1 kafka-2

在swagger发送消息

调同步发送:请求被阻断,一直等待,超时后返回错误

image-20210106155900539

而调异步发送的(默认发送接口),请求立刻返回。

image-20210106160001017

那么,异步发送的消息怎么确认发送情况呢???往下看!

3)注册监听

代码参考: KafkaListener.java (释放注解)

可以给kafkaTemplate设置Listener来监听消息发送情况,实现内部的对应方法

1
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

1
com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}

启动kafka

1
docker-compose unpause kafka-1 kafka-2

再次发送消息时,同步异步均可以正常收发,并且监听进入success回调

1
2
com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}
com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]

可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息。

4.2.2 序列化

消费者使用:KafkaConsumer.java

1)序列化详解

  • 前面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
  • 基本上,可以满足绝大多数场景

2)自定义序列化

自己实现,实现对应的接口即可,有以下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}

//理论上,只实现这个即可正常运行
byte[] serialize(String var1, T var2);

//默认调上面的方法
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}

default void close() {
}
}

案例,参考: MySerializer.java

在yaml中配置自己的编码器

1
value-serializer: com.itheima.demo.config.MySerializer

重新发送,发现:消息发送端编码回调一切正常。但是消费端消息内容不对!

1
2
com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}
com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"

怎么办?

3)解码

发送端有编码并且我们自己定义了编码,那么接收端自然要配备对应的解码策略

代码参考:MyDeserializer.java,实现方式与编码器几乎一样!

在yaml中配置自己的解码器

1
value-deserializer: com.itheima.demo.config.MyDeserializer

再次收发,消息正常

1
2
com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}
com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}

4.2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规则

发送者代码参考:PartitionProducer.java

消费者代码使用:PartitionConsumer.java

通过swagger访问setKey:

image-20210108134831705

看控制台:

image-20210108134950158

再访问setPartition来设置分区号0来发送

image-20210108135041543

看控制台:

image-20210108135148546

2)自定义分区

你想自己定义规则,根据我的要求,把消息投放到对应的分区去? 可以!

参考代码:MyPartitioner.java , MyPartitionTemplate.java ,

发送使用:MyPartitionProducer.java

使用swagger,发送0开头和非0开头两种key试一试!

image-20210108152145108

备注:

自己定义config参数,比较麻烦,需要打破默认的KafkaTemplate设置

可以将KafkaConfiguration.java中的getTemplate加上@Bean注解来覆盖系统默认bean

这里为了避免混淆,采用@Autowire注入

4.3 消息消费

4.3.1 消息组别

发送者使用:KafkaProducer.java

1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

image-20210106142254297

2)启动

image-20210106143956316

3)通过swagger发送2条消息

image-20210106144226686

  • 同一group下的两个消费者,在group1均分消息
  • group2下只有一个消费者,得到全部消息

4)消费端闲置

注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置,浪费资源!

验证方式:

停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。

重新发送两条消息,试一试

image-20210205171103705

解析:

group2可以消费到1、2两条消息

group1下有两个消费者,但是只分配给了 -1 , -2这个进程被闲置

4.3.2 位移提交

1)自动提交

前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

1
2
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)

2)手动提交

有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。

下面我们自己定义配置,覆盖上面的参数

代码参考:MyOffsetConfig.java

通过在消费端的Consumer来提交偏移量,有如下几种方式:

代码参考:MyOffsetConsumer.java

同步提交、异步提交:manualCommit() ,同步异步的差别,下面会详细讲到。

指定偏移量提交:offset()

3)重复消费问题

如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

代码参考和对比:manualCommit() , noCommit()

验证过程:

用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:

image-20210113120859072

但是!重启试试:

image-20210113121017561

无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量试试:

image-20210113121449839

4)经验与总结

1
2
3
4
5
6
7
8
9
commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。

这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
详细代码参考:MyOffsetConsumer.manualOffset()

5、高级特性

5.1 扩展性

5.1.1 broker扩容

1)在yaml中复制kafka-2,拷贝为新的节点,注意以下标注修改的地方!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#修改后的内容参考:cluster.yml

kafka-3: #改
container_name: kafka-3 #改
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10905:9092 #改
environment:
KAFKA_BROKER_ID: 3 #改
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10905 #改
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper

完整的 cluster.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: '3'
services:
zookeeper:
image: zookeeper:3.4.13

kafka-1:
container_name: kafka-1
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10903:9092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10903
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10904:9092
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10904
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
km:
image: liggdocker/km:2002
ports:
- 10906:9000
depends_on:
- zookeeper
kafka-3: #改
container_name: kafka-3 #改
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10905:9092 #改
environment:
KAFKA_BROKER_ID: 3 #改
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10905 #改
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper

2)更新docker集群信息

1
2
3
4
5
6
7
8
docker-compose -f cluster.yml up -d
#启动消息

kafka_zookeeper_1 is up-to-date
kafka_km_1 is up-to-date
kafka-1 is up-to-date
kafka-2 is up-to-date
Creating kafka-3 ... done

3)进命令行,或打开km查看新的broker信息

image-20210114123502766

5.1.2 分区扩容

1)使用km对test主题增加分区到3个,看分区分配机器情况

image-20210114123626581

可以指定新分区数量,及分配到的机器

image-20210114123856288

2)注意问题

新加分区或重新调整分区,已经启动的客户端会动态更新对应的分配信息,不需要重启。

但是!!!

在同步变更消息的过程中有可能会丢失消息!想想为什么?(答案在下面)

(注意!以下场景不保证100%会重现!)

image-20210114130806920

答案:

回顾一下消费偏移量的默认提交配置:latest,因为新分区没有任何offset提交记录

所以会在重新分配分区后从末尾开始消费!

那么分配前的那些消息就不会消费到。而分配后再发送的不会受影响,可以正常消费

分区分配正常后,查看偏移量提交信息,没问题:

image-20210114125922113

km的Consumer页签里也可以查看偏移量信息:

image-20210114161441582

5.2 高可用

以上动态扩容操作是怎么实现的呢?集群中必然有一个节点协调了相关操作。

这台协调者,就是controller节点。

controller节点是其中的一台broker,所有broker都有可能成为controller

当前controller宕机后,其他就会参与竞争,选出新的controller,保持集群对外的高可用

5.2.1 节点选举

1)查找controller,找到它所在的broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#查找docker进程,找到zookeeper的容器
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps --format "table{{.ID}}\t{{.Names}}\t{{.Ports}}"
CONTAINER ID NAMES PORTS
75318748caab kafka-3 0.0.0.0:10905->9092/tcp
4807d188a180 kafka_km_1 0.0.0.0:10906->9000/tcp
4453eb0b2a36 kafka-2 0.0.0.0:10904->9092/tcp
d6fd814a0851 kafka-1 0.0.0.0:10903->9092/tcp
8c1fc2cc6e9a kafka_zookeeper_1 2181/tcp, 2888/tcp, 3888/tcp

#进入容器,连上zk
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker exec -it kafka_zookeeper_1 sh
/zookeeper-3.4.13 #
/zookeeper-3.4.13 # zkCli.sh
Connecting to localhost:2181

#查询当前controller是哪个节点,发现是2号机器(有可能是其他节点,找到这个brokerid,下面要用!)
[zk: localhost:2181(CONNECTED) 6] get /controller
{"version":1,"brokerid":2,"timestamp":"1610500701187"}

#controller变更的次数
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch
1

2)docker-compose停掉它!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#docker pause 暂停容器的服务,注意是上面找到的那台broker
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker pause kafka-2
kafka-2

#查看状态,发现(Paused)
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps | grep kafka-2
4453eb0b2a36 wurstmeister/kafka:2.12-2.2.2 "start-kafka.sh" 2 days ago Up 2 days (Paused) 0.0.0.0:10904->9092/tcp kafka-2

#再次按 1)的步骤进入zk容器,查看当前controller,已经变为3号
[zk: localhost:2181(CONNECTED) 0] get /controller
{"version":1,"brokerid":3,"timestamp":"1610679583216"}

#变更次数加了1
[zk: localhost:2181(CONNECTED) 1] get /controller_epoch
2

5.2.2 原理剖析

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的/controller临时节点就会被清除。

Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建它。

第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。

每个新选举出来的控制器,会在Zookeeper系统中递增controller_epoch的值。

附:详细流程图

image-20230922162208579

核心机制

Topic的分片和副本机制

什么是分片呢?

1
2
3
4
5
6
7
8
分片:  逻辑概念
相当于将一个Topic(大容器)拆分为N多个小容器, 多个小的容器构建为一个Topic

目的:
1- 提高读写的效率: 分片可以分布在不同节点上, 在进行读写的时候, 可以让多个节点一起参与(提高并行度)
2- 分布式存储: 解决了单台节点存储容量有限的问题

分片的数量:分片是可以创建N多个, 理论上没有任何的限制

什么是副本呢?

1
2
3
4
5
6
7
副本: 物理的概念
针对每个分片的数据, 可以设置备份, 可以将其备份多个

目的:
提高数据的可靠性, 防止数据丢失

副本的数量: 副本的数量最多和集群节点数量保持一致, 但是一般设置为 2个 或者 3个

如何保证数据不丢失

生产端是如何保证数据不丢失

image-20230922162313291

1
2
3
4
5
6
7
8
9
10
11
12
13
       当生产者将数据生产到Broker后, Broker应该给予一个ack确认响应,在Kafka中, 主要提供了三种ack的方案:

0: 生产者只管发送数据, 不关心不接收broker给予的响应

1: 生产者将数据发送到Broker端, 需要等待Broker端对应的topic上的对应的分片的主副本接收到消息后, 才认为发送成功了

-1(ALL): 生产者将数据发送到Broker端, 需要等待Broker端对应的topic上的对应的分片的所有的副本接收到消息后, 才认为发送成功了

效率角度: 0 > 1 > -1
安全角度: -1 > 1 > 0

思考: 在实际使用中, 一般使用什么方案呢? 三种都有可能
一般要根据消息的重要程度, 来选择采用什么方案, 如果数据非常的重要, 不能丢失, 一般设置为 -1

相关的思考的点:

1
2
3
4
5
6
7
8
9
10
11
思考1:  如果Broker迟迟没有给予ACK响应, 如何解决呢?

解决方案: 设置超时时间, 如果超时触发重试策略, 如果多次重试依然无法给予响应, 此时程序报异常

思考2: 每发送一次,Broker就要给予一次响应, 请问这样是否会对网络带宽产生影响呢? 如果产生, 如何解决呢?

解决方案: 会, 引入缓存池, 满足了一批数据后, 异步发送给Broker端, Broker端只需要针对这一批数据给予一次响应即可

思考3:通过一批一批的异步发送方式, 如果Broker端对这一批数据没有给予响应, 但是缓存池中数据已经满了, 如何解决?

解决方案: 选择清空缓存池 / 不清空, 如果数据是可重复读的,那么直接让程序报错即可, 通知处理, 处理后, 重新获取发送即可, 如果数据是不可重复读,为了避免此种问题, 我们可以数据先在某个其他位置保存(备份), 当数据生产成功, 删除对应的数据, 生产不成功, 后续直接从保存的位置中获取生产即可

相关的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
buffer.memory : 缓存池的大小 
默认值: 33554432(32M)

retries: 重试次数
默认值: 2147483647 (此参数并不代表最终的重试次数, 取决于超时相关参数)

delivery.timeout.ms: 一次发送数据总超时时间
默认值: 120000(120s)

request.timeout.ms: 一次请求超时时间
默认值: 30000(30s)

一批数据的阈值: 时间 和 大小
batch.size : 一批数据大小
默认值: 16384 (16kb)

linger.ms : 每一批次的间隔时间
默认值: 0

代码演示: 如何模拟同步发送数据 和 异步发送数据

同步方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// 模拟同步的发送方式
public class KafkaProducerTestSync {
public static void main(String[] args) {

// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类

Producer<String, String> producer = new KafkaProducer<>(props);

// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));
try {
producer.send(producerRecord).get(); // get() 会等待响应
// 发送成功了...
}catch (Exception e) {
// 发送失败了: 此处失败指的是重试后的失败 一旦发送失败, 就会抛出异常

// 在此处, 就可以编写发送失败后的, 业务逻辑代码

e.printStackTrace();
}
}

// 第三步: 关闭生产者对象
producer.close();
}

}

异步方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

// 模拟异步有返回值的发送方式
public class KafkaProducerTestAsync {
public static void main(String[] args) {

// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("linger.ms",1000); // 设置每批数据的间隔时间
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类

Producer<String, String> producer = new KafkaProducer<>(props);

// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));

producer.send(producerRecord, new Callback() { // 异步有返回值发送方式
// 回调函数: 底层在异步发送数据的时候, 发送一次, 就会调用一次回调函数, 如果 Exception 不为Null 说明发送成功了, 否则认为发送失败了
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {

if(exception != null){
// 认为数据发送失败了
// 在此处编写处理失败的业务逻辑
}

// 发送成功了

}
});


}

// 第三步: 关闭生产者对象
producer.close();
}

}

Broker端如何保证数据不丢失

保证方案: 磁盘存储 + 多副本 + ack为-1

消费端如何保证数据不丢失

image-20230922162321190

1
2
3
4
5
6
7
8
9
10
第一步: 当Consumer启动后, 连接Kafka集群, 根据group.id 到Kafka中寻找上一次消费到了什么位置(偏移量)

第二步:
如果consumer找到了上次消费位置, 接着从这个位置开始消费数据

如果没有找到上一次消费的位置, 说明第一次来, 这个时候默认从当前时刻开始消费数据, 消费的位置也会从当前这个消息的偏移量位置开始消费

第三步: 消费者开始消费数据, 在消费的过程中, 每消费完数据后, 都要和kafka集群进行汇报, 汇报当前消费到了那一个偏移量信息

汇报方式: 自动 / 手动

**思考: 请问在这种方式下是否可以保证消费端不会发送数据丢失的问题呢? **

可以保证, 但是可能会存在重复消费的问题

1
2
3
4
5
6
7
思考: 消费者消费的消息偏移量信息是存储在哪里呢? 

0.8.x版本之前, 消费者的消息偏移量信息是被记录在zookeeper中

0.8.x版本之后, 将消费者的消息偏移量信息记录在kafka集群上, 通过一个topic来记录: __consumer_offsets

此topic默认有50个分片 1个副本

演示: 如何手动提交偏移量信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

// 演示消费端的代码实现
public class KafkaConsumerTest2 {

public static void main(String[] args) {
// 第一步: 创建消费者的核心对象: KafkaConsumer 并添加配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 指定kafka的集群地址
props.setProperty("group.id", "test"); // 指定消费者组
props.setProperty("enable.auto.commit", "false"); // 是否开启自动提交消息偏移量
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类型
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value反序列化类型

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 第二步: 指定监听那些topic中数据 (支持多个一起监听)
consumer.subscribe(Arrays.asList("test01"));


while (true) { // 不断监听, 持续一直监听
// 第三步: 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候,等待的超时时间, 如果过了这个等待超时时间, 返回 空对象(对象是存在的, 但是内部没有任何的数据, 相当于空容器)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset(); // 获取消息偏移量
String key = record.key(); // 获取消息的key值
String value = record.value(); // 获取消息的value值
int partition = record.partition(); // 获取消息从那个分片来

System.out.println("消息的偏移量为:"+ offset +"; 消息所属分片:"+partition +";消息的key值:"+key+";消息的value:"+value);
}

// 每消费完一批数据, 提交一次偏移量信息
// 注意: 一旦使用手动提交偏移量, 千万要注意, 必须写提交偏移量的代码. 否则会导致大量的数据重复消费
//consumer.commitSync(); // 同步提交
consumer.commitAsync(); // 异步提交

}
}
}

消息存储和查询机制

消息存储

image-20230922162329875

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
存儲路径:  /export/server/kafka/data

在此目录下,根据topic名称和分片编号创建一个目录,在此目录下存储对应分片的数据

1- Kafka中数据是存储在磁盘中, 通过分文件的方式来存储的, 一个log文件默认最大为1GB,当达到1GB后, 就会滚动形成一个新的log文件, 同时对应的index文件也会滚动形成一个新的文件

2- 每一个消息数据片段都是由两个文件组成的:
index文件: 对log文件中数据索引信息
log文件: 存储是真正消息数据

3- 文件名表示什么?
当前这个文件存储的消息起始偏移量


思考: kafka为啥要分文件的方式来存储数据呢? 如果吧数据放置到同一个文件中, 不更好嘛?

原因: kafka本质是消息队列的中间件产品, 当消息被消费者所消费后, 存储在kafka中数据其实也就没啥用了, 或者说随着时间的推移, 存储在kafka的消息数据, 其数据价值会越来越低的, 所以说kafka存储数据仅仅是一种临时存储

默认情况下, kafka会自动删除超过168小时(7天)的数据, 通过分文件的方式, kafka只需要查看文件最后修改时间, 如果超过7天, 自动将其删除即可

相关配置: server.properties

log.retention.hours=168 (小时)
log.segment.bytes=1073741824(1GB)

查询机制

image-20230922162337140

查询数据的步骤:

  • 1- 确定消息被存储在那个segment片段中
  • 2- 先去对应segment片段中index文件, 从这个索引文件中, 查询对应消息偏移量, 在log文件的什么位置上进行存储着
  • 3- 根据返回的log文件的具体的位置信息, 底层会基于磁盘顺序查询方式查询log文件, 找到对应位置上数据即可
1
2
3
4
扩展: 
磁盘的读写方式主要有二种: 顺序读写 和 随机读写

顺序读写的效率是远远高于随机读写的效率

生产者数据分发机制

​ 生产者将数据生产到kafka的某个Topic中, Topic可以被分为多个分片的,最终一条消息只能被其中一个分片所接收, 那么最终是有哪个分片来接收数据呢? 这就是生产者的分发机制

思考: 分发策略有那些呢?

1
2
3
4
5
6
7
1- 随机分发策略
2- 轮询分发策略
3- Hash取模分发策略
4- 指定分区策略
5- 范围分发策略
6- 自定义分区策略
....

思考: 在Kafka中支持有那些策略呢?

1
2
3
4
5
1- 轮询策略(2.4版本以下), 目前为 粘性分发策略    是Java客户端拥有的
2- Hash取模分发策略
3- 指定分发策略
4- 随机分发策略 (Python 客户端支持, Java 客户端不支持)
5- 自定义分区策略

如何使用不同的分发策略呢?

  • 1- 指定分区策略
1
2
3
4
5
6
7
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}

在生产端, 构建数据承载对象的时候, 采用此构造方式, 即可实现指定分区的策略

分区编号: 从 0 开始
  • 2- Hash 取模分发策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2.1 创建数据承载对象的时候, 必须使用仅传递 k 和 v的构造方法, 即可使用hash模式
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}

2.2 当执行Hash取模分区策略,底层是通过一个默认的分区类实现完成Hash取模: DefaultPartitioner
public class DefaultPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
// 执行分区的核心方法, 返回内容表示将当前这条数据发送到那个分片上
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

说明: 在使用此种分发策略的时候, key值一定是可变的, 千万不要固定不变
  • 3- 粘性分区策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
3.1 创建生产者的数据承载对象的时候, 只需要传递value即可, 此时底层会采用粘性的分区策略
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}

3.2 当执行粘性分区策略,底层是通过一个默认的分区类实现完成Hash取模: DefaultPartitioner

public class DefaultPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
// 执行分区的核心方法, 返回内容表示将当前这条数据发送到那个分片上
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 当key为null的时候, 执行的是粘性的分区策略
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

什么叫粘性分区策略:
当生产者发送数据的时候, 一般都是采用异步(批)发送方案,当发送一批数据到Broker端后, 首先会随机的选择其中一个分片, 然后尽可能黏上这个分区, 将这一批的数据全部都交给这一个分区即可


什么是轮询策略:
当生产者发送数据的时候, 一般都是采用异步(批)发送方案,当发送一批数据到Broker端后, 根据topic的分片的数量, 将一批数据拆分为N多个小的批次, 一个批次对应一个分片, 然后写入到topic的各个分片上

粘性分区的优势:
减少中间的这个切分的操作, 直接将一批数据全部写入到某一个分片上, 同时也减少了中间ack的响应的次数, 减少网络的带宽, 提升效率


但是如果生成的数量非常的块, 会导致大量的数据写入到同一个分片上, 无法解开
  • 4- 自定义分区策略: 在MR中自定义分区方案很相似的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
如何自定义分区呢? 抄   抄DefaultPartitioner

1- 创建一个类, 实现 Partitioner 接口

2- 重写接口中partition()方法 以及 close 方法, 主要核心重写: partition()
partition方法的参数列表:
String topic : 指定要写入到那个topic
Object key : 传入的key值
byte[] keyBytes: 传入的key的字节
Object value : 传入的value数据
byte[] valueBytes : 传入的value的字节
Cluster cluster : 集群的对象 可以帮助获取指定的topic有多少个分片

其返回值为 要将这个数据写入到那个分片的编号

3- 将自定义的分区类, 配置到生产者的代码的Properties配置信息中:
key: partitioner.class
value: 自定义类的权限类名

将key 和value的值添加到properties对象中

消费者的负载均衡机制

image-20230922162344790

1
2
3
4
Kafka的消费者负载均衡机制规定:
1- 在一个消费者组内, 消费者的数量最多和所监听的topic的分片数量是相等的, 如果有大于分片数量的消费者, 一定会有消费者处于闲置的状态

2- 在一个消费者组内, topic的一个分片的数据只能被一个消费者所接收, 不允许出现一个分片被多个消费者所接收的情况, 而一个消费者是可以接收多个分片的数据

思考:

1
2
3
如何模拟点对点消费模式: 让所有监听这个topic的消费者, 都处在同一个消费组内

如何模拟发布订阅模式: 让所有监听这个topic的消费者都不在同一个消费组内

通过命令的方式查看数据积压的问题

1
./kafka-consumer-groups.sh  --bootstrap-server node1:9092,node2:9092,node3:9092 --group test01 --describe

image-20230922162352797

工作中, 有时候运维工程师, 会将lag指标纳入监控范围, 当这个LAG 出现积压问题, 基于告警系统 进行告警

也可以用kafka-eagle工具在界面查看积压

配额限速

防止大量消息压爆服务器

1
2
3
4
5
# producer 配置
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

# consumer配置
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

6、底层架构

6.1 存储架构

6.1.1 分段存储

开篇讲过,kafka每个主题可以有多个分区,每个分区在它所在的broker上创建一个文件夹

每个分区又分为多个段,每个段两个文件,log文件里顺序存消息,index文件里存消息的索引

段的命名直接以当前段的第一条消息的offset为名

注意是偏移量,不是序号! 第几条消息 = 偏移量 + 1。类似数组长度和下标。

所以offset从0开始(可以开新队列新groupid消费第一条消息打印offset得到验证)

image-20210119150603724

例如:

0.log -> 有8条,offset为 0-7,[0, 8)

8.log -> 有两条,offset为 8-9,[8, 10)

10.log -> 有xx条,offset从10-xx,[10, 10 + xx)

image-20230922162217093

6.1.2 日志索引

每个log文件配备一个索引文件 *.index

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments –files test-0/00000000000000000000.index

image-20210115162535648

综合上述,来看一个消息的查找:

  • consumer发起请求要求从offset=6的消息开始消费
  • kafka直接根据文件名大小,发现6号消息在00000.log这个文件里
  • 那文件找到了,它在文件的哪个位置呢?
  • 根据index文件,发现 (6 , 9807),说明消息藏在这里!
  • 从log文件的 9807 位置开始读取。
  • 那读多长呢?简单,读到下一条消息的偏移量停止就可以了

6.1.3 日志删除

Kafka作为消息中间件,数据需要按照一定的规则删除,否则数据量太大会把集群存储空间占满。

删除数据方式:

  • 按照时间,超过一段时间后删除过期消息
  • 按照消息大小,消息数量超过一定大小后删除最旧的数据

Kafka删除数据的最小单位:segment,也就是直接干掉文件!一删就是一个log和index文件

6.1.4 存储验证

1)数据准备

将broker 2和3 停掉,只保留1

1
docker pause kafka-2 kafka-3

2)删掉test主题,通过km新建一个test主题,加2个分区

新建时,注意下面的选项:

segment.bytes = 1000 ,即:每个log文件到达1000byte时,开始创建新文件

删除策略:

retention.bytes = 2000,即:超出2000byte的旧日志被删除

retention.ms = 60000,即:超出1分钟后的旧日志被删除

以上任意一条满足,就会删除。

3)进入kafka-1这台容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
docker exec -it kafka-1 sh

#查看容器中的文件信息
/ # ls /
bin dev etc home kafka lib lib64 media mnt opt proc root run sbin srv sys tmp usr var

/ # cd /kafka/

/kafka # ls
kafka-logs-d0b9c75080d6

/kafka # cd kafka-logs-d0b9c75080d6/
/kafka/kafka-logs-d0b9c75080d6 # ls -l | grep test
drwxr-xr-x 2 root root 4096 Jan 15 14:35 test-0
drwxr-xr-x 2 root root 4096 Jan 15 14:35 test-1

#2个分区的日志文件清单,注意当前还没有任何消息写进来
#timeindex:日志的时间信息
#leader-epoch,下面会讲到
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 4
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 0 Jan 15 14:35 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 4
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 0 Jan 15 14:35 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

4)往里灌数据。启动项目通过swagger发送消息

注意!边发送边查看上一步的文件列表信息!

image-20210115144735864

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#先发送2条,消息开始进来,log文件变大!消息在两个分区之间逐个增加。
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 875 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 875 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

#继续逐条发送,返回再来看文件,大小为1000,到达边界!
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

#继续发送消息!1号分区的log文件开始分裂
#说明第8条消息已经进入了第二个log
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 20
-rw-r--r-- 1 root root 0 Jan 15 14:46 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r-- 1 root root 125 Jan 15 14:46 00000000000000000008.log #第二个log文件!
-rw-r--r-- 1 root root 10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

#持续发送,另一个分区也开始分离
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 20
-rw-r--r-- 1 root root 0 Jan 15 15:55 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 15:55 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 15:55 00000000000000000008.index
-rw-r--r-- 1 root root 625 Jan 15 15:55 00000000000000000008.log
-rw-r--r-- 1 root root 10 Jan 15 15:55 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 15:55 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 20
-rw-r--r-- 1 root root 0 Jan 15 14:46 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r-- 1 root root 750 Jan 15 15:55 00000000000000000008.log
-rw-r--r-- 1 root root 10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint


#持续发送消息,分区越来越多。
#过一段时间后再来查看,清理任务将会执行,超出的日志被删除!(默认调度间隔5min)
#log.retention.check.interval.ms 参数指定

/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r-- 1 root root 0 Jan 15 19:12 00000000000000000119.log
-rw-r--r-- 1 root root 10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r-- 1 root root 10 Jan 15 19:12 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r-- 1 root root 0 Jan 15 19:12 00000000000000000119.log
-rw-r--r-- 1 root root 10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r-- 1 root root 10 Jan 15 19:12 leader-epoch-checkpoint

6.2 零拷贝

Kafka 在执行消息的写入和读取这么快,其中的一个原因是零拷贝(Zero-copy)技术

6.2.1 传统文件读写

image-20230922162105833

传统读写,涉及到 4 次数据的复制。但是这个过程中,数据完全没有变化,我们仅仅是想从磁盘把数据送到网卡。

那有没有办法不绕这一圈呢?让磁盘和网卡之类的外围设备直接访问内存,而不经过cpu?

有! 这就是DMA(Direct Memory Access 直接内存访问)。

6.2.2 DMA

DMA其实是由DMA芯片(硬件支持)来控制的。通过DMA控制芯片,可以让网卡等外部设备直接去读取内存,而不是由cpu来回拷贝传输。这就是所谓的零拷贝

目前计算机主流硬件基本都支持DMA,就包括我们的硬盘和网卡。

kafka就是调取操作系统的sendfile,借助DMA来实现零拷贝数据传输的

image-20210115210450506

6.2.3 java实现

为加深理解,类比为java中的零拷贝:

  • 在Java中的零拷贝是通过java.nio.channels.FileChannel中的transferTo方法来实现的

  • transferTo方法底层通过native调操作系统的sendfile

  • 操作系统sendfile负责把数据从某个fd(linux file descriptor)传输到另一个fd

    备注:linux下所有的设备都是一个文件描述符fd

代码参考:

1
2
3
4
5
6
7
8
File file = new File("0.log");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
//文件通道,来源
FileChannel fileChannel = raf.getChannel();
//网络通道,去处
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("1.1.1.1", 1234));
//对接上,通过transfer直接送过去
fileChannel.transferTo(0, fileChannel.size(), socketChannel);

6.3 分区一致性

6.3.1 水位值

1)先回顾两个值:

image-20230922162241810

2)再看下几个值的存储位置:

注意!分区是有leader和follower的,最新写的消息会进入leader,follower从leader不停的同步

无论leader还是follower,都有自己的HW和LEO,存储在各自分区所在的磁盘上

leader多一个Remote LEO,它表示针对各个follower的LEO,leader又额外记了一份!

3)为什么这么做呢?

leader会拿这些remote值里最小的来更新自己的hw,具体过程我们详细往下看

6.3.2 同步原理

image-20230922162245987

我们来看这几个值是如何更新的:

1)leader.LEO

这个很简单,每次producer有新消息发过来,就会增加

2)其他值

另外的4个值初始化都是 0

他们的更新由follower的fetch(同步消息线程)得到的数据来决定!

如果把fetch看做是leader上提供的方法,由follower远程请求调用,那么它的伪代码大概是这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//java伪代码!
//follower端的操作,不停的请求从leader获取最新数据
class Follower{
private List<Message> messages;
private HW hw;
private LEO leo;

@Schedule("不停的向leader发起同步请求")
void execute(){
//向leader发起fetch请求,将自己的leo传过去
//leader返回leo之后最新的消息,以及leader的hw
LeaderReturn lr = leader.fetch(this.leo) ;

//存消息
this.messages.addAll(lr.newMsg);
//增加follower的leo值
this.leo = this.leo + lr.newMsg.length;
//比较自己的leo和leader的hw,取两者小的,作为follower的hw
this.hw = min(this.leo , lr.leaderHW);
}
}



//leader返回的报文
class LeaderReturn{
//新增的消息
List<Messages> newMsg;
//leader的hw
HW leaderHW;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//leader在接到follower的fetch请求时,做的逻辑
class Leader{
private List<Message> messages;
private LEO leo;
private HW hw;
//Leader比follower多了个Remote!
//注意!如果有多个副本,那么RemoteLEO也有多个,每个副本对应一个
private RemoteLEO remoteLEO;

//接到follower的fetch请求时,leader做的事情
LeaderReturn fetch(LEO followerLEO){
//根据follower传过来的leo,来更新leader的remote
this.remoteLEO = followerLEO ;
//然后取ISR(所有可用副本)的最小leo作为leader的hw
this.hw = min(this.leo , this.remoteLEO) ;

//从leader的消息列表里,查找大于follower的leo的所有新消息
List<Message> newMsg = queryMsg(followerLEO) ;

//将最新的消息(大于follower leo的那些),以及leader的hw返回给follower
LeaderReturn lr = new LeaderReturn(newMsg , this.hw)
return lr;
}

}

6.3.3 Leader Epoch

1)产生的背景

0.11版本之前的kafka,完全借助hw作为消息的基准,不管leo。

发生故障后的规则:

  • follower故障再次恢复后,从磁盘读取hw的值并从hw开始剔除后面的消息,并同步leader消息
  • leader故障后,新当选的leader的hw作为新的分区hw,其余节点按照此hw进行剔除数据,并重新同步
  • 上述根据hw进行数据恢复会出现数据丢失和不一致的情况,下面分开来看

假设:

我们有两个副本:leader(A),follower(B)

场景一:丢数据

image-20210127152137598

  • 某个时间点B挂了。当它恢复后,以挂之前的hw为准,设置 leo = hw
  • 这就造成一个问题:现实中,leo 很可能是 大于 hw的。leo被回退了!
  • 如果这时候,恰恰A也挂掉了。kafka会重选leader,B被选中。
  • 过段时间,A恢复后变成follower,从B开始同步数据。
  • 问题来了!上面说了,B的数据是被回退过的,以它为基准会有问题
  • 最终结果:两者的数据都发生丢失,没有地方可以找回!

场景二:数据不一致

image-20230922162256389

  • 这次假设AB全挂了。比较惨
  • B先恢复。但是它的hw有可能挂之前没从A同步过来(原来A是leader)
  • 我们假设,A.hw = 2 , B.hw = 1
  • B恢复后,集群里只有它自己,所以被选为leader,开始接受新消息
  • B.hw上涨,变成2
  • 然后,A恢复,原来A.hw = 2 ,恢复后以B的hw,也就是2为基准开始同步。
  • 问题来了!B当leader后新接到的2号消息是不会同步给A的,A一直保留着它当leader时的旧数据
  • 最终结果:数据不一致了!

2)改进思路

0.11之后,kafka改进了hw做主的规则,这就是leader epoch

leader epoch给leader节点带了一个版本号,类似于乐观锁的设计。

它的思想是,一旦发生机器故障,重启之后,不再机械的将leo退回hw

而是借助epoch的版本信息,去请求当前leader,让它去算一算leo应该是什么

3)实现原理

对比上面丢数据的问题:

image-20210117160544546

  • A为(leo=2 , hw=2),B为(leo=2 , hw=1)
  • B重启,但是B不再着急将leo打回hw,而是发起一个Epoch请求给当前leader,也就是A
  • A收到LE=0后,发现和自己的LE一样,说明B在挂掉前后,leader没变,都是A自己
  • 那么A就将自己的leo值返回给B,也就是数字2
  • B收到2后和自己的leo比对取较小值,发现也是2,那么不再退回到hw的1
  • 没有回退,也就是信息1的位置没有被覆盖,最大程度的保护了数据
  • 如果和上面一样的场景,A挂掉,B被选为leader

image-20210117161903544

  • 那么A再次启动时后,从B开始同步数据

  • 因为B之前没有回退,1号信息得到了保留

  • 同时,B的LE(epoch号码)开始增加,从0变成1,offset记录为B当leader时的位置,也就是2

  • A传过来的epoch为0,B是1,不相等。那么取大于0的所有epoch里最小的

    (现实中可能发生了多次重新选主,有多条epoch)

  • 其实就是LE=1的那条。现实中可能有多条。并找到它对应的offset(也就是2)给A返回去

  • 最终A得到了B同步过来的数据

再来看一致性问题的解决:

image-20210117163116528

  • 还是上面的场景,AB同时挂掉,但是hw还没同步,那么A.hw=2 , B.hw=1

  • B先启动被选成了leader,新leader选举后,epoch加了一条记录(参考下图,LE=1,这时候offset=1)

  • 表示B从1开始往后继续写数据,新来了条信息,内容为m3,写到1号位

  • A启动前,集群只有B自己,消息被确认,hw上涨到2,变成下面的样子

    image-20210117163347006

  • A开始恢复,启动后向B发送epoch请求,将自己的LE=0告诉leader,也就是B

  • B发现自己的LE不同,同样去大于0的LE里最小的那条,也就是1 , 对应的offset也是1,返回给A

  • A从1开始同步数据,将自己本地的数据截断、覆盖,hw上升到2

  • 那么最新的写入的m3从B给同步到了A,并覆盖了A上之前的旧数据m2

  • 结果:数据保持了一致

附:epochRequest的详细流程图

image-20230922162141615

7、业务实战

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

image-20230922162128230

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

image-20230922162118830

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

3)通过swagger请求

image-20210120135200524

先按不同的id发送,查看控制台日志,id被正确分发到对应的队列

image-20210120141510550

同一个key分配到同一个queue,顺序性得到保障

image-20210120141712904

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

image-20230922162230466

7.2.3 部署实现

1)mysql部署

注意,需要打开binlog,8.0 默认处于开启状态

1
2
#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0

连上mysql,执行以下sql,添加canal用户

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

创建订单表

1
2
3
4
5
CREATE TABLE `orders` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
);

2)canal部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#canal.properties
#附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: '2'
services:
canal:
image: canal/canal-server
container_name: canal
restart: always
ports:
- "10908:11111"
environment:
#mysql的链接信息
canal.instance.master.address: 192.168.10.30:3306
canal.instance.dbUsername: canal
canal.instance.dbPassword: canal
#投放到kafka的哪个主题?要提前准备好!
canal.mq.topic: canal
volumes:
- "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"

3)数据通道验证

进入kafka容器,用上面3.2.4里的命令行方式监听canal队列

1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal

在mysql上创建orders表,增删数据试一下

1
2
mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)

在kafka控制台,可以看到同步的消息

1
{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}

数据通道已打通,还缺少的是druid作为消费端来接收消息

4)druid部署

1
2
3
4
#druid.yml
#在附带资料里有
#随便找个目录,执行
docker-compose -f druid.yml up -d

5)验证

配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

image-20210126185051954

注:

关于druid的详细使用,在大数据篇章里会详细讲解。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

image-20230922162158414

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#注意ip地址:192.168.10.30,全部换成你自己服务器的

version: '3'
services:
zookeeper:
image: zookeeper:3.4.13

kafka-1:
container_name: kafka-1
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10903:9092
- 10913:10913
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10903
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"
JMX_PORT: 10913
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10904:9092
- 10914:10914
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.10.30
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
KAFKA_ADVERTISED_PORT: 10904
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"
JMX_PORT: 10914
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
eagle:
image: gui66497/kafka_eagle
container_name: ke
restart: always
depends_on:
- kafka-1
- kafka-2
ports:
- "10907:8048"
environment:
ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

image-20210127105615559

与km到底选哪个呢?根据自己习惯,个人认为:

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

Kafka Cluster

zookeeper集群环境

由于Kafka Cluster需要依赖ZooKeeper(后面简称ZK)集群来协同管理,所以这里我们需要事先搭建好ZK集群

Kafka集群环境

由于Kafka已经贡献到Apache基金会了,我们可以到Apache的官方网站上去下载Kafka的基础安装包,下载地址如下所示:

Kafka Cluster的部署

  首先,我们将下载好的Kafka基础安装包解压,命令如下所示:

  • 解压Kafka
1
[hadoop@dn1 ~]$ tar -zxvf kafka_2.9.1-0.8.2.1.tgz
  • 进入到Kafka解压目录
1
[hadoop@dn1 ~]$ cd kafka_2.9.1-0.8.2.1
  • 配置环境变量
1
2
3
[hadoop@dn1 ~]$ vi /etc/profile
export KAFKA_HOME=/home/hadoop/kafka_2.11-0.8.2.1
export PATH=$PATH:$KAFKA_HOME/bin
  • 配置Kafka的zookeeper.properties
1
2
3
4
5
6
# the directory where the snapshot is stored.
dataDir=/home/hadoop/data/zk
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
  • 配置server.properties
1
2
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

  注:这里配置broker的时候,每台机器上的broker保证唯一,从0开始。如:在另外2台机器上分别配置broker.id=1,broker.id=2

  • 配置producer.properties
1
2
3
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
  • 配置consumer.properties
1
2
3
4
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=dn1:2181,dn2:2181,dn3:2181

  至此,Kafka Cluster部署完成。