支付宝网站怎么登录是多少啊

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的。
(Ps: External DataSource使用篇地址: )一、Sources包核心Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。
在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:
1、DDLParser
专门负责解析外部数据源SQL的SqlParser,解析create temporary table xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~& ident ~ (USING ~& className) ~ (OPTIONS ~& options) ^^ {
case tableName ~ provider ~ opts =&
CreateTableUsing(tableName, provider, opts)
2、CreateTableUsing
一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。private[sql] case class CreateTableUsing(
tableName: String,
provider: String,
// org.apache.spark.sql.json
options: Map[String, String]) extends RunnableCommand {
def run(sqlContext: SQLContext) = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch { //do reflection
case cnf: java.lang.ClassNotFoundException =&
try loader.loadClass(provider + ".DefaultSource") catch {
case cnf: java.lang.ClassNotFoundException =&
sys.error(s"Failed to load class for data source: $provider")
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSource
val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelation
sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册
2、DataSourcesStrategy
在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。
最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。private[sql] object DataSourceStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =&
pruneFilterProjectRaw(
projectList,
(a, f) =& t.buildScan(a, f)) :: Nil
case l @ LogicalRelation(t: TableScan) =&
execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
case _ =& Nil
3、interfaces.scala
该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait RelationProvider 和 BaseRelation,下文会详细介绍。
4、filters.scala
该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。
5、LogicalRelation
封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。
二、External DataSource注册流程用spark sql下sql/json来做示例, 画了一张流程图,如下:注册外部数据源的表的流程:1、提供一个外部数据源文件,比如json文件。2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。3、引入SQLContext,使用DDL创建表,如create temporary table xxx using options (key 'value', key 'value') 4、External Datasource的DDLParser将对该SQL进行Parse5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。三、External DataSource解析流程先看图,图如下:Spark SQL解析SQL流程如下:1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0)
3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。四、External Datasource Interfaces在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。BaseRelation:是外部数据源的抽象,里面存放了schema的映射,和如何scan数据的规则。abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructTypeabstract class PrunedFilteredScan extends BaseRelation {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。
1、TableScan:
默认的Scan策略。
2、PrunedScan:
这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。
3、PrunedFilterScan:
在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。
4、CatalystScan:
Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。RelationProvider:我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。trait RelationProvider {
* Returns a new base relation with the given parameters.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}五、External Datasource定义示例在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。下面以json的外部数据源定义为示例,说明是如何实现的:1、JsonRelation定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
@transient val sqlContext: SQLContext)
extends TableScan {
private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json file
override val schema =
JsonRDD.inferSchema(
// jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。
samplingRatio,
sqlContext.columnNameOfCorruptRecord)
override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row
}2、DefaultSourceparameters中可以获取到options中传入的path等自定义参数。这里接受传入的参数,来狗仔JsonRelation。private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
JSONRelation(fileName, samplingRatio)(sqlContext)
External DataSource源码分析下来,可以总结为3部分。
1、外部数据源的注册流程
2、外部数据源Table查询的计划解析流程
3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。
External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。——EOF——原创文章,转载请注明:转载自:,作者: 本文链接地址:
注:本文基于协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。
如果您想留下此文,您可以将其发送至您的邮箱(将同时以邮件内容&PDF形式发送)
相关文章推荐
(Ctrl+Enter提交) &&
已有0人在此发表见解
&在& 01:12收藏到了
&&在信息爆炸的时代,您的知识需要整理,沉淀,积累!Lai18为您提供一个简单实用的文章整理收藏工具,在这里您可以收藏对您有用的技术文章,自由分门别类,在整理的过程中,用心梳理自己的知识!相信,用不了多久,您收藏整理的文章将是您一生的知识宝库!
· 蜀ICP备号-11593人阅读
大数据技术与系统(76)
http://blog.csdn.net/oopsoom/article/details/一、Spark SQL External DataSource简介& 随着Spark1.2的发布,Spark SQL开始正式支持外部数据源。Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现。& 这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式。只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL。之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成。&二、External DataSource& 拿Spark1.2的json为例,它支持已经改为了实现了外部数据源的接口方式。所以除了先前我们操作json的API,又多了一种DDL创建外部数据源的方式。&& parquetFile的操作方式也如下类似,就不一一列举了。2.1 SQL方式&CREATE TEMPORARY TABLE USING OPTIONS在Spark1.2之后,支持了一种CREATE TEMPORARY TABLE USING OPTIONS的DDL语法来创建外部数据源的表。[sql]&1、操作示例:我们拿example下people.json文件来做示例。[java]&2、DDL创建外部数据源表jsonTable:[java]&我们来看下该schemaRDD:[java]&ExecutedCommand来取把数据用spark.sql.json的方式从path加载到jsonTable中。涉及到得类是CreateTableUsing,后续源码分析会讲到。各阶段执行计划情况:[java]&至此,创建加载外部数据源到Spark SQL已经完成。我们可以使用任何我们希望的方式来查询:3、SQL查询方式:[java]&执行查询:scala& sqlContext.sql(&select * from jsonTable&).collect()
res1: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])2.2 API方式sqlContext.jsonFile[java]&& 总的来说,Spark SQL 在努力的向各种数据源靠拢,希望让Spark SQL能和其它许多类型的数据源的集成。& Spark SQL提供的了一种创建加载外部数据源表的DDL语法:CREATE TEMPORARY TABLE USING OPTIONS& Spark SQL对外开放了一系列的扩展接口,能够通过实现这些接口,来实现对不同的数据源接入,如avro, csv, parquet,json, etc三、Sources包核心& & Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。& & 在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:& &&1、DDLParser&& & 专门负责解析外部数据源SQL的SqlParser,解析create&temporary&table xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。[java]&& &&2、CreateTableUsing& &一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。[java]&& & 2、DataSourcesStrategy& & 在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。& & 最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。[java]&& &3、interfaces.scala&& & 该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait&RelationProvider 和&BaseRelation,下文会详细介绍。& &&4、filters.scala& & 该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。& &5、LogicalRelation& &封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。& &&& &&四、External DataSource注册流程用spark sql下sql/json来做示例, 画了一张流程图,如下:注册外部数据源的表的流程:1、提供一个外部数据源文件,比如json文件。2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。3、引入SQLContext,使用DDL创建表,如create&temporary&table xxx using options (key 'value', key 'value')&4、External Datasource的DDLParser将对该SQL进行Parse5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。五、External DataSource解析流程先看图,图如下:Spark SQL解析SQL流程如下:1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0) &3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。六、External Datasource Interfaces在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。BaseRelation:是外部数据源的抽象,里面存放了schema的映射,和如何scan数据的规则。[java]&[java]&1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。& &1、TableScan:& & & & & 默认的Scan策略。& &2、PrunedScan:& & & & & 这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。& &3、PrunedFilterScan:& & & & & 在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。& &4、CatalystScan:& & & & & &Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。RelationProvider:我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。[java]&七、External Datasource定义示例在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。下面以json的外部数据源定义为示例,说明是如何实现的:1、JsonRelation定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。[java]&2、DefaultSourceparameters中可以获取到options中传入的path等自定义参数。这里接受传入的参数,来狗仔JsonRelation。[java]&八、总结&&External DataSource源码分析下来,可以总结为3部分。& 1、外部数据源的注册流程& 2、外部数据源Table查询的计划解析流程& 3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。&&& External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。——EOF——原创文章,转载请注明:转载自:,作者:&本文链接地址:&&注:本文基于协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:936888次
积分:10159
积分:10159
排名:第1335名
原创:180篇
转载:200篇
评论:1151条
中国科学院博士,代码洁癖重度患者,10年以上Java Web架构、开发经验,非单一语言爱好者,熟悉C++/MFC/java/Scala开发技术,著有《标准C++开发入门与编程实践》、《把脉VC++》,以及“白乔原创”系列技术文章多篇。
开源贡献,欢迎star:> 博客详情
摘要: SparkSQL默认不支持hbase数据源, 可以通过自定义外部数据源(External DataSource)的方式来访问hbase. 本文在/archives/.htm的代码基础上作少许改进.
包:&sparksql.hbase
HBaseRelation.scala
package&sparksql.hbase
import&java.io.Serializable
import&org.apache.spark.sql._
import&org.apache.spark.sql.sources.TableScan
import&org.apache.hadoop.hbase.client.{Result}
import&org.apache.spark.sql._
import&org.apache.hadoop.hbase.util.Bytes
import&org.apache.hadoop.hbase.HBaseConfiguration
import&org.apache.hadoop.hbase.mapreduce.TableInputFormat
import&scala.collection.JavaConversions._
import&scala.collection.JavaConverters._
import&scala.collection.mutable.ArrayBuffer
import&org.apache.spark.sql.types.StructType
import&org.apache.spark.sql.types.DataType
import&org.apache.spark.sql.types.StructField
import&org.apache.spark.sql.types.LongType
import&org.apache.spark.sql.types.IntegerType
import&org.apache.spark.sql.types.StringType
import&org.apache.spark.sql.sources.BaseRelation
import&sparksql.hbase.hbase._
object&Resolver&extends&&Serializable&{&
&&def&resolve&(hbaseField:&HBaseSchemaField,&result:&Result&):&Any&=&{
&&&&val&cfColArray&=&hbaseField.fieldName.split(":",-1)
&&&&val&cfName&=&cfColArray(0)
&&&&val&colName&=&&cfColArray(1)
&&&&var&fieldRs:&Any&=&null
&&&&//resolve&row&key&otherwise&resolve&column
&&&&if(cfName==""&&&&colName=="key")&{
&&&&&&fieldRs&=&resolveRowKey(result,&hbaseField.fieldType)
&&&&}&else&{
&&&&&&fieldRs&=&&resolveColumn(result,&cfName,&colName,hbaseField.fieldType)
&&&&fieldRs
&&def&resolveRowKey&(result:&Result,&resultType:&String):&Any&=&{
&&&&&val&rowkey&=&resultType&match&{
&&&&&&case&"string"&=&
&&&&&&&&result.getRow.map(_.toChar).mkString
&&&&&&case&"int"&=&
&&&&&&&&result&&.getRow.map(_.toChar).mkString.toInt
&&&&&&case&"long"&=&
&&&&&&&&result.getRow.map(_.toChar).mkString.toLong
&&&&rowkey
&&def&resolveColumn&(result:&Result,&columnFamily:&String,&columnName:&String,&resultType:&String):&Any&=&{
&&&&val&column&=&result.containsColumn(columnFamily.getBytes,&columnName.getBytes)&match{
&&&&&&&&case&true&=&{
&&&&&&&&&&&&resultType&match&{
&&&&&&&&&&&&&&case&"string"&=&
&&&&&&&&&&&&&&&&Bytes.toString(result.getValue(columnFamily.getBytes,columnName.getBytes))
&&&&&&&&&&&&&&&&//result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
&&&&&&&&&&&&&&case&"int"&=&
&&&&&&&&&&&&&&&&Bytes.toInt(result.getValue(columnFamily.getBytes,columnName.getBytes))
&&&&&&&&&&&&&&case&"long"&=&
&&&&&&&&&&&&&&&&Bytes.toLong(result.getValue(columnFamily.getBytes,columnName.getBytes))
&&&&&&&&&&&&&&case&"float"&=&
&&&&&&&&&&&&&&&&Bytes.toFloat(result.getValue(columnFamily.getBytes,columnName.getBytes))
&&&&&&&&&&&&&&case&"double"&=&
&&&&&&&&&&&&&&&&Bytes.toDouble(result.getValue(columnFamily.getBytes,columnName.getBytes))
&&&&&&&&&&&&&}
&&&&&&&&case&_&=&&{
&&&&&&&&&&&&resultType&match&{
&&&&&&&&&&&&&&case&"string"&=&
&&&&&&&&&&&&&&&&""
&&&&&&&&&&&&&&case&"int"&=&
&&&&&&&&&&&&&&&&0
&&&&&&&&&&&&&&case&"long"&=&
&&&&&&&&&&&&&&&&0
&&&&&&&&&&&&&}
&&&&column
&&&val&hbaseDDL&=&s"""
&&&&&&|CREATE&TEMPORARY&TABLE&hbase_people
&&&&&&|USING&com.shengli.spark.hbase
&&&&&&|OPTIONS&(
&&&&&&|&&sparksql_table_schema&&&'(row_key&string,&name&string,&age&int,&job&string)',
&&&&&&|&&&hbase_table_name&&&&&'people',
&&&&&&|&hbase_table_schema&'(:key&,&profile:name&,&profile:age&,&career:job&)'
&&&&&&|)""".stripMargin
case&class&HBaseRelation(@transient&val&hbaseProps:&Map[String,String])(@transient&val&sqlContext:&SQLContext)&extends&BaseRelation&with&Serializable&with&TableScan{
&&val&hbaseTableName&=&&hbaseProps.getOrElse("hbase_table_name",&sys.error("not&valid&schema"))
&&val&hbaseTableSchema&=&&hbaseProps.getOrElse("hbase_table_schema",&sys.error("not&valid&schema"))
&&val&registerTableSchema&=&hbaseProps.getOrElse("sparksql_table_schema",&sys.error("not&valid&schema"))
&&val&rowRange&=&hbaseProps.getOrElse("row_range",&"-&")
&&//get&star&row&and&end&row
&&val&range&=&rowRange.split("-&",-1)
&&val&startRowKey&=&range(0).trim
&&val&endRowKey&=&range(1).trim
&&val&tempHBaseFields&=&extractHBaseSchema(hbaseTableSchema)&//do&not&use&this,&a&temp&field
&&val&registerTableFields&=&extractRegisterSchema(registerTableSchema)
&&val&tempFieldRelation&=&tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
&&val&hbaseTableFields&=&feedTypes(tempFieldRelation)
&&val&fieldsRelations&=&&tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
&&val&queryColumns&=&&getQueryTargetCloumns(hbaseTableFields)
&&def&&feedTypes(&mapping:&Map[HBaseSchemaField,&RegisteredSchemaField])&:&&Array[HBaseSchemaField]&=&{
&&&&&&&&&val&hbaseFields&=&mapping.map{
&&&&&&&&&&&case&(k,v)&=&
&&&&&&&&&&&&&&&val&field&=&k.copy(fieldType=v.fieldType)
&&&&&&&&&&&&&&&field
&&&&&&&&hbaseFields.toArray
&&def&isRowKey(field:&HBaseSchemaField)&:&Boolean&=&{
&&&&val&cfColArray&=&field.fieldName.split(":",-1)
&&&&val&cfName&=&cfColArray(0)
&&&&val&colName&=&&cfColArray(1)
&&&&if(cfName==""&&&&colName=="key")&true&else&false
&&//eg:&f1:col1&&f1:col2&&f1:col3&&f2:col1
&&def&getQueryTargetCloumns(hbaseTableFields:&Array[HBaseSchemaField]):&String&=&{
&&&&var&str&=&ArrayBuffer[String]()
&&&&hbaseTableFields.foreach{&field=&
&&&&&&&&&if(!isRowKey(field))&{
&&&&&&&&&&&str&+=&&field.fieldName
&&&&&&&&&}
&&&&str.mkString("&")
&&lazy&val&schema&=&{
&&&&val&fields&=&hbaseTableFields.map{&field=&
&&&&&&&&val&name&&=&fieldsRelations.getOrElse(field,&sys.error("table&schema&is&not&match&the&definition.")).fieldName
&&&&&&&&val&relatedType&=&&field.fieldType&match&&{
&&&&&&&&&&case&"string"&=&
&&&&&&&&&&&&SchemaType(StringType,nullable&=&false)
&&&&&&&&&&case&"int"&=&
&&&&&&&&&&&&SchemaType(IntegerType,nullable&=&false)
&&&&&&&&&&case&"long"&=&
&&&&&&&&&&&&SchemaType(LongType,nullable&=&false)
&&&&&&&&StructField(name,relatedType.dataType,relatedType.nullable)
&&&&StructType(fields)
&&def&tableSchemaFieldMapping(&externalHBaseTable:&Array[HBaseSchemaField],&&registerTable&:&Array[RegisteredSchemaField]):&Map[HBaseSchemaField,&RegisteredSchemaField]&=&{
&&&&&&&if(externalHBaseTable.length&!=&registerTable.length)&sys.error("columns&size&not&match&in&definition!")
&&&&&&&val&rs&=&externalHBaseTable.zip(registerTable)
&&&&&&&rs.toMap
&&&&&*&spark&sql&schema&will&be&register
&&&&&*&&&registerTableSchema&&&'(rowkey&string,&value&string,&column_a&string)'
&&def&extractRegisterSchema(registerTableSchema:&String)&:&Array[RegisteredSchemaField]&=&{
&&&&&&&&&val&fieldsStr&=&registerTableSchema.trim.drop(1).dropRight(1)
&&&&&&&&&val&fieldsArray&=&fieldsStr.split(",").map(_.trim)
&&&&&&&&&fieldsArray.map{&fildString&=&
&&&&&&&&&&&val&splitedField&=&fildString.split("\\s+",&-1)
&&&&&&&&&&&RegisteredSchemaField(splitedField(0),&splitedField(1))
&&&&&&&&&}
&&//externalTableSchema&'(:key&,&f1:col1&)'
&&def&extractHBaseSchema(externalTableSchema:&String)&:&Array[HBaseSchemaField]&=&{
&&&&&&&&val&fieldsStr&=&externalTableSchema.trim.drop(1).dropRight(1)
&&&&&&&&val&fieldsArray&=&fieldsStr.split(",").map(_.trim)
&&&&&&&&fieldsArray.map(fildString&=&&HBaseSchemaField(fildString,""))
&&//&By&making&this&a&lazy&val&we&keep&the&RDD&around,&amortizing&the&cost&of&locating&splits.
&&lazy&val&buildScan&=&{
&&&&val&hbaseConf&=&HBaseConfiguration.create()
&&&&hbaseConf.set("hbase.zookeeper.quorum",&"zookeeper-name")
&&&&hbaseConf.set(TableInputFormat.INPUT_TABLE,&hbaseTableName)
&&&&hbaseConf.set(TableInputFormat.SCAN_COLUMNS,&queryColumns);
&&&&hbaseConf.set(TableInputFormat.SCAN_ROW_START,&startRowKey);
&&&&hbaseConf.set(TableInputFormat.SCAN_ROW_STOP,&endRowKey);
&&&&val&hbaseRdd&=&sqlContext.sparkContext.newAPIHadoopRDD(
&&&&&&hbaseConf,
&&&&&&classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
&&&&&&classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
&&&&&&classOf[org.apache.hadoop.hbase.client.Result]
&&&&val&rs&=&hbaseRdd.map(tuple&=&&tuple._2).map(result&=&&{
&&&&&&var&values&=&new&ArrayBuffer[Any]()
&&&&&&hbaseTableFields.foreach{field=&
&&&&&&&&values&+=&Resolver.resolve(field,result)
&&&&&&Row.fromSeq(values.toSeq)
&&private&case&class&SchemaType(dataType:&DataType,&nullable:&Boolean)
DefaultSource.scala
package&sparksql.hbase
import&org.apache.spark.sql.SQLContext
import&org.apache.spark.sql.sources.RelationProvider
class&DefaultSource&extends&RelationProvider&{
&&def&createRelation(sqlContext:&SQLContext,&parameters:&Map[String,&String])&=&{
&&&&HBaseRelation(parameters)(sqlContext)
package.scala
package&sparksql.hbase
import&org.apache.spark.sql.SQLContext
import&scala.collection.immutable.HashMap&
package&object&hbase&{
&&abstract&class&SchemaField&extends&Serializable
&&&case&class&RegisteredSchemaField(fieldName:&String,&fieldType:&String)&&extends&&SchemaField&&with&Serializable
&&&case&class&HBaseSchemaField(fieldName:&String,&fieldType:&String)&&extends&&SchemaField&&with&Serializable
&&&case&class&Parameter(name:&String)
&&protected&&val&SPARK_SQL_TABLE_SCHEMA&=&Parameter("sparksql_table_schema")
&&protected&&val&HBASE_TABLE_NAME&=&Parameter("hbase_table_name")
&&protected&&val&HBASE_TABLE_SCHEMA&=&Parameter("hbase_table_schema")
&&protected&&val&ROW_RANGE&=&Parameter("row_range")
&&&*&Adds&a&method,&`hbaseTable`,&to&SQLContext&that&allows&reading&data&stored&in&hbase&table.
&&implicit&class&HBaseContext(sqlContext:&SQLContext)&{
&&&&def&hbaseTable(sparksqlTableSchema:&String,&hbaseTableName:&String,&hbaseTableSchema:&String,&rowRange:&String&=&"-&")&=&{
&&&&&&var&params&=&new&HashMap[String,&String]
&&&&&&params&+=&(&SPARK_SQL_TABLE_SCHEMA.name&-&&sparksqlTableSchema)
&&&&&&params&+=&(&HBASE_TABLE_NAME.name&-&&hbaseTableName)
&&&&&&params&+=&(&HBASE_TABLE_SCHEMA.name&-&&hbaseTableSchema)
&&&&&&//get&star&row&and&end&row
&&&&&&params&+=&(&ROW_RANGE.name&-&&rowRange)
&&&&&&sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
package&test
import&org.apache.spark.SparkConf
import&org.apache.spark.sql.SQLContext
import&org.apache.spark.SparkContext
object&SparkSqlHbaseTest&{
&&def&main(args:&Array[String]):&Unit&=&{
&&&&val&sparkConf&=&new&SparkConf().setMaster("local[*]").setAppName("spark&sql&hbase&test")
&&&&val&sc&=&new&SparkContext(sparkConf)
&&&&val&sqlContext&=&new&SQLContext(sc)
&&&&var&hbasetable&=&sqlContext.read.format("sparksql.hbase").options(Map(
&&&&"sparksql_table_schema"&-&&"(key&string,&title&string,&url&string)",
&&&&"hbase_table_name"&-&&"crawled",
&&&&"hbase_table_schema"&-&&"(:key&,&data:title&,&basic:url)"
&&&&)).load()
&&&&hbasetable.printSchema()
&&&&hbasetable.registerTempTable("crawled")
&&&&var&records&=&sqlContext.sql("SELECT&*&from&crawled&limit&10").collect
人打赏支持
码字总数 38680
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥
& 开源中国(OSChina.NET) |
开源中国社区(OSChina.net)是工信部
指定的官方社区

我要回帖

更多关于 支付宝官方网站 的文章

 

随机推荐