三星c5手机忽然弄成这样,怎么去掉

后使用快捷导航没有帐号?
查看: 635|回复: 5
RDD的持久化需要自己写还是自动缓存?
新手上路, 积分 12, 距离下一级还需 38 积分
论坛徽章:1
RDD的持久化需要自己写还是自动缓存?
中会有很多数据重用的场景,数据缓存是什么样的机制?
新手上路, 积分 16, 距离下一级还需 34 积分
论坛徽章:2
持久化代码必须自己写,框架内部也会有一些持久化代码,曾经看到过
中级会员, 积分 227, 距离下一级还需 273 积分
论坛徽章:3
RDD有present指定缓存到磁盘的方法可以做吧,一般来说,一个RDD,后续需要频繁使用到,会做一下缓存,这样就不需要重复运算前面的逻辑了。
论坛徽章:21
Spark Streaming:构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+),虽然比不上专门的流式数据处理软件,也可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),一部分窄依赖的RDD数据集可以从源数据重新计算达到容错处理目的。
论坛徽章:21
本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。
论坛徽章:21
Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+),虽然比不上专门的流式数据处理软件,也可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),一部分窄依赖的RDD数据集可以从源数据重新计算达到容错处理目的。
扫一扫加入本版微信群基于Spark的数据库增量准实时同步
2016年微型机与应用第19期
作者:王浩,葛昂,赵晴
  王浩,葛昂,赵晴  (华北计算机系统工程研究所,北京 100083)& & & &摘要:为了实现将传统关系型数据库中的增量数据快速导入同构或者异构目的库,在使用已有的增量提取方法的基础上,提出了通过增加并行度和的方法加快同步速度。此方法不仅支持插入、更新和删除的增量数据同步,而且可以抽取出数据库表结构信息动态支持表结构变更。与传统单点抽取方式相比,大大提高了目的库数据的新鲜度。  关键词:; ; 流式计算0引言  随着大数据技术的发展,越来越多的企业开始构建大数据平台进行数据处理。然而如何将保存在关系型数据库中的数据快速同步到大数据平台组件(例如HBase、HDFS)中,正成为很多企业面临的问题。Sqoop是常用的数据同步工具,其实质是MapReduce任务,延时较高,而且需要通过定时任务来达到自动化流程效果。本文在触发器记录数据变化的基础上,提出了一种使用Spark Streaming将增量数据抽取出来,然后根据需要写入到不同的目的库的方法。由于只提取增量数据,所以较Sqoop减少了数据量。另外由于是流式处理方式,降低了延时。1增量提取  1.1增量提取的概念  增量提取是针对上一次提取而言,将上一次提取时间点到现在数据库中插入、更新、删除的数据提取出来[1]。  1.2常用的增量提取方法  1.2.1基于业务系统日志  在业务中将数据库DML(Data Manipulation Language)语句输出以日志的方式存储,然后通过解析日志将DML语句在目的库中重放以达到目的。此方法需要侵入业务系统,对于已经成型的业务系统不适用。  1.2.2基于数据库日志  解析数据库日志也能达到增量提取的目的,但是各大数据库厂商不对外开放数据库系统的日志格式,这就使得解析日志变成了问题。而且各数据库的日志格式还不尽相同,难以达到通用性。  1.2.3基于触发器  基于触发器的方式,目前被广泛运用于数据库增量提取。它通过在源表上建立插入、更新、删除触发器来记录对数据的操作。每当有数据变化时,就会触发相应的触发器,然后运行触发器定义的逻辑,将变化记录到增量表。  1.3基于触发器方法的具体实现  由于触发器方法具有实现逻辑简单,对业务无入侵,数据库通用等优点,所以本文采用了基于触发器方式的增量提取方法。具体实现方法如下:  (1)创建名为dml_log的数据库表,字段为id、table_name、record_id、execute_date、dml_type。其中id为自增id,table_name存储要同步的源表表名称,record_id是源表中发生变化的记录的唯一标识,execute_date为触发器执行时的时间戳,dml_type为I、U、D分别代表insert、update、delete操作。  (2)在源表上创建插入、更新、删除类型的触发器。创建语句在此省略。2构建Spark Streaming程序  2.1Spark Streaming  Spark是目前大数据处理领域比较常用的计算框架。它将中间计算结果维护在内存中,这样不仅可以做到中间结果的重用,而且减少了磁盘IO,大大加快了计算速度。Spark Streaming是构建于Spark core之上的流式处理模块。其原理是将流式数据切分成一个个小的片段,以mini batch的形式来处理这一小部分数据,从而模拟流式计算达到准实时的效果。  2.2JdbcRDD  弹性分布式数据集(Resilient Distributed Datasets,RDD),它是Spark数据抽象的基石。RDD是一个只读的分区记录集合,分区分散在各个计算节点[2]。RDD提供了transformation和action两类操作,其中transformation是lazy级别的,主要对数据处理流程进行标记,而不立即进行运算。action操作会触发作业的提交,然后进行回溯导致transformation操作进行运算。  JdbcRDD扩展自RDD,是RDD的子类。内部通过JDBC(Java Data Base Connectivity)操作以数据库为源头构建RDD。其构造函数签名为:  class JdbcRDD[T: ClassTag](  sc: SparkContext,  getConnection:()=& Connection,  sql: String,  lowerBound: Long,  upperBound: Long,  numPartitions: Int,  mapRow:(ResultSet) =& T =  JdbcRDD.resultSetToObjectArray _)  extends RDD[T](sc, Nil) with Logging {…}  2.3具体实现  Spark官方提供用于构建Spark Streaming的数据源没有对数据库进行支持,所以本文自己实现对数据库的支持。编写继承自InputDStream类的DirectJdbcInputDStream类,其签名为:  class DirectJdbcInputDStream[T: ClassTag](  @transient ssc_ : StreamingContext,  param: JdbcParam) extends  InputDStream[Row] (ssc_) with Logging {…}  对start()、compute()和stop()方法进行重写。  (1)在start函数中注册JDBC驱动,用于JDBC获取初始化信息(构造JdbcRDD时的参数);  (2)compute函数会被框架间隔指定的时间反复调用,其实质是如何返回一个JdbcRDD。首先通过JDBC获取本次需要拉取的trigger记录的id的上下界以及表的Schema信息;然后以这些信息为参数生成提取真实数据的SQL,其逻辑为用选中的trigger表中的记录和原表在record_id上进行左连接;最后使用该SQL当做参数构建JdbcRDD。值得说明的是,构建JdbcRDD时是可以指定并行度的,每个worker节点都会建立到数据库的JDBC连接,由多个节点并行去数据库拉取属于自己的那一部分数据,这就大大增加了提取和处理速度。  (3)在stop函数中关闭JDBC连接。总体来看,就是在driver程序中执行的JDBC程序获取初始化参数,在executor中执行的JDBC程序拉取真实的数据。  (4)编写driver程序:  val sc = new SparkContext(new SparkConf)  val ssc = new StreamingContext(sc, Seconds(30))  val directStream = new DirectJdbcInputDStream[Row](ssc, jdbcParam)  directStream.foreachRDD(rdd =& {  …//对数据进行处理  })  2.4限流  假设当前时间点到上次提取的时间点之间新增数据量太大,就会导致在新一次作业提交时,上一次的作业仍然没有完成,可能会因此造成作业积压使得系统不稳定。本文使用了基于规则的限流方法,综合考虑集群处理能力以及间隔时间,可以配置化设置每次最大提取条数。如果当前需要提取的数据条数大于最大提取条数,则本次就只提取最大条数,剩下的延时到下次再进行提取。这样做的好处是削减了峰流对系统造成的影响。3测试分析  测试环境:VMware虚拟机,处理器设置为4核心,2 GB内存, 64位CentOS 6.5操作系统,Spark 1.5.1,Oracle 11g。使用4台虚拟机搭建成Spark集群,1台为Master,3台为Worker。数据库表分别设置为20、40个字段,每次最大抽取记录数分别设置为10 000、50 000、500 000。将抽取出来的数据写成parquet格式的文件存储到hdfs上。测试结果如表1所示。4结束语  本文在基于数据库触发器记录数据变化的基础上,通过自己构造DirectJdbcStream类提供Spark Streaming对数据库的支持,达到准实时从数据库中抽取出增量数据的目的。并且可以对抽取出来的数据进行过滤、清洗等操作,根据需求灵活地写入到不同的目的库。  参考文献  [1] 郭亮. 基于MD5与HASH的数据库增量提取算法及其应用[D]. 长沙:湖南大学,2013.  [2] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault tolerant abstraction for in memory cluster computing[C]. Usenix Conference on Networked Systems Design & Implementation, ):141?146.  [3] DEAN J, GHEMAWAT S. MapReduce: simplified dataprocessing on large clusters[C]. USENIX Association OSDI′04: 6th Symposium on Operating Systems Design and Implementation, .  [4] MARTIN O. Programming in scala[M]. California: Artima Press,2010.  [5] YADAV R. Spark cookbook[M]. UK: Packt Publishing Ltd, 2015.  [6] KARAU H. Learning spark[M]. America: O’Reilly Media, Inc. 2015.  [7] 梁刚. 企业大数据管理解决方案[J]. 微型机与应用,):7 10,13.
继续阅读>>
热门关键词spark所支持的文件格式
1.文本文件
在 Spark 中读写文本文件很容易。
当我们将一个文本文件读取为 RDD 时,输入的每一行 都会成为 RDD 的 一个元素。
也可以将多个完整的文本文件一次性读取为一个 pair RDD, 其中键是文件名,值是文件内容。
&在 Scala 中读取一个文本文件
val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val textFile = sc.textFile(inputFile)
&在 Scala 中读取给定目录中的所有文件
val input = sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count")
&保存文本文件,Spark 将传入的路径作为目录对待,会在那个目录下输出多个文件
textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback")
//textFile.coalesce(1).saveAsTextFile 就能保存成一个文件
对于dataFrame文件,先使用.toJavaRDD 转换成RDD,然后再使用 &coalesce(1).saveAsTextFile
JSON 是一种使用较广的半结构化数据格式。
读取JSON,书中代码有问题所以找了另外的一段读取JSON的代码
&build.sbt
"org.json4s" %% "json4s-jackson" % "3.2.11"
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
* Created by common on 17-4-3.
case class Person(firstName: String, lastName: String, address: List[Address]) {
override def toString = s"Person(firstName=$firstName, lastName=$lastName, address=$address)"
case class Address(line1: String, city: String, state: String, zip: String) {
override def toString = s"Address(line1=$line1, city=$city, state=$state, zip=$zip)"
object WordCount {
def main(args: Array[String]) {
val inputJsonFile = "file:///home/common/coding/coding/Scala/word-count/test.json"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val input5 = sc.textFile(inputJsonFile)
val dataObjsRDD = input5.map { myrecord =&
implicit val formats = DefaultFormats
// Workaround as
DefaultFormats is not serializable
val jsonObj = parse(myrecord)
//val addresses = jsonObj \ "address"
//println((addresses(0) \ "city").extract[String])
jsonObj.extract[Person]
dataObjsRDD.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/test1.json")
&读取的JSON文件
{"firstName":"John","lastName":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"1 main street","city":"sunnyvale","state":"CA","zip":"94000"}]}
{"firstName":"Tim","lastName":"Williams","address":[{"line1":"1 main street","city":"Mountain View","state":"CA","zip":"94300"},{"line1":"1 main street","city":"San Jose","state":"CA","zip":"92000"}]}
&输出的文件
Person(firstName=John, lastName=Smith, address=List(Address(line1=1 main street, city=San Francisco, state=CA, zip=94101), Address(line1=1 main street, city=sunnyvale, state=CA, zip=94000)))
Person(firstName=Tim, lastName=Williams, address=List(Address(line1=1 main street, city=Mountain View, state=CA, zip=94300), Address(line1=1 main street, city=San Jose, state=CA, zip=92000)))
3.逗号分割值与制表符分隔值
逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件,即 TSV 文 件中用制表符隔开)。
如果恰好CSV 的所有数据字段均没有包含换行符,你也可以使用 textFile() 读取并解析数据,
"au.com.bytecode" % "opencsv" % "2.4"
3.1 读取CSV文件
import java.io.StringReader
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
.bytecode.opencsv.CSVReader
* Created by common on 17-4-3.
object WordCount {
def main(args: Array[String]) {
val input = sc.textFile("/home/common/coding/coding/Scala/word-count/sample_map.csv")
val result6 = input.map{ line =&
val reader = new CSVReader(new StringReader(line));
reader.readNext();
for(result &- result6){
for(re &- result){
println(re)
&CSV文件内容
Front Left
/usr/share/alsa/samples/Front_Left.wav
Front Right
/usr/share/alsa/samples/Front_Right.wav
如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段。如果每个文件都很大,读取和解析的过程可能会很不幸地成为性能瓶颈。
import java.io.StringReader
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConversions._
.bytecode.opencsv.CSVReader
* Created by common on 17-4-3.
case class Data(index: String, title: String, content: String)
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")
val result = input.flatMap { case (_, txt) =&
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x =& Data(x(0), x(1), x(2)))
for(res &- result){
println(res)
Data(0,Front Left,/usr/share/alsa/samples/Front_Left.wav)
Data(1,Front Right,/usr/share/alsa/samples/Front_Right.wav)
import java.io.StringReader
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConversions._
.bytecode.opencsv.CSVReader
* Created by common on 17-4-3.
case class Data(index: String, title: String, content: String)
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")  //wholeTextFiles读出来是一个RDD(String,String)
val result = input.flatMap { case (_, txt) =&
val reader = new CSVReader(new StringReader(txt));
//reader.readAll().map(x =& Data(x(0), x(1), x(2)))
reader.readAll()
result.collect().foreach(x =& {
x.foreach(println); println("======")
Front Left
/usr/share/alsa/samples/Front_Left.wav
Front Right
/usr/share/alsa/samples/Front_Right.wav
3.2 保存CSV
import java.io.{StringReader, StringWriter}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConversions._
.bytecode.opencsv.{CSVReader, CSVWriter}
* Created by common on 17-4-3.
case class Data(index: String, title: String, content: String)
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val inputRDD = sc.parallelize(List(Data("index", "title", "content")))
inputRDD.map(data =& List(data.index, data.title, data.content).toArray)
.mapPartitions { data =&
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(data.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile("/home/common/coding/coding/Scala/word-count/sample_map_out")
"index","title","content"
4.SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。
SequenceFile 文件有同步标记, Spark 可 以用它来定位到文件中的某个点,然后再与记录的边界对齐。这可以让 Spark 使 用多个节点高效地并行读取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作 业中常用的输入输出格式,所以如果你在使用一个已有的 Hadoop 系统,数据很有可能是以 S equenceFile 的格式供你使用的。
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
* Created by common on 17-4-6.
object SparkRDD {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
//写sequenceFile,
val rdd = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
rdd.saveAsSequenceFile("output")
//读sequenceFile
val output = sc.sequenceFile("output", classOf[Text], classOf[IntWritable]).
map{case (x, y) =& (x.toString, y.get())}
output.foreach(println)
5.对象文件
对象文件看起来就像是对 SequenceFile 的简单封装,它允许存储只包含值的 RDD。和 SequenceFile 不一样的是,对象文件是使用 Java 序列化写出的。
如果你修改了你的类——比如增减了几个字段——已经生成的对象文件就不再可读了。
读取文件——用 SparkContext 中的 objectFile() 函数接收一个路径,返回对应的 RDD。
写入文件——要 保存对象文件, 只需在 RDD 上调用 saveAsObjectFile
6.Hadoop输入输出格式
除了 Spark 封装的格式之外,也可以与任何 Hadoop 支持的格式交互。Spark 支持新旧两套Hadoop 文件 API,提供了很大的灵活性。
旧的API:hadoopFile,使用旧的 API 实现的 Hadoop 输入格式
新的API:newAPIHadoopFile 接收一个路径以及三个类。第一个类是“格式”类,代表输入格式。第二个类是键的类,最后一个类是值的类。如果需要设定额外的 H adoop 配置属性,也可以传入一个 conf 对象。
KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取键值对数据。每一行都会被独立处理,键和值之间用制表符隔开。
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd._
* Created by common on 17-4-6.
object SparkRDD {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
//使用老式 API 读取 KeyValueTextInputFormat(),以JSON文件为例子
//注意引用的包是org.apache.hadoop.mapred.KeyValueTextInputFormat
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("input/test.json").map {
case (x, y) =& (x.toString, y.toString)
input.foreach(println)
// 读取文件,使用新的API,注意引用的包是org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
val job = new Job()
val data = sc.newAPIHadoopFile("input/test.json" ,
classOf[KeyValueTextInputFormat],
classOf[Text],
classOf[Text],
job.getConfiguration)
data.foreach(println)
//保存文件,注意引用的包是org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
data.saveAsNewAPIHadoopFile(
"input/test1.json",
classOf[Text],
classOf[Text],
classOf[TextOutputFormat[Text,Text]],
job.getConfiguration)
Hadoop 的非文件系统数据源
除 了 hadoopFile() 和 saveAsHadoopFile() 这 一 大 类 函 数, 还 可 以 使 用 hadoopDataset/saveAsHadoopDataSet 和 newAPIHadoopDataset/ saveAsNewAPIHadoopDataset 来访问 Hadoop 所支持的非文件系统的存储格式。例如,许多像 HBase 和 MongoDB 这样的键值对存储都提供了用来直接读取 Hadoop 输入格式的接口。我们可以在 Spark 中很方便地使用这些格式。
7.文件压缩
Spark 原生的输入方式( textFile 和 sequenceFile)可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。 这些压缩选项只适用于支持压缩的 Hadoop 格式,也就是那些写出到文件系统的格式。写入数据库的 Hadoop 格式一般没有实现压缩支持。如果数据库中有压缩过的记录,那应该是数据库自己配置的。
阅读(...) 评论()2831人阅读
spark+scala(7)
spark对mysql提供了一些基本的读写操作,今天这边文章主要从读写两个方面来讲。
一、spark读取mysql数据库
1、通过JdbcRdd来读取
& & & 首先看一下官方文档是如何介绍的
1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:&select title, author from books where ? & = id and id &= ?&
3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。
代码示例:
val sc = new SparkContext(&local&, &mysql&)
val rdd = new JdbcRDD(
Class.forName(&com.mysql.jdbc.Driver&).newInstance()
DriverManager.getConnection(&jdbc:mysql://localhost:3306/db&, &root&, &123456&)
&SELECT content FROM mysqltest WHERE ID &= ? AND ID &= ?&,
1, 100, 3,
r =& r.getString(1)).cache()
print(rdd.filter(_.contains(&success&)).count())
只能用于查找符合ID &= ? AND ID &= ?这种形式的数据
2、通过SQLContext.read.jdbc来读取
看一下官方文档介绍
翻译一下:
url:database的url,格式:jdbc:subprotocol:subname
table:表名
predicates:where条件
connectionProperties:数据库连接属性
代码示例:
val url=&jdbc:mysql://localhost:3306/my_db&
val prop = new java.util.Properties
prop.setProperty(&user&,&root&)
prop.setProperty(&password&,&123456)
#指定读取条件,这里 Array(&gender=1&) 是where过滤条件
val cnFlight = sqlContext.read.jdbc(url,&gps_location&,Array(&gender=1&),prop).select(&id&,&name&)
二、spark写入mysql数据库
spark1.3添加了DataFrame,可以方便的对表和数据进行操作。
createJDBCTable介绍:
url:数据库信息
table:表名
overwrite:是否覆盖之前数据
insertIntoJDBC介绍:
url:数据库信息
table:表名
allowExisting:如果表存在,是否删除之前表
看文档,我们可以看到,这个方法会执行一个create table和insert into 的过程,如果allowdExisting为true,则会删除之前的表,
示例如下:
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
* class description :
* project_name:test.mysql
* author:lucaifang
* createTime: 17:11
* updateTime: 17:11
object JdbcTest {
def main(args: Array[String]) {
val sc = new SparkContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val data = sc.parallelize(List((1,&name1&),(2,&name2&),(3,&name3&),(4,&name4&))).
map(item=&Row.apply(item._1,item._2))
val schema = StructType(StructField(&id&, IntegerType)::StructField(&name&, StringType):: Nil)
val df = sqlContext.createDataFrame(data,schema)
val url=&jdbc:mysql://172.16.3.66:3306/data_kanban?user=root&password=dataS@baihe!&
df.createJDBCTable(url, &table1&, false)//创建表并插入数据
df.insertIntoJDBC(url,&table1&,false)//插入数据
false为不覆盖之前数据
文章整理来自:/archives/1290
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:890724次
积分:6072
积分:6072
排名:第3862名
原创:86篇
转载:57篇
评论:107条
(1)(3)(10)(1)(16)(5)(9)(7)(1)(8)(1)(13)(22)(6)(20)(20)(4)

我要回帖

 

随机推荐