如何在Hadoop平台上实现K-kmeans算法matlab实现

mapreduce(14)
在我们阅读的时候,我们首先知道什么是KMeans:
K-means算法是最为经典的基于划分的聚类方法,是十大经典数据挖掘算法之一。K-means算法的基本思想是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。
虽然已经发展到了hadoop2.4,但是对于一些算法只要明白其中的含义,是和语言无关的,无论是使用Java、C++、python等,
本文以Hadoop1.0.3为例。
从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点);在Reducer中属于同一个质心的样本点在一个链表中,方便我们计算新的中心,然后Emit(质心编号,质心)。但是技术上的事并没有理论层面那么简单。
Mapper和Reducer都要用到K个中心(我习惯称之为质心),Mapper要读这些质心,Reducer要写这些质心。另外Mapper还要读存储样本点的数据文件。我先后尝试以下3种方法,只有第3种是可行的,如果你不想被我误导,请直接跳过前两种。
一、用一个共享变量在存储K个质心
由于K很小,所以我们认为用一个Vector&Sample&来存储K个质心是没有问题的。以下代码是错误的:
class MyJob extends Tool{
  static Vector&Sample& centers=new Vector&Sample&(K);
  static class MyMapper extends Mapper{
    //read centers
  static class MyMapper extends Reducer{
    //update centers
  void run(){
    until ( convergence ){
      map();
      reduce();
发生这种错误是因为对hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。
详细的流程是这样的:
首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。
在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。
二、用分布式缓存文件存储K个质心
既然不能通过共享外部类变量的方式,那我们通过文件在map和reduce之间传递数据总可以吧,Mapper从文件中读取质心,Reducer把更新后的质心再写入这个文件。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。虽然有MutipleInputs可以指定map()的输入文件有多个,并可以为每个输入文件分别指定解析方式,但是MutipleInputs不能保证每条记录从不同文件中传给map()的顺序。在我们的KMeans中,我们希望质心文件全部被读入后再逐条读入样本数据。
于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之间共享数据。DistributedCacheFile是缓存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式读取它。于是我又有了一个错误的思路:
class MyMaper{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void setup(){
& && &&&//读取cacheFile,给centers赋值
& & void map(){
& && &&&//计算样本离哪个质心最近
class MyReducer{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void reduce(){
& && &&&//更新centers
& & void cleanup(){
& && &&&//把centers写回cacheFile
错因:DistributedCacheFile是只读的,在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这是单向的复制,是不能写回的。试想在分布式环境下,如果不同的mapper和reducer可以把缓存文件写回的话,那岂不又需要一套复杂的文件共享机制,严重地影响hadoop执行效率。
三、用分布式缓存文件存储样本数据
其实DistributedCache还有一个特点,它更适合于“大文件”(各节点内存容不下)缓存在本地。仅存储了K个质心的文件显然是小文件,与之相比样本数据文件才是大文件。
此时我们需要2个质心文件:一个存放上一次的质心prevCenterFile,一个存放reducer更新后的质心currCenterFile。Mapper从prevCenterFile中读取质心,Reducer把更新后有质心写入currCenterFile。在Driver中读入prevCenterFile和currCenterFile,比较前后两次的质心是否相同(或足够地接近),如果相同则停止迭代,否则就用currCenterFile覆盖prevCenterFile(使用fs.rename),进入下一次的迭代。
这时候Mapper就是这样的:
class MyMaper{
& & Vector&Sample& centers=new Vector&Sample&(K);
& & void map(){
& && &&&//逐条读取质心,给centers赋值
& & void cleanup(){
& && &&&//逐行读取cacheFile,计算每个样本点离哪个质心最近
& && &&&//然后Emit(样本点所属的簇编号,样本点)
试验数据是在Mahout项目中作为example提供的,600个样本点,每个样本是一个60维的浮点向量。&&(118.04
KB, 下载次数: 1)&
为样本数据建立一个类Sample.java。
import java.io.DataI
import java.io.DataO
import java.io.IOE
import mons.logging.L
import mons.logging.LogF
import org.apache.hadoop.io.W
public class Sample implements Writable{
& & private static final Log log=LogFactory.getLog(Sample.class);
& & public static final int DIMENTION=60;
& & public double arr[];
& & public Sample(){
& && &&&arr=new double[DIMENTION];
& & public static double getEulerDist(Sample vec1,Sample vec2){
& && &&&if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){
& && && && &log.error(&vector's dimention is not &+DIMENTION);
& && && && &System.exit(1);
& && &&&double dist=0.0;
& && &&&for(int i=0;i&DIMENTION;++i){
& && && && &dist+=(vec1.arr[i]-vec2.arr[i])*(vec1.arr[i]-vec2.arr[i]);
& && &&&return Math.sqrt(dist);
& & public void clear(){
& && &&&for(int i=0;i&arr.i++)
& && && && &arr[i]=0.0;
& & @Override
& & public String toString(){
& && &&&String rect=String.valueOf(arr[0]);
& && &&&for(int i=1;i&DIMENTION;i++)
& && && && &rect+=&\t&+String.valueOf(arr[i]);
& & @Override
& & public void readFields(DataInput in) throws IOException {
& && &&&String str[]=in.readUTF().split(&\\s+&);
& && &&&for(int i=0;i&DIMENTION;++i)
& && && && &arr[i]=Double.parseDouble(str[i]);
& & @Override
& & public void write(DataOutput out) throws IOException {
& && &&&out.writeUTF(this.toString());
KMeans.java
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.io.InputStreamR
import java.util.V
import mons.logging.L
import mons.logging.LogF
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
public class KMeans extends Configured implements Tool{
& & private static final Log log = LogFactory.getLog(KMeans2.class);
& & private static final int K = 10;
& & private static final int MAXITERATIONS = 300;
& & private static final double THRESHOLD = 0.01;
& & public static boolean stopIteration(Configuration conf) throws IOException{
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&Path pervCenterFile=new Path(&/user/orisun/input/centers&);
& && &&&Path currentCenterFile=new Path(&/user/orisun/output/part-r-00000&);
& && &&&if(!(fs.exists(pervCenterFile) && fs.exists(currentCenterFile))){
& && && && &(&两个质心文件需要同时存在&);
& && && && &System.exit(1);
& && &&&//比较前后两次质心的变化是否小于阈值,决定迭代是否继续
& && &&&boolean stop=
& && &&&String line1,line2;
& && &&&FSDataInputStream in1=fs.open(pervCenterFile);
& && &&&FSDataInputStream in2=fs.open(currentCenterFile);
& && &&&InputStreamReader isr1=new InputStreamReader(in1);
& && &&&InputStreamReader isr2=new InputStreamReader(in2);
& && &&&BufferedReader br1=new BufferedReader(isr1);
& && &&&BufferedReader br2=new BufferedReader(isr2);
& && &&&Sample prevCenter,currC
& && &&&while((line1=br1.readLine())!=null && (line2=br2.readLine())!=null){
& && && && &prevCenter=new Sample();
& && && && &currCenter=new Sample();
& && && && &String []str1=line1.split(&\\s+&);
& && && && &String []str2=line2.split(&\\s+&);
& && && && &assert(str1[0].equals(str2[0]));
& && && && &for(int i=1;i&=Sample.DIMENTION;i++){
& && && && && & prevCenter.arr[i-1]=Double.parseDouble(str1[i]);
& && && && && & currCenter.arr[i-1]=Double.parseDouble(str2[i]);
& && && && &}
& && && && &if(Sample.getEulerDist(prevCenter, currCenter)&THRESHOLD){
& && && && && & stop=
& && && && && &
& && && && &}
& && &&&//如果还要进行下一次迭代,就用当前质心替代上一次的质心
& && &&&if(stop==false){
& && && && &fs.delete(pervCenterFile,true);
& && && && &if(fs.rename(currentCenterFile, pervCenterFile)==false){
& && && && && & log.error(&质心文件替换失败&);
& && && && && & System.exit(1);
& && && && &}
& & public static class ClusterMapper extends Mapper&LongWritable, Text, IntWritable, Sample& {
& && &&&Vector&Sample& centers = new Vector&Sample&();
& && &&&@Override
& && &&&//清空centers
& && &&&public void setup(Context context){
& && && && &for (int i = 0; i & K; i++) {
& && && && && & centers.add(new Sample());
& && && && &}
& && &&&@Override
& && &&&//从输入文件读入centers
& && &&&public void map(LongWritable key, Text value, Context context)
& && && && && & throws IOException, InterruptedException {
& && && && &String []str=value.toString().split(&\\s+&);
& && && && &if(str.length!=Sample.DIMENTION+1){
& && && && && & log.error(&读入centers时维度不对&);
& && && && && & System.exit(1);
& && && && &}
& && && && &int index=Integer.parseInt(str[0]);
& && && && &for(int i=1;i&str.i++)
& && && && && & centers.get(index).arr[i-1]=Double.parseDouble(str[i]);
& && &&&@Override
& && &&&//找到每个数据点离哪个质心最近
& && &&&public void cleanup(Context context) throws IOException,InterruptedException {
& && && && &Path []caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
& && && && &if(caches==null || caches.length&=0){
& && && && && & log.error(&data文件不存在&);
& && && && && & System.exit(1);
& && && && &}
& && && && &BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
& && && && &S
& && && && &S
& && && && &while((line=br.readLine())!=null){
& && && && && & sample=new Sample();
& && && && && & String []str=line.split(&\\s+&);
& && && && && & for(int i=0;i&Sample.DIMENTION;i++)
& && && && && && &&&sample.arr[i]=Double.parseDouble(str[i]);
& && && && && &&&
& && && && && & int index=-1;
& && && && && & double minDist=Double.MAX_VALUE;
& && && && && & for(int i=0;i&K;i++){
& && && && && && &&&double dist=Sample.getEulerDist(sample, centers.get(i));
& && && && && && &&&if(dist&minDist){
& && && && && && && && &minDist=
& && && && && && && && &index=i;
& && && && && && &&&}
& && && && && & }
& && && && && & context.write(new IntWritable(index), sample);
& && && && &}
& & public static class UpdateCenterReducer extends Reducer&IntWritable, Sample, IntWritable, Sample& {
& && &&&int prev=-1;
& && &&&Sample center=new Sample();;
& && &&&int count=0;
& && &&&@Override
& && &&&//更新每个质心(除最后一个)
& && &&&public void reduce(IntWritable key,Iterable&Sample& values,Context context) throws IOException,InterruptedException{
& && && && &while(values.iterator().hasNext()){
& && && && && & Sample value=values.iterator().next();
& && && && && & if(key.get()!=prev){
& && && && && && &&&if(prev!=-1){
& && && && && && && && &for(int i=0;i&center.arr.i++)
& && && && && && && && && & center.arr[i]/=& && &&
& && && && && && && && &context.write(new IntWritable(prev), center);
& && && && && && &&&}
& && && && && && &&&center.clear();
& && && && && && &&&prev=key.get();
& && && && && && &&&count=0;
& && && && && & }
& && && && && & for(int i=0;i&Sample.DIMENTION;i++)
& && && && && && &&&center.arr[i]+=value.arr[i];
& && && && && & count++;
& && && && &}
& && &&&@Override
& && &&&//更新最后一个质心
& && &&&public void cleanup(Context context) throws IOException,InterruptedException{
& && && && &for(int i=0;i&center.arr.i++)
& && && && && & center.arr[i]/=
& && && && &context.write(new IntWritable(prev), center);
& & @Override
& & public int run(String[] args) throws Exception {
& && &&&Configuration conf=getConf();
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&Job job=new Job(conf);
& && &&&job.setJarByClass(KMeans.class);
& && &&&//质心文件每行的第一个数字是索引
& && &&&FileInputFormat.setInputPaths(job, &/user/orisun/input/centers&);
& && &&&Path outDir=new Path(&/user/orisun/output&);
& && &&&fs.delete(outDir,true);
& && &&&FileOutputFormat.setOutputPath(job, outDir);
& && &&&job.setInputFormatClass(TextInputFormat.class);
& && &&&job.setOutputFormatClass(TextOutputFormat.class);
& && &&&job.setMapperClass(ClusterMapper.class);
& && &&&job.setReducerClass(UpdateCenterReducer.class);
& && &&&job.setOutputKeyClass(IntWritable.class);
& && &&&job.setOutputValueClass(Sample.class);
& && &&&return job.waitForCompletion(true)?0:1;
& & public static void main(String[] args) throws Exception {
& && &&&Configuration conf = new Configuration();
& && &&&FileSystem fs=FileSystem.get(conf);
& && &&&//样本数据文件中每个样本不需要标记索引
& && &&&Path dataFile=new Path(&/user/orisun/input/data&);
& && &&&DistributedCache.addCacheFile(dataFile.toUri(), conf);
& && &&&int iteration = 0;
& && &&&int success = 1;
& && &&&do {
& && && && &success ^= ToolRunner.run(conf, new KMeans(), args);
& && && && &(&iteration &+iteration+& end&);
& && &&&} while (success == 1 && iteration++ & MAXITERATIONS
& && && && && & && (!stopIteration(conf)));
& && &&&(&Success.Iteration=& + iteration);
& && &&&//迭代完成后再执行一次mapper,输出每个样本点所属的分类--在/user/orisun/output2/part-m-00000中
& && &&&//质心文件保存在/user/orisun/input/centers中
& && &&&Job job=new Job(conf);
& && &&&job.setJarByClass(KMeans.class);
& && &&&FileInputFormat.setInputPaths(job, &/user/orisun/input/centers&);
& && &&&Path outDir=new Path(&/user/orisun/output2&);
& && &&&fs.delete(outDir,true);
& && &&&FileOutputFormat.setOutputPath(job, outDir);
& && &&&job.setInputFormatClass(TextInputFormat.class);
& && &&&job.setOutputFormatClass(TextOutputFormat.class);
& && &&&job.setMapperClass(ClusterMapper.class);
& && &&&job.setNumReduceTasks(0);
& && &&&job.setOutputKeyClass(IntWritable.class);
& && &&&job.setOutputValueClass(Sample.class);
& && &&&job.waitForCompletion(true);
注意在Driver中创建Job实例时一定要把Configuration类型的参数传递进去,否则在Mapper或Reducer中调用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就为null。因为空构造函数的Job采用的Configuration是从hadoop的配置文件中读出来的(使用new Configuration()创建的Configuration就是从hadoop的配置文件中读出来的),请注意在main()函数中有一句:DistributedCache.addCacheFile(dataFile.toUri(),
conf);即此时的Configuration中多了一个DistributedCacheFile,所以你需要把这个Configuration传递给Job构造函数,如果传递默认的Configuration,那在Job中当然不知道DistributedCacheFile的存在了。
方案三还是不如人意,质心文件是很小的(因为质心总共就没几个),用map()函数仅仅是来读一个质心文件根本就没有发挥并行的作用,而且在map()中也没有调用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中来完成,这样就不需要Mapper了,或者说上面设计的就不是MapReduce程序,跟平常的单线程串行程序是一样的。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:17054次
排名:千里之外
转载:62篇
(1)(5)(2)(2)(1)(1)(1)(1)(7)(6)(40)(2)您所在位置: &
&nbsp&&nbsp&nbsp&&nbsp
大数据经典算法Kmeans要点详解.ppt33页
本文档一共被下载:
次 ,您可全文免费在线阅读后下载本文档。
文档加载中...广告还剩秒
需要金币:350 &&
你可能关注的文档:
··········
··········
一个实验 所有实验都是在实验室搭建的Hadoop平台 上运行的.平台有5 台机器,都是四核Intel Corei3处理器,4GB内存.Hadoop版本0.20.2, java版本1.6.25.每台机器之间用千兆以太网 卡,通过交换机连接.实验所用的数据是人工数 据,维度是48维.为了测试算法的性能,实验中构 造了分别含有10^4,10^5,10^6,2*10^6 条 记录的数据来进行测试.由于KMeans算法中有 随机初始化中心点的操作,因此对每一组实验重 复执行25次,取其平均执行时间作为最终实验结 果 算法改进后的实效 可以看出:基于MapReduce的KMeans算法 的运行效率要远远高于传统的KMeans算法 Q&A The
algorithm of Kmeans 主要内容: Kmeans实战 聚类算法简介 Kmeans算法详解 Kmeans算法的缺陷及若干改进 Kmeans的单机实现与分布式实现策略
聚类算法简介 1 2 3 聚类的目标:将一组向量分成若干组,组内数据是相似的,而组间数据是有较明显差异。 与分类区别:分类与聚类最大的区别在于分类的目标事先已知,聚类也被称为无监督机器学习 聚类手段:传统聚类算法 ①划分法
②层次方法
③基于密度方法
④基于网络方法
⑤基于模型方法 什么是Kmeans算法? Q1:K是什么?A1:k是聚类算法当中类的个数。 Summary:Kmeans是用均值算法把数据分成K个类的算法!
Q2:means是什么?A2:means是均值算法。 Kmeans算法详解(1) 步骤一:取得k个初始初始中心点 Kmeans算法详解(2) Min
three due to
the EuclidDistance 步骤二:把每个点划分进相应的簇 Kmeans算法详解(3) Min
three due to
the EuclidDistance 步骤三:重新计算中心点 Kmeans算法详解(4) 步骤四:迭代计算中心点
正在加载中,请稍后...数据挖掘--kmeans聚类算法mapreduce实现 代码
==================cluster.txt===========================A&&& 2&&& 2B&&& 2&&& 4C&&& 4&&& 2D&&& 4&&& 4E&&& 6&&& 6F&&& 6&&& 8G&&& 8&&& 6H&&& 8&&& 8==================cluster.center.conf===========================K1&&& 3&&& 2K2&&& 6&&& 2====================================================================================package com.mahout.//二维坐标的点public class DmRecord {&&& private S&&& public String getName() {&&& &&&&&& }&&& public void setName(String name) {&&& &&& this.name =&&& }&&& priv&&& priv&&& &&& public DmRecord(){&&& &&& &&& }&&& &&& public DmRecord(String name,double x,double y){&&& &&& this.name =&&& &&& this.xpodouble =&&& &&& this.ypodouble =&&& }&&& public double getXpoint() {&&& &&&&&& }&&& public void setXpoint(double xpodouble) {&&& &&& this.xpodouble =&&& }&&& public double getYpoint() {&&& &&&&&& }&&& public void setYpoint(double ypodouble) {&&& &&& this.ypodouble =&&& }&&& &&& public& double distance(DmRecord record){&&& &&& return Math.sqrt(Math.pow(this.xpodouble-record.xpodouble, 2)+Math.pow(this.ypodouble-record.ypodouble, 2));&&& }}==============================================================================package com.mahout.import java.io.BufferedRimport java.io.Fimport java.io.FileInputSimport java.io.IOEimport java.io.InputStreamRimport java.util.HashMimport java.util.Mimport org.apache.hadoop.io.IOUpublic class DmRecordParser {&&& private Map&String,DmRecord& urlMap = new HashMap&String,DmRecord&();&&& & &&& & /**&&& && * 读取配置文件记录,生成对象&&& && */&&& & public void initialize(File file) throws IOException {&&& &&& BufferedReader in =&&& &&& try {&&& &&&&& in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));&&& &&&&& S&&& &&&&& while ((line = in.readLine()) != null) {&&& &&&&&&& String [] strKey = line.split(&\t&);&&& &&&&&&& urlMap.put(strKey[0],parse(line));&&& &&&&& }&&& &&& } finally {&&& &&&&& IOUtils.closeStream(in);&&& &&& }&&& & }&&& & &&& & /**&&& && * 生成坐标对象&&& && */&&& & public DmRecord parse(String line){&&& &&& String [] strPlate = line.split(&\t&);&&& &&& DmRecord Dmurl = new DmRecord(strPlate[0],Integer.parseInt(strPlate[1]),Integer.parseInt(strPlate[2]));&&& &&& return D&&& & }&&& & &&& & /**&&& && * 获取分类中心坐标&&& && */&&& & public DmRecord getUrlCode(String cluster){&&& &&& & DmRecord returnCode =&&& &&& DmRecord dmUrl = (DmRecord)urlMap.get(cluster);&&& &&& if(dmUrl == null){&&& &&&&& //35&&&& 6&&& &&& &&& returnCode = &&& &&& }else{&&& &&& &&& returnCode =dmU&&& &&& }&&& &&& return returnC&&& & }}==============================================================================package com.mahout.import java.io.Fimport java.io.IOEimport java.util.Iimport org.apache.hadoop.conf.Cimport org.apache.hadoop.fs.Pimport org.apache.hadoop.io.LongWimport org.apache.hadoop.io.Timport org.apache.pressionCimport org.apache.hadoop.mapred.FileInputFimport org.apache.hadoop.mapred.FileOutputFimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.MapReduceBimport org.apache.hadoop.mapred.Mimport org.apache.hadoop.mapred.OutputCimport org.apache.hadoop.mapred.Rimport org.apache.hadoop.mapred.Rimport org.apache.hadoop.mapred.TextInputFimport org.apache.hadoop.mapred.TextOutputFimport org.apache.hadoop.util.Timport org.apache.hadoop.util.ToolRimport com.mahout.test.StringStringPairApublic class Kmeans& extends Configured implements Tool {&& &public static class KmeansMapper extends MapReduceBase implements&& &&& &&& &Mapper&LongWritable, Text, Text, Text& {&& &&& &private DmRecordP&& &&& &private String clusterNode = &K&;&& &&& &private DmRecord record0 =&& &&& &private DmRecord record1 = new DmRecord();&& &&& &private double Min_distance = 9999;&& &&& &private int tmpK = 0;&& &&& &private Text tKey = new Text();&& &&& &private Text tValue = new Text();&& &&& &&& &&& &//获取聚类中心坐标&& &&& &@Override&& &&& &public void configure(JobConf conf) {&& &&& &&& &drp = new DmRecordParser();&& &&& &&& &try {&& &&& &&& &&& &drp.initialize(new File(&cluster.center.conf&));&& &&& &&& &} catch (IOException e) {&& &&& &&& &&& &throw new RuntimeException(e);&& &&& &&& &}&& &&& &}&& &&& &&& &&& &//根据聚类坐标,把文件中的点进行类别划分&& &&& &@Override&& &&& &public void map(LongWritable key, Text value,&& &&& &&& &&& &OutputCollector&Text, Text& output, Reporter arg3)&& &&& &&& &&& &throws IOException {&& &&& &&& &String [] strArr = value.toString().split(&\t&);&& &&& &&& &&& &&& &&& &for(int i=1; i &= 2; i++){&& &&& &&& &&& &record0 = drp.getUrlCode(&K&+i);&& &&& &&& &&& &record1.setName(strArr[0]);&& &&& &&& &&& &record1.setXpoint(Double.parseDouble(strArr[1]));&& &&& &&& &&& &record1.setXpoint(Integer.parseInt(strArr[2]));&& &&& &&& &&& &&& &&& &&& &&& &if(record0.distance(record1) & Min_distance){&& &&& &&& &&& &&& &tmpK =&& &&& &&& &&& &&& &Min_distance = record0.distance(record1);&& &&& &&& &&& &}&& &&& &&& &}&& &&& &&& &&& &&& &&& &&& &&& &&& &tKey.set(&C&+tmpK);&& &&& &&& &output.collect(tKey, value);&& &&& &}&& &}&& &&& &//计算新的聚类中心&& &public static class KmeansReducer extends MapReduceBase implements&& &&& &&& &Reducer&Text, Text, Text, Text& {&& &&& &&& &&& &private Text tKey = new Text();&& &&& &private Text tValue = new Text();&& &&& &&& &&& &@Override&& &&& &public void reduce(Text key, Iterator&Text& value,&& &&& &&& &&& &OutputCollector&Text, Text& output, Reporter arg3)&& &&& &&& &&& &throws IOException {&& &&& &&& &double avgX=0;&& &&& &&& &double avgY=0;&& &&& &&& &double sumX=0;&& &&& &&& &double sumY=0;&& &&& &&& &int count=0;&& &&& &&& &String [] strValue =&& &&& &&& &&& &&& &&& &while(value.hasNext()){&& &&& &&& &&& &count++;&& &&& &&& &&& &strValue = value.next().toString().split(&\t&);&& &&& &&& &&& &sumX = sumX + Integer.parseInt(strValue[1]);&& &&& &&& &&& &sumY = sumY + Integer.parseInt(strValue[1]);&& &&& &&& &}&& &&& &&& &&& &&& &&& &avgX = sumX/&& &&& &&& &avgY = sumY/&& &&& &&& &tKey.set(&K&+key.toString().substring(1,2));&& &&& &&& &tValue.set(avgX + &\t& + avgY);&& &&& &&& &System.out.println(&K&+key.toString().substring(1,2)+&\t&+avgX + &\t& + avgY);&& &&& &&& &output.collect(tKey, tValue);&& &&& &}&& &}&& &&& &@Override&&& public int run(String[] args) throws Exception {&& &&& &JobConf conf = new JobConf(getConf(), Kmeans.class);&&&&&&& conf.setJobName(&Kmeans&);&&&&&&& //conf.setNumMapTasks(200);&&&&&&& // 设置Map输出的key和value的类型&&&&&&& conf.setMapOutputKeyClass(Text.class);&&&&&&& conf.setMapOutputValueClass(Text.class);&&&&&&& // 设置Reduce输出的key和value的类型&&&&&&& conf.setOutputKeyClass(Text.class);&&&&&&& conf.setOutputValueClass(Text.class);&&&&&&& // 设置Mapper和Reducer&&&&&&& conf.setMapperClass(KmeansMapper.class);&&&&&&& conf.setReducerClass(KmeansReducer.class);&&&&&& &&&&&&&& conf.setInputFormat(TextInputFormat.class);&&&&&&& conf.setOutputFormat(TextOutputFormat.class);&&&&&&& // 设置输入输出目录&&&&&&& FileInputFormat.setInputPaths(conf, new Path(args[0]));&&&&&&& FileOutputFormat.setOutputPath(conf, new Path(args[1]));&&&&&&& JobClient.runJob(conf);&&&&&&& return 0;&&& }&&& public static void main(String[] args) throws Exception {&&&&&&& int exitCode = ToolRunner.run(new Kmeans(), args);&&&&&&& System.exit(exitCode);&&& }}
&&最后修改于
请各位遵纪守法并注意语言文明

我要回帖

更多关于 hadoop实现kmeans 的文章

 

随机推荐