云计算云商之家和云数贸关系???什么关系

1188人阅读
数据挖掘(5)
mahout源码(6)
Canopy算法的实现在包org.apache.mahout.clustering.canopy中。
一、算法基本思想
算法基本思想如下:
(1)、将数据集向量化得到一个list后放入内存,选择两个距离阈值:T1和T2,其中T1 & T2,T1和T2的值可以用交叉校验来确定;
(2)、从list中任取一点P,用低计算成本方法快速计算点P与所有Canopy之间的距离(如果当前不存在Canopy,则把点P作为一个Canopy),如果点P与某个Canopy距离在T1以内,则将点P加入到这个Canopy;
(3)、如果点P曾经与某个Canopy的距离在T2以内,则需要把点P从list中删除(不过在在新的mahout采用的 不加入新的Collection 这样后面处理的时候就不包含点P),这一步是认为点P此时与这个Canopy已经够近了,因此它不可以再做其它Canopy的中心了;
(4)、重复步骤2、3,直到list为空结束。
算法思想参考博客
二、源码分析
下面我们来看一下这个包里面的类。
CanopyConfigKeys
CanopyConfigKeys类主要定义了Canopy算法中需要使用的一些配置参数的名称。具体每个参数的作用如下:
String T1_KEY = &org.apache.mahout.clustering.canopy.t1&;
String T2_KEY = &org.apache.mahout.clustering.canopy.t2&;
String T3_KEY = &org.apache.mahout.clustering.canopy.t3&;
String T4_KEY = &org.apache.mahout.clustering.canopy.t4&;
T1和T2为算法设定的阈值,具体参见前面的算法思想,T3和T4也是,不同之处在与,在单机跑canopy算法时mahout只使用了T1和T2,在运行Map-Reduce版本的Canopys算法时,Map阶段使用的T1和T2,Reduce阶段使用的T3和T4,也就是说Map和Reduce阶段的阈值可以不一样。
String CANOPY_PATH_KEY = &org.apache.mahout.clustering.canopy.path&;//这个好像没有见到怎么用的
String DISTANCE_MEASURE_KEY = &org.apache.mahout.clustering.canopy.measure&;//canopy算法需要计算每个向量与canopy的距离,使用的就是这个
String CF_KEY = &org.apache.mahout.clustering.canopy.canopyFilter&;//这个是一个阈值,当一个canopy里面的样本数目大于这个阈值的时候才算作一个canopy
Canopy类继承了类DistanceMeasureCluster,没有做什么事情,真正实现canopy算法的是类CanopyClusterer。
2.3CanopyClusterer
CanopyClusterer的构造函数需要传入一个距离计算类和两个double类型的参数。
public CanopyClusterer(DistanceMeasure measure, double t1, double t2) {
this.t1 = t1;
this.t2 = t2;
this.t3 = t1;
this.t4 = t2;
this.measure =
我们也可以通过传入一个Configuration类型的变量来构造一个CanopyClusterer,其中Configuration是hadoop中的类,我们可以在这个Configuration对象中设定距离计算类,T1,T2,T3和T4等参数,
CanopyClusterer的构造函数会利用Configuration初始化类的属性。
public CanopyClusterer(Configuration config) {
this.configure(config);
} public void configure(Configuration configuration) {
// 初始化距离计算公式
measure = ClassUtils.instantiateAs(
configuration.get(CanopyConfigKeys.DISTANCE_MEASURE_KEY),
DistanceMeasure.class);
measure.configure(configuration);
t1 = Double.parseDouble(configuration.get(CanopyConfigKeys.T1_KEY));
t2 = Double.parseDouble(configuration.get(CanopyConfigKeys.T2_KEY));
String d = configuration.get(CanopyConfigKeys.T3_KEY);
if (d != null) {
t3 = Double.parseDouble(d);
d = configuration.get(CanopyConfigKeys.T4_KEY);
if (d != null) {
t4 = Double.parseDouble(d);
nextCanopyId = 0;
我们来看看怎么利用向量创建canopy以及怎么将一个向量添加到已有的canopy里面,这涉及到两个函数createCanopies和addPointToCanopies,我们分别来解析。
创建canopy的步骤是:
1.从原始的List中取出一个向量用来创建一个canopy,暂时记作canopy-A,将向量从原始的list中删除并将canopy-A加入我们创建的Canopy的List中,此List需要作为最后的输出。
2..利用距离计算类计算List中其他向量B与此向量A的距离,当距离小于T1时,将向量B添加到canopy-A中,当距离小于T2时将向量B从List中删除。
public static List&Canopy& createCanopies(List&Vector& points,
DistanceMeasure measure, double t1, double t2) {
List&Canopy& canopies = Lists.newArrayList();
int nextCanopyId = 0;
while (!points.isEmpty()) {
// 从points取出第一个元素并将其在List中删除
Iterator&Vector& ptIter = points.iterator();
Vector p1 = ptIter.next();
ptIter.remove();
// 创建一个canopy,将器加入List&Canopy&
Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
canopies.add(canopy);
// 计算points中其他的向量与此canopy的距离,当距离小于T1时,将器加入这个canopy,当距离小于t2时,将其删除,以免其他的canopy将其添加进去
while (ptIter.hasNext()) {
Vector p2 = ptIter.next();
double dist = measure.distance(p1, p2);
// Put all points that are within distance threshold T1 into the
if (dist & t1) {
canopy.observe(p2);
// Remove from the list all points that are within distance
// threshold T2
if (dist & t2) {
ptIter.remove();
// 更新每个canopy的S0,S1,S2
for (Canopy c : canopies) {
c.computeParameters();
添加向量到canopy的步骤:遍历canopy的List,计算向量与canopy中心的距离,当距离小于T1时,将此向量加入此canopy。当距离大于T2时就新建一个canopy。
public void addPointToCanopies(Vector point, Collection&Canopy& canopies) {
boolean pointStronglyBound =
for (Canopy canopy : canopies) {
// 遍历所有的Canopy,当向量与Canopy的距离小于T1时,将向量加入这个Canopy
double dist = measure.distance(canopy.getCenter()
.getLengthSquared(), canopy.getCenter(), point);
if (dist & t1) {
if (log.isDebugEnabled()) {
log.debug(&Added point: {} to canopy: {}&,
AbstractCluster.formatVector(point, null),
canopy.getIdentifier());
canopy.observe(point);
pointStronglyBound = pointStronglyBound || dist & t2;
// 当canopies为空时将新建canopy,这个用的真奇妙!!
if (!pointStronglyBound) {
// 当距离大于T2时,新建一个canopy
if (log.isDebugEnabled()) {
log.debug(&Created new Canopy:{} at center:{}&, nextCanopyId,
AbstractCluster.formatVector(point, null));
canopies.add(new Canopy(point, nextCanopyId++, measure));
2.4 CanopyDriver
CanopyDriver类继承AbstractJob,AbstractJob又实现了Tool接口,所以运行的时候是运行其重载的run函数,即函数public int run(String[] args)。真正执行的run函数如下:
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, double t1, double t2, double t3,
double t4, int clusterFilter, boolean runClustering,
double clusterClassificationThreshold, boolean runSequential)
throws IOException, InterruptedException, ClassNotFoundException {
// 先将每个簇的编号和其聚类中心计算出来,如果runClustering为真就将输入的每个向量的归属簇计算出来
Path clustersOut = buildClusters(conf, input, output, measure, t1, t2,
t3, t4, clusterFilter, runSequential);
if (runClustering) {
clusterData(conf, input, clustersOut, output,
clusterClassificationThreshold, runSequential);
此函数第一步是将每个簇的聚类中心计算出来,第二部是将原始样本的归属簇计算出来。
2.4.1簇的聚类中心计算
我们先讲第一步,跟踪函数buildClusters,进入其函数体
public static Path buildClusters(Configuration conf, Path input,
Path output, DistanceMeasure measure, double t1, double t2,
double t3, double t4, int clusterFilter, boolean runSequential)
throws IOException, InterruptedException, ClassNotFoundException {
(&Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}&,
new Object[] { input, output, measure, t1, t2 });
// 查看是否使用Map-Reduce进行聚类,单机时只有T1,T2,Map-Reduce还有T3,T4
if (runSequential) {
return buildClustersSeq(input, output, measure, t1, t2,
clusterFilter);
return buildClustersMR(conf, input, output, measure, t1, t2, t3,
t4, clusterFilter);
}我们可以看到我们可以设定是进行单机版的聚类还是运用Map-Reduce进行聚类。
首先我们来看单机版的聚类:
private static Path buildClustersSeq(Path input, Path output,
DistanceMeasure measure, double t1, double t2, int clusterFilter)
throws IOException {
CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
Collection&Canopy& canopies = Lists.newArrayList();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(input.toUri(), conf);
// 将每个向量都添加到canopies中
for (VectorWritable vw : new SequenceFileDirValueIterable&VectorWritable&(
input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
clusterer.addPointToCanopies(vw.get(), canopies);
Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0'
+ Cluster.FINAL_ITERATION_SUFFIX);
Path path = new Path(canopyOutputDir, &part-r-00000&);
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
Text.class, ClusterWritable.class);
ClusterWritable clusterWritable = new ClusterWritable();
for (Canopy canopy : canopies) {
puteParameters();
if (log.isDebugEnabled()) {
log.debug(
&Writing Canopy:{} center:{} numPoints:{} radius:{}&,
new Object[] {
canopy.getIdentifier(),
AbstractCluster.formatVector(
canopy.getCenter(), null),
canopy.getNumObservations(),
AbstractCluster.formatVector(
canopy.getRadius(), null) });
// 当canopy里面的样本个数大于阈值时,将其输出到文件
if (canopy.getNumObservations() & clusterFilter) {
clusterWritable.setValue(canopy);
writer.append(new Text(canopy.getIdentifier()),
clusterWritable);
} finally {
Closeables.closeQuietly(writer);
return canopyOutputD
}我们可以看到单机版的聚类主要包含以下两个步骤:
1.利用我们前面讲的addPointToCanopies方法将向量添加到clusterer中。其中序列化文件存的是向量。
2.对于每一个canopy,先更新里面的参数,也就是簇里面的S0,S1和S2。对于簇里面样本个数大于阈值的就输出。
注意:输出的SequenceFile文件中key为每个簇的id,value为将canopy封装了的一个ClusterWritable对象。
看完单机版,我们就来看Map-Reduce版本的,我们可以看到,里面的阈值参数有4个,分别是T1,T2,T3和T4。
private static Path buildClustersMR(Configuration conf, Path input,
Path output, DistanceMeasure measure, double t1, double t2,
double t3, double t4, int clusterFilter) throws IOException,
InterruptedException, ClassNotFoundException {
conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
.getName());
conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
conf.set(CanopyConfigKeys.CF_KEY, String.valueOf(clusterFilter));
Job job = new Job(conf,
&Canopy Driver running buildClusters over input: & + input);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(CanopyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setReducerClass(CanopyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ClusterWritable.class);
job.setNumReduceTasks(1);
job.setJarByClass(CanopyDriver.class);
FileInputFormat.addInputPath(job, input);
Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0'
+ Cluster.FINAL_ITERATION_SUFFIX);
FileOutputFormat.setOutputPath(job, canopyOutputDir);
if (!job.waitForCompletion(true)) {
throw new InterruptedException(&Canopy Job failed processing &
return canopyOutputD
} 我们主要来看看里面的map和reduce函数。
CanopyMapper类
1.在其setup阶段,通过context对象内部的Configuration构造了一个CanopyClusterer,并设定了簇内部个数的阈值,簇里面元素个数大于这个值的簇才输出。
2.在map阶段,只是将样本向量添加到CanopyClusterer中。
3.在cleanup阶段,只是简单的将样本向量添加到CanopyClusterer中,在cleanup阶段才更新每个canopy的参数,并将簇样本个数大于阈值的簇输出。
注意map输出的key和value,key为统一的Text类型的centroid,value为每个簇的中心。
CanopyReducer类:
1.setup阶段,和map几乎一样,只是使用了T3和T4.
2.reduce阶段,因为map阶段输出的key都一样,而且我们在任务设定的时候,指定的reduce个数为1(参见前面Map-Reduce),所以,所有的数据都会传递给唯一的一个reduce。首先将所有的向量添加到CanopyClusterer中,再来更新每个canopy,再将样本数目大于阈值的簇输出。
注意:reduce的输出,其中key为每个簇的id,value为将canopy封装了的一个ClusterWritable对象。
说到这里,我们可以将map阶段看作一个归并,其将样本合并成一个簇,只输出这个簇中心,reduce阶段是将这些簇的中心当作样本又进行了一个归并,得到最后的结果。
2.4.2归属簇计算
归属簇计算是通过函数clusterData计算的,其中分两步,1.将聚类策略写入文件。2,计算向量归属簇。我们重点介绍第二步,第一步只是简单的将CanopyClusteringPolicy写入文件,主要就是那个参数T1,T2。
private static void clusterData(Configuration conf, Path points,
Path canopies, Path output, double clusterClassificationThreshold,
boolean runSequential) throws IOException, InterruptedException,
ClassNotFoundException {
// 将聚类策略写入文件
ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies);
// 开始计算向量归属的簇
ClusterClassificationDriver.run(points, output, new Path(output,
CLUSTERED_POINTS_DIRECTORY), clusterClassificationThreshold,
true, runSequential);
} 计算归属簇的时候,又可以使用单机版本或者Map-Reduce版本。具体见ClusterClassificationDriver类的run方法。
public static void run(Path input, Path clusteringOutputPath, Path output,
Double clusterClassificationThreshold, boolean emitMostLikely,
boolean runSequential) throws IOException, InterruptedException,
ClassNotFoundException {
Configuration conf = new Configuration();
// 是否采用Map-Reduce
if (runSequential) {
classifyClusterSeq(conf, input, clusteringOutputPath, output,
clusterClassificationThreshold, emitMostLikely);
classifyClusterMR(conf, input, clusteringOutputPath, output,
clusterClassificationThreshold, emitMostLikely);
先看单机版本,先将文件中的聚类策略ClusteringPolicy反序列化,构建一个ClusterClassifier对象。
private static void classifyClusterSeq(Configuration conf, Path input,
Path clusters, Path output, Double clusterClassificationThreshold,
boolean emitMostLikely) throws IOException {
List&Cluster& clusterModels = populateClusterModels(clusters, conf);
// 读取聚类策略
ClusteringPolicy policy = ClusterClassifier
.readPolicy(finalClustersPath(conf, clusters));
ClusterClassifier clusterClassifier = new ClusterClassifier(
clusterModels, policy);
selectCluster(input, clusterModels, clusterClassifier, output,
clusterClassificationThreshold, emitMostLikely);
}在函数selectCluster中,遍历每个原始向量,
1.计算ClusterClassifier中的每个Cluster对与此向量的分类结果,结果为一个向量,向量的维度与ClusterClassifier中Cluster个数一样。
2.计算pdfPerCluster中最大值与我们先前设定的阈值的关系,如果最大值都小于阈值,就不能将此向量分到任何一个簇中。
3.classifyAndWrite中将向量分类并写入文件。在此有两种方式,一种,只写pdfPerCluster中最大值的簇,第二种将pdfPerCluster中大于阈值的簇都写如文件。这里就是一个样本能分到一个簇还是多个簇的问题。
private static void selectCluster(Path input, List&Cluster& clusterModels,
ClusterClassifier clusterClassifier, Path output,
Double clusterClassificationThreshold, boolean emitMostLikely)
throws IOException {
Configuration conf = new Configuration();
SequenceFile.Writer writer = new SequenceFile.Writer(
input.getFileSystem(conf), conf,
new Path(output, &part-m-& + 0), IntWritable.class,
WeightedVectorWritable.class);
for (VectorWritable vw : new SequenceFileDirValueIterable&VectorWritable&(
input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
// 给向量进行分类
Vector pdfPerCluster = clusterClassifier.classify(vw.get());
if (shouldClassify(pdfPerCluster, clusterClassificationThreshold)) {
classifyAndWrite(clusterModels, clusterClassificationThreshold,
emitMostLikely, writer, vw, pdfPerCluster);
writer.close();
在Map-Reduce版本中,我们就看看其map类,因为其没有设定reduce。
private static void classifyClusterMR(Configuration conf, Path input,
Path clustersIn, Path output,
Double clusterClassificationThreshold, boolean emitMostLikely)
throws IOException, InterruptedException, ClassNotFoundException {
conf.setFloat(OUTLIER_REMOVAL_THRESHOLD,
clusterClassificationThreshold.floatValue());
conf.setBoolean(EMIT_MOST_LIKELY, emitMostLikely);
conf.set(CLUSTERS_IN, clustersIn.toUri().toString());
Job job = new Job(conf,
&Cluster Classification Driver running over input: & + input);
job.setJarByClass(ClusterClassificationDriver.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(ClusterClassificationMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(WeightedVectorWritable.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
throw new InterruptedException(
&Cluster Classification Driver Job failed processing &
在类的map阶段跟在单机版一样,也是先计算pdfPerCluster,再写入文件。具体参见前面
protected void map(WritableComparable&?& key, VectorWritable vw,
Context context) throws IOException, InterruptedException {
if (!clusterModels.isEmpty()) {
Vector pdfPerCluster = clusterClassifier.classify(vw.get());
if (shouldClassify(pdfPerCluster)) {
if (emitMostLikely) {
int maxValueIndex = pdfPerCluster.maxValueIndex();
write(vw, context, maxValueIndex);
writeAllAboveThreshold(vw, context, pdfPerCluster);
canopy算法就这么多吧,后面将把mahout的所有源码讲一次。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:6185次
排名:千里之外1205人阅读
mahout(1)
转自:/vivounicorn/archive//2186483.html
&&&&& 聚类是机器学习里很重要的一类方法,基本原则是将“性质相似”(这里就有相似的标准问题,比如是基于概率分布模型的相似性又或是基于距离的相似性)的对象尽可能的放在一个Cluster中而不同Cluster中对象尽可能不相似。对聚类算法而言,有三座大山需要爬过去:(1)、a large number of clusters,(2)、a high feature dimensionality,(3)、a
large number of data points。在这三种情况下,尤其是三种情况都存在时,聚类的计算代价是非常高的,有时候聚类都无法进行下去,于是出现一种简单而又有效地方法:Canopy Method,说简单是因为它不用什么高深的理论或推导就可以理解,说有效是因为它的实际表现确实可圈可点。
一、基本思想
&&&& 1、基于Canopy Method的聚类算法将聚类过程分为两个阶段
&&&&&&Stage1、聚类最耗费计算的地方是计算对象相似性的时候,Canopy Method在第一阶段选择简单、计算代价较低的方法计算对象相似性,将相似的对象放在一个子集中,这个子集被叫做Canopy ,通过一系列计算得到若干Canopy,Canopy之间可以是重叠的,但不会存在某个对象不属于任何Canopy的情况,可以把这一阶段看做数据预处理;
&&&&&&Stage2、在各个Canopy&内使用传统的聚类方法(如K-means),不属于同一Canopy 的对象之间不进行相似性计算。
&&&&& 从这个方法起码可以看出两点好处:首先,Canopy 不要太大且Canopy 之间重叠的不要太多的话会大大减少后续需要计算相似性的对象的个数;其次,类似于K-means这样的聚类方法是需要人为指出K的值的,通过Stage1得到的Canopy 个数完全可以作为这个K值,一定程度上减少了选择K的盲目性。
&&&& 2、聚类精度
&&&&& 对传统聚类来说,例如K-means、Expectation-Maximization、Greedy Agglomerative Clustering,某个对象与Cluster的相似性是该点到Cluster中心的距离,那么聚类精度能够被很好保证的条件是:
&&&&& 对于每个Cluster都存在一个Canopy,它包含所有属于这个Cluster的元素。
&&&&& 如果这种相似性的度量为当前点与某个Cluster中离的最近的点的距离,那么聚类精度能够被很好保证的条件是:
&&&&& 对于每个Cluster都存在若干个Canopy,这些Canopy之间由Cluster中的元素连接(重叠的部分包含Cluster中的元素)。
&&&&& 数据集的Canopy划分完成后,类似于下图:
二、单机生成Canopy的算法
&&&&& (1)、将数据集向量化得到一个list后放入内存,选择两个距离阈值:T1和T2,其中T1 & T2,对应上图,实线圈为T1,虚线圈为T2,T1和T2的值可以用交叉校验来确定;
&&&&& (2)、从list中任取一点P,用低计算成本方法快速计算点P与所有Canopy之间的距离(如果当前不存在Canopy,则把点P作为一个Canopy),如果点P与某个Canopy距离在T1以内,则将点P加入到这个Canopy;
&&&&& (3)、如果点P曾经与某个Canopy的距离在T2以内,则需要把点P从list中删除,这一步是认为点P此时与这个Canopy已经够近了,因此它不可以再做其它Canopy的中心了;
&&&&& (4)、重复步骤2、3,直到list为空结束。
三、并行策略
&&&&& 并行点是比较明显的,就是生成Canopy的过程可以并行,第一阶段,各个slave可以依据存储在本地的数据,各自在本地用上述算法生成若干Canopy,最后在master机器将这些Canopy用相同算法汇总后得到最终的Canopy集合,第二阶段聚类操作就利用最终的Canopy集合进行。
&&&&& 用map-reduce描述就是:datanode在map阶段,利用上述算法在本地生成若干Canopy,之后通过reduce操作得到最终的Canopy集合。
四、Mahout源码安装
&&&&& 正式使用Mahout之前需要做以下准备工作:
&&&&& 1、在下载最新的Mahout 0.5源码包;
&&&&& 2、安装mvn,可以在终端输入:sudo apt-get install maven2具体方法可以参照:;
&&&&& 3、安装Mahout源码,可以参照这里的方法进行:;
&&&&& 4、打开eclipse,在“Help”菜单下单击“Install New Software...”,在地址栏添加:,之后把复选框勾上,然后一路Next即可。
&&&&& 5、最后在eclipse的“File”菜单单击“Import...”,选择“Existing Maven Projects”,Next后选择Mahout源码所在目录,将感兴趣的项目勾上,最后完成步骤即可。mahout-core、mahout-examples和mahout-math是下一步我们需要的。
五、Mahout的Canopy Clustering
&&&&& mahout实现了一个Canopy Clustering,大致思路与前两节用的方法一样,用了两个map操作和一个reduce操作,首先用一个map和一个reduce生成全局Canopy集合,最后用一个map操作进行聚类。可以在mahout-core下的src/main/java中的package:org.apache.mahout.clustering.canopy中找到相关代码:
1、数据模型
&&&&& Mahout聚类算法将对象以Vector的方式表示,它同时支持dense vector和sparse vector,一共有三种表示方式(它们拥有共同的基类AbstractVector,里面实现了有关Vector的很多操作):
&&&&& (1)、DenseVector
&&&&& 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它实现的时候用一个double数组表示Vector(private double[] values), 对于dense data可以使用它;
&&&&& (2)、RandomAccessSparseVector
&&&&& 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它用来表示一个可以随机访问的sparse vector,只存储非零元素,数据的存储采用hash映射:OpenIntDoubleHashM
&&&&& 关于OpenIntDoubleHashMap,其key为int类型,value为double类型,解决冲突的方法是double hashing,可能是我获取的源码问题,没有在0.5中找到它的source code,可以从中查看0.3中代码和较详细注释;
&&&&& (3)、SequentialAccessSparseVector
&&&&& 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它用来表示一个顺序访问的sparse vector,同样只存储非零元素,数据的存储采用顺序映射:OrderedIntDoubleM
&&&&& 关于OrderedIntDoubleMapping,其key为int类型,value为double类型,存储的方式让我想起了Libsvm数据表示的形式:非零元素索引:非零元素的值,这里用一个int数组存储indices,用double数组存储非零元素,要想读写某个元素,需要在indices中查找offset,由于indices应该是有序的,所以查找操作用的是二分法。
2、如何抽象Canopy?
&&&&& 可以从Canopy.java文件及其父类中找到答案,Mahout在实现时候还是很巧妙的,一个Canopy包含的字段信息主要有:
&&&&& 1)、 #Canopy的id
&&&&& 2)、private long numP #Canopy中包含点的个数,这里的点都是Vector
&&&&& 3)、private V #Canopy的重心
&&&&& 4)、private Vector R #Canopy的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,它的计算依赖下面要说的s0、s1和s2。
&&&&& 它并不会真的去用一个list去存储其包含的点,因为将来的计算并不关心这些点是什么,而是与由这些点得到的三个值有关,这里用三个变量来表示:
&&&&& 5)、private double s0; #表示Canopy包含点的权重之和,
&&&&&&6)、private Vector s1; #表示各点的加权和,
&&&&&&7)、private Vector s2; #表示各点平方的加权和,
&&&&& 以下是它的核心操作:
&&&&& 8)、public void computeParameters(); #根据s0、s1、s2计算numPoints、center和Radius,其中numPoints=(int)s0,center=s1/s0,Radius=sqrt(s2*s0-s1*s1)/s0,简单点来,假设所有点权重都是1,那么:
&&&&&&&&&&&&&&&&&&&&&&&&&&,其中
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&,其中
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&& 9)、public void observe(VectorWritable x, double weight); #每当有一个新的点加入当前Canopy时都需要更新s0、s1、s2的值,这个比较简单。
3、Canopy Clustering的Map-Reduce实现
&&&&& Canopy Clustering的实现包含单机版和MR两个版本,单机版就不多说了,MR版用了两个map操作和一个reduce操作,当然是通过两个不同的job实现的,map和reduce阶段执行顺序是:CanopyMapper –& CanopyReducer –& ClusterMapper,我想对照下面这幅图来理解:
&&&&& (1)、首先是InputFormat,这是从HDFS读取文件后第一个要考虑的问题,mahout中提供了三种方式,都继承于FileInputFormat&K,V&:
Description
TextInputFormat
D reads lines of text files (默认格式,按行读取文件且不进行解析操作,基于行的文件比较有效)
The byte offset of the line(行的字节偏移量)
The line contents (整个行的内容)
KeyValueInputFormat
Parses lines into key, val pairs (同样是按照行读取,但会搜寻第一个tab字符,把行拆分为(Key,Value) pair)
Everything up to the first tab character(第一个tab字符前的所有字符)
The remainder of the line (该行剩下的内容)
SequenceFileInputFormat
A Hadoop-specific high-performance binary format (Hadoop定义的高性能二进制格式)
user-defined (用户自定义)
user-defined (用户自定义)
&&&&& 在这里,由于使用了很多自定义的类型,如:表示vector的VectorWritable类型,表示canopy的canopy类型,且需要进行高效的数据处理,所以输入输出文件选择SequenceFileInputFormat格式。由job对象的setInputFormatClass方法来设置,如:job.setInputFormatClass(SequenceFileInputFormat.class),一般在执行聚类算法前需要调用一个job专门处理原始文件为合适的格式,比如用InputDriver,这点后面再说。
&&&&& (2)、Split
&&&&& 一个Split块为一个map任务提供输入数据,它是InputSplit类型的,默认情况下hadoop会把文件以64MB为基数拆分为若干Block,这些Block分散在各个节点上,于是一个文件就可以被多个map并行的处理,也就是说InputSplit定义了文件是被如何切分的。
&&&&& (3)、RR
&&&&& RecordReader类把由Split传来的数据加载后转换为适合mapper读取的(Key,Value) pair,RecordReader实例是由InputFormat决定,RR被反复调用直到Split数据处理完,RR被调用后接着就会调用Mapper的map()方法。
&&&&& “RecordReader实例是由InputFormat决定”这句话怎么理解呢?比如,在Canopy Clustering中,使用的是SequenceFileInputFormat,它会提供一个 SequenceFileRecordReader类型,利用SequenceFile.Reader将Key和Value读取出来,这里Key和Value的类型对应Mapper的map函数的Key和Value的类型,Sequence File的存储根据不同压缩策略分为:NONE:不压缩、RECORD:仅压缩每一个record中的value值、BLOCK:将一个block中的所有records压缩在一起,有以下存储格式:
Uncompressed SequenceFile&
Record length&
Key length&
A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile&
Record length&
Key length&
Compressed Value&
A sync-marker every few 100 bytes or so.
Block-Compressed SequenceFile Format&
Record Block
Compressed key-lengths block-size&
Compressed key-lengths block&
Compressed keys block-size&
Compressed keys block&
Compressed value-lengths block-size&
Compressed value-lengths block&
Compressed values block-size&
Compressed values block&
A sync-marker every few 100 bytes or so.
具体可参见:&&&&&
&&&&& (4)、CanopyMapper
1: class CanopyMapper extends Mapper&WritableComparable&?&, VectorWritable, Text, VectorWritable& {
private final Collection&Canopy& canopies = new ArrayList&Canopy&();
private CanopyClusterer canopyC
protected void map(WritableComparable&?& key, VectorWritable point, Context context)
throws IOException, InterruptedException {
canopyClusterer.addPointToCanopies(point.get(), canopies);
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
canopyClusterer = new CanopyClusterer(context.getConfiguration());
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Canopy canopy : canopies) {
context.write(new Text(&centroid&), new puteCentroid()));
super.cleanup(context);
&&&&& CanopyMapper类里面定义了一个Canopy集合,用来存储通过map操作得到的本地Canopy。
&&&&& setup方法在map操作执行前进行必要的初始化工作;
&&&&& 它的map操作很直白,就是将传来的(Key,Value) pair(以后就叫“点”吧,少写几个字)按照某种策略加入到某个Canopy中,这个策略在CanopyClusterer类里说明;&&&&&
&&&&& 在map操作执行完后,调用cleanup操作,将中间结果写入上下文,注意这里的Key是一个固定的字符串“centroid”,将来reduce操作接收到的数据就只有这个Key,写入的value是所有Canopy的中心点(是个Vector哦)。
&&&&& (5)、Combiner
&&&&& 可以看做是一个local的reduce操作,接受前面map的结果,处理完后发出结果,可以使用reduce类或者自己定义新类,这里的汇总操作有时候是很有意义的,因为它们都是在本地执行,最后发送出得数据量比直接发出map结果的要小,减少网络带宽的占用,对将来shuffle操作也有益。在Canopy Clustering中不需要这个操作。
&&&&& (6)、Partitioner & Shuffle
&&&&& 当有多个reducer的时候,partitioner决定由mapper或combiner传来的(Key,Value) Pair会被发送给哪个reducer,接着Shuffle操作会把所有从相同或不同mapper或combiner传来的(Key,Value) Pair按照Key进行分组,相同Key值的点会被放在同一个reducer中,我觉得如何提高Shuffle的效率是hadoop可以改进的地方。在Canopy Clustering中,因为map后的数据只有一个Key值,也就没必要有多个reducer了,也就不用partition了。关于Partitioner可以参考:
&&&&& (7)、CanopyReducer
1: public class CanopyReducer extends Reducer&Text, VectorWritable, Text, Canopy& {
private final Collection&Canopy& canopies = new ArrayList&Canopy&();
private CanopyClusterer canopyC
CanopyClusterer getCanopyClusterer() {
return canopyC
protected void reduce(Text arg0, Iterable&VectorWritable& values,
Context context) throws IOException, InterruptedException {
for (VectorWritable value : values) {
Vector point = value.get();
canopyClusterer.addPointToCanopies(point, canopies);
for (Canopy canopy : canopies) {
puteParameters();
context.write(new Text(canopy.getIdentifier()), canopy);
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
canopyClusterer = new CanopyClusterer(context.getConfiguration());
canopyClusterer.useT3T4();
&&&&& CanopyReducer 类里面同样定义了一个Canopy集合,用来存储全局Canopy。
&&&&& setup方法在reduce操作执行前进行必要的初始化工作,这里与mapper不同的地方是可以对阈值T1、T2(T1&T2)重新设置(这里用T3、T4表示),也就是说map阶段的阈值可以与reduce阶段的不同;
&&&&& reduce操作用于map操作一样的策略将局部Canopy的中心点做重新划分,最后更新各个全局Canopy的numPoints、center、radius的信息,将(Canopy标示符,Canopy对象) Pair写入上下文中。
&&&&& (8)、OutputFormat
&&&&& 它与InputFormat类似,Hadoop会利用OutputFormat的实例把文件写在本地磁盘或HDFS上,它们都是继承自FileOutputFormat类。各个reducer会把结果写在HDFS某个目录下的单独的文件内,命名规则是part-r-xxxxx,这个是依据hadoop自动命名的,此外还会在同一目录下生成一个_SUCCESS文件,输出文件夹用FileOutputFormat.setOutputPath() 设置。&
&&&&& 到此为止构建Canopy的job结束。即CanopyMapper –& CanopyReducer 阶段结束。
&&&&& (9)、ClusterMapper
&&&&& 最后聚类阶段比较简单,只有一个map操作,以上一阶段输出的Sequence File为输入,setup方法做一些初始化工作并从上一阶段输出目录读取文件,重建Canopy集合信息并存储在一个Canopy集合中,map操作就调用CanopyClusterer的emitPointToClosestCanopy方法实现聚类,将最终结果输出到一个Sequence File中。
&&&&& (10)、CanopyClusterer
&&&&& 这个类是实现Canopy算法的核心,其中:
&&&&& 1)、addPointToCanopies方法用来决定当前点应该加入到哪个Canopy中,在CanopyMapper和CanopyReducer&中用到,流程如下:
&&&&& 2)、emitPointToClosestCanopy方法查找与当前点距离最近的Canopy,并将(Canopy的标示符,当前点Vector表示)输出,这个方法在聚类阶段ClusterMapper中用到。
&&&&& 3)、createCanopies方法用于单机生成Canopy,算法一样,实现也较简单,就不多说了。
&&&&& (11)、CanopyDriver
&&&&& 一般都会定义这么一个driver,用来定义和配置job,组织job执行,同时提供单机版和MR版。job执行顺序是:buildClusters –& clusterData。
&&&&& CanopyMapper的输入需要是(WritableComparable&?&, VectorWritable) Pair,因此,一般情况下,需要对数据集进行处理以得到相应的格式,比如,在源码的/mahout-examples目录下的package org.apache.mahout.clustering.syntheticcontrol.canopy中有个Job.java文件提供了对Canopy Clustering的一个版本:
1: private static void run(Path input, Path output, DistanceMeasure measure,
double t1, double t2) throws IOException, InterruptedException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
InputDriver.runJob(input, directoryContainingConvertedInput,
&org.apache.mahout.math.RandomAccessSparseVector&);
CanopyDriver.run(new Configuration(), directoryContainingConvertedInput,
output, measure, t1, t2, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
&clusters-0&), new Path(output, &clusteredPoints&));
clusterDumper.printClusters(null);
&&&&& 利用InputDriver对数据集进行处理,将(Text, VectorWritable) Pair 以sequence file形式存储,供CanopyDriver使用。InputDriver中的作业配置如下:
1: public static void runJob(Path input, Path output, String vectorClassName)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set(&vector.implementation.class.name&, vectorClassName);
Job job = new Job(conf, &Input Driver running over input: & + input);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VectorWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(InputMapper.class);
job.setNumReduceTasks(0);
job.setJarByClass(InputDriver.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
5、实例说明
&&&&& 可以用源码生成相关Jar文件,例如:
&&&&& (1)、准备若干数据集data,要求不同feature之间用空格隔开;
&&&&& (2)、在master的终端敲入命令:hadoop namenode –start-all.用于初始化namenode和启动hadoop;
&&&&& (3)、在HDFS上建立testdata文件夹,聚类算法会去这个文件夹加载数据集,在终端输入:hadoop dfs –mkdir testdata;
&&&&& (4)、然后将各个datanode上的数据集data上传到HDFS,在终端输入hadoop dfs –put data testdata/
&&&&& (5)、进入mahout的那些Jar文件所在路径,在终端敲入:hadoop jar mahout-examples-0.5-job.jar org.apache.mahout.clustering.syntheticcontrol.canopy.J
&&&&& (6)、在localhost:50030查看作业执行情况,例如:
&&&&& 可以看到,第一个作业由InputDriver发起,输入目录是testdata,一共做了一个map操作但没有做reduce操作,第二个作业由CanopyDriver发起,做了一对mapreduce操作,这里对应Canopy生成过程,最后一个作业也由CanopyDriver发起,做了一个map操作,对应Canopy Clustering过程。
&&&&& (7)、将执行结果抓到本地文件夹,在终端执行:hadoop dfs –get output output,得到目录如下:
其中聚类结果保存在第一个文件夹中,当然,结果是Sequence
File,不能直接双击打开来看。
&&&&& Mahout中对Canopy Clustering的实现是比较巧妙的,整个聚类过程用2个map操作和1个reduce操作就完成了,Canopy构建的过程可以概括为:遍历给定的点集S,设置两个阈值:T1、T2且T1&T2,选择一个点,用低成本算法计算它与其它Canpoy中心的距离,如果距离小于T1则将该点加入那个Canopy,如果距离小于T2则该点不会成为某个Canopy的中心,重复整个过程,直到S为空。
六、参考资料
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:150759次
积分:1873
积分:1873
排名:第16960名
原创:32篇
转载:12篇
评论:35条
(1)(1)(4)(1)(3)(1)(1)(2)(8)(7)(6)(1)(3)(1)(1)(3)

我要回帖

更多关于 云数贸张健和中央领导 的文章

 

随机推荐