
class MyJob extends Tool{
  static Vector&Sample& centers=new Vector&Sample&(K);
  static class MyMapper extends Mapper{
    //read centers
  static class MyMapper extends Reducer{
    //update centers
  void run(){
    until ( convergence ){
发生这种错误是因为对hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。
首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。
在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。
于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之间共享数据。DistributedCacheFile是缓存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式读取它。于是我又有了一个错误的思路:
class MyMaper{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void setup(){
& && &&&//读取cacheFile,给centers赋值
& & void map(){
& && &&&//计算样本离哪个质心最近
class MyReducer{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void reduce(){
& && &&&//更新centers
& & void cleanup(){
& && &&&//把centers写回cacheFile
class MyMaper{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void map(){
& && &&&//逐条读取质心,给centers赋值
& & void cleanup(){
& && &&&//逐行读取cacheFile,计算每个样本点离哪个质心最近
& && &&&//然后Emit(样本点所属的簇编号,样本点)
public class KMeans extends Configured implements Tool{
& & private static final Log log = LogFactory.getLog(KMeans2.class);
& & private static final int K = 10;
& & private static final int MAXITERATIONS = 300;
& & private static final double THRESHOLD = 0.01;
& & public static boolean stopIteration(Configuration conf) throws IOException{
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&Path pervCenterFile=new Path(&/user/orisun/input/centers&);
& && &&&Path currentCenterFile=new Path(&/user/orisun/output/part-r-00000&);
& && &&&if(!(fs.exists(pervCenterFile) && fs.exists(currentCenterFile))){
& && && && &(&两个质心文件需要同时存在&);
& && && && &System.exit(1);
& && &&&//比较前后两次质心的变化是否小于阈值,决定迭代是否继续
& && &&&boolean stop=
& && &&&String line1,line2;
& && &&&FSDataInputStream in1=fs.open(pervCenterFile);
& && &&&FSDataInputStream in2=fs.open(currentCenterFile);
& && &&&InputStreamReader isr1=new InputStreamReader(in1);
& && &&&InputStreamReader isr2=new InputStreamReader(in2);
& && &&&BufferedReader br1=new BufferedReader(isr1);
& && &&&BufferedReader br2=new BufferedReader(isr2);
& && &&&Sample prevCenter,currC
& && &&&while((line1=br1.readLine())!=null && (line2=br2.readLine())!=null){
& && && && &prevCenter=new Sample();
& && && && &currCenter=new Sample();
& && && && &String []str1=line1.split(&\\s+&);
& && && && &String []str2=line2.split(&\\s+&);
& && && && &assert(str1[0].equals(str2[0]));
& && && && &for(int i=1;i&=Sample.DIMENTION;i++){
& && && && && & prevCenter.arr[i-1]=Double.parseDouble(str1[i]);
& && && && && & currCenter.arr[i-1]=Double.parseDouble(str2[i]);
& && && && &}
& && && && &if(Sample.getEulerDist(prevCenter, currCenter)&THRESHOLD){
& && && && && & stop=
& && && && && &
& && && && &}
& && &&&//如果还要进行下一次迭代,就用当前质心替代上一次的质心
& && &&&if(stop==false){
& && && && &fs.delete(pervCenterFile,true);
& && && && &if(fs.rename(currentCenterFile, pervCenterFile)==false){
& && && && && & log.error(&质心文件替换失败&);
& && && && && & System.exit(1);
& && && && &}
& & public static class ClusterMapper extends Mapper&LongWritable, Text, IntWritable, Sample& {
& && &&&Vector&Sample& centers = new Vector&Sample&();
& && &&&@Override
& && &&&//清空centers
& && &&&public void setup(Context context){
& && && && &for (int i = 0; i & K; i++) {
& && && && && & centers.add(new Sample());
& && && && &}
& && &&&@Override
& && &&&//从输入文件读入centers
& && &&&public void map(LongWritable key, Text value, Context context)
& && && && && & throws IOException, InterruptedException {
& && && && &String []str=value.toString().split(&\\s+&);
& && && && &if(str.length!=Sample.DIMENTION+1){
& && && && && & log.error(&读入centers时维度不对&);
& && && && && & System.exit(1);
& && && && &}
& && && && &int index=Integer.parseInt(str[0]);
& && && && &for(int i=1;i&str.i++)
& && && && && & centers.get(index).arr[i-1]=Double.parseDouble(str[i]);
& && &&&@Override
& && &&&//找到每个数据点离哪个质心最近
& && &&&public void cleanup(Context context) throws IOException,InterruptedException {
& && && && &Path []caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
& && && && &if(caches==null || caches.length&=0){
& && && && && & log.error(&data文件不存在&);
& && && && && & System.exit(1);
& && && && &}
& && && && &BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
& && && && &S
& && && && &S
& && && && &while((line=br.readLine())!=null){
& && && && && & sample=new Sample();
& && && && && & String []str=line.split(&\\s+&);
& && && && && & for(int i=0;i&Sample.DIMENTION;i++)
& && && && && && &&&sample.arr[i]=Double.parseDouble(str[i]);
& && && && && &&&
& && && && && & int index=-1;
& && && && && & double minDist=Double.MAX_VALUE;
& && && && && & for(int i=0;i&K;i++){
& && && && && && &&&double dist=Sample.getEulerDist(sample, centers.get(i));
& && && && && && &&&if(dist&minDist){
& && && && && && && && &minDist=
& && && && && && && && &index=i;
& && && && && && &&&}
& && && && && & }
& && && && && & context.write(new IntWritable(index), sample);
& && && && &}
& & public static class UpdateCenterReducer extends Reducer&IntWritable, Sample, IntWritable, Sample& {
& && &&&int prev=-1;
& && &&&Sample center=new Sample();;
& && &&&int count=0;
& && &&&@Override
& && &&&//更新每个质心(除最后一个)
& && &&&public void reduce(IntWritable key,Iterable&Sample& values,Context context) throws IOException,InterruptedException{
& && && && &while(values.iterator().hasNext()){
& && && && && & Sample value=values.iterator().next();
& && && && && & if(key.get()!=prev){
& && && && && && &&&if(prev!=-1){
& && && && && && && && &for(int i=0;i&center.arr.i++)
& && && && && && && && && & center.arr[i]/=& && &&
& && && && && && && && &context.write(new IntWritable(prev), center);
& && && && && && &&&}
& && && && && && &&&center.clear();
& && && && && && &&&prev=key.get();
& && && && && && &&&count=0;
& && && && && & }
& && && && && & for(int i=0;i&Sample.DIMENTION;i++)
& && && && && && &&&center.arr[i]+=value.arr[i];
& && && && && & count++;
& && && && &}
& && &&&@Override
& && &&&//更新最后一个质心
& && &&&public void cleanup(Context context) throws IOException,InterruptedException{
& && && && &for(int i=0;i&center.arr.i++)
& && && && && & center.arr[i]/=
& && && && &context.write(new IntWritable(prev), center);
& & @Override
& & public int run(String[] args) throws Exception {
& && &&&Configuration conf=getConf();
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&Job job=new Job(conf);
& && &&&job.setJarByClass(KMeans.class);
& && &&&//质心文件每行的第一个数字是索引
& && &&&FileInputFormat.setInputPaths(job, &/user/orisun/input/centers&);
& && &&&Path outDir=new Path(&/user/orisun/output&);
& && &&&fs.delete(outDir,true);
& && &&&FileOutputFormat.setOutputPath(job, outDir);
& && &&&job.setInputFormatClass(TextInputFormat.class);
& && &&&job.setOutputFormatClass(TextOutputFormat.class);
& && &&&job.setMapperClass(ClusterMapper.class);
& && &&&job.setReducerClass(UpdateCenterReducer.class);
& && &&&job.setOutputKeyClass(IntWritable.class);
& && &&&job.setOutputValueClass(Sample.class);
& && &&&return job.waitForCompletion(true)?0:1;
& & public static void main(String[] args) throws Exception {
& && &&&Configuration conf = new Configuration();
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&//样本数据文件中每个样本不需要标记索引
& && &&&Path dataFile=new Path(&/user/orisun/input/data&);
& && &&&DistributedCache.addCacheFile(dataFile.toUri(), conf);
& && &&&int iteration = 0;
& && &&&int success = 1;
& && &&&do {
& && && && &success ^= ToolRunner.run(conf, new KMeans(), args);
& && && && &(&iteration &+iteration+& end&);
& && &&&} while (success == 1 && iteration++ & MAXITERATIONS
& && && && && & && (!stopIteration(conf)));
& && &&&(&Success.Iteration=& + iteration);
& && &&&//迭代完成后再执行一次mapper,输出每个样本点所属的分类--在/user/orisun/output2/part-m-00000中
& && &&&//质心文件保存在/user/orisun/input/centers中
& && &&&Job job=new Job(conf);
& && &&&job.setJarByClass(KMeans.class);
& && &&&FileInputFormat.setInputPaths(job, &/user/orisun/input/centers&);
& && &&&Path outDir=new Path(&/user/orisun/output2&);
& && &&&fs.delete(outDir,true);
& && &&&FileOutputFormat.setOutputPath(job, outDir);
& && &&&job.setInputFormatClass(TextInputFormat.class);
& && &&&job.setOutputFormatClass(TextOutputFormat.class);
& && &&&job.setMapperClass(ClusterMapper.class);
& && &&&job.setNumReduceTasks(0);
& && &&&job.setOutputKeyClass(IntWritable.class);
& && &&&job.setOutputValueClass(Sample.class);
& && &&&job.waitForCompletion(true);
注意在Driver中创建Job实例时一定要把Configuration类型的参数传递进去,否则在Mapper或Reducer中调用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就为null。因为空构造函数的Job采用的Configuration是从hadoop的配置文件中读出来的(使用new Configuration()创建的Configuration就是从hadoop的配置文件中读出来的),请注意在main()函数中有一句:DistributedCache.addCacheFile(dataFile.toUri(),
次 ,您可全文免费在线阅读后下载本文档。
需要金币:350 &&
一个实验 所有实验都是在实验室搭建的Hadoop平台 上运行的.平台有5 台机器,都是四核Intel Corei3处理器,4GB内存.Hadoop版本0.20.2, java版本1.6.25.每台机器之间用千兆以太网 卡,通过交换机连接.实验所用的数据是人工数 据,维度是48维.为了测试算法的性能,实验中构 造了分别含有10^4,10^5,10^6,2*10^6 条 记录的数据来进行测试.由于KMeans算法中有 随机初始化中心点的操作,因此对每一组实验重 复执行25次,取其平均执行时间作为最终实验结 果 算法改进后的实效 可以看出:基于MapReduce的KMeans算法 的运行效率要远远高于传统的KMeans算法 Q&A The
algorithm of Kmeans 主要内容: Kmeans实战 聚类算法简介 Kmeans算法详解 Kmeans算法的缺陷及若干改进 Kmeans的单机实现与分布式实现策略
聚类算法简介 1 2 3 聚类的目标:将一组向量分成若干组,组内数据是相似的,而组间数据是有较明显差异。 与分类区别:分类与聚类最大的区别在于分类的目标事先已知,聚类也被称为无监督机器学习 聚类手段:传统聚类算法 ①划分法
⑤基于模型方法 什么是Kmeans算法? Q1:K是什么?A1:k是聚类算法当中类的个数。 Summary:Kmeans是用均值算法把数据分成K个类的算法!
Q2:means是什么?A2:means是均值算法。 Kmeans算法详解(1) 步骤一:取得k个初始初始中心点 Kmeans算法详解(2) Min
three due to
the EuclidDistance 步骤二:把每个点划分进相应的簇 Kmeans算法详解(3) Min
three due to
the EuclidDistance 步骤三:重新计算中心点 Kmeans算法详解(4) 步骤四:迭代计算中心点
正在加载中,请稍后...数据挖掘--kmeans聚类算法mapreduce实现 代码
==================cluster.txt===========================A&&& 2&&& 2B&&& 2&&& 4C&&& 4&&& 2D&&& 4&&& 4E&&& 6&&& 6F&&& 6&&& 8G&&& 8&&& 6H&&& 8&&& 8==================cluster.center.conf===========================K1&&& 3&&& 2K2&&& 6&&& 2====================================================================================package com.mahout.//二维坐标的点public class DmRecord {&&& private S&&& public String getName() {&&& &&&&&& }&&& public void setName(String name) {&&& &&& this.name =&&& }&&& priv&&& priv&&& &&& public DmRecord(){&&& &&& &&& }&&& &&& public DmRecord(String name,double x,double y){&&& &&& this.name =&&& &&& this.xpodouble =&&& &&& this.ypodouble =&&& }&&& public double getXpoint() {&&& &&&&&& }&&& public void setXpoint(double xpodouble) {&&& &&& this.xpodouble =&&& }&&& public double getYpoint() {&&& &&&&&& }&&& public void setYpoint(double ypodouble) {&&& &&& this.ypodouble =&&& }&&& &&& public& double distance(DmRecord record){&&& &&& return Math.sqrt(Math.pow(this.xpodouble-record.xpodouble, 2)+Math.pow(this.ypodouble-record.ypodouble, 2));&&& }}==============================================================================package com.mahout.import java.io.BufferedRimport java.io.Fimport java.io.FileInputSimport java.io.IOEimport java.io.InputStreamRimport java.util.HashMimport java.util.Mimport org.apache.hadoop.io.IOUpublic class DmRecordParser {&&& private Map&String,DmRecord& urlMap = new HashMap&String,DmRecord&();&&& & &&& & /**&&& && * 读取配置文件记录,生成对象&&& && */&&& & public void initialize(File file) throws IOException {&&& &&& BufferedReader in =&&& &&& try {&&& &&&&& in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));&&& &&&&& S&&& &&&&& while ((line = in.readLine()) != null) {&&& &&&&&&& String [] strKey = line.split(&\t&);&&& &&&&&&& urlMap.put(strKey[0],parse(line));&&& &&&&& }&&& &&& } finally {&&& &&&&& IOUtils.closeStream(in);&&& &&& }&&& & }&&& & &&& & /**&&& && * 生成坐标对象&&& && */&&& & public DmRecord parse(String line){&&& &&& String [] strPlate = line.split(&\t&);&&& &&& DmRecord Dmurl = new DmRecord(strPlate[0],Integer.parseInt(strPlate[1]),Integer.parseInt(strPlate[2]));&&& &&& return D&&& & }&&& & &&& & /**&&& && * 获取分类中心坐标&&& && */&&& & public DmRecord getUrlCode(String cluster){&&& &&& & DmRecord returnCode =&&& &&& DmRecord dmUrl = (DmRecord)urlMap.get(cluster);&&& &&& if(dmUrl == null){&&& &&&&& //35&&&& 6&&& &&& &&& returnCode = &&& &&& }else{&&& &&& &&& returnCode =dmU&&& &&& }&&& &&& return returnC&&& & }}==============================================================================package com.mahout.import java.io.Fimport java.io.IOEimport java.util.Iimport org.apache.hadoop.conf.Cimport org.apache.hadoop.fs.Pimport org.apache.hadoop.io.LongWimport org.apache.hadoop.io.Timport org.apache.pressionCimport org.apache.hadoop.mapred.FileInputFimport org.apache.hadoop.mapred.FileOutputFimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.MapReduceBimport org.apache.hadoop.mapred.Mimport org.apache.hadoop.mapred.OutputCimport org.apache.hadoop.mapred.Rimport org.apache.hadoop.mapred.Rimport org.apache.hadoop.mapred.TextInputFimport org.apache.hadoop.mapred.TextOutputFimport org.apache.hadoop.util.Timport org.apache.hadoop.util.ToolRimport com.mahout.test.StringStringPairApublic class Kmeans& extends Configured implements Tool {&& &public static class KmeansMapper extends MapReduceBase implements&& &&& &&& &Mapper&LongWritable, Text, Text, Text& {&& &&& &private DmRecordP&& &&& &private String clusterNode = &K&;&& &&& &private DmRecord record0 =&& &&& &private DmRecord record1 = new DmRecord();&& &&& &private double Min_distance = 9999;&& &&& &private int tmpK = 0;&& &&& &private Text tKey = new Text();&& &&& &private Text tValue = new Text();&& &&& &&& &&& &//获取聚类中心坐标&& &&& &@Override&& &&& &public void configure(JobConf conf) {&& &&& &&& &drp = new DmRecordParser();&& &&& &&& &try {&& &&& &&& &&& &drp.initialize(new File(&cluster.center.conf&));&& &&& &&& &} catch (IOException e) {&& &&& &&& &&& &throw new RuntimeException(e);&& &&& &&& &}&& &&& &}&& &&& &&& &&& &//根据聚类坐标,把文件中的点进行类别划分&& &&& &@Override&& &&& &public void map(LongWritable key, Text value,&& &&& &&& &&& &OutputCollector&Text, Text& output, Reporter arg3)&& &&& &&& &&& &throws IOException {&& &&& &&& &String [] strArr = value.toString().split(&\t&);&& &&& &&& &&& &&& &&& &for(int i=1; i &= 2; i++){&& &&& &&& &&& &record0 = drp.getUrlCode(&K&+i);&& &&& &&& &&& &record1.setName(strArr[0]);&& &&& &&& &&& &record1.setXpoint(Double.parseDouble(strArr[1]));&& &&& &&& &&& &record1.setXpoint(Integer.parseInt(strArr[2]));&& &&& &&& &&& &&& &&& &&& &&& &if(record0.distance(record1) & Min_distance){&& &&& &&& &&& &&& &tmpK =&& &&& &&& &&& &&& &Min_distance = record0.distance(record1);&& &&& &&& &&& &}&& &&& &&& &}&& &&& &&& &&& &&& &&& &&& &&& &&& &tKey.set(&C&+tmpK);&& &&& &&& &output.collect(tKey, value);&& &&& &}&& &}&& &&& &//计算新的聚类中心&& &public static class KmeansReducer extends MapReduceBase implements&& &&& &&& &Reducer&Text, Text, Text, Text& {&& &&& &&& &&& &private Text tKey = new Text();&& &&& &private Text tValue = new Text();&& &&& &&& &&& &@Override&& &&& &public void reduce(Text key, Iterator&Text& value,&& &&& &&& &&& &OutputCollector&Text, Text& output, Reporter arg3)&& &&& &&& &&& &throws IOException {&& &&& &&& &double avgX=0;&& &&& &&& &double avgY=0;&& &&& &&& &double sumX=0;&& &&& &&& &double sumY=0;&& &&& &&& &int count=0;&& &&& &&& &String [] strValue =&& &&& &&& &&& &&& &&& &while(value.hasNext()){&& &&& &&& &&& &count++;&& &&& &&& &&& &strValue = value.next().toString().split(&\t&);&& &&& &&& &&& &sumX = sumX + Integer.parseInt(strValue[1]);&& &&& &&& &&& &sumY = sumY + Integer.parseInt(strValue[1]);&& &&& &&& &}&& &&& &&& &&& &&& &&& &avgX = sumX/&& &&& &&& &avgY = sumY/&& &&& &&& &tKey.set(&K&+key.toString().substring(1,2));&& &&& &&& &tValue.set(avgX + &\t& + avgY);&& &&& &&& &System.out.println(&K&+key.toString().substring(1,2)+&\t&+avgX + &\t& + avgY);&& &&& &&& &output.collect(tKey, tValue);&& &&& &}&& &}&& &&& &@Override&&& public int run(String[] args) throws Exception {&& &&& &JobConf conf = new JobConf(getConf(), Kmeans.class);&&&&&&& conf.setJobName(&Kmeans&);&&&&&&& //conf.setNumMapTasks(200);&&&&&&& // 设置Map输出的key和value的类型&&&&&&& conf.setMapOutputKeyClass(Text.class);&&&&&&& conf.setMapOutputValueClass(Text.class);&&&&&&& // 设置Reduce输出的key和value的类型&&&&&&& conf.setOutputKeyClass(Text.class);&&&&&&& conf.setOutputValueClass(Text.class);&&&&&&& // 设置Mapper和Reducer&&&&&&& conf.setMapperClass(KmeansMapper.class);&&&&&&& conf.setReducerClass(KmeansReducer.class);&&&&&& &&&&&&&& conf.setInputFormat(TextInputFormat.class);&&&&&&& conf.setOutputFormat(TextOutputFormat.class);&&&&&&& // 设置输入输出目录&&&&&&& FileInputFormat.setInputPaths(conf, new Path(args[0]));&&&&&&& FileOutputFormat.setOutputPath(conf, new Path(args[1]));&&&&&&& JobClient.runJob(conf);&&&&&&& return 0;&&& }&&& public static void main(String[] args) throws Exception {&&&&&&& int exitCode = ToolRunner.run(new Kmeans(), args);&&&&&&& System.exit(exitCode);&&& }}


