为什么晚上咳嗽厉害白天没事 百度拇指医生

kafka(5)
写了一个kafka的demo,kafka生产者和消费者,消费者用线程池创建多个消费者,并且创建的消费者大于或者等于小于partition个数,验证了kafka消费端负载的算法,算法见:
创建一个maven工程,程序的结构如下:
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&
&kafka-new&
&kafka-new&
&0.0.1-SNAPSHOT&
&org.apache.kafka&
&kafka_2.11&
&0.10.1.1&
&org.apache.hadoop&
&hadoop-common&
&org.apache.hadoop&
&hadoop-hdfs&
&org.apache.hadoop&
&hadoop-client&
&org.apache.hbase&
&hbase-client&
&org.apache.hbase&
&hbase-server&
&org.apache.hadoop&
&hadoop-hdfs&
&jdk.tools&
&jdk.tools&
&${JAVA_HOME}/lib/tools.jar&
&org.apache.httpcomponents&
&httpclient&
&org.apache.maven.plugins&
&maven-compiler-plugin&
MyProducer类
package com.lijie.
import java.util.P
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaP
import org.apache.kafka.clients.producer.ProducerR
public class MyProducer {
public static void main(String[] args) throws Exception {
produce();
public static void produce() throws Exception {
String topic = "mytopic";
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.80.123:9092");
properties.put("value.serializer","org.mon.serialization.StringSerializer");
properties.put("key.serializer", "org.mon.serialization.StringSerializer");
KafkaProducer&String, String& pro = new KafkaProducer&&(properties);
while (true) {
String value = UUID.randomUUID().toString();
ProducerRecord&String, String& pr = new ProducerRecord&String, String&(topic, value);
pro.send(pr);
Thread.sleep(1000);
MyConsumer类
package com.lijie.
import java.util.HashM
import java.util.L
import java.util.M
import java.util.P
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import kafka.consumer.C
import kafka.consumer.ConsumerC
import kafka.consumer.KafkaS
import kafka.javaapi.consumer.ConsumerC
public class MyConsumer {
public static void main(String[] args) {
consumer();
public static void consumer() {
String topic = "mytopic";
Properties properties = new Properties();
properties.put("group.id", "lijieGroup");
properties.put("zookeeper.connect", "192.168.80.123:2181");
properties.put("auto.offset.reset", "largest");
properties.put("mit.interval.ms", "1000");
ConsumerConfig config = new ConsumerConfig(properties);
ConsumerConnector conn = Consumer.createJavaConsumerConnector(config);
Map&String, Integer& map = new HashMap&String, Integer&();
map.put(topic, 3);
Map&String, List&KafkaStream&byte[], byte[]&&& createMessageStreams = conn
.createMessageStreams(map);
List&KafkaStream&byte[], byte[]&& list = createMessageStreams.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i & list.size(); i++) {
executor.execute(new ConsumerThread("消费者" + (i + 1), list.get(i)));
ConsumerThread类
package com.lijie.
import kafka.consumer.ConsumerI
import kafka.consumer.KafkaS
import kafka.message.MessageAndM
public class ConsumerThread implements Runnable {
private String consumerN
private KafkaStream&byte[], byte[]&
public ConsumerThread(String consumerName, KafkaStream&byte[], byte[]& stream) {
this.consumerName = consumerN
this.stream =
public void run() {
ConsumerIterator&byte[], byte[]& iterator = stream.iterator();
while (iterator.hasNext()) {
MessageAndMetadata&byte[], byte[]& next = iterator.next();
String topic = next.topic();
int partitionNum = next.partition();
long offset = next.offset();
String message = new String(next.message());
System.out
.println("consumerName: "+ consumerName + "topic: " + topic + " ,partitionNum: "
+ partitionNum + " ,offset: " + offset + " ,message: " + message);
执行结果:
按照上面的结果可以发现,我创建的topic是mytopic,并且创建的3个partition,消费者1消费partition0,消费者2消费partition1,消费者3消费partition2,一个消费者对应一个partition
如果我把消费者改成4个,但是我的partition还是只有3个,那么回怎么样呢?
结果和上面一样,而消费者4没有效果
但是如果我把消费者设置为小于partition的个数,比如2个,又会怎样么?
可以看到消费者1会消费两个partition,分别是partition1和partition0,而消费者2只会消费partition2
这个现象正好验证了我上一篇博客,kafka消费者和partition个数的负载算法,详情见:
当然,在kafka里面,默认的生产者生产数据会均衡的放到各个partition当中,如果我们需要指定特定消息到特定的partition里面,我们需要自定义partition
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:41459次
积分:1517
积分:1517
排名:千里之外
原创:109篇
评论:18条
(4)(14)(18)(10)(8)(19)(36)(1)(2)(5)Kafka技术内幕: 生产者_架构师_传送门
你是真实用户吗(Are you a robot)?
我们怀疑你不是真实用户,已对你的访问做了限制。如果您是真实用户,非常抱歉我们的误判对您造成的影响,您可以通过QQ()或电子邮件()反馈给我们,并在邮件和QQ请求信息里注明您的IP地址:220.177.198.53,我们会尽快恢复您的正常访问权限。另外,如果您不是在访问的当前页面,我们建议您移步
或者 在浏览器中输入以下地址:http://chuansong.me/n/ 访问,您所访问的网站是从抓取的数据,请直接访问,会有更好的体验和更及时的更新。We suspect you are a robot.We are really sorry if you are not,and you can email us () with your current IP address: 220.177.198.53 to get full access to .If you are not accessing
for the current page,you'd better visit
for better performance,as the current website you are accessing is just spam.
觉得不错,分享给更多人看到
架构师 微信二维码
分享这篇文章
5月30日 20:56
架构师 最新头条文章
架构师 热门头条文章

我要回帖

更多关于 一到晚上咳嗽厉害 的文章

 

随机推荐