spark如何spark 入门 pdf?

聚合国内IT技术精华文章,分享IT技术精华,帮助IT从业人士成长
>> 技术文章 >> 正文
数据科学中的 Spark 入门
浏览: 44 次
Apache Spark 为数据科学提供了许多有价值的工具。随着 Apache Spark 1.3.1 技术预览版的发布,强大的 Data Frame API 也可以在 HDP 上使用数据科学家使用数据挖掘和可视化来帮助构造问题架构并对学习进行微调。Apache Zeppelin 正好能够帮他们做到这些。
Zeppelin 是一个基于 Web 的 notebook 服务器。它基于一个解释器的概念,这个解释器可以绑定到任何语言或数据处理后端。作为 Zeppelin 后端的一种,Zeppelin 实现了 Spark 解释器。其他解释器实现,如 Hive、Markdown、D3 等,也同样可以在 Zeppelin 中使用。
我们将通过一系列的博客文章来描述如何结合使用 Zeppelin、Spark SQL 和 MLLib 来使探索性数据科学简单化。作为这个系列的第一篇文章,我们描述了如何为 HDP2.2 安装/构建 Zeppelin,并揭示一些 Zeppelin 用来做数据挖掘的基本功能。
以下假设 HDP 2.2 和 Spark 已经安装在集群上。
Spark 可以使用 Ambari 2.0 安装成一个 service,或者按照这篇文章的描述下载和配置。
无论使用哪种方法安装,本文将 spark.home 代指 Spark 安装的根目录。
构建 Zeppelin
如果可以的话,在一个非 datanode 或 namenode 的集群节点上构建和运行 Zeppelin。这是为了确保在那个节点上 Zeppelin 有足够的计算资源。
从 github 获取 Zeppelin:
git clone /apache/incubator-zeppelin.git
cd incubator-zeppelin
使用如下命令构建 Spark 1.3.1 可用的 Zeppelin:
mvn clean install -DskipTests -Pspark-1.3 -Dspark.version=1.3.1 -Phadoop-2.6 -Pyarn
使用如下命令构建 Spark 1.2.1 可用的 Zeppelin:
mvn clean install -DskipTests -Pspark-1.2 -Phadoop-2.6 -Pyarn
在之前的步骤中,Zeppelin、Spark 1.3.1 和 Hadoop 2.6 已经构建好了。现在先确定正在使用的 HDP 的版本:
hdp-select status hadoop-client | sed 's/hadoop-client - (.*)/1/'
这个命令应该输出类似这样的版本号:
将这个参数记为 hdp.version。
编辑 conf/zeppelin-env.sh 文件添加以下几行:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export ZEPPELIN_PORT=10008
export ZEPPELIN_JAVA_OPTS=&-Dhdp.version=$hdp.version&
复制 /etc/hive/conf/hive-site.xml到conf/ 文件夹下。
为运行 Zeppelin(比如 zeppelin)的用户在 HDFS 上创建一个目录:
hdfs dfs -mkdir /user/hdfs dfs -chown zeppelin:hdfs /user/zeppelin&
使用以下命令运行 Zeppelin:
bin/zeppelin-daemon.sh start
这行命令会启动一个 notebook 服务器并通过端口 10008 提供一个 Web UI。
打开 http://$host:10008 访问 notebooks。点击 Interpreter 标签切换到 Interpreter 页面设置一些属性。
配置Zeppelin
为了在YARN客户端模式下运行解释器,需要在 $SPARK_HOME/conf/spark-defaults.conf 重写以下这些属性:
master yarn-client
spark.driver.extraJavaOptions -Dhdp.version=$hdp.version
spark.home $spark.home
spark.yarn.am.extraJavaOptions -Dhdp.version=$hdp.version
spark.yarn.jar $zeppelin.home/interpreter/spark/zeppelin-spark-0.5.0-SNAPSHOT.jar
一旦这些配置更新,Zeppelin 会弹框提醒重启解释器。确认重启后解释器会重新加载配置。
至此,准备工作完成,可以开始使用 Zeppelin notebook 了。
打开 http://$host:10008 你将看到像截图一样的界面:
点击 Create new note 来打开一个新的 notebook。
在Notebook中编写Scala
在任一 Ambari 管理的集群上,ambari-agent 日志都写在 /var/log/ambari-agent/ambari-agent.log。
我们将在 Zeppelin 上写一点 Scala 代码来可视化这些日志,从中抽取信息。
为了能看到这些日志的内容并随后处理他们,我们将从这个日志文件创建一个 RDD。
val ambariLogs = sc.textFile(&file:///var/log/ambari-agent/ambari-agent.log&)
上面的代码将文本文件的内容连结到一个由变量 ambariLogs 代表的 RDD 上。
为了能更好地看到日志的内容,使用以下代码 dump 几行文本到解释器终端看看:
ambariLogs.take(10).mkString(&n&)
这行代码的输出会像这样:
使用Spark SQL
为了进一步分析这些日志,最好将他们与一个 schema 连结起来,并使用 Spark 强大的 SQL 查询功能。
Spark SQL 有一个强大的功能,就是它能够以编程方式把 schema 连接到一个 Data Source,并映射到 Scala 条件类。Scala 条件类能够以类型安全的方式操纵和查询。
对于当前的分析,ambari 日志的每一行可以认为是由以空格隔开的四个基本组件组成的。
日志级别(INFO、DEBUG、WARN等)
日期(YYYY-mm-dd)
时间(HH:mm:ss,SSS格式)
创建一个条件类来连结这个 schema:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
import java.sql.Date
case class Log(level: String, date: Date, fileName: String)
注意:为了方便,这里将日期和时间合并到一个 Date 对象里。
import java.text.SimpleDateFormat
val df = new SimpleDateFormat(&yyyy-mm-dd HH:mm:ss,SSS&)
val ambari = ambariLogs.map { line =&
line.split(& &)
val logLevel = s(0)
val dateTime = df.parse(s(1) + & & + s(2))
val fileName = s(3).split(&:&)(0)
Log(logLevel,new Date(dateTime.getTime()), fileName)}.toDF()
ambari.registerTempTable(&ambari&)
初始化一个 dataframe 之后,我们可以使用 SQL 在上面做查询。Dataframes 是用来接收针对他们而写的 SQL 查询,并根据需要将查询优化成一系列的 Spark 任务。
比如,假设我们想要得到不同日志级别的事件数量,查询写成 SQL 会是这样的形式:
SELECT level, COUNT(1) from ambari GROUP BY level
但是使用Scala Data Frame API 可以写成:
ambari.groupBy(&level&).count()
这时,我们可以使用非常接近原生 SQL 的查询:
sqlContext.sql(&SELECT level, COUNT(1) from ambari group by level&)
这个查询返回的数据结构是根 DataFrame API 返回的是相同的。返回的数据结构本身是一个 data frame。
这个时候并没有任何操作被执行:data frames 上的操作都映射到 RDD 相应的操作(在这个例子中):
RDD.groupBy(...).aggregateByKey(...))
我们可以通过使用 collect() 强制执行这个任务,将结果发送到 driver 的内存中。
使用 Zeppelin 做可视化
Zeppelin Notebook 有一个强大的功能,那就是你可以在同一个框架里看到上一个片段的结果集。Zeppelin 的显示系统接通了标准输出。
任何以 %table、%img、%html 等解释器命令为开头,通过println输出到标准输出的字符串,都可以被 Zeppelin 的显示系统所解析。
在我们的例子中,我们想要将每种日志级别的日志个数输出成一个表,所以使用以下代码:
import org.apache.spark.sql.Row
val result = sqlContext.sql(&SELECT level, COUNT(1) from ambari group by level&).map {
case Row(level: String, count: Long) =& {
level + &t& + count
}.collect()
这段代码将 groupby 的输出整合成表解释器可以渲染的格式。
%table 要求每行数据都以 n(换行符)分隔,每一列均以 t(制表符)分开,如下所示:
println(&%table Log LeveltCountn& + result.mkString(&n&))
通过这行代码打印出来的结果会是:
数据科学家们使用许多种工具进行工作。Zeppelin 为他们提供了一个新工具来构建出更好的问题。在下一篇文章中,我们将深入讨论一个具体的数据科学问题,并展示如何使用 Zeppelin、Spark SQL 和 MLLib 来创建一个使用 HDP、Spark 和 Zeppelin 的数据科学项目。请问音质spark2和七彩虹c3比怎么样(感觉c3很强大啊)? 如果都用森海c200这样的入门耳机,效果区别大么?
内容为广告/垃圾,我要举报!
特聘专家具有协助内容审核的特权
举报后内容将不能在前台展示
错乱举报会导致该权利被剥夺
选择举报原因&
请高手详细解答?
已有2个回答
[见习专家]
专家星级&:&0.5星
问答堂专家综合评分
问题评分&:&0星
采纳、点赞&:&0星
二次回复率&:&5星
内容为广告/垃圾,我要举报!
特聘专家具有协助内容审核的特权
举报后内容将不能在前台展示
错乱举报会导致该权利被剥夺
选择举报原因×
性价比不错,音质秒杀苹果(包括ipc)如果预算不高的话可以说是十分理想的,毕竟这个价格可以超苹果,但是称HiFi有点过了,没有Lo输出,无法接耳放,关键是放大电路有点差,推力一般,推白牙36欧的便携耳机就稍显吃力。不要指望开大声音功率会上去,放大电路的确不行,声音大了,三频就混了,最大20单位音量三频才分得开,我也是相信测评才买的,现在有点悔不该……但是价格在这里,搭中低端耳机配这耳机还是可以,1500rmb以上就算了吧,还是买个好点的,不然会失望………另外操控超级不好,性格急、手笨的人还是先想好,如果想挑战自我还是可以试试,挑战自我……还有极度容易掉漆,金属外壳不知道是不是纯铝,超级容易变形,我用了一周,比我iPod用了两年还旧(当然我同学都说不太爱惜东西,再贵我也没性子去宠),该说的都说了,因为我用了所以把缺点都说了,大家选的时候根据自己耳机和预算好好考虑,c3也不是太差,这个价位应该是最好的了,比什么台电、艾利和、苹果、索尼这个价位的还是可以秒杀他们的……等明年毕业了果断进个m10或HiFiman 的机子(d50和c4就算了,我要求续航强的、便携的)呵呵!萝卜白菜你爱什么?
留下你的评论
[见习专家]
专家星级&:&0.5星
问答堂专家综合评分
问题评分&:&0星
采纳、点赞&:&0星
二次回复率&:&5星
内容为广告/垃圾,我要举报!
特聘专家具有协助内容审核的特权
举报后内容将不能在前台展示
错乱举报会导致该权利被剥夺
选择举报原因×
看了很多大神对C3这款机器的评价,看的心里实在是憋屈,为C3的憋屈!!!有人说C3的操作蛋疼,我想问问,你们真的用过C3么?真的感受过C3的触控操作么?没亲身体验过的,请不要在哪里唧唧歪歪!C3是触控屏幕,操作蛋疼的问题出在第一批货身上。而后面的货,已经解决了这些问题!而且,C3的炫灯,你们没有用过的人是感觉不到的!低调的华丽!有人说C3除了音质就没有其他的,说低音残废!我想说的是,低音是什么?低音就特么只是一个音而已,难道你不觉得低音量太多的话,听久了会头晕么?C3的音质是属于清淡型的,还原音质的类型!我们听音乐是为什么?不就是为了听到录音室的那种音乐么?还原了还不好?还非得要音染?????没错,SONY的低音重比较好,知道为什么?知道SONY不能跻身成为一流的媒体播放器的行列么?就是因为SONY的音染太重!没错,不排除很多播放器的音质很讨耳朵,但是,只要不是木耳的人,挺久了自然而然会产生一种厌倦感!而C3不会,致力还原最真实的音质,让我每一次都会有种震撼的感觉!再加上C3的沉金板,能很有效的隔绝干扰。这些,你们都知道么?所以,那些在骂C3的人,不要再2了!难道国产机器真的这么不如你的眼?难道你们看见洋鬼子的东西就觉得比中国的机器好?难道你们不是中国人?中国的机器好不容易出了这款音质在1000元以内无敌手的播放器,你们是不把它喷出翔誓不罢休是不是????借用某代名人的话:没有实践就没有发言权!!!!!
留下你的评论
悲剧啊,今天刚拿到的C3。试听之后,果断心灰意冷,原来我不是初烧,我只是木耳,听不出来他的素质啊!失望之下求出手,或换其他P3,有意私信!
微信公众账号ZOL问答堂
关注微信,随时随地解答您的疑惑
ZOL问答堂官方微博@ZOL问答堂
关注成功!该问题被回答后,将给您发送站内短信。
您也可以通过关注问答堂微信,及时获得您关注问题的回答。
微信关注问题方法“”今天看啥 热点:
sparkSQL1.1入门之二:sparkSQL运行架构,
& & & 在介绍sparkSQL之前,我们首先来看看,传统的关系型数据库是怎么运行的。当我们提交了一个很简单的查询:
可以看得出来,该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result--&Data Source--&Operation的次序来描述的。那么,SQL语句在实际的运行过程中是怎么处理的呢?一般的数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data
Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation--&Data
Source--&Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。
& & & 以上过程看上去非常简单,但实际上会包含很多复杂的操作细节在里面。而这些操作细节都和Tree有关,在数据库解析(Parse)SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理,如下面一个查询,会形成一个含有多个节点(TreeNode)的Tree,然后在后续的处理过程中对该Tree进行一系列的操作。
下图给出了对Tree的一些可能的操作细节,对于Tree的处理过程中所涉及更多的细节,可以查看相关的数据库论文。
&OK,上面简单介绍了关系型数据库的运行过程,那么,sparkSQL是不是也采用类似的方式处理呢?答案是肯定的。下面我们先来看看sparkSQL中的两个重要概念Tree和Rule、然后再介绍一下sparkSQL的两个分支sqlContext和hiveContext、最后再综合看看sparkSQL的优化器Catalyst。
1:Tree和Rule
& & & sparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。
Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/treesLogical Plans、Expressions、Physical Operators都可以使用Tree表示Tree的具体操作是通过TreeNode来实现的
sparkSQL定义了catalyst.trees的日志,通过这个日志可以形象的表示出树的结构TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)进行操作有了TreeNode,通过Tree中各个TreeNode之间的关系,可以对Tree进行遍历操作,如使用transformDown、transformUp将Rule应用到给定的树段,然后用结果替代旧的树段;也可以使用transformChildrenDown、transformChildrenUp对一个给定的节点进行操作,通过迭代将Rule应用到该节点以及子节点。
TreeNode可以细分成三种类型的Node:
UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand
Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
Rule在sparkSQL的Analyzer、Optimizer、SparkPlan等各个组件中都有应用到
Rule是一个抽象类,具体的Rule实现是通过RuleExecutor完成
Rule通过定义batch和batchs,可以简便的、模块化地对Tree进行transform操作
Rule通过定义Once和FixedPoint,可以对Tree进行一次操作或多次操作(如对某些Tree进行多次迭代操作的时候,达到FixedPoint次数迭代或达到前后两次的树结构没变化才停止操作,具体参看RuleExecutor.apply)
& & & 拿个简单的例子,在处理由解析器(SqlParse)生成的LogicPlan Tree的时候,在Analyzer中就定义了多种Rules应用到LogicPlan Tree上。
& & & 应用示意图:
& & &&Analyzer中使用的Rules,定义了batches,由多个batch构成,如MultiInstanceRelations、Resolution、Check Analysis、AnalysisOperators等构成;每个batch又有不同的rule构成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的ResolveReferences
、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。
在整个sql语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在sparkSQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。
知道了sparkSQL的各个过程的基本处理方式,下面来看看sparkSQL的运行过程。sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);hiveContext现在支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。关于sqlContext和hiveContext的具体应用请参看第六部分。
2:sqlContext的运行过程
& & & sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
def sql(sqlText: String): SchemaRDD = {
if (dialect == &sql&) {
new SchemaRDD(this, parseSql(sqlText))
//parseSql(sqlText)对sql语句进行语法解析
sys.error(s&Unsupported SQL dialect: $dialect&)
}sqlContext.sql的返回结果是SchemaRDD,调用了new
SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成Unresolved LogicalPlan。
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)类SchemaRDD继承自SchemaRDDLike
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLikeSchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成Unresolved LogicalPlan,这里的baseLogicalPlan就是指Unresolved
LogicalPlan。
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
sqlContext.executePlan做了什么呢?它调用了QueryExecution类
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
QueryExecution类的定义:
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
protected abstract class QueryExecution {
def logical: LogicalPlan
//对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
//对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
lazy val optimizedPlan = optimizer(analyzed)
// 将optimized LogicalPlan转换成PhysicalPlan
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
// PhysicalPlan执行前的准备工作,生成可执行的物理计划
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
//执行可执行物理计划
lazy val toRdd: RDD[Row] = executedPlan.execute()
}sqlContext总的一个过程如下图所示:
SQL语句经过SqlParse解析成UnresolvedLogicalPlan;使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan;使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;使用SparkPlan将LogicalPlan转换成PhysicalPlan;使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;使用execute()执行可执行物理计划;生成SchemaRDD。
在整个运行过程中涉及到多个sparkSQL的组件,如SqlParse、analyzer、optimizer、SparkPlan等等,其功能和实现在下一章节中详解。
3:hiveContext的运行过程
& & & 在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。
& & & 从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
override def sql(sqlText: String): SchemaRDD = {
// 使用spark.sql.dialect定义采用的语法解析器
if (dialect == &sql&) {
super.sql(sqlText)
//如果使用sql解析器,则使用sqlContext的sql方法
} else if (dialect == &hiveql&) {
//如果使用和hiveql解析器,则使用HiveQl.parseSql
new SchemaRDD(this, HiveQl.parseSql(sqlText))
sys.error(s&Unsupported SQL dialect: $dialect.
Try 'sql' or 'hiveql'&)
}hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == &sql&则采用sqlContext的sql语法执行过程;如果是dialect == &hiveql&,则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new
SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = {
//非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
nodeToPlan(tree) match {
case NativePlaceholder =& NativeCommand(sql)
case other =& other
//异常处理
因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:
首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache table、add jar等,将这些语句转换成command类型的LogicalPlan;如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成LogicalPlan。
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
/** * Returns the AST for the given SQL string.
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))和sqlContext一样,类SchemaRDD继承自SchemaRDDLike,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
hiveContext的catalog,是指向 Hive Metastore:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {
LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}hiveContext的analyzer,使用了新的catalog和functionRegistry:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false)
hiveContext的planner,使用新定义的hivePlanner:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@transient
override protected[sql] val planner = hivePlanner
所以hiveContext总的一个过程如下图所示:
SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan;使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;使用hivePlanner将LogicalPlan转换成PhysicalPlan;使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;使用execute()执行可执行物理计划;执行后,使用map(_.copy)将结果导入SchemaRDD。
hiveContxt还有很多针对hive的特性,更细节的内容参看源码。
4:catalyst优化器
& & & sparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:
core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;hive对hive数据的处理hive-ThriftServer提供CLI和JDBC/ODBC接口
& & & 在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是catalyst的一个设计图:
&其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:
sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;Analyzer,主要完成绑定工作,将不同来源的Unresolved LogicalPlan和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolved LogicalPlan;optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;Planner将LogicalPlan转换成PhysicalPlan;CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
这些组件的基本实现方法:
先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的属性而转换成resolved LogicalPlan;optimizer使用Optimization Rules,对resolved LogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimized LogicalPlan;Planner使用Planning Strategies,对optimized LogicalPlan
关于本篇中涉及到的相关概念和组件在下篇再详细介绍。
提供点思路:1、缺少你的用户名和密码没有问题。2、在myEclipse中有个database tools的视图,可以测试在myeclipse中链接sql server的服务器,从而验证myeclipse可以链接到sqlserver.3、注意jar包的导入,sqlserver2000的jar包和sqlserver2005的jar包不一样的。
你好哦楼主~很高兴看到你的问题。但是又很遗憾到现在还没有人回答你的问题。也可能你现在已经在别的地方找到了答案,那就得恭喜你啦。可能是你问的问题有些专业了,没人会。或者别人没有遇到或者接触过你的问题,所以帮不了你。建议你去问题的相关论坛去求助,那里的人通常比较多,也会比较热心,能快点帮你解决问题。希望我的回答能够帮到你!祝你好运。。
相关搜索:
相关阅读:
相关频道:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
云计算最近更新

我要回帖

更多关于 spark入门教程 的文章

 

随机推荐