微信代运营公司微保的保险产品正规吗?

spark(14)
我们都知道SparkStreaming程序是一个长服务,一旦运转起来不会轻易停掉,那么如果我们想要停掉正在运行的程序应该怎么做呢?
如果运行的是spark on yarn模式直接使用
yarn application -kill
暴力停掉sparkstreaming是有可能出现问题的,比如你的数据源是kafka,已经加载了一批数据到sparkstreaming中正在处理,如果中途停掉,这个批次的数据很有可能没有处理完,就被强制stop了,下次启动时候会重复消费或者部分数据丢失。
如何解决?
1.4之前的版本,需要一个钩子函数:
sys.ShutdownHookThread
log.info(&Gracefully stopping Spark Streaming Application&)
ssc.stop(true, true)
log.info(&Application stopped&)
1.4之后的版本,比较简单,只需要在SparkConf里面设置下面的参数即可:
sparkConf.set(&spark.streaming.stopGracefullyOnShutdown&,&true&)
然后,如果需要停掉sparkstreaming程序时:
(1)登录spark ui页面在executors页面找到driver程序所在的机器
(2)使用ssh命令登录这台机器上,执行下面的命令通过端口号找到主进程然后kill掉
ss -tanlp |
grep 55197|awk '{print $6}'|awk
-F, '{print $2}'|xargs kill -15
注意上面的操作执行后,sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
此外还有一个问题是,spark on yarn模式下,默认的情况driver程序的挂了,会自动再重启一次,作为高可用,也就是上面的操作 你可能要执行两次,才能真能的停掉程序,当然我们也可以设置驱动程序一次挂掉之后,就真的挂掉了,这样就没有容灾机制了,需要慎重考虑:
--conf spark.yarn.maxAppAttempts=1
上面的步骤还是有点复杂的,当然在网上有朋友提出在HDFS上建立一个文件,通过程序主动扫描来判断是否应该停止,这样的话不需要经历前面停止的繁琐的方式,后面有机会可以尝试一下。
参考文章:
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:157081次
积分:2660
积分:2660
排名:第13174名
原创:132篇
评论:21条
大数据群:
搜索技术群1:
搜索技术群2:
公众号:woshigcs
(3)(6)(3)(8)(6)(3)(4)(6)(7)(2)(7)(8)(3)(5)(6)(3)(2)(2)(2)(6)(4)(2)(1)(7)(1)(8)(11)(5)(5)(3)您所在的位置: &
Storm与Spark:谁才是我们的实时处理利器
Storm与Spark:谁才是我们的实时处理利器
核子可乐译
实时商务智能目前已经逐步迈入主流,而Storm与Spark开源项目的支持无疑在其中起到了显著的推动作用。那么问题来了:实时处理到底哪家强?
实时商务智能这一构想早已算不得什么新生事物(早在2006年维基百科中就出现了关于这一概念的页面)。然而尽管人们多年来一直在对此类方案进行探讨,我却发现很多企业实际上尚未就此规划出明确发展思路、甚至没能真正意识到其中蕴含的巨大效益。
为什么会这样?一大原因在于目前市场上的实时商务智能与分析工具仍然非常有限。传统数据仓库环境针对的主要是批量处理流程,这类方案要么延迟极高、要么成本惊人&&当然,也可能二者兼具。
然而已经有多款强大而且易于使用的开源平台开始兴起,欲彻底扭转目前的不利局面。其中最值得关注的两大项目分别为Apache Storm与Apache Spark,它们都能为广大潜在用户提供良好的实时处理能力。两套方案都归属于Apache软件基金会,而且除了在功能方面的一部分交集之外、两款工具还各自拥有着独特的特性与市场定位。
Storm:实时处理领域的Hadoop
作为一套专门用于事件流处理的分布式计算框架,Storm的诞生可以追溯到当初由BackType公司开发的项目&&这家市场营销情报企业于2011年被Twitter所收购。Twitter旋即将该项目转为开源并推向GitHub平台,不过Storm最终还是加入了Apache孵化器计划并于2014年9月正式成为Apache旗下的顶级项目之一。
Storm有时候也被人们称为实时处理领域的Hadoop。Storm项目的说明文档看起来对这种称呼也表示认同:&Storm大大简化了面向庞大规模数据流的处理机制,从而在实时处理领域扮演着Hadoop之于批量处理领域的重要角色。&
为了达成上述目标,Storm在设计思路中充分考虑到大规模可扩展能力、利用一套&故障快速、自动重启&方案为处理提供容错性支持、从而有力地保证了每个元组都能切实得到处理。Storm项目默认为消息采取&至少一次&的处理覆盖保障,但用户也能够根据需要实现&仅为一次&的处理方式。
Storm项目主要利用Clojure编写而成,且既定设计目标在于支持将&流&(例如输入流)与&栓&(即处理与输出模块)结合在一起并构成一套有向无环图(简称DAG)拓扑结构。Storm的拓扑结构运行在集群之上,而Storm调度程序则根据具体拓扑配置将处理任务分发给集群当中的各个工作节点。
大家可以将拓扑结构大致视为MapReduce在Hadoop当中所扮演的角色,只不过Storm的关注重点放在了实时、以流为基础的处理机制身上,因此其拓扑结构默认永远运行或者说直到手动中止。一旦拓扑流程启动,挟带着数据的流就会不断涌入系统并将数据交付给栓(而数据仍将在各栓之间循流程继续传递),而这也正是整个计算任务的主要实现方式。随着处理流程的推进,一个或者多个栓会把数据写入至数据库或者文件系统当中,并向另一套外部系统发出消息或者将处理获得的计算结果提供给用户。
Storm生态系统的一大优势在于其拥有丰富的流类型组合,足以从任何类型的来源处获取数据。虽然大家也可以针对某些具备高度特殊性的应用程序编写定制化流,但基本上我们总能从庞大的现有源类型中找到适合需要的方案&&从Twitter流API到Apache Kafka再到JMS broker,一切尽皆涵盖于其中。
适配器的存在使其能够轻松与HDFS文件系统进行集成,这意味着Storm可以在必要时与Hadoop间实现互操作。Storm的另一大优势在于它对多语言编程方式的支持能力。尽管Storm本身基于Clojure且运行在JVM之上,其流与栓仍然能够通过几乎所有语言进行编写,其中包括那些能够充分发挥在标准输入/输出基础上使用JSON、并由此实现组件间通信协议优势的非JVM语言。
总体而言,Storm是一套极具可扩展能力、快速惊人且具备容错能力的开源分布计算系统,其高度专注于流处理领域。Storm在事件处理与增量计算方面表现突出,能够以实时方式根据不断变化的参数对数据流进行处理。尽管Storm同时提供原语以实现通用性分布RPC并在理论上能够被用于任何分布式计算任务的组成部分,但其最为根本的优势仍然表现在事件流处理方面。
Spark:适用于一切的分布式处理方案
作为另一个专门面向实时分布式计算任务的项目,Spark最初由加州大学伯克利分校的APMLab实验室所打造,而后又加入到Apache孵化器项目并最终于2014年2月成为其中的顶尖项目之一。与Storm类似,Spark也支持面向流的处理机制,不过这是一套更具泛用性的分布式计算平台。
有鉴于此,我们不妨将Spark视为Hadoop当中一套足以取代MapReduce的潜在备选方案&&二者的区别在于,Spark能够运行在现有Hadoop集群之上,但需要依赖于YARN对于资源的调度能力。除了Hadoop YARN之外,Spark还能够以Mesos为基础实现同样的资源调度或者利用自身内置调度程度作为独立集群运行。值得注意的是,如果不将Spark与Hadoop配合使用,那么运行在集群之上时某些网络/分布式文件系统(包括NFS、AFS等)仍然必要,这样每个节点才能够切实访问底层数据。
Spark项目由Scala编写而成,而且与Storm一样都支持多语言编程&&不过Spark所提供的特殊API只支持Scala、Java以及Python。Spark并不具备&流&这样的特殊抽象机制,但却拥有能够与存储在多种不同数据源内的数据实现协作的适配器&&具体包括HDFS文件、Cassandra、HBase以及S3。
Spark项目的最大亮点在于其支持多处理模式以及支持库。没错,Spark当然支持流模式,但这种支持能力仅源自多个Spark模块之一,其预设模块除了流处理之外还支持SQL访问、图形操作以及机器学习等。
Spark还提供一套极为便利的交互shell,允许用户利用Scala或者Python API以实时方式快速建立起原型及探索性数据分析机制。在使用这套交互shell时,大家会很快发现Spark与Storm之间的另一大差异所在:Spark明显表现出一种偏&功能&的取向,在这里大部分API使用都是由面向原始操作的连续性方法调用来实现的&&这与Storm遵循的模式完全不同,后者更倾向于通过创建类与实现接口来完成此类任务。先不论两种方案孰优孰劣,单单是风格的巨大差异已经足以帮助大家决定哪款系统更适合自己的需求了。
与Storm类似,Spark在设计当中同样高度重视大规模可扩展能力,而且Spark团队目前已经拥有一份大型用户文档、其中列出的系统方案都运行着包含成千上万个节点的生产性集群。除此之外,Spark还在最近的2014年Daytona GraySort竞赛当中获得了优胜,成为目前承载100TB级别数据工作负载的最佳选择。Spark团队还保留了多份文档,其中记录着Spark ETL如何负责数PB级别生产工作负载的运营。
Spark是一套快速出色、可扩展能力惊人且极具灵活性的开源分布式计算平台,与Hadoop以及Mesos相兼容并且支持多川计算模式,其中包括流、以图形为核心的操作、SQL访问外加分布式机器学习等。Spark的实际扩展记录令人满意,而且与Storm一样堪称构建实时分析与商务智能系统的卓越平台。
您会如何选择
那么大家又该如何在Storm与Spark之间做出选择呢?
如果大家的需求主要集中在流处理与CEP(即复杂事件处理)式处理层面,而且需要从零开始为项目构建一套目标明确的集群设施,那么我个人更倾向于选择Storm&&特别是在现有Storm流机制能够确切满足大家集成需求的情况下。这一结论并不属于硬性要求或者强制规则,但上述因素的存在确实更适合由Storm出面打理。
在另一方面,如果大家打算使用现有Hadoop或者Mesos集群,而且/或者既定流程需要涉及与图形处理、SQL访问或者批量处理相关的其它实质性要求,那么Spark则值得加以优先考虑。
另一个需要考量的因素是两套系统对于多语言的支持能力,举例来说,如果大家需要使用由R语言或者其它Spark无法原生支持的语言所编写的代码,那么Storm无疑在语言支持宽泛性方面占据优势。同理可知,如果大家必须利用交互式shell通过API调用实现数据探索,那么Spark也能带来Storm所不具备的优秀能力。
最后,大家可能希望在做出决定前再对两套平台进行一番详尽分析。我建议大家先利用这两套平台各自建立一个小规模概念验证项目&&而后运行自己的基准工作负载,借此在最终选择前亲身体验二者的工作负载处理能力是否与预期相一致。
当然,大家也不一定非要从二者之中选择其一。根据各位工作负载、基础设施以及具体要求的不同,我们可能会找出一种将Storm与Spark加以结合的理想方案&&其它同样可能发挥作用的工具还包括Kafka、Hadoop以及Flume等等。而这正是开源机制的最大亮点所在。
无论大家选择哪一套方案,这些工具的存在都切实表明实时商务智能市场的游戏规则已经发生了变化。曾经只能为少数精英所掌握的强大选项如今已经进入寻常百姓家&&或者说,至少适用于多数中等规模或者大型企业。不要浪费资源,充分享受由此带来的便利吧。
英文:【编辑推荐】【责任编辑: TEL:(010)】
关于&&的更多文章
Apache Spark是立足于内存计算的一种快速数据分析方案。其性能已
随着云计算、物联网、大数据、移动互联网的大发展,你应该知道这些。
讲师: 1人学习过讲师: 1人学习过讲师: 8人学习过
从2006年12月份Sun发布Java 6后,经过五年多的不懈努
借助Google的三大论文,Hadoop打开了低成本海量数据处
春运大军前天正式启动了。昨天的新闻有几条不怎么好的
本书论述了软件开发价值增加的思维方式。这一思维方式构成了VSTS的基础,包括VSTS的指导思想,为什么这些指导思想会以某些方式表
51CTO旗下网站spark(15)
-------------------------------------------------------AdClickedStreamingStatus -------------------------------------------------------------------------------------------------
import java.sql.C
import java.sql.DriverM
import java.sql.PreparedS
import java.sql.ResultS
import java.sql.SQLE
import java.util.ArrayL
import java.util.HashM
import java.util.HashS
import java.util.I
import java.util.L
import java.util.M
import java.util.S
import java.util.concurrent.LinkedBlockingQ
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.F
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairF
import org.apache.spark.api.java.function.VoidF
import org.apache.spark.streaming.D
import org.apache.spark.streaming.api.java.JavaDS
import org.apache.spark.streaming.api.java.JavaPairDS
import org.apache.spark.streaming.api.java.JavaPairInputDS
import org.apache.spark.streaming.api.java.JavaStreamingC
import org.apache.spark.streaming.kafka.KafkaU
import mon.base.O
import kafka.serializer.StringD
import scala.Tuple2;
&* 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
public class AdClickedStreamingStatus {
& &public static void main(String[] args) {
& & & SparkConf conf = new SparkConf()
& & & .setMaster(&local[5]&)
& & & //.setMaster(&spark://master:7077&)
& & & .setAppName(&AdClickedStreamingStats&);
& & & /*SparkConf conf = new SparkConf().setMaster(&spark://Master:7077&).
& & & & & & setAppName(&SparkStreamingOnKafkaReceiver&);*/
& & & JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
& & & jsc.checkpoint(&d:/checkpoint&);
& & & &* 创建Kafka元数据,来让Spark Streaming这个Kafka Consumer利用
& & & Map&String, String& kafkaParameters = new HashMap&String, String&();
& & & kafkaParameters.put(&metadata.broker.list&,&
& & & & & & &master:9092,slave1:9092,slave2:9092&);
& & & Set&String& topics = &new HashSet&String&();
& & & topics.add(&AdClicked&);
& & & JavaPairInputDStream&String, String& adClickedStreaming = KafkaUtils.createDirectStream(jsc,&
& & & & & & String.class, String.class,&
& & & & & & StringDecoder.class, StringDecoder.class,
& & & & & & kafkaParameters,&
& & & & & & topics);
& & & &* 因为要对黑名单进行在线过滤,而数据是在RDD中的,所以必然使用transform这个函数;
& & & &* 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair&String,String&类型的,另外
& & & &* 一个原因是过滤后的数据要进行进一步处理,所以必须是读进来的Kafka数据的原始类型DStream&String, String&
& & & &* 在此:再次说明每个Batch Duration中实际上讲输入的数据就是被一个且仅仅被一个RDD封装的,你可以有多个
& & & &* InputDstream,但是其实在产生Job的时候,这些不同的InputDstream在Batch Duration中就相当于Spark基于
& & & &* HDFS数据操作的不同文件来源而已罢了。
& & & JavaPairDStream&String, String& filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function&JavaPairRDD&String,String&, JavaPairRDD&String,String&&() {
& & & & &@Override
& & & & &public JavaPairRDD&String, String& call(JavaPairRDD&String, String& rdd) throws Exception {
& & & & & & /**
& & & & & & &* 在线黑名单过滤思路步骤:
& & & & & & &* 1,从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据;
& & & & & & &* 2,然后把代表黑名单的RDD的实例和Batch Duration产生的rdd进行join操作,准确的说是进行
& & & & & & &* leftOuterJoin操作,也就是说使用Batch Duration产生的rdd和代表黑名单的RDD的实例进行
& & & & & & &* leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false;
& & & & & & &*&
& & & & & & &* 我们要留下的是leftOuterJoin操作结果为false;
& & & & & & &*&
& & & & & & &*/
& & & & & &&
& & & & & & List&String& blackListNames = new ArrayList&String&();
& & & & & & JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
/* & & & & & &jdbcWrapper.doQuery(&SELECT * FROM blacklisttable&, null, new ExecuteCallBack(){
& & & & & & & &@Override
& & & & & & & &public void resultCallBack(ResultSet result) throws Exception {
& & & & & & & & &&
& & & & & & & & & while(result.next()){
& & & & & & & & & & &blackListNames.add(result.getString(1));
& & & & & & & & & }
& & & & & & & &}
& & & & & & & &
& & & & & & });*/
& & & & & &&
& & & & & & List&Tuple2&String, Boolean&& blackListTuple = new ArrayList&Tuple2&String, Boolean&&();
& & & & & &&
& & & & & & for (String name : blackListNames){
& & & & & & & &blackListTuple.add(new Tuple2&String,Boolean&(name, true));
& & & & & & }
& & & & & &&
& & & & & & List&Tuple2&String, Boolean&& blackListFromDB = blackListT //数据来自于查询的黑名单表并且映射成为&String, Boolean&
& & & & & &&
& & & & & & JavaSparkContext jsc = new JavaSparkContext(rdd.context());
& & & & & &&
& & & & & & /**
& & & & & & &* 黑名单的表中只有userID,但是如果要进行join操作的话,就必须是Key-Value,所以
& & & & & & &* 在这里我们需要基于数据表中的数据产生Key-Value类型的数据集合;
& & & & & & &*/
& & & & & & JavaPairRDD&String, Boolean& blackListRDD = jsc.parallelizePairs(blackListFromDB);
& & & & & &&
& & & & & & & &
& & & & & & /**
& & & & & & &* 进行操作的时候肯定是基于userID进行join的,所以必须把传入的rdd进行mapToPair操作转化成为符合
& & & & & & &* 格式的rdd
& & & & & & &*&
& & & & & & &* 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
& & & & & & &*/
& & & & & &&
& & & & & & JavaPairRDD&String, Tuple2&String, String&& &rdd2Pair = rdd.mapToPair(new PairFunction&Tuple2&String,String&, String, Tuple2&String,String&&() {
& & & & & & & &@Override
& & & & & & & &public Tuple2&String, Tuple2&String, String&& call(Tuple2&String, String& t) throws Exception {
& & & & & & &System.out.println(t._2.split(&/t&)[0]);
& & & & & & & & & String userID = t._2.split(&/t&)[2];
& & & & & & & & & System.out.println(&userID=& + userID);
& & & & & & & & & return new Tuple2&String, Tuple2&String, String&&(userID, t);
& & & & & & & &}
& & & & & & });
& & & & & &&
& & & & & & &JavaPairRDD&String, Tuple2&Tuple2&String, String&, Optional&Boolean&&& joined = rdd2Pair.leftOuterJoin(blackListRDD);
& & & & & &&
& & & & & & &JavaPairRDD&String, String& result = joined.filter(new Function&Tuple2&String,
& & & & & & & & & &Tuple2&Tuple2&String,String&,Optional&Boolean&&&, Boolean&() {
& & & & & & & & & & &@Override
& & & & & & & & & & &public Boolean call(Tuple2&String, Tuple2&Tuple2&String, String&, Optional&Boolean&&& v1)
& & & & & & & & & & & & & &throws Exception {
& & & & & & & & & & & & Optional&Boolean& optional = v1._2._2;
& & & & & & & & & & & &&
& & & & & & & & & & & & if (optional.isPresent() && optional.get()){
& & & & & & & & & & & & & &
& & & & & & & & & & & & } else {
& & & & & & & & & & & & & &
& & & & & & & & & & & & }
& & & & & & & & & & & &&
& & & & & & & & & & &}
& & & & & & }).mapToPair(new PairFunction&Tuple2&String,Tuple2&Tuple2&String,String&,Optional&Boolean&&&, String, String&() {
& & & & & & & &@Override
& & & & & & & &public Tuple2&String, String& call(
& & & & & & & & & & &Tuple2&String, Tuple2&Tuple2&String, String&, Optional&Boolean&&& t) throws Exception {
& & & & & & & & & // TODO Auto-generated method stub
& & & & & & & & & return t._2._1;
& & & & & & & &}
& & & & & & });
& & & & & & &&
& & & & & &
& & & & &}
& & & &* 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体
& & & &* 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!!
& & & &*对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
& & & & * & & 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
& & & & */
& & & JavaPairDStream&String, Long& pairs = filteredadClickedStreaming.mapToPair(new PairFunction&Tuple2&String,String&, String, Long&() {
& & & & &@Override
& & & & &public Tuple2&String, Long& call(Tuple2&String, String& t) throws Exception {
& & & & & & String[] splited = t._2.split(&_&);
& & & & & &&
& & & & & & String timestamp = splited[0]; //yyyy-MM-dd
& & & & & & String ip = splited[1];
& & & & & & String userID = splited[2];
& & & & & & String adID = splited[3];
& & & & & & String province = splited[4];
& & & & & & String city = splited[5];
& & & & & &&
& & & & & & String clickedRecord = timestamp + &_& + ip + &_& + userID + &_& + adID + &_&&
& & & & & & & & & + province + &_& +
& & & & & &&
& & & & & & return new Tuple2&String, Long&(clickedRecord, 1L);
& & & & &}
& & & & & * 第四步:对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
& & & & & * & 计算每个Batch Duration中每个User的广告点击量
& & & & & */
& & & JavaPairDStream&String, Long& adClickedUsers = pairs.reduceByKey(new Function2&Long, Long, Long&(){
& & & & &@Override
& & & & &public Long call(Long v1, Long v2) throws Exception {
& & & & & & // TODO Auto-generated method stub
& & & & & & return v1 + v2;
& & & & &}
& & & & & &&
& & & &* 计算出什么叫有效的点击?
& & & &* 1,复杂化的一般都是采用机器学习训练好模型直接在线进行过滤;
& & & &* 2,简单的?可以通过一个Batch Duration中的点击次数来判断是不是非法广告点击,但是实际上讲非法广告
& & & &* 点击程序会尽可能模拟真实的广告点击行为,所以通过一个Batch来判断是 不完整的,我们需要对例如一天(也可以是每一个小时)
& & & &* 的数据进行判断!
& & & &* 3,比在线机器学习退而求次的做法如下:
& & & &* & & & &例如:一段时间内,同一个IP(MAC地址)有多个用户的帐号访问;
& & & &* & & & &例如:可以统一一天内一个用户点击广告的次数,如果一天点击同样的广告操作50次的话,就列入黑名单;
& & & &* 黑名单有一个重点的特征:动态生成!!!所以每一个Batch Duration都要考虑是否有新的黑名单加入,此时黑名单需要存储起来
& & & &* 具体存储在什么地方呢,存储在DB/Redis中即可;
& & & &* 例如邮件系统中的“黑名单”,可以采用Spark Streaming不断的监控每个用户的操作,如果用户发送邮件的频率超过了设定的值,可以
& & & &* 暂时把用户列入“黑名单”,从而阻止用户过度频繁的发送邮件。
& & & JavaPairDStream&String, Long& &filteredClickInBatch = adClickedUsers.filter(new Function&Tuple2&String,Long&, Boolean&() {
& & & & &@Override
& & & & &public Boolean call(Tuple2&String, Long& v1) throws Exception {
& & & & & & if ( 1 & v1._2){
& & & & & & & &//更新一下黑名单的数据表
& & & & & & & &
& & & & & & } else {
& & & & & & & &
& & & & & & }
& & & & & &&
& & & & &}
& & & // Todo。。。。
& & & &* 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark Streaming
& & & &* 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的
& & & &* 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作,
& & & &* output Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个
& & & &* 方法是foraeachRDD,因为Spark Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD
& & & &* 主要就是用用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!!
& & filteredClickInBatch.print();
& & & filteredClickInBatch.foreachRDD(new Function&JavaPairRDD&String,Long&, Void&() {
& & & & &@Override
& & & & &public Void call(JavaPairRDD&String, Long& rdd) throws Exception {
& & & & & & rdd.foreachPartition(new VoidFunction&Iterator&Tuple2&String,Long&&&() {
& & & & & & & &
& & & & & & & &@Override
& & & & & & & &public void call(Iterator&Tuple2&String, Long&& partition) throws Exception {
& & & & & & & & & /**
& & & & & & & & & &* 在这里我们使用数据库连接池的高效读写数据库的方式把数据写入数据库MySQL;
& & & & & & & & & &* 由于传入的参数是一个Iterator类型的集合,所以为了更加高效的操作我们需要批量处理
& & & & & & & & & &* 例如说一次性插入1000条Record,使用insertBatch或者updateBatch类型的操作;
& & & & & & & & & &* 插入的用户信息可以只包含:timestamp、ip、userID、adID、province、city
& & & & & & & & & &* 这里面有一个问题:可能出现两条记录的Key是一样的,此时就需要更新累加操作
& & & & & & & & & &*/
& & & & & & & & &&
& & & & & & & & & List&UserAdClicked& userAdClickedList = new ArrayList&UserAdClicked&();
& & & & & & & & &&
& & & & & & & & & while (partition.hasNext()){
& & & & & & & & & & &Tuple2&String, Long& record = partition.next();
& & & & & & & & & & &String[] splited = record._1.split(&_&);
& & & & & & & & & & &
& & & & & & & & & & &UserAdClicked userClicked = new UserAdClicked();
& & & & & & & & & & &userClicked.setTimestamp(splited[0]);
& & & & & & & & & & &userClicked.setIp(splited[1]);
& & & & & & & & & & &userClicked.setUserID(splited[2]);
& & & & & & & & & & &userClicked.setAdID(splited[3]);
& & & & & & & & & & &userClicked.setProvince(splited[4]);
& & & & & & & & & & &userClicked.setCity(splited[5]);
& & & & & & & & & & &userAdClickedList.add(userClicked);
& & & & & & & & & & &
& & & & & & & & & }
& & & & & & & & &&
& & & & & & & & & List&UserAdClicked& inserting &= new ArrayList&UserAdClicked&();
& & & & & & & & & List&UserAdClicked& updating &= new ArrayList&UserAdClicked&();
& & & & & & & & &&
& & & & & & & & & JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
& & & & & & & & &&
& & & & & & & & & //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
& & & & & & & & & for (UserAdClicked clicked : userAdClickedList){
& & & & & & & & & & &jdbcWrapper.doQuery(&SELECT count(1) FROM adclicked WHERE &
& & & & & & & & & & & & & &+ & timestamp = ? AND userID = ? AND adID = ?&,
& & & & & & & & & & & & & &new Object[]{clicked.getTimestamp(), clicked.getUserID(), clicked.getAdID()},
& & & & & & & & & & & & & &new ExecuteCallBack() {
& & & & & & & & & & & & & & &&
& & & & & & & & & & & & & & & @Override
& & & & & & & & & & & & & & & public void resultCallBack(ResultSet result) throws Exception {
& & & & & & & & & & & & & & & & &if(result.next()){
& & & & & & & & & & & & & & & & & & long count = result.getLong(1);
& & & & & & & & & & & & & & & & & & clicked.setClickedCount(count);
& & & & & & & & & & & & & & & & & & updating.add(clicked);
& & & & & & & & & & & & & & & & &} else {
& & & & & & & & & & & & & & & & & & inserting.add(clicked);
& & & & & & & & & & & & & & & & &}
& & & & & & & & & & & & & & & & &
& & & & & & & & & & & & & & & }
& & & & & & & & & & & & & &});
& & & & & & & & & }
& & & & & & & &//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
& & & & & & & &ArrayList&Object[]& insertParametersList = new ArrayList&Object[]&();
& & & & & & & &for(UserAdClicked inserRecord : inserting){
& & & & & & & & & insertParametersList.add(new Object[]{
& & & & & & & & & & & & inserRecord.getTimestamp(),
& & & & & & & & & & & & inserRecord.getIp(),
& & & & & & & & & & & & inserRecord.getUserID(),
& & & & & & & & & & & & inserRecord.getAdID(),
& & & & & & & & & & & & inserRecord.getProvince(),
& & & & & & & & & & & & inserRecord.getCity(),
& & & & & & & & & & & & inserRecord.getClickedCount()
& & & & & & & & & });
& & & & & & & &}
& & & & & & & &jdbcWrapper.doBatch(&INSERT INTO adclicked VALUES(?,?,?,?,?,?,?)&, insertParametersList);
& & & & & & & &
& & & & & & & &
& & & & & & & &
& & & & & & & &//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
& & & & & & & &ArrayList&Object[]& updateParametersList = new ArrayList&Object[]&();
& & & & & & & &for(UserAdClicked updateRecord : updating){
& & & & & & & & & updateParametersList.add(new Object[]{
& & & & & & & & & & & & updateRecord.getTimestamp(),
& & & & & & & & & & & & updateRecord.getIp(),
& & & & & & & & & & & & updateRecord.getUserID(),
& & & & & & & & & & & & updateRecord.getAdID(),
& & & & & & & & & & & & updateRecord.getProvince(),
& & & & & & & & & & & & updateRecord.getCity(),
& & & & & & & & & & & & updateRecord.getClickedCount()
& & & & & & & & & });
& & & & & & & &}
& & & & & & & &jdbcWrapper.doBatch(&UPDATE adclicked set clickedCount = ? WHERE &
& & & & & & & & & & & & & &+ & timestamp = ? AND ip = ? AND userID = ? AND adID = ? AND province = ? &
& & & & & & & & & & & & & &+ &AND city = ? &, updateParametersList);
& & & & & & & &
& & & & & & & &
& & & & & & & &
& & & & & & & &}
& & & & & & });
& & & & & &
& & & & &}
& & & JavaPairDStream&String, Long& blackListBasedOnHistory = filteredClickInBatch.filter(new Function&Tuple2&String,Long&, Boolean&() {
& & & & &@Override
& & & & &public Boolean call(Tuple2&String, Long& v1) throws Exception {
& & & & & & //广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
& & & & & & String[] splited = v1._1.split(&_&);
& & & & & &&
& & & & & & String date = splited[0];
& & & & & & String userID = splited[2];
& & & & & & String adID = splited[3];
& & & & & &&
& & & & & & /**
& & & & & & &* 接下来根据date、userID、adID等条件去查询用户点击广告的数据表,获得总的点击次数
& & & & & & &* 这个时候基于点击次数判断是否属于黑名单点击 & & & & & & *&
& & & & & & &*/
& & & & & &&
& & & & & & int clickedCountTotalToday = 81;
& & & & & & & & &&
& & & & & & if (clickedCountTotalToday & 50)
& & & & & & {
& & & & & & & &
& & & & & & } else {
& & & & & & & &
& & & & & & }
& & & & & & & & & & &
& & & & &}
& & & &* 必须对黑名单的整个RDD进行去重操作!!!
& & & JavaDStream&String& blackListuserIDtBasedOnHistory = blackListBasedOnHistory.map(new Function&Tuple2&String,Long&, String&() {
& & & & &@Override
& & & & &public String call(Tuple2&String, Long& v1) throws Exception {
& & & & & & // TODO Auto-generated method stub
& & & & & & return v1._1.split(&_&)[2];
& & & & &}
& & & JavaDStream&String& blackListUniqueuserIDtBasedOnHistory = blackListuserIDtBasedOnHistory.transform(new Function&JavaRDD&String&, JavaRDD&String&&() {
& & & & &@Override
& & & & &public JavaRDD&String& call(JavaRDD&String& rdd) throws Exception {
& & & & & & // TODO Auto-generated method stub
& & & & & & return rdd.distinct();
& & & & &}
& & & //下一步写入黑名单数据表中
& & & blackListUniqueuserIDtBasedOnHistory.foreachRDD(new Function&JavaRDD&String&, Void&() {
& & & & &@Override
& & & & &public Void call(JavaRDD&String& rdd) throws Exception {
& & & & & & rdd.foreachPartition(new VoidFunction&Iterator&String&&() {
& & & & & & & &
& & & & & & & &@Override
& & & & & & & &public void call(Iterator&String& t) throws Exception {
& & & & & & & & & /**
& & & & & & & & & &* 在这里我们使用数据库连接池的高效读写数据库的方式把数据写入数据库MySQL;
& & & & & & & & & &* 由于传入的参数是一个Iterator类型的集合,所以为了更加高效的操作我们需要批量处理
& & & & & & & & & &* 例如说一次性插入1000条Record,使用insertBatch或者updateBatch类型的操作;
& & & & & & & & & &* 插入的用户信息可以只包含:useID
& & & & & & & & & &* 此时直接插入黑名单数据表即可。
& & & & & & & & & &*/
& & & & & & & & &&
& & & & & & & & & List&Object[]& blackList = new ArrayList&Object[]&();
& & & & & & & & &&
& & & & & & & & & while(t.hasNext()){
& & & & & & & & & & &blackList.add(new Object[]{(Object)t.next()});
& & & & & & & & & }
& & & & & & & & & JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
& & & & & & & & & jdbcWrapper.doBatch(&INSERT INTO blacklisttable VALUES (?) &, blackList);
& & & & & & & &}
& & & & & & });
& & & & & &
& & & & &}
& & & &* 广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行更高点击次数的更新,
& & & &* 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中;
& & & filteredadClickedStreaming.mapToPair(new PairFunction&Tuple2&String,String&, String, Long&() {
& & & & &@Override
& & & & &public Tuple2&String, Long& call(Tuple2&String, String& t) throws Exception {
& & & & & & String[] splited = t._2.split(&\t&);
& & & & & &&
& & & & & & String timestamp = splited[0]; //yyyy-MM-dd
& & & & & & String ip = splited[1];
& & & & & & String userID = splited[2];
& & & & & & String adID = splited[3];
& & & & & & String province = splited[4];
& & & & & & String city = splited[5];
& & & & & &&
& & & & & & String clickedRecord = timestamp + &_& + &adID + &_&&
& & & & & & & & & + province + &_& +
& & & & & &&
& & & & & & return new Tuple2&String, Long&(clickedRecord, 1L);
& & & & &}
& & & }).updateStateByKey(new Function2&List&Long&, Optional&Long&, Optional&Long&&() {
& & & & &@Override
& & & & &public Optional&Long& call(List&Long& v1, Optional&Long& v2) throws Exception {
& & & & & & /**在历史的数据的基础上进行更新
& & & & & & &* v1:代表是当前的key在当前的Batch Duration中出现次数的集合,例如{1,1,1,1,1,1}
& & & & & & &* v2:代表当前key在以前的Batch Duration中积累下来的结果;我们要再v2的基础上不断加v1的值
& & & & & & &*/
& & & & & & Long clickedTotalHistory = 0L;
& & & & & & if(v2.isPresent()) {//如果v2存在
& & & & & & & &clickedTotalHistory = v2.get();//拿v2的值
& & & & & & }
& & & & & & //不用reduceBykey是因为会产生很多shuffle,shuffle里面有很多内容的。updateStateByKey可以算过去一天,1年
& & & & & & for(Long one : v1){//循环v1
& & & & & & & &clickedTotalHistory +=//一直在基础上进行累加
& & & & & & }
& & & & & &&
& & & & & & return Optional.of(clickedTotalHistory);
& & & & &}
& & & }).foreachRDD(new Function&JavaPairRDD&String,Long&, Void&() {
& & & & &@Override
& & & & &public Void call(JavaPairRDD&String, Long& rdd) throws Exception {
& & & & & & rdd.foreachPartition(new VoidFunction&Iterator&Tuple2&String,Long&&&() {
& & & & & & & &
& & & & & & & &@Override
& & & & & & & &public void call(Iterator&Tuple2&String, Long&& partition) throws Exception {
& & & & & & & & & /**
& & & & & & & & & &* 在这里我们使用数据库连接池的高效读写数据库的方式把数据写入数据库MySQL;
& & & & & & & & & &* 由于传入的参数是一个Iterator类型的集合,所以为了更加高效的操作我们需要批量处理
& & & & & & & & & &* 例如说一次性插入1000条Record,使用insertBatch或者updateBatch类型的操作;
& & & & & & & & & &* 插入的用户信息可以只包含:timestamp、adID、province、city
& & & & & & & & & &* 这里面有一个问题:可能出现两条记录的Key是一样的,此时就需要更新累加操作
& & & & & & & & & &*/
& & & & & & & & &&
& & & & & & & & & List&AdClicked& adClickedList = new ArrayList&AdClicked&();
& & & & & & & & &&
& & & & & & & & & while (partition.hasNext()){
& & & & & & & & & & &Tuple2&String, Long& record = partition.next();
& & & & & & & & & & &String[] splited = record._1.split(&_&);
& & & & & & & & & & &
& & & & & & & & & & &AdClicked adClicked = new AdClicked();
& & & & & & & & & & &adClicked.setTimestamp(splited[0]);
& & & & & & & & & & &adClicked.setAdID(splited[1]);
& & & & & & & & & & &adClicked.setProvince(splited[2]);
& & & & & & & & & & &adClicked.setCity(splited[3]);
& & & & & & & & & & &adClicked.setClickedCount(record._2);
& & & & & & & & &&
& & & & & & & & & & &adClickedList.add(adClicked);
& & & & & & & & & & &
& & & & & & & & & }
& & & & & & & & &&
& & & & & & & & &&
& & & & & & & & &&
& & & & & & & &JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
& & & & & & & & &&
& & & & & & & & &&
& & & & & & & &List&AdClicked& inserting &= new ArrayList&AdClicked&();
& & & & & & & &List&AdClicked& updating &= new ArrayList&AdClicked&();
& & & & & & & &
& & & & & & & &//adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
& & & & & & & &for (AdClicked clicked : adClickedList){
& & & & & & & & & jdbcWrapper.doQuery(&SELECT count(1) FROM adclickedcount WHERE &
& & & & & & & & & & & & + & timestamp = ? AND adID = ? AND province = ? AND city = ? &,
& & & & & & & & & & & & new Object[]{clicked.getTimestamp(), clicked.getAdID(), clicked.getProvince(),clicked.getCity()},
& & & & & & & & & & & & new ExecuteCallBack() {
& & & & & & & & & & & & & &
& & & & & & & & & & & & & &@Override
& & & & & & & & & & & & & &public void resultCallBack(ResultSet result) throws Exception {
& & & & & & & & & & & & & & & if(result.next()){
& & & & & & & & & & & & & & & & &long count = result.getLong(1);
& & & & & & & & & & & & & & & & &clicked.setClickedCount(count);
& & & & & & & & & & & & & & & & &updating.add(clicked);
& & & & & & & & & & & & & & & } else {
& & & & & & & & & & & & & & & & &inserting.add(clicked);
& & & & & & & & & & & & & & & }
& & & & & & & & & & & & & & &&
& & & & & & & & & & & & & &}
& & & & & & & & & & & & });
& & & & & & & &}
& & & & & & //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
& & & & & & ArrayList&Object[]& insertParametersList = new ArrayList&Object[]&();
& & & & & & for(AdClicked inserRecord : inserting){
& & & & & & & &insertParametersList.add(new Object[]{
& & & & & & & & & & &inserRecord.getTimestamp(),
& & & & & & & & & & &inserRecord.getAdID(),
& & & & & & & & & & &inserRecord.getProvince(),
& & & & & & & & & & &inserRecord.getCity(),
& & & & & & & & & & &inserRecord.getClickedCount()
& & & & & & & &});
& & & & & & }
& & & & & & jdbcWrapper.doBatch(&INSERT INTO adclickedcount VALUES(?,?,?,?,?)&, insertParametersList);
& & & & & &&
& & & & & &&
& & & & & &&
& & & & & & //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
&/* & & & & & ArrayList&Object[]& updateParametersList = new ArrayList&Object[]&();
& & & & & & for(AdClicked updateRecord : updating){
& & & & & & & &updateParametersList.add(new Object[]{
& & & & & & & & & & &updateRecord.getTimestamp(),
& & & & & & & & & & &updateRecord.getAdID(),
& & & & & & & & & & &updateRecord.getProvince(),
& & & & & & & & & & &updateRecord.getCity(),
& & & & & & & & & & &updateRecord.getClickedCount()
& & & & & & & &});
& & & & & & }
& & & & & & jdbcWrapper.doBatch(&UPDATE adclickedcount set clickedCount = ? WHERE &
& & & & & & & & & & & & + & timestamp = ? AND adID = ? AND province = ? AND city = ? &, updateParametersList);*/
& & & & & & & &
& & & & & & & &
& & & & & & & &}
& & & & & & });
& & & & & &
& & & & &}
& & & &* Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于
& & & &* 接受应用程序本身或者Executor中的消息;
& & & jsc.start();
& & & jsc.awaitTermination();
& & & jsc.close();
class JDBCWrapper {
& &private static JDBCWrapper jdbcInstance =
& &private static LinkedBlockingQueue&Connection& dbConnectionPool = new LinkedBlockingQueue&Connection& ();
& &static {
& & & try {
& & & & &Class.forName(&com.mysql.jdbc.Driver&);
& & & } catch (ClassNotFoundException e) {
& & & & &// TODO Auto-generated catch block
& & & & &e.printStackTrace();
& &public static JDBCWrapper getJDBCInstance(){
& & & if (jdbcInstance == null){
& & & & &synchronized(JDBCWrapper.class){
& & & & & & if (jdbcInstance == null){
& & & & & & & &jdbcInstance = new JDBCWrapper();
& & & & & & } & & & & &&
& & & & &}
& & & return jdbcI
& &private JDBCWrapper(){
& & & for (int i = 0; i & 10; i++){
& & & & & &&
& & & & &try {
& & & & & & Connection conn = DriverManager.getConnection(&jdbc:mysql://localhost:3306/sparkstreaming&,&root&,&root&);
& & & & & & dbConnectionPool.put(conn);
& & & & &} catch (Exception e) {
& & & & & & // TODO Auto-generated catch block
& & & & & & e.printStackTrace();
& & & & &}
& & & & & &&
& &public synchronized Connection getConnection(){
& & & while (0 == dbConnectionPool.size()){
& & & & &try {
& & & & & & Thread.sleep(20);
& & & & &} catch (InterruptedException e) {
& & & & & & // TODO Auto-generated catch block
& & & & & & e.printStackTrace();
& & & & &}
& & & return dbConnectionPool.poll();
& &public int[] doBatch(String sqlText, List&Object[]& paramsList) {
& & & Connection conn = getConnection();
& & & PreparedStatement preparedStatement =
& & & int[] result =
& & & try {
& & & & &conn.setAutoCommit(false);
& & & & &preparedStatement = conn.prepareStatement(sqlText);
& & & & &for (Object[] parameters : paramsList){
& & & & & & for(int i = 0; i & parameters. i++){
& & & & & & & &preparedStatement.setObject(i+1, parameters[i]);
& & & & & & }
& & & & & &&
& & & & & & preparedStatement.addBatch();
& & & & &}
& & & & &result = preparedStatement.executeBatch();
& & & & & & & &
& & & & &mit();
& & & } catch (Exception e) {
& & & & &// TODO Auto-generated catch block
& & & & &e.printStackTrace();
& & & } finally {
& & & & &if (preparedStatement != null){
& & & & & & try {
& & & & & & & &preparedStatement.close();
& & & & & & } catch (SQLException e) {
& & & & & & & &// TODO Auto-generated catch block
& & & & & & & &e.printStackTrace();
& & & & & & }
& & & & &}
& & & & &if (conn != null){
& & & & & & try {
& & & & & & & &dbConnectionPool.put(conn);
& & & & & & } catch (InterruptedException e) {
& & & & & & & &// TODO Auto-generated catch block
& & & & & & & &e.printStackTrace();
& & & & & & }
& & & & &}
public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callBack) {
& & & System.out.println(&sqlText=& + sqlText);
& & & Connection conn = getConnection();
& & & PreparedStatement preparedStatement =
& & & ResultSet result =
& & & try {
& & & & &preparedStatement = conn.prepareStatement(sqlText);
& & & & & & for(int i = 0; i & paramsList. i++){
& & & & & & & &preparedStatement.setObject(i+1, paramsList[i]);
& & & & & & }
& & & & & &&
& & & & &result = preparedStatement.executeQuery();
& & & & & & & &
& & & & &callBack.resultCallBack(result);
& & & } catch (Exception e) {
& & & & &// TODO Auto-generated catch block
& & & & &e.printStackTrace();
& & & } finally {
& & & & &if (preparedStatement != null){
& & & & & & try {
& & & & & & & &preparedStatement.close();
& & & & & & } catch (SQLException e) {
& & & & & & & &// TODO Auto-generated catch block
& & & & & & & &e.printStackTrace();
& & & & & & }
& & & & &}
& & & & &if (conn != null){
& & & & & & try {
& & & & & & & &dbConnectionPool.put(conn);
& & & & & & } catch (InterruptedException e) {
& & & & & & & &// TODO Auto-generated catch block
& & & & & & & &e.printStackTrace();
& & & & & & }
& & & & &}
interface ExecuteCallBack {
& &void resultCallBack(ResultSet result) throws E
class UserAdClicked {
& &private S
& &private S
& &private String userID;
& &private String adID;
& &private S
& &private S
& &private Long clickedC
& &public Long getClickedCount() {
& & & return clickedC
& &public void setClickedCount(Long clickedCount) {
& & & this.clickedCount = clickedC
& &public String getTimestamp() {
& &public void setTimestamp(String timestamp) {
& & & this.timestamp =
& &public String getIp() {
& &public void setIp(String ip) {
& & & this.ip =
& &public String getUserID() {
& & & return userID;
& &public void setUserID(String userID) {
& & & this.userID = userID;
& &public String getAdID() {
& & & return adID;
& &public void setAdID(String adID) {
& & & this.adID = adID;
& &public String getProvince() {
& &public void setProvince(String province) {
& & & this.province =
& &public String getCity() {
& &public void setCity(String city) {
& & & this.city =
class AdClicked{
& &private S
& &private String adID;
& &private S
& &private S
& &private Long clickedC
& &public String getTimestamp() {
& &public void setTimestamp(String timestamp) {
& & & this.timestamp =
& &public String getAdID() {
& & & return adID;
& &public void setAdID(String adID) {
& & & this.adID = adID;
& &public String getProvince() {
& &public void setProvince(String province) {
& & & this.province =
& &public String getCity() {
& &public void setCity(String city) {
& & & this.city =
& &public Long getClickedCount() {
& & & return clickedC
& &public void setClickedCount(Long clickedCount) {
& & & this.clickedCount = clickedC
-------------------------------------------------------------MockAdClickedStats&----------------------------------------------------------------------------------------------------------------------------------
import java.util.D
import java.util.HashM
import java.util.P
import java.util.R
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
public class MockAdClickedStats {
& & public static void main(String[] args){
& & Random random = new Random();
& & & & String[] provinces = new String[]{&Guangdong&,&Zhejiang&,&Jiangsu&,&Fujian&};
& & & & HashMap&String,String[]& cityes = new HashMap&String,String[]&();
& & & & cityes.put(&Guangdong&,new String[]{&Guangzhou&,&Shenzhen&,&DongGuan&});
& & & & cityes.put(&Zhejiang&,new String[]{&Hangzhou&,&Wenzhou&,&Ninbo&});
& & & & cityes.put(&Jiangsu&,new String[]{&Nanjing&,&Suzhou&,&wuxi&});
& & & & cityes.put(&Fujian&,new String[]{&Fuzhou&,&Ximen&,&DongGuan&});
& & & & String[] ips = new String[]{
& & & & & & &192.168.112.240&,
& & & & & & &192.168.112.241&,
& & & & & & &192.168.112.242&,
& & & & & & &192.168.112.243&,
& & & & & & &192.168.112.244&,
& & & & & & &192.168.112.245&,
& & & & & & &192.168.112.246&,
& & & & & & &192.168.112.247&,
& & & & & & &192.168.112.248&,
& & & & & & &192.168.112.249&,
& & & & & & &192.168.112.250&,
& & & & & & &192.168.112.251&,
& & & & & & &192.168.112.252&,
& & & & & & &192.168.112.253&
& & & & };
& & & & Properties kafkaConf = new Properties();
& & & & kafkaConf.put(&serializer.class&,&kafka.serializer.StringEncoder&);
& & & & kafkaConf.put(&metadata.broker.list&,&master:9092,slave1:9092,slave2:9092&);
& & & & ProducerConfig producerConfig = new ProducerConfig(kafkaConf);
& & & & Producer&Integer,String& producer = new Producer&Integer,String&(producerConfig);
& & & & new Thread(new Runnable(){
& & & & & & @Override
& & public void run(){
& & & &while(true){
& & & & & &Long timestamp = new Date().getTime();
& & & & & &String ip = ips[random.nextInt(14)];
& & & & & &int userID = random.nextInt(10000);
& & & & & &int adID = random.nextInt(100);
& & & & & &String province = provinces[random.nextInt(4)];
& & & & & &String city = cityes.get(province)[random.nextInt(3)];
& & & & & &String clickedAd = timestamp + &\t& + ip + &\t& + userID + &\t& + adID + &\t& + province + &\t& +
& & & & & &producer.send(new KeyedMessage(&AdClicked&,clickedAd));
& & & & & &System.out.println(clickedAd);
& & & & & &try{
& & & & & & & &Thread.sleep(50);
& & & & & &}catch(InterruptedException e){
& & & & & & & &e.printStackTrace();
& & & & & &}
& & & & }).start();
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:108125次
积分:3716
积分:3716
排名:第8367名
原创:265篇
转载:26篇
(5)(1)(4)(19)(37)(19)(2)(5)(21)(47)(31)(8)(21)(4)(2)(27)(2)(2)(1)(2)(1)(1)(1)(2)(2)(2)(4)(1)(1)(2)(2)(1)(10)(1)(5)

我要回帖

更多关于 微信第三方开发平台 的文章

 

随机推荐