spark 提交python脚本streaming怎么提交脚本

博主最新文章
博主热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博主最新文章
博主热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)Spark 应用提交指南 - 为程序员服务
Spark 应用提交指南
Spark bin文件夹下的spark-submit脚本用来启动集群中的应用。 它使用统一的提交接口支持各种类型的集群服务器, 这样你你就不必为每种类型都配置自己的应用.
绑定应用的依赖
如果你的代码依赖其它的项目,为了发布到集群中,你需要为你的应用打包它们。 这样来做: 创建一个装配assembly jar (或者 “uber” jar) 来包含你的代码和依赖.
和 都有assembly 插件. 当用插件创建assembly jar时,需要将Spark 和 Hadoop设置为provided依赖; 不需要将它们打包进你的jar, 这是因为集群管理器在运行时会提供它们. 一旦你打包好,你可以调用bin/spark-submit脚本, 将你的jar作为参数.
如果使用Python, 你可以使用—py-files参数增加 .py, .zip 或者 .egg 文件. 如果你依赖多个Python文件, 我们推荐你将它们打包成一个.zip 或者 .egg.
使用spark-submit启动应用
一旦打包好,就可以使用bin/spark-submit脚本启动应用了. 这个脚本负责设置spark使用的classpath和依赖,支持不同类型的集群管理器和发布模式:
./bin/spark-submit \
--class &main-class&
--master &master-url& \
--deploy-mode &deploy-mode& \
--conf &key&=&value& \
... # other options
&application-jar& \
[application-arguments]
一些常用选项:
—class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
—master: 集群的master URL (如 spark://23.195.26.187:7077)
—deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
—conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”.
application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,
如果是 file:// path, 那么所有的节点的path都包含同样的jar.
application-arguments: 传给main()方法的参数
一个通用发布策略是从一个网管机器提交你的应用, 这台机器物理的和你的worker机器在一起(比如在一个独立EC2集群中的Master节点). 在这种部署模式下, 适合采用client mode模式. 如果设置client模式, 驱动直接在spark-submit进程中启动,输入输出都可以显示在控制台. 所以这种模式特别适合(读取-求值-输出循环), 比如Spark shell.
另外一种情况, 如果你的应用从离worker机器很远的机器提交,比如你本地笔记本,通常采用集群模式减小驱动和executor之间网络延迟。注意集群模式当前并不支持standalone clusters, Mesos clusters, 或者 python应用.
对于Python应用, 简单把.py文件取代 的JAR, 并将Python .zip, .egg 或者 .py 文件增加到搜索路径中(—py-files).
使用—help列出所有的选项. 这里有一个例子:
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
# Run on a Spark standalone cluster
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
# can also be `yarn-client` for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
# Run a Python application on a cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
Master URL
master URL可以是以下格式:
Master URL
本地以一个worker线程运行(例如非并行的情况).
本地以K worker 线程 (理想情况下, K设置为你机器的CPU核数).
本地以本机同样核数的线程运行.
spark://HOST:PORT
连接到指定的Spark standalone cluster master. 端口是你的master集群配置的端口,缺省值为7077.
mesos://HOST:PORT
连接到指定的Mesos 集群. Port是你配置的mesos端口, 缺省是5050. 或者如果Mesos使用ZOoKeeper,格式为 mesos://zk://….
yarn-client
以client模式连接到YARN cluster. 集群的位置基于HADOOP_CONF_DIR 变量找到.
yarn-cluster
以cluster模式连接到YARN cluster. 集群的位置基于HADOOP_CONF_DIR 变量找到.
从文件中加载配置
spark-submit脚本可以从一个属性文件中加载, 并把它们传给你的应用. 在默认情况下它会从Spark文件夹的conf/spark-defaults.conf读取参数,更多细节请看 。
加载缺省配置可以避免在spark-submit设置部分参数. 举例来讲, 如果配置了spark.master, 你就可以不用在调用spark-submit脚本时设置—master参数. 通常SparkConf配置值具有最高的优先级, 然后是spark-submit的参数, 其次才是缺省文件中的值.
如果你不清楚配置项来自哪里,你可以使用—verbose打印详细的调试信息。
先进的依赖管理
当使用spark-submit, 你的应用jar以及其它通过—jars包含的jar文件将自动传送给集群。 Spark使用下面的URL scheme 为不同的策略分发jar文件:
file: -绝对路径和 file:/ URIs 由驱动的HTTP文件服务器提供。 每个executor从驱动的http服务器拉取文件.
hdfs:, http:, https:, ftp: - 从期望URI地址拉取文件
local: - local:/ 期望存在所有的worker节点的本地文件系统中. 这意味着没有网络I/O发生,适合已经发布到各worker节点的大的文件/jar, 或者共享的文件系统 NFS, GlusterFS等.
注意JAR和文件被复制到每个executor节点的相应的SparkContext的工作文件夹下。 长期运行会占用大量的磁盘空间,需要清理。YARN会自动清理, 而Spark standalone通过设置spark.worker.cleanup.appDataTtl来配置自动清理.
对于python, t等价的—py-files参数可以用来发布.egg, .zip 和 .py 库到executor.
一旦你发布了你的应用,描述了分布执行的组件,以及如何监控和调试应用.
Spark bin文件夹下的spark-submit脚本用来启动集群中的应用。 它使用统一的提交接口支持各种类型的集群服务器, 这样你你就不必为每种类型都配置自己的应用.
大道至简 衍化至繁
原文地址:, 感谢原作者分享。
您可能感兴趣的代码官方文档地址:http://spark.apache.org/docs/latest/streaming-programming-guide.htmlSpark Streaming是spark api的扩展能实现可扩展,高吞吐,可容错,的流式处理从外接数据源接受数据流,处理数据流使用的是复杂的高度抽象的算法函数map reduce join window等输出的数据可以存储到文件系统和数据库甚至是直接展示在命令行也可以应用ml 和graph processing在这些数据流上spark streaming本质还是spark只是实现了所谓的微批量&spark streaming中连续数据流用DStream表示,DStream可以从输入数据创建,也可以从其他的DStream转化来本质上DStream是一组RDD组成的序列一个迅速上手的例子:# coding: utf-8# In[ ]:from pyspark import SparkContextfrom pyspark.streaming import StreamingContext# In[ ]:#创建两个工作线程,将这两个线程喂给StreamingContext,时间间隔是1秒#这里有个错误Cannot run multiple SparkContexts at once#参考:http://stackoverflow.com/questions//how-to-create-multiple-sparkcontexts-in-a-console#先要尝试关闭sc才能创建多个SparkContexttry:
sc.stop()except:
passsc = SparkContext("local[2]", "NetworkWordCount")ssc = StreamingContext(sc, 1)#sc.stop()# In[ ]:#创建一个DStream,从本机的这个端口取数据lines = ssc.socketTextStream("localhost", 9999)# In[ ]:#lines中的数据记录是一行行的文本,下面将文本切割成字words = lines.flatMap(lambda line: line.split(" "))#这里的flatMap是一对多DStream操作,生成一个新的DStream,就是变成了字流了# In[ ]:#下面数一数每一批次的字的个数# Count each word in each batchpairs = words.map(lambda word: (word, 1))wordCounts = pairs.reduceByKey(lambda x, y: x + y)# In[ ]:# 打印DStream中的每个RDD的前十个元素wordCounts.pprint()# In[ ]:ssc.start()
# 开始计算ssc.awaitTermination()
#等待计算停止# In[ ]:#将这个文件的py脚本提交计算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999#在命令行输入nc -lk 9999 然后模拟输入字符串文本,那么在pyspark命令行会打印出每秒钟输入的数据的统计结果基本概念& & 要想写自己的streaming程序,首先要添加maven或者sbt的依赖& &&
org.apache.spark
spark-streaming_2.11
2.0.0对于外部输入流的依赖现在不在核心api中了,需要单独添加依赖。初始化StreamingContext可以从SparkContext对象中创建from pyspark import SparkContextfrom pyspark.streaming import StreamingContextsc = SparkContext(master, appName)ssc = StreamingContext(sc, 1)appName是程序名字可以在UI中显示master是Spark,Mesos,YARN cluster URL & &或者是 声明的Local[*]字符串使得运行在本地模式当运行在集群上的时候,不要写死在代码里面,而是要从spark-submit启动,传递进去。对于本地测试或者单元测试,可以传递local[*]在context被定义之后,要做下面的事情:1. 通过创建DStream去定义输入资源2. 通过对DStream的转换和输出操作定义流的计算3. 使用streamingContext.start()开始接收数据并处理数据4. 使用streamingContext.awaitTermination()等待处理过程停止(手动或者因为错误)5. 可以使用streamingContext.stop()手动停止处理过程要点:1. 一旦一个context被启动,不能再添加任何新的流进去了2. 一旦context被停止,就不能重启了3. 在JVM中只能有一个StreamingContext被激活4. 在streamingContext上使用stop()也会停止SparkContext(),要想单独停止前者,设置stop()的可选的参数 stopSparkContext()参数为false5. 一个sparkContext可以被重复使用,去创建多个StreamingContext,只要前一个StreamingContext被单独停止,下一个就可以接着创建。Discretized Streams(DStreams)是Spark Streaming的基本的抽象,代表了一个连续的数据流可以是从数据源接受的输入数据流,也可以是转换输入数据流得到的数据流一个DStream代表一串RDD,RDD是不可分割的基本数据单元抽象。每一个在DStream中的RDD包含特定时间间隔的数据&如果时间是1秒的话,从0-1秒的很多RDD,与从1-2的RDD等,组成了DStream任何对DStream的操作,都会被翻译成对底层的RDD的操作,例如,将Lines转换成words的操作&对DStream的操作,隐藏了很多细节,给开发者提供高度抽象的API输入DStreams和Receivers每一个输出DStream(除了file strem)都关联一个Receiver对象,这个对象从数据源接受数据存储在spark的内存中等待处理。Spark Streaming支持两种类型的内建数据源& & 1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections& & 2. Advances Sources:从外部工具类中调过来的例如Kafka,Flume,Kinesis等,需要链接一些外部依赖。关键点:& & 1. 注意在本地运行SparkStreaming的时候,不要使用local或者local[1]作为主机的URL,因为这些都是意味着开一个线程,因为如果只是输入一个数据源,那么这个单一的线程会用来运行receiver,那么没有线程去处理接收到的数据了。所以本地运行的时候,参数local[n]中的n最好大于运行中的receiver。& & 2. 相应的在集群上运行的时候,分配的核心数要比接收者的数目多,否则的话系统能接收数据,但是不能处理。Basic Sources 基础数据源在基础例子中已经看到过ssc.socketTextStream(),下面看file streamsstreamingContext.textFileStream(dataDirectory)可以创建一个DStreamspark会监控这个路径,处理在那个路径中的任何文件注意:& & 1. 路径中的文件要有相同的数据格式& & 2. 文件必须通过自动专业或者重命名进入到这个路径的& & 3. 一旦进入,这些文件不能被改变,所以如果文件被连续附加,那么新的数据不能被读取的针对简单的文本文件,有个简单的方法streamingContext.textFileStream(dataDirectory)因为file stream不需要运行receiver所以不需要分配核心或者线程去处理Python API不支持fileStream只是支持textFileStream可以使用RDD的queue去创建DStream,使用streamingContext.queueStream(queueofRDDs)Advanced Souces 高级数据源As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.因为这些高级的数据源的支持比较复杂,需要依赖单独的包,现在被转移出了核心的API,所以不能再shell中使用,也就不能在shell中测试这些数据源。如果非要的话,需要下载对应的maven jar包,和对应的依赖,然后添加到classpathCustom Sources自定义数据源现在python还不支持,但是要想从自定义的数据源创建DStream,就要自己实现用户定义的receiver,这可以接受自定义的数据,并且发送到spark中Receiver Reliability 接收者的可靠性按照可靠性可以把数据分为两种,有的数据源例如Kafka和Flume运行传送被回复的数据、如果系统正确接受到这些要被确认的数据,可以保证不会因为某种失败而导致数据丢失。这导致两种类型的接收者。& & 1. 可靠的接收者:当数据被接受并存储到spark之后,必须回复确认消息给可靠数据源。& & 2. 不可靠的接收者:不用回复确认,针对没有确认机制的数据源,或者有确认机制但是不需要执行复杂确认机制的数据源。Transformations on DStreams DStream的转换map(fun)这个函数将输入的DStream的每一个元素传递给func得到一个新的DStreamflatMap(func)同上,只是每个输入可以map到多个输出项filter(func)选择func返回结果为true的DStream中的记录组成新的DStreamreparitition(numPartitions)通过改变划分去改变DStream的并行水平union(otherStream)合并count()返回一个新的DStream,是原始的DStream中的每个RDD的元素的数目reduce(func)使用函数func聚合原始数据汇总的每个RDD得到一个新的单一元素RDD组成的DStreamcountByValue()调用类型K的DStream时候返回一个新的DStream有(K,long)对,其中long是k在每个RDD中出现的频率reduceByKey(func,[numTasks])将(k,v)中的v按照k使用func进行聚合join(otherStream,[numTasks])(k,v)(k,w)得到(k,(v,w))cogroup(otherStream,[numTasks])(k,v)(k,w)得到(k,Seq[V],Seq[W])tansform(func)作用在每个RDD上得到新的RDD组成的DStreamupdateStateByKey(func)每个键都是通过将给定的函数作用在其值上得到的新的DStream下面是对某些转换的详细的讨论UpdateStateByKey Operation允许当使用新的信息连续更新的时候,维护任意的状态& & 1. 定义状态,这个状态可以是任意的数据类型& & 2. 定义状态的更新函数不管有没有数据,spark都会更新状态,如果更新函数返回为none那么键值对就会被消除假设想要维持一个运行时数目,那么运行时数目就是一个状态,是个整数,下面是一个更新函数def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
# add the new values with the previous running count to get the new count假设使用前面的paris DStream包含(word,1)对runningCounts = pairs.updateStateByKey(updateFunction)转换操作:tranform操作允许任意的RDD-to-RDD函数应用到DStream,下面是一个例子:spamInfoRDD = sc.pickleFile(...) # RDD containing spam information# join data stream with spam information to do data cleaningcleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))窗口操作:& &允许我们应用transformation到一个滑动窗口的数据上&上面的例子说明了每个窗口操作要声明下面的两个参数& & windows length:窗口的长度,上面的例子是3& & sliding interval:窗口被执行的时间间隔,例子中的书2上面的两个参数都应该是元素DStream批间隔(上面的间隔是1)的整数倍下面是一个窗口操作的例子,假设我们想生成过去的30秒的数据的wordcounts,每10秒钟一次# Reduce last 30 seconds of data, every 10 secondswindowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)下面是一个常见的窗口操作的描述,所有的操作都传递两个参数,窗口的长度和时间间隔window(长度,间隔)原来的DStream按照新的指定窗口进行切分返回新的DStreamcountByWindow(长度,间隔)返回滑动窗口的元素个数reduceByWindow(func, windowLength, slideInterval)读原来的DStream数据进行聚合得到新的DStreamreduceByKeyAndWindow(func,长度,间隔,[numtasks])(k,v)中的k被函数合并得到新的DStreamreduceByKeyAndWindow(func,invFunc,长度,间隔,[numtasks])比上面的更高效,对窗口内的数据增量聚合和逐步移去得到聚合后新的DStreamcountByValueAndWindow(windowLength, slideInterval, [numTasks])根据窗口计算每个元素的频次Join Operations下面是简单的流的joinstream1 = ...stream2 = ...joinedStream = stream1.join(stream2)You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin下面是基于窗口的流的JoinwindowedStream1 = stream1.window(20)windowedStream2 = stream2.window(60)joinedStream = windowedStream1.join(windowedStream2)sream-dataset的join流和数据集的join操作是使用lambda表达式实现的dataset = ... # some RDDwindowedStream = stream.window(20)joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))DStream的输出操作print()前十个元素打印出来saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本方式保存成文件,每次批处理间隔内产生的文件按照prefix-TIME_IN_MS[.suffix]命名saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile格式保存,每次批处理间隔文件按照上面的命名saveAsHadoopFiles(prefix, [suffix])
将DStream中的内容按对象序列化并且以hadoop格式保存,每次批处理间隔文件按照上面的命名
foreachRDD(func)对每个RDD应用这个函数,将RDD保存在外部文件中Design Patterns for using foreachRDDforeachRDD的设计模式dstream.foreachRDD非常强大,但是容易出错将数据写到外部系统需要创建一个连接对象,使用这个对象例如Tcp Connection发送数据到远程的系统开发者可能会错误的连接到Spark Driver,然后试图在worker中使用将数据保存到RDD中例如:def sendRecord(rdd):
connection = createNewConnection()
# executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()dstream.foreachRDD(sendRecord)这是错误的,因为这要求连接对象序列化并且从driver发送到worker这样的连接对象很少能跨机器转让正确的做法是在worker中创建连接对象def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))通常的,创建一个对象需要时间和资源的管理费用,因此,为每个记录创建和摧毁连接对象可能会带来不必要的管理费用,这可能会显著降低系统的吞吐量,一个更好的解决方案是使用rdd.foreachPartition去创造唯一连接对象,并且用这个对象发送所有的RDD。def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))最终的优化是,跨RDD或者批次,重用连接对象程序员可以维护一个连接对象的静态的池。def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
阅读(...) 评论()

我要回帖

更多关于 spark streaming 提交 的文章

 

随机推荐