玩牌有什么承载30吨的转运工具小工具

Client:220.177.198.53 Node:bc0008c Time: 19:16:52
您的浏览器
工作正常
知道创宇云安全节点
工作正常
线路
连接超时
服务器状态未知Spark Streaming kafka 实现数据零丢失的几种方式 - 简书
Spark Streaming kafka 实现数据零丢失的几种方式
在使用spark streaming消费kafka数据时,程序异常中断下发现会有数据丢失的情况。下文将说明如何避免这种情况。
Definitions
问题开始之前先解释下流处理中的一些概念:
At most once - 每条数据最多被处理一次(0次或1次)
At least once - 每条数据最少被处理一次 (1次或更多)
Exactly once - 每条数据只会被处理一次(没有数据会丢失,并且没有数据会被多次处理)
1.High Level API
如果不做容错,将会带来数据丢失因为receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),executor突然挂掉(或是driver挂掉通知executor关闭),缓存在其中的数据就会丢失。因为这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改为StorageLevel.MEMORY_AND_DISK_SER
val conf = new SparkConf()conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
val sc= new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("walDir") val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
开启WAL后,依旧存在数据丢失问题即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]
WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。
sys.addShutdownHook({
ssc.stop(true,true
调用的方法为:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
WAL带来的问题
WAL实现的是At-least-once语义。如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,降低了程序的吞吐量。
2.Kafka Direct API
Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍。例如如下的调用方式:
messages.foreachRDD(rdd=&{
val message = rdd.map(_._2)
//对数据进行一些操作
message.map(method)
//更新zk上的offset (自己实现)
updateZKOffsets(rdd)
两种方式的具体数据处理流程可以参考该文章:
邮箱:breeze_
csdn博客 :http://blog.csdn.net/lsshlsw《Spark+Kafka构建实时分析Dashboard案例——步骤三:Spark Streaming实时处理数据》
开发团队:厦门大学数据库实验室 联系人:林子雨老师ziyulin@
版权声明:版权归厦门大学数据库实验室所有,请勿用于商业用途;未经授权,其他网站请勿转载
本教程介绍大数据课程实验案例“Spark+Kafka构建实时分析Dashboard”的第三个步骤,Spark Streaming实时处理数据。在本篇博客中,将介绍如何利用Spark Streaming实时接收处理Kafka数据以及将处理后的结果发给的Kafka。
所需知识储备
会使用Scala编写Spark Streaming程序,Kafka原理。
编写Spark Streaming程序,熟悉Spark操作Kafka。
Spark Streaming实时处理Kafka数据;
将处理后的结果发送给Kafka;
本案例在于实时统计每秒中男女生购物人数,而Spark Streaming接收的数据为1,1,0,2...,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。
因此利用Spark Streaming接口reduceByKeyAndWindow,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。
配置Spark开发Kafka环境
kafka的安装可以参考。下面主要介绍配置Spark开发Kafka环境。首先到下载Spark连接Kafka的代码库。然后把下载的代码库放到目录/usr/local/spark/jars目录下,命令如下:
sudo mv ~/下载/spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars
然后在/usr/local/spark/jars目录下新建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下
cd /usr/local/spark/jars
mkdir kafka
cp /usr/local/kafka/libs/* .
执行上述步骤之后,Spark开发Kafka环境即已配置好,下面介绍如何编码实现。
建立Spark项目
之前有很多教程都有说明如何创建Spark项目,这里再次说明。首先在/usr/local/spark/mycode新建项目主目录
cd /usr/local/spark/mycode
mkdir kafka
然后在kafka目录下新建scala文件存放目录以及scala工程文件
mkdir -p src/main/scala
接着在src/main/scala文件下创建两个文件,一个是用于设置日志,一个是项目工程主文件,设置日志文件为StreamingExamples.scala
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
这个文件不做过多解释,因为这只是一个辅助文件,下面着重介绍工程主文件,文件名为KafkaTest.scala
package org.apache.spark.examples.streaming
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
implicit val formats = DefaultFormats//数据格式化时需要
def main(args: Array[String]): Unit={
if (args.length & 4) {
System.err.println("Usage: KafkaWordCount &zkQuorum& &group& &topics& &numThreads&")
System.exit(1)
StreamingExamples.setStreamingLogLevels()
/* 输入的四个参数分别代表着
* 1. zkQuorum 为zookeeper地址
* 2. group为消费者所在的组
* 3. topics该消费者所消费的topics
* 4. numThreads开启消费topic线程的个数
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// 将topics转换成topic--&numThreads的哈稀表
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// 创建连接Kafka的消费者链接
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))//将输入的每行用空格分割成一个个word
// 对每一秒的输入数据进行reduce,然后将reduce后的数据发送给Kafka
val wordCounts = words.map(x =& (x, 1L))
.reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd =& {
if(rdd.count !=0 ){
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.mon.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.mon.serialization.StringSerializer")
// 实例化一个Kafka生产者
val producer = new KafkaProducer[String, String](props)
// rdd.colect即将rdd中数据转化为数组,然后write函数将rdd内容转化为json格式
val str = write(rdd.collect)
// 封装成Kafka消息,topic为"result"
val message = new ProducerRecord[String, String]("result", null, str)
// 给Kafka发送消息
producer.send(message)
ssc.start()
ssc.awaitTermination()
上述代码注释已经也很清楚了,下面在简要说明下:
1. 首先按每秒的频率读取Kafka消息;
2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
3. 最后将上述结果封装成json发送给K
编写好程序之后,下面介绍下如何打包运行程序。在/usr/local/spark/mycode/kafka目录下新建文件simple.sbt,输入如下内容:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
然后,即可编译打包程序,输入如下命令
/usr/local/sbt/sbt package
打包成功之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1
其中最后四个为输入参数,含义如下
1. 127.0.0.1:2181为zookeeper地址
2. 1 为consumer group标签
3. sex为消费者接收的topic
4. 1 为消费者线程数
最后在/usr/local/spark/mycode/kafka目录下,运行如下命令即可执行刚编写好的Spark Streaming程序
sh startup.sh
程序运行成功之后,下面通过之前的KafkaProducer和KafkaConsumer来检测程序。
下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为
from kafka import KafkaConsumer
consumer = KafkaConsumer('result')
for msg in consumer:
print((msg.value).decode('utf8'))
在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。
下篇文章链接为查看: 28528|回复: 3
spark streaming 接收 kafka 数据示例
主题帖子积分
高级会员, 积分 3212, 距离下一级还需 1788 积分
高级会员, 积分 3212, 距离下一级还需 1788 积分
1.本文是基于什么环境?
2.如何实现数据转发?
3.spark streaming 处理的代码是如何实现的?
曾经试过了用 spark streaming 读取 logstash 启动的 TCP Server 的数据。不过如果你有多台 logstash 的时候,这种方式就比较难办了 —— 即使你给 logstash 集群申请一个 VIP,也很难确定说转发完全符合。所以一般来说,更多的选择是采用 kafka 等队列方式由 spark streaming 去作为订阅者获取数据。
环境部署这里只讲 kafka 单机的部署。只是示例嘛:cd kafka_2.10-0.8.2.0/bin/
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh --daemon ../config/server.properties
数据转发保持跟之前示例的连贯性,这里继续用 logstash 发送数据到 kafka。首先创建一个 kafka 的 topic:./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash
然后到 logstash 里,修改配置为:[Java] 纯文本查看 复制代码input {
file { path =& &/var/log/*.log& }
code =& &event['lineno'] = 100 * rand(Math::E..Math::PI)&
broker_list =& &127.0.0.1:9092&
topic_id =& &logstash&
spark streaming 处理的代码:处理效果跟之前示例依然保持一致,就不重复贴冗余的函数了,只贴最开始的处理部分:[Java] 纯文本查看 复制代码import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import org.json4s.jackson.JsonMethods._
object LogStash {
implicit val formats = DefaultFormats
case class LogStashV1(message:String, path:String, host:String, lineno:Double, `@timestamp`:String)
def main(args: Array[String]) {
val Array(zkQuorum, group, topics, numThreads) = args
val topicMap = topics.split(&,&).map((_,numThreads.toInt)).toMap
val sparkConf = new SparkConf().setMaster(&local[2]&).setAppName(&LogStash&)
= new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.map(line =& {
val json = parse(line)
json.extract[LogStashV1]
}).print()
ssc.start()
ssc.awaitTermination()
这里面有一些跟网上常见资料不一样的地方。第一个,import org.apache.spark.streaming.kafka._ 并不会导出 KafkaUtils,必须明确写明才行。第二个,之前示例里用了 scala 核心自带的 JSON 模块。但是这次我把 lineno 字段从整数改成浮点数后,发现 JSON.parseFull() 有问题。虽然我在 scala 的 repl 里测试没问题,但是写在 spark 里的时候,它并不像文档所说的”总是尝试解析成 Double 类型”,而是一直尝试用Integer.parseInteger() 方法来解析。哪怕我明确定义 JSON.globalNumberParser = {input:String =& Float.parseFloat(input)} 都不起作用。
所以,最后这里改用了 。据称这也是 scala 里性能和功能最好的 JSON 库。json4s 库默认解析完后,不是标准的 Map、List 等对象,而是它自己的 JObject、JList、JString 等。想要转换成标准 scala 对象,需要调用 .values 才对。不过我这个示例里没有这么麻烦,而是直接采用 .extract 就变成了 cast class 对象了。非常简便。
另一个需要点出来的变动是:因为采用 .extract,所以 cast class 里的参数命名必须跟 JSON 里的 key 完全对应上。而我们都知道 logstash 里有几个特殊的字段,叫 @timestamp 和 @version 。这个 “@” 是不能直接裸字符的,所以要用反引号(`)包括起来。
sbt 打包sbt 打包也需要有所变动。spark streaming 的核心代码中,并不包含 kafka 的代码。还跟之前那样 sbt package 的话,就得另外指定 kafka 的 jar 地址才能运行了。更合适的办法,是打包一个完全包含的 jar 包。这就用到 。
刚刚收到的消息,spark 1.3 版发布 beta 了,spark streaming 会内置对 kafka 的底层直接支持。或许以后不用这么麻烦?sbt-assembly 使用起来特别简单,尤其是当你使用的 sbt 版本比较新(大于 0.13.6) 的时候。
在项目的 project/ 目录下创建一个 plugins.sbt 文件,内容如下:[Java] 纯文本查看 复制代码addSbtPlugin(&com.eed3si9n& % &sbt-assembly& % &0.13.0&)
具体的版本选择,看官方 README 的 。
添加新增依赖模块
现在可以去修改我们项目的 build.sbt 了:[Java] 纯文本查看 复制代码name := &LogStash&
version := &1.0&
scalaVersion := &2.10.4&
libraryDependencies ++= Seq(
&org.apache.spark& %% &spark-core& % &1.2.0& % &provided&,
&org.apache.spark& %% &spark-sql& % &1.2.0& % &provided&,
&org.apache.spark& %% &spark-streaming& % &1.2.0& % &provided&,
&org.apache.spark& %% &spark-streaming-kafka& % &1.2.0&,
&org.json4s& %% &json4s-native& % &3.2.10&,
&org.json4s& %% &json4s-jackson& % &3.2.10&
是的。新版本的 sbt-assembly 完全不需要单独修改 build.sbt 了。需要注意,因为我们这次是需要把各种依赖全部打包到一起,这个可能会导致一些文件相互有冲突。比如我们用 spark-submit 提交任务,有关 spark 的核心文件,本身里面就已经有了的,那么就需要额外通过 % &provided& 指明这部分会另外提供,不需要打进去。这样运行的时候就不会有问题了。
采用 sbt-assembly 后的打包命令是:sbt assembly。注意输出的结果,会是直接读取 build.sbt里的 name 变量,不做处理。,我们之前定义的叫 “LogStash Project”,sbt package 命令自动会转换成全小写且空格改成中横线的格式 logstash-project_2.10-1.0.jar。但是 sbt assembly 就会打包成 LogStash Project-assembly-1.0.jar 包。这个空格在走 spark-submit 提交的时候是有问题的。所以这里需要把 name 改成一个不会中断的字符串。。。
欢迎加入about云群 、 ,云计算爱好者群,关注
主题帖子积分
中级会员, 积分 567, 距离下一级还需 433 积分
中级会员, 积分 567, 距离下一级还需 433 积分
借楼主宝地,请教个问题:使用kafka和spark streaming,环境是三台虚拟机(4核,6G内存),安装了cdh5.3.3,我有三个streaming作业,配合使用
①如果在三台机器每台机器上各跑一个作业,且master指定为local[3](即本地模式),是没有问题的
②master为spark://ip:7077,跑三个作业,有时这三个作业正常运行,有时就不动了
③master为yarn-cluster,只有第一个作业分配了applicationmaster,两外两个作业一直是未分配的状态
我想问下,是机器原因导致吗?
主题帖子积分
高级会员, 积分 1333, 距离下一级还需 3667 积分
高级会员, 积分 1333, 距离下一级还需 3667 积分
主题帖子积分
中级会员, 积分 300, 距离下一级还需 700 积分
中级会员, 积分 300, 距离下一级还需 700 积分
借楼主宝地,请教个问题:使用kafka和spark streaming,环境是三台虚拟机(4核,6G内存),安装了cdh5.3.3 ...
楼主,你 spark streaming 怎么跑yarn-cluster 模式的啊?我跑standalone没问题,跑 on yarn 就报错了[Shell] 纯文本查看 复制代码ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.
积极上进,爱好学习
经常参与各类话题的讨论,发帖内容较有主见
经常帮助其他会员答疑
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by

我要回帖

更多关于 小c转运 的文章

 

随机推荐