spark-submit --class com.ones.soc.cf.KMeansClustering --master yarn --num-executors 3 --driver-memory 5g --executor-memory 4g /root/bigData.jar /ones/mldata/test1 /ones/mldata/test2 8 30 3 /ones/result/12345
##############################################
package com.ones.soc.cf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
/**
* Created by tom
*/
object KMeansClustering {
def main(args: Array[String]) {
if(args.length < 6){
println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes outpath")
sys.exit(1)
}
val confighdfs = new Configuration();
val fs=FileSystem.get(confighdfs) ;
if(args(5) != null && args(5).trim().length > 1){
val output = new Path(args(5));
if(fs.exists(output)){ //删除输出目录
fs.delete(output, true);
}
}
val conf = new SparkConf().setAppName("K-Means")
val sc = new SparkContext(conf)
val rawTrainingData = sc.textFile(args(0))
val parsedTrainingData =
rawTrainingData.filter(!isColumnNameLine(_)).map(line => {
Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
}).cache()
//Cluster the data into two classes using KMeans
val numClusters = args(2).toInt
val numIterations = args(3).toInt
val runTimes = args(4).toInt
var clusterIndex: Int = 0
val clusters: KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations, runTimes)
println("Cluster Number:" + clusters.clusterCenters.length)
println("Cluster Centers Information Overview:")
clusters.clusterCenters.foreach(
x => {
println("Center Point of Cluster " + clusterIndex + ":")
println(x)
clusterIndex += 1
})
//begin to check which cluster each test data belongs to based on the clustering result
val rawTestData = sc.textFile(args(1))
val parsedTestData = rawTestData.map(line => {
Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
})
val sb=new StringBuilder()
parsedTestData.collect().foreach(testDataLine => {
val predictedClusterIndex:
Int = clusters.predict(testDataLine)
println("The data " + testDataLine.toString + " belongs to cluster " +predictedClusterIndex)
sb.append(testDataLine.toString).append("\t").append("belongs to cluster ").append(predictedClusterIndex).append("\r\n")
})
outputHdfs(fs,sb.toString(),args(5))
println("Spark MLlib K-means clustering test finished.")
}
private def isColumnNameLine(line: String): Boolean = {
if (line != null &&
line.contains("Channel")) true
else false
}
def outputHdfs(fs:FileSystem,text:String,textdir:String):Unit={
try{
val fsDataOutputStream = fs.create(new Path(textdir+"/result.txt"), true);
val s=text.getBytes("UTF-8")
fsDataOutputStream.write(s,0,s.length)
fsDataOutputStream.hflush();
}catch{
case e:Exception =>
}
}
}
分享到:
相关推荐
Spark中机器学期(Machine Learning)之KMeans算法完整代码讲解
包含两种平台上运行的kmeans算法:一种是在Hadoop系统上的并行化kmeans算法,支持读文件,执行聚类算法,输出质心文件,将每个数据的聚类信息输出到控制台上;另一种是串行的聚类算法,支持读文件数据,执行kmeans...
KMeans算法是一种常用的无监督学习算法,用于将数据集分成K个簇或类别。并行和分布式的KMeans算法针对大规模数据集提供了高效的实现方式。并行化可以加速算法的计算过程,而分布式实现则可以处理更大规模的数据集。 ...
毕业设计: 基于Spark的Kmeans聚类算法优化
不到一百行的代码教你在spark平台中使用scala实现kmeans算法。简单易懂,大量注释。适合初学者参考理解。本程序在intelliJ IDEA2016.1.1 中编程,运行在spark1.6.1 scala2.10.4本地模式下运行成功。 数据集:(其实...
SparkKmeans 毕业设计源码-基于Spark的Kmeans聚类算法优化时间:2016-07-18内容: 发布内容到Github。 (2)ML聚类程序:利用Spark的机器学习库的聚类函数进行聚类测试。(3)MD聚类程序: (4)数据库操作程序:
毕业设计源码-基于Spark的Kmeans聚类算法优化.zip
Spark下K-Means算法的Scala工程,代码不是特别长。对应的可以参考我的博客。
1、资源内容:毕业设计源码-基于Spark的Kmeans聚类算法优化+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功,功能ok的...
精品--毕业设计源码-基于Spark的Kmeans聚类算法优化
毕业设计&课设--毕业设计源码-基于Spark的Kmeans聚类算法优化
毕业设计 基于Spark的Kmeans聚类算法优化源码+详细文档+全部数据资料 高分项目.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 2、该资源内项目代码都经过测试运行成功,功能...
#资源达人分享计划#