为什么kafka 查看节点节点存在关闭时报没有kafka 查看节点服务

博客分类:
1 kafka集群搭建
1.zookeeper集群
搭建在110, 111,112
2.kafka使用3个节点110, 111,112
修改配置文件config/server.properties
broker.id=110
host.name=192.168.1.110
log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs
复制到其他两个节点,然后修改对应节点上的config/server.pro
3.启动,在三个节点分别执行
bin/kafka-server-start.sh
config/server.properties &/dev/null 2&&1 &
4 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
5 查看主题详细
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
--topic test
Topic:test
PartitionCount:3
ReplicationFactor:3
Topic: test
Partition: 0
Leader: 110
Replicas: 110,111,112
Isr: 110,111,112
Topic: test
Partition: 1
Leader: 111
Replicas: 111,112,110
Isr: 111,112,110
Topic: test
Partition: 2
Leader: 112
Replicas: 112,110,111
Isr: 112,110,111
6 去zk上看kafka集群
[zk: localhost:2181(CONNECTED) 5] ls /
[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 6] ls /brokers
----& 查看注册在zk内的kafka
[topics, ids]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
[112, 110, 111]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions
[zk: localhost:2181(CONNECTED) 12]
kafka java调用:
2.1 java端生产数据, kafka集群消费数据:
1 创建maven工程,pom.xml中增加如下:
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.8.2.0&/version&
&/dependency&
2 java代码:
向主题test内写入数据
import java.util.P
import java.util.concurrent.TimeU
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
import kafka.serializer.StringE
public class kafkaProducer extends Thread{
public kafkaProducer(String topic){
this.topic =
public void run() {
Producer producer = createProducer();
while(true){
producer.send(new KeyedMessage&Integer, String&(topic, "message: " + i++));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.1.110:.1.111:.1.112:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.1.110:.1.111:.1.112:9094");// 声明kafka broker
return new Producer&Integer, String&(new ProducerConfig(properties));
public static void main(String[] args) {
new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
kafka集群中消费主题test的数据:
[root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin
启动java代码,然后在看集群消费的数据如下:
message: 0
message: 1
message: 2
message: 3
message: 4
message: 5
message: 6
message: 7
message: 8
message: 9
message: 10
message: 11
message: 12
message: 13
message: 14
message: 15
message: 16
message: 17
message: 18
message: 19
message: 20
message: 21
3 kafka 使用Java写消费者,这样 先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:
import java.util.HashM
import java.util.L
import java.util.M
import java.util.P
import kafka.consumer.C
import kafka.consumer.ConsumerC
import kafka.consumer.ConsumerI
import kafka.consumer.KafkaS
import kafka.javaapi.consumer.ConsumerC
* 接收数据
* 接收到: message: 10
接收到: message: 11
接收到: message: 12
接收到: message: 13
接收到: message: 14
* @author zm
public class kafkaConsumer extends Thread{
public kafkaConsumer(String topic){
this.topic =
public void run() {
ConsumerConnector consumer = createConsumer();
Map&String, Integer& topicCountMap = new HashMap&String, Integer&();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map&String, List&KafkaStream&byte[], byte[]&&&
messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream&byte[], byte[]& stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator&byte[], byte[]& iterator =
stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.1.110:.1.111:.1.112:2181");//声明zk
properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
public static void main(String[] args) {
new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test
代码见附件:
下载次数: 1322
浏览 95123
chengjianxiaoxue
浏览: 536567 次
来自: 北京
可以通过继承FileOutputFormat来简化相关代码
楼主,那hbase结合hive使用,这种架构稳定不?机器的内存 ...
最近在使用kylin的时候有点疑问,我安装这些都没问题 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'博主最新文章
博主热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)kafka删除节点怎么删除_百度知道
kafka删除节点怎么删除
我有更好的答案
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中SparkStreaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。Kafka架构与安全首先,我们来了解下有关Kafka的几个基本概念:Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的TopicName标识。Producer:向Topic发布消息的进程称为Producer。Consumer:从Topic订阅消息的进程称为Consumer。Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在ConsumerGroup发生变化时进行relalance.Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。然而,分析Kafka框架,我们会发现以下严重的安全问题:1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。2.网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。Kafka安全设计基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。基于Kerberos的身份机制如下图所示:Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:1.Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出2.Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)3.Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl//,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。构建安全的Kafka服务首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:其中,authentication参数表示认证模式,可选配置项为simple,kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:publicclassSecureProducerextendsThread{privatefinalkafka.javaapi.producer.PprivatefinalSprivatefinalPropertiesprops=newProperties();publicSecureProducer(Stringtopic){AuthenticationManager.setAuthMethod(“kerberos”);AuthenticationManager.login(“producer1″,“/etc/producer1.keytab”);props.put(“serializer.class”,“kafka.serializer.StringEncoder”);props.put(“metadata.broker.list”,“172.16.1.190:.1.192:.1.193:9092″);//Userandompartitioner.Don’tneedthekeytype.JustsetittoInteger.//ThemessageisoftypeString.producer=newkafka.javaapi.producer.Producer(newProducerConfig(props));this.topic=}...Topic权限管理Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:其中,resetPermission(user,Permissions,topic)为重置user对topic的权限。grant(user,Permissions,topic)为赋予user对topic权限。revoke(user,Permissions,topic)为取消user对topic权限。isPermitted(user,Permissions,topic)为检查user对topic是否具有指定权限。调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeperKerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:publicclassAuthzTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.setProperty(“authentication”,“kerberos”);props.setProperty(“zookeeper.connect”,“172.16.2.116:.2.117:.2.118:2181″);props.setProperty(“principal”,“kafka/host1@TDH”);props.setProperty(“keytab”,“/usr/lib/kafka/config/kafka.keytab”);ZKConfigconfig=newZKConfig(props);AuthenticationManager.setAuthMethod(config.authentication());AuthenticationManager.login(config.principal(),config.keytab());AuthorizationManagerauthzManager=newAuthorizationManager(config);//resetpermissionREADandWRITEtoip172.16.1.87ontopictestauthzManager.resetPermission(“172.16.1.87″,newPermissions(Permissions.READ,Permissions.WRITE),“test”);//grantpermissionWRITEtoip172.16.1.87ontopictestauthzManager.grant(“172.16.1.87″,newPermissions(Permissions.CREATE),“test”);//revokepermissionREADfromip172.16.1.87ontopictestauthzManager.revoke(“172.16.1.87″,newPermissions(Permissions.READ),“test”);//committhepermissionsettingsauthzManager.commit();authzManager.close();}}ipaddress认证模式下,取消和赋予权限的操作如下所示:publicclassAuthzTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.setProperty(“authentication”,“ipaddress”);props.setProperty(“zookeeper.connect”,“172.16.1.87:.1.88:.1.89:2181″);ZKConfigconfig=newZKConfig(props);//newauthorizationmanagerAuthorizationManagerauthzManager=newAuthorizationManager(config);//resetpermissionREADandWRITEtoip172.16.1.87ontopictestauthzManager.resetPermission(“172.16.1.87″,newPermissions(Permissions.READ,Permissions.WRITE),“test”);//grantpermissionWRITEtoip172.16.1.87ontopictestauthzManager.grant(“172.16.1.87″,newPermissions(Permissions.CREATE),“test”);//revokepermissionREADfromip172.16.1.87ontopictestauthzManager.revoke(“172.16.1.87″,newPermissions(Permissions.READ),“test”);//committhepermissionsettingsauthzManager.commit();authzManager.close();}}总结与展望本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop&Spark生态系统,安全功能还存在很多问题,各组件的权限系统独立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能限制具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。
为您推荐:
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。Linux下安装并(单节点)配置启动Kafka_服务器应用_Linux公社-Linux系统门户网站
你好,游客
Linux下安装并(单节点)配置启动Kafka
来源:Linux社区&
作者:favccxx
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。也可以把它当作是分布式提交日志的发布-订阅消息,事实上Kafka官网上也是这么说明的。
关于Kafk你必须知道的几个关键术语
topics:Kafka接收的各种各样的消息
producers:发送消息到Kafka
consumers:从Kafka接收消息的订阅者
broker:一个或多个服务器组成的Kakfa集群
下图是一个生产者通过kafka集群发送给消费者的示例
Topics和Logs
一个Topic就是将发布的消息归类的过程,对于每一个topic,Kafka集群都会维护一个分区日志,如下图
每个分区都是有编号的,而且每个分区的消息也会根据提交的日志进行编号。分区中的消息会被分配一个唯一的编号,这个术语叫做offset,用以识别分区中的消息。
Kafka集群会保存所有的发布消息,无论这些消息在固定的时间内是否被消费者所消费。比如,消息日志设置的保存期间是2天,在消息发布的2天内,消费者可以消费,然后丢弃该条消息来释放空间。Kafka的性能跟数据空间无关,因此保存大量数据对于Kafka来说不是问题。
实际上,在日志中保存每个消费者位置的元数据才是&offset&。offset是由消费者控制的:一般来说,当消费者一行行读取消息时,offset才起作用。但实际上,消费者可以以任意他们想要的方式读取消息,因为消费者可以重置已存在的offset。
这种机制表明Kafka消费者是非常容易处理的-消息的处理对于集群或其它消费者来说几乎没有什么影响。比如,我们可以在命令行工具中使用&tail&topic来处理消息而不用改变已经存在的消费者。
日志分区有几种不同的目的。首先,能够避免一台服务器上的日志文件过大。每个独立的分区肯定位于同一台服务器上,并且在同一台服务器上处理,但是一个topic可能有多个分区,这样能够保证处理大量的数据。其次,分区可以作为并行处理的单元。
日志分布在Kafka集群中的不同分区上,每个服务器处理数据并请求共享分区。每个分区都是可以通过配置服务器的容错机制进行复制的。
每个分区都有一个服务器作为&leader(主节点)&,有0个或多个服务器作为&followers(从节点)&,主节点可以从分区中读写数据,但是从节点只能复制主节点的消息。如果主节点宕机,其中的一个从服务器会自动成为新的主服务器。主服务器处理一些分区的数据,从服务器处理其它服务器的数据,这样保存集群的平衡。
生产者(Producers)
生产者可以决定将消息发送到哪些topic,而且生产者可以选择将topic内的消息发送到哪个分区。这种简单的循环负载均衡方式能够在语义分区时完成。这种分区通常在1秒内完成。
消费者(Consumers)
传统的消息队列有两种处理方式:顺序处理和发布/订阅处理。在顺序处理方式时,消费者是按照消息进入消息队列的顺序进行读取的。发布/订阅方式则是将消息广播给所有的消费者。Kafka提供了一种抽象的方式-消费者分组(consumer group)来满足消息的以上两种处理方式。
每个消费者都有一个组名,只有订阅的消费者在对应的组中时,发布到topic中的消息才会传递给消费者对象。消费者对象可以在不同的进程或主机中存在。
如果所有的消费者对象的组名都相同,这就好比是传统的顺序队列,消费者平均分配这些消息。
如果所有的消费者对象的组名都不相同,这就好比是发布/订阅模式,消费者只接受订阅的消息。
通常来说,订阅某一主题(topic)的消费者在同一组的有多个,这是为了系统的稳定和容错。下图是一个具体的示例。
Kafka比传统的消息队列拥有更高的排序可靠性。
传统的消息队列在顺序保存消息到服务器时,如果有多个消费者从队列中读取消息,服务器会顺序发送消息。但是,尽管服务器是顺序发送消息的,但是消费者是异步接收消息的,因此消费者接收到的消息可能并不是顺序的,但消费者并不知道消息是乱序的。为避免这种情况,传统的消息队列通常只允许一个进程读取消息,这也就意味着消息的处理是单向的,而不是并行的。
Kafka在这方面有更好的处理方式,它通过在主题中使用分区完成了并行处理。Kafka既保证了顺序输出又实现了消费者之间的平衡。通过给主题分配分区,将消息分给同组内的消费者,确保每一分区内的消费者是唯一的,并且是顺序读取消息。由于是通过分区来实现多个消费者对象的负载均衡,所以同一消费者组的消费者是不能超过分区的。
Kafka仅仅实现了消息在一个分区内的排序,而不是同一主题不同分区内的排序。对于大多数应用而言,数据分区和分区内数据排序就足够了。如果你想要所有的消息都是顺序排列的,那就只能有一个分区,这意味着只能有一个消费者在一个消费者组内。这种情况下,消息的处理就不是并行的。
消息会以生产者发送的顺序追加到主题的分区。例如,一个生产者发送同一个消息两次分别称为M1,M2,M1先发,那么M1将会有一个更小的偏移量,并且也会比M2早出现在日志中。
消费者以存储在日志中的顺序看见消息。
对于复制N倍的主题,即便N-1台服务器出错,都不会使已经提交到日志的消息丢失
Kafka可以替代一些传统的消息代理。消息代理有很多使用场景,比如与数据处理程序解耦,缓存未处理的消息等等。和大多数消息处理系统相比,Kafka有更好的吞吐量,内建的分区,复制和容错能力,这使得Kafka能够很好的处理大规模消息应用。
Kafka最初用来提供实时追踪网站用户行为的相关数据的能力,例如统计PV,UV等。
Kafka经常被用来操作监控数据,比如从分布式的应用中汇总统计数据。
我们的服务通常部署在多台计算机上,服务器的运行日志也会分散打在各个机器上。Kafka通常被用来从各个服务器上收集日志,然后统一打到HDFS或者其他离线存储系统,比如Facebook的Scribe在收集日志时就是用了Kafka。
很多用户完成原始数据的阶段性汇总,加工等处理后,将操作结果转换为新的topic写入Kafka来进行更深入的处理。&比如,文章推荐程序完首先是用爬虫从RSS中爬取用户订阅的文本内容,然后把这些内容发布到articles&topic下。接下来的处理程序,将articles&topic下的内容格式化后,发布到format&topic下。最终的处理程序尝试将这些格式化的内容推荐给合适的用户。Storm和Samze是处理这种业务的流行框架。
业务状态的变化被按照时间顺序记录下来,这种程序设计方式被成为事件采集。Kafka支持大规模的日志数据存储,这使得Kafka成为事件采集程序理想的后端模块。
更多详情见请继续阅读下一页的精彩内容:
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计
Apache Kafka 代码实例
Apache Kafka 教程笔记
Apache kafka原理与特性(0.8V)&
Kafka部署与代码实例&
Kafka介绍和集群环境搭建&
相关资讯 & & &
& (10/30/:13)
& (08/30/:36)
& (11/02/:30)
& (10/14/:16)
& (05/02/:00)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款拒绝访问 | www.ggdoc.com | 百度云加速
请打开cookies.
此网站 (www.ggdoc.com) 的管理员禁止了您的访问。原因是您的访问包含了非浏览器特征(40f722da2fe2450b-ua98).
重新安装浏览器,或使用别的浏览器

我要回帖

更多关于 kafka集群节点数 的文章

 

随机推荐