spark 如何执行r语言在代码中执行spark

数据科学中的 Spark 入门 - 为程序员服务
数据科学中的 Spark 入门
Apache Spark 为数据科学提供了许多有价值的工具。随着 Apache Spark 1.3.1 技术预览版的发布,强大的 Data Frame API 也可以在 HDP 上使用数据科学家使用数据挖掘和可视化来帮助构造问题架构并对学习进行微调。Apache Zeppelin 正好能够帮他们做到这些。
Zeppelin 是一个基于 Web 的 notebook 服务器。它基于一个解释器的概念,这个解释器可以绑定到任何语言或数据处理后端。作为 Zeppelin 后端的一种,Zeppelin 实现了 Spark 解释器。其他解释器实现,如 Hive、Markdown、D3 等,也同样可以在 Zeppelin 中使用。
我们将通过一系列的博客文章来描述如何结合使用 Zeppelin、Spark SQL 和 MLLib 来使探索性数据科学简单化。作为这个系列的第一篇文章,我们描述了如何为 HDP2.2 安装/构建 Zeppelin,并揭示一些 Zeppelin 用来做数据挖掘的基本功能。
以下假设 HDP 2.2 和 Spark 已经安装在集群上。
Spark 可以使用 Ambari 2.0 安装成一个 service,或者按照这篇文章的描述下载和配置。
无论使用哪种方法安装,本文将
spark.home
代指 Spark 安装的根目录。
构建 Zeppelin
如果可以的话,在一个非 datanode 或 namenode 的集群节点上构建和运行 Zeppelin。这是为了确保在那个节点上 Zeppelin 有足够的计算资源。
从 github 获取 Zeppelin:
git clone /apache/incubator-zeppelin.git
cd incubator-zeppelin
使用如下命令构建 Spark 1.3.1 可用的 Zeppelin:
mvn clean install -DskipTests -Pspark-1.3 -Dspark.version=1.3.1 -Phadoop-2.6 -Pyarn
使用如下命令构建 Spark 1.2.1 可用的 Zeppelin:
mvn clean install -DskipTests -Pspark-1.2 -Phadoop-2.6 -Pyarn
在之前的步骤中,Zeppelin、Spark 1.3.1 和 Hadoop 2.6 已经构建好了。现在先确定正在使用的 HDP 的版本:
hdp-select status hadoop-client | sed 's/hadoop-client - (.*)/1/'
这个命令应该输出类似这样的版本号:
将这个参数记为
hdp.version
conf/zeppelin-env.sh
文件添加以下几行:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export ZEPPELIN_PORT=10008
export ZEPPELIN_JAVA_OPTS="-Dhdp.version=$hdp.version"
/etc/hive/conf/hive-site.xml
文件夹下。
为运行 Zeppelin(比如 zeppelin)的用户在 HDFS 上创建一个目录:
hdfs dfs -mkdir /user/hdfs dfs -chown zeppelin:hdfs /user/zeppelin&
使用以下命令运行 Zeppelin:
bin/zeppelin-daemon.sh start
这行命令会启动一个 notebook 服务器并通过端口 10008 提供一个 Web UI。
http://$host:10008
访问 notebooks。点击 Interpreter 标签切换到 Interpreter 页面设置一些属性。
配置Zeppelin
为了在YARN客户端模式下运行解释器,需要在
$SPARK_HOME/conf/spark-defaults.conf
重写以下这些属性:
master yarn-client
spark.driver.extraJavaOptions -Dhdp.version=$hdp.version
spark.home $spark.home
spark.yarn.am.extraJavaOptions -Dhdp.version=$hdp.version
spark.yarn.jar $zeppelin.home/interpreter/spark/zeppelin-spark-0.5.0-SNAPSHOT.jar
一旦这些配置更新,Zeppelin 会弹框提醒重启解释器。确认重启后解释器会重新加载配置。
至此,准备工作完成,可以开始使用 Zeppelin notebook 了。
http://$host:10008
你将看到像截图一样的界面:
Create new note
来打开一个新的 notebook。
在Notebook中编写Scala
在任一 Ambari 管理的集群上,ambari-agent 日志都写在
/var/log/ambari-agent/ambari-agent.log
我们将在 Zeppelin 上写一点 Scala 代码来可视化这些日志,从中抽取信息。
为了能看到这些日志的内容并随后处理他们,我们将从这个日志文件创建一个 RDD。
val ambariLogs = sc.textFile("file:///var/log/ambari-agent/ambari-agent.log")
上面的代码将文本文件的内容连结到一个由变量
ambariLogs
代表的 RDD 上。
为了能更好地看到日志的内容,使用以下代码 dump 几行文本到解释器终端看看:
ambariLogs.take(10).mkString("n")
这行代码的输出会像这样:
使用Spark SQL
为了进一步分析这些日志,最好将他们与一个 schema 连结起来,并使用 Spark 强大的 SQL 查询功能。
Spark SQL 有一个强大的功能,就是它能够以编程方式把 schema 连接到一个 Data Source,并映射到 Scala 条件类。Scala 条件类能够以类型安全的方式操纵和查询。
对于当前的分析,ambari 日志的每一行可以认为是由以空格隔开的四个基本组件组成的。
日志级别(INFO、DEBUG、WARN等)
日期(YYYY-mm-dd)
时间(HH:mm:ss,SSS格式)
创建一个条件类来连结这个 schema:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
import java.sql.Date
case class Log(level: String, date: Date, fileName: String)
:为了方便,这里将日期和时间合并到一个 Date 对象里。
import java.text.SimpleDateFormat
val df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss,SSS")
val ambari = ambariLogs.map { line =&
line.split(" ")
val logLevel = s(0)
val dateTime = df.parse(s(1) + " " + s(2))
val fileName = s(3).split(":")(0)
Log(logLevel,new Date(dateTime.getTime()), fileName)}.toDF()
ambari.registerTempTable("ambari")
初始化一个 dataframe 之后,我们可以使用 SQL 在上面做查询。Dataframes 是用来接收针对他们而写的 SQL 查询,并根据需要将查询优化成一系列的 Spark 任务。
比如,假设我们想要得到不同日志级别的事件数量,查询写成 SQL 会是这样的形式:
SELECT level, COUNT(1) from ambari GROUP BY level
但是使用Scala Data Frame API 可以写成:
ambari.groupBy("level").count()
这时,我们可以使用非常接近原生 SQL 的查询:
sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")
这个查询返回的数据结构是根 DataFrame API 返回的是相同的。返回的数据结构本身是一个 data frame。
这个时候并没有任何操作被执行:data frames 上的操作都映射到 RDD 相应的操作(在这个例子中):
RDD.groupBy(...).aggregateByKey(...))
我们可以通过使用
强制执行这个任务,将结果发送到 driver 的内存中。
使用 Zeppelin 做可视化
Zeppelin Notebook 有一个强大的功能,那就是你可以在同一个框架里看到上一个片段的结果集。Zeppelin 的显示系统接通了标准输出。
任何以 %table、%img、%html 等解释器命令为开头,通过println输出到标准输出的字符串,都可以被 Zeppelin 的显示系统所解析。
在我们的例子中,我们想要将每种日志级别的日志个数输出成一个表,所以使用以下代码:
import org.apache.spark.sql.Row
val result = sqlContext.sql("SELECT level, COUNT(1) from ambari group by level").map {
case Row(level: String, count: Long) =& {
level + "t" + count
}.collect()
这段代码将 groupby 的输出整合成表解释器可以渲染的格式。
%table 要求每行数据都以
(换行符)分隔,每一列均以
(制表符)分开,如下所示:
println("%table Log LeveltCountn" + result.mkString("n"))
通过这行代码打印出来的结果会是:
数据科学家们使用许多种工具进行工作。Zeppelin 为他们提供了一个新工具来构建出更好的问题。在下一篇文章中,我们将深入讨论一个具体的数据科学问题,并展示如何使用 Zeppelin、Spark SQL 和 MLLib 来创建一个使用 HDP、Spark 和 Zeppelin 的数据科学项目。
用程序师的眼光看世界
原文地址:, 感谢原作者分享。
您可能感兴趣的代码16:33 提问
WEB容器调用spark程序分析数据
请教各位大神,如何在web容器中(如tomcat)来调用spark程序啊,网上看到的都是自己把程序生成jar包,然后调用spark-submit来执行!!
按赞数排序
楼主的问题是怎么解决的,我们现在也要这么做,能不能给点指导,小弟在这先谢谢了
可以参考 一下 这个
5247关注|371收录
其他相似问题<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&运行spark程序的方式 - 推酷
运行spark程序的方式
本文主要讲述运行spark程序的几种方式,包括:本地测试、提交到集群运行、交互式运行 等。
在以下几种执行spark程序的方式中,都请注意master的设置,切记。
运行自带样例
可以用 run-example 执行spark自带样例程序,如下:
./bin/run-example org.apache.spark.examples.SparkPi
或者同样的:
run-example SparkPi
可以用 spark-shell 以交互方式执行spark代码,这些操作都将由spark自动控制并以分布式处理的形式完成。首先,进入spark shell:
./bin/spark-shell
然后就可以直接执行spark代码了。spark-shell非常适合学习API,初学的话多在里面敲敲很好的。
Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可,否则用户自己再初始化,就会出现端口占用问题,相当于启动两个上下文。
在使用spark-shell时,可以通过 –driver-class-path 选项来指定所依赖的jar文件,多个jar文件之间使用分号”:”分割。
如果觉得spark-shell的日志过多而影响观看结果,可以配置一下日志参数,将conf目录下的log4j.properties.template复制一个并命名为log4j.properties,并修改其中的日志等级就ok了。
本地运行测试
如果你是在windows上开发spark程序,然后提交到linux运行。那么本地测试将会方便开发。
本地测试spark程序,需要将master设置为local[n]。同时注意:sc.textFile()可以加载本地文件而不一定是hdfs文件,这对于开发测试是非常方便的。
本地运行测试spark程序,既可以在IDE中进行,也可以手动在命令行中执行,参见我的
linux下spark开发环境配置
windows本地测试时,需要用到hadoop的一个东东(winutils.exe),否则会出现异常。使用方法是:新建一个文件夹 D:\hadoop\bin\ 并将 winutils.exe 放入其中,并保证winutils.exe双击运行没有报*.dll缺失的错误,然后
System.setProperty(&hadoop.home.dir&, &D:\\hadoop\\&)
设置一下hadoop目录即可。
提交到集群
可以用 spark-submit 提交任务到集群执行,如下(这里我们指定了集群URL为spark standalone集群):
spark-submit \
--class 应用程序的类名 \
--master spark://master:7077 \
--jars 依赖的库文件,多个包之间用逗号&,&分割 \
--executor-memory 2G \
--total-executor-cores 20 \
spark应用程序的jar包 你的应用程序需要的参数(即main方法的参数)
参数指定集群URL,可以是独立集群、YARN集群、Mesos集群,甚至是本地模式。见下表:
master可选值
spark://host:port
spark standalone集群,默认端口为7077。
YARN集群,当在YARN上运行时,需设置环境变量HADOOP_CONF_DIR指向hadoop配置目录,以获取集群信息。
mesos://host:port
Mesos集群,默认端口为5050。
本地模式,使用1个核心。
本地模式,使用n个核心。
本地模式,使用尽可能多的核心。
如果jar包所需的依赖较少,通过
手动指定还可以,如果很多,最好使用构建工具打包。
需要注意的是,你的spark程序需要打包成jar包,spark-submit会将程序包分发到各个worker节点,同时这些上传到worker节点的文件,需要定时清理,否则会占用许多磁盘空间,如果运行于standalone模式,你可以设置 spark.worker.cleanup.appDataTtl 选项来让spark自动清理这些文件。
其实安装spark不需要安装scala,因为 spark-assembly-1.2.0-hadoop2.4.0.jar 中已经自带了scala库。spark/bin/compute-classpath.sh 会自动将spark自带的库文件(spark-assembly-1.2.0-hadoop2.4.0.jar等)添加到classpath中,因此即使classpath和你的spark应用程序中都没有指定spark库文件路径,你的spark应用程序照样可以执行。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致

我要回帖

更多关于 spark sql 执行计划 的文章

 

随机推荐