spring cloud kafka中怎么持续从kafka中消费数据

消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
(2)指定"auto.offset.reset"参数的值为earliest;
对应的spring-kafka消费者客户端配置参数为:
&!-- 指定消费组名 --&
&entry key="group.id" value="fg11"/&
&!-- 从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest --&
&entry key="auto.offset.reset" value="earliest"/&
注意:从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上。
比如我们为消费者A指定了消费者组(group.id)为fg11,那么可以使用如下命令查看消费者组的消费情况:
bin/kafka-consumer-groups.sh --bootstrap-server 172.17.6.10:9092 --describe --group fg11
显示结果如下:
CURRENT-OFFSET
LOG-END-OFFSET
CONSUMER-ID
consumer-1-08c856a3-ae39-4f73-a2da-4de
/192.168.207.127
consumer-1
consumer-1-08c856a3-ae39-4f73-a2da-4de
/192.168.207.127
consumer-1
consumer-1-08c856a3-ae39-4f73-a2da-4de
/192.168.207.127
consumer-1
其实friend这个topic共有3个分区,消息总数为12条,其实在消费者A启动之前,这12条消息已经被其他某个组的消费者消费过了。而我们虽然为消费者A指定了一个全新的group.id为fg11,但是如果我们在启动消费者A之前,指定的"auto.offset.reset"参数的值是latest而不是earliest的话(就算你停止消费者,然后改为earliest也是没有用的),启动之后它将不会消费以前的消息,除非friend这个topic的分区中有了新的消息它才会消费。
所以一定要在消费者启动之前就保证group.id是全新的,而且要指定earliest而不是latest。
阅读(...) 评论()Kafka之Consumer获取消费数据全过程图解 - 简书
Kafka之Consumer获取消费数据全过程图解
这篇文章是作为: 的补充材料,看过我们之前源码分析的同学可能知道。本文将从客户端程序如何调用Consumer获取到最终Kafka消息的全过程以图解的方式作一个源码级别的梳理。
废话不多说,请图看
Business Process Model.jpg
文章短小的目的是便于大家快速找到内容的核心加以理解,避免文章又臭又长抓不住重点。对于Kafka技术,如果大家对此有任何疑问,请给我留言,我们可以深入探讨。
清晰的UML时序图在这里可以看:
十二年JAVA开发经验,专注电商、金融、支付行业,对微服务、大数据、分布式有浓厚兴趣
个人主页:http://blog.csdn.net/u
微博地址:/u//
推荐干货社区:本帖子已过去太久远了,不再提供回复功能。拒绝访问 |
| 百度云加速
请打开cookies.
此网站 () 的管理员禁止了您的访问。原因是您的访问包含了非浏览器特征(663a-ua98).
重新安装浏览器,或使用别的浏览器

我要回帖

更多关于 spring kafka maven 的文章

 

随机推荐