diff --git a/tools/kal-test/README.md b/tools/kal-test/README.md
index 83ff55b07c64a6dedaa9a36bd7c392a6db7462cd..3e0579c7085dd4323411891016f1b40f03a839f0 100644
--- a/tools/kal-test/README.md
+++ b/tools/kal-test/README.md
@@ -13,9 +13,9 @@ The Kunpeng algorithm library test tool can be used to test machine learning and
#### Procedure
1. Go to the Spark-ml-algo-lib/tools/kal-test directory in the compilation environment.
2. Install the dependencies.
- Take spark 2.3.2 as an example, the install command is as follows:
- mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-graph-kernel-client_2.11 -Dversion=2.2.0 -Dclassifier=spark2.3.2 -Dfile=boostkit-graph-kernel-client_2.11-2.2.0-spark2.3.2.jar -Dpackaging=jar -DgeneratePom=true
- mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-ml-kernel-client_2.11 -Dversion=2.2.0 -Dclassifier=spark2.3.2 -Dfile=boostkit-ml-kernel-client_2.11-2.2.0-spark2.3.2.jar -Dpackaging=jar -DgeneratePom=true
+ Take spark 3.3.1 as an example, the install command is as follows:
+ mvn install:install-file -DgroupId=org.apache.spark.graphx.lib -DartifactId=boostkit-graph-kernel-client_2.12 -Dversion=3.0.0 -Dclassifier=spark3.3.1 -Dfile=boostkit-graph-kernel-client_2.12-3.0.0-spark3.3.1.jar -Dpackaging=jar -DgeneratePom=true
+ mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-ml-kernel-client_2.12 -Dversion=3.0.0 -Dclassifier=spark3.3.1 -Dfile=boostkit-ml-kernel-client_2.12-3.0.0-spark3.3.1.jar -Dpackaging=jar -DgeneratePom=true
3. Run the compile command:
mvn clean install
4. View the kal-test_2.11-0.1.jar file generated in Spark-ml-algo-lib/tools/kal-test/target.
diff --git a/tools/kal-test/pom.xml b/tools/kal-test/pom.xml
index f0d483c1edb82b5af0eeac113a541017482cb27f..f35a20ee9fd56cc11e6c1c439a95031a7b6018eb 100644
--- a/tools/kal-test/pom.xml
+++ b/tools/kal-test/pom.xml
@@ -14,7 +14,7 @@
UTF-8
2.12
3.0.0
- 3.1.1
+ 3.3.1
@@ -30,9 +30,19 @@
- com.microsoft.ml.spark
- mmlspark_2.12_spark3.1.2
- 0.0.0+79-09152193
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.12.3
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.12.3
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.12.3
ai.h2o
diff --git a/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala b/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala
deleted file mode 100644
index e90896c0cf8c79e7e1d75278ef4ffd881192bce3..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.compare.graph
-
-import java.io.InputStreamReader
-
-import scala.collection.Map
-import scala.util.Try
-import com.bigdata.utils.Utils
-import com.bigdata.graph.{DeepWalkConfig, DeepWalkParams}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.apache.spark.graphx.VertexId
-import org.apache.spark.ml.linalg.{DenseVector, Vector}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{SparkConf, SparkContext}
-import org.yaml.snakeyaml.representer.Representer
-import smile.math.MathEx.cos
-import smile.validation.AUC
-
-object DeepWalkVerify {
-
- def readEdgeListFromHDFS(
- sc: SparkContext,
- filePath: String,
- split: String,
- partition: Int): RDD[(VertexId, VertexId, Double)] = {
- sc.textFile(filePath, partition)
- .flatMap(line => {
- if (line.startsWith("#")) {
- Iterator.empty
- }
- else {
- val x = line.trim.split(split)
- if (x.length < 2) {
- Iterator.empty
- }
- else {
- var w = x(2).toDouble
- Iterator.single((x(0).toLong, x(1).toLong, w))
- }
- }
- })
- }
-
- def readUndirectEdgeFromHDFS(
- sc: SparkContext,
- filePath: String,
- split: String,
- partition: Int): RDD[(Long, Long)] = {
- sc.textFile(filePath, partition)
- .flatMap(line => {
- if (line.startsWith("#")) {
- Iterator.empty
- } else {
- val x = line.split(split)
- if (x.length < 2) {
- Iterator.empty
- } else {
- val node1 = x(0).toLong
- val node2 = x(1).toLong
- Iterator.single((node1, node2))
- }
- }
- })
- }
-
- def readNode2VecModel(sc: SparkContext, input: String): RDD[(Long, Vector)] = {
- val rdd: RDD[(Long, Vector)] = sc
- .textFile(input)
- .mapPartitions(it => {
- val regexp = "([0-9]+) \\((.*)\\)".r
- it.map { case regexp(u, emb) => (u.toLong, new DenseVector(emb.split(",")
- .map(_.toDouble)): Vector)
- }
- }).cache()
- rdd
- }
-
- def get(modelRDD: RDD[(Long, Vector)]): Map[Long, Vector] = {
- modelRDD.collectAsMap()
- }
-
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName) = (modelConfSplit(0), modelConfSplit(1))
- val graphPath = args(1)
- val negEdgePath = args(2)
- val embeddingPath = args(3)
- val isRaw = args(4)
-
- val representer = new Representer
- representer.addClassTag(classOf[DeepWalkParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/deepwalk/deepwalk.yml")
- val yaml = new Yaml(new Constructor(classOf[DeepWalkConfig]), representer, options)
- val description = new TypeDescription(classOf[DeepWalkParams])
- yaml.addTypeDescription(description)
- val config: DeepWalkConfig = yaml.load(stream).asInstanceOf[DeepWalkConfig]
- val paramsMap =
- config.deepwalk.get(datasetName).get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- })
-
- val params = new DeepWalkParams()
-
- params.setDatasetName(datasetName)
- params.setPartitions(paramsMap.get("partitions").toString.toInt)
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setAlgorithmName("DeepWalk")
- params.setTestcaseType(s"DeepWalk_${datasetName}")
-
- val sparkConf = new SparkConf().setAppName("DeepwalkVerify")
- val sc = SparkContext.getOrCreate(sparkConf)
-
- val edgesRDD = readEdgeListFromHDFS(sc, graphPath, params.getSplitGraph, params.getPartitions)
- val negativeEdgesRDD = readUndirectEdgeFromHDFS(sc, negEdgePath, ",", params.getPartitions)
-
- val nvModel: collection.Map[Long, Vector] = get(readNode2VecModel(sc, embeddingPath))
-
- val nvModelBC = sc.broadcast(nvModel)
- edgesRDD.foreachPartition(_ => nvModelBC.value)
-
- val positiveEdgesScores: Array[Double] = edgesRDD
- .flatMap({ case (src, dst, weight) =>
- Try(Iterator.single(cos(nvModelBC.value(src).toArray, nvModelBC.value(dst).toArray)))
- .getOrElse(Iterator.empty)
- })
- .filter(score => !score.isInfinity && !score.isNaN)
- .collect()
-
- val negativeEdgesScores: Array[Double] = negativeEdgesRDD
- .flatMap({ case (src, dst) =>
- Try(Iterator.single(cos(nvModelBC.value(src).toArray, nvModelBC.value(dst).toArray)))
- .getOrElse(Iterator.empty)
- })
- .filter(score => !score.isInfinity && !score.isNaN)
- .collect()
-
- val truths = Array.fill(positiveEdgesScores.length)(1) ++ Array.fill(negativeEdgesScores.length)(0)
- val auc = AUC.of(truths, (positiveEdgesScores ++ negativeEdgesScores))
- println(s"Link Prediction AUC Score = $auc")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
\ No newline at end of file
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala
deleted file mode 100644
index ad82c115b3d0e24f2ef9dfbbb963a6934c8bc575..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-
-import com.bigdata.utils.Utils
-import org.apache.spark.graphx.lib.{DeepWalk, Parameters}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import scala.beans.BeanProperty
-import java.io.{File, FileWriter, InputStreamReader}
-import java.util
-
-class DeepWalkConfig extends Serializable {
- @BeanProperty var deepwalk: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class DeepWalkParams extends Serializable {
- @BeanProperty var inputPath: String = _
- @BeanProperty var outputPath: String = _
- @BeanProperty var splitGraph: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var partitions: Int = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var walkLength: Int = _
- @BeanProperty var numWalks: Int = _
- @BeanProperty var iteration: Int = _
- @BeanProperty var dimension: Int = _
- @BeanProperty var windowSize: Int = _
- @BeanProperty var negativeSample: Int = _
-}
-
-object DeepWalkRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName) = (modelConfSplit(0), modelConfSplit(1))
- val inputPath = args(1)
- val outputPath = args(2)
- val isRaw = args(3)
-
- val representer = new Representer
- representer.addClassTag(classOf[DeepWalkParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/deepwalk/deepwalk.yml")
- val yaml = new Yaml(new Constructor(classOf[DeepWalkConfig]), representer, options)
- val description = new TypeDescription(classOf[DeepWalkParams])
- yaml.addTypeDescription(description)
- val config: DeepWalkConfig = yaml.load(stream).asInstanceOf[DeepWalkConfig]
-
- val paramsMap =
- config.deepwalk.get(datasetName).get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).asInstanceOf[util.HashMap[String, Object]]
-
- val params = new DeepWalkParams()
-
- params.setDatasetName(datasetName)
- params.setInputPath(inputPath)
- params.setOutputPath(outputPath)
- params.setPartitions(paramsMap.get("partitions").toString.toInt)
- params.setWalkLength(paramsMap.get("walkLength").toString.toInt)
- params.setNumWalks(paramsMap.get("numWalks").toString.toInt)
- params.setNegativeSample(paramsMap.get("negativeSample").toString.toInt)
- params.setIteration(paramsMap.get("iteration").toString.toInt)
- params.setDimension(paramsMap.get("dimension").toString.toInt)
- params.setWindowSize(paramsMap.get("windowSize").toString.toInt)
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setAlgorithmName("DeepWalk")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val sc = new SparkContext(conf)
-
- val startTime = System.currentTimeMillis()
-
- val edgeRDD = Util.readCommFromHDFS(sc, inputPath, params.getSplitGraph, params.getPartitions)
-
- val deepwalkParams = Parameters(params.getWalkLength, params.getNumWalks, params.getIteration, params.getDimension, params.getWindowSize, params.getNegativeSample)
-
- val deepwalkModel = DeepWalk.run(edgeRDD,deepwalkParams)
- Util.saveNode2VecModel(deepwalkModel, params.getOutputPath)
-
- val costTime = (System.currentTimeMillis() - startTime) / 1000.0
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/deepWalk_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala
deleted file mode 100644
index d430981a3048a6f6924b10e4955205dcacae3435..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala
+++ /dev/null
@@ -1,326 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-import com.bigdata.utils.Utils
-
-import org.apache.spark.graphx.lib.{Fraudar, Parameters}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.storage.StorageLevel
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.io.{BufferedWriter, File, FileWriter, InputStreamReader}
-import java.util
-import scala.beans.BeanProperty
-import scala.collection.mutable
-import scala.collection.mutable.Map
-
-class FraudarConfig extends Serializable {
- @BeanProperty var fraudar: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class FraudarParams extends Serializable {
- @BeanProperty var splitGraph: String = _
- @BeanProperty var partitions: Int = _
-
- @BeanProperty var iSetOutPath: String = _
- @BeanProperty var jSetOutPath: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
-}
-
-
-object FraudarRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val iSetOutPath = args(2)
- val jSetOutPath = args(3)
- val representer = new Representer
- representer.addClassTag(classOf[FraudarParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/fraudar/fraudar.yml")
- val yaml = new Yaml(new Constructor(classOf[FraudarConfig]), representer, options)
- val description = new TypeDescription(classOf[FraudarParams])
- yaml.addTypeDescription(description)
- val config: FraudarConfig = yaml.load(stream).asInstanceOf[FraudarConfig]
-
- val params = new FraudarParams()
- val paramsMap =
- config.fraudar.get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).get(datasetName).asInstanceOf[util.HashMap[String, Object]]
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setPartitions(paramsMap.get("partitions").toString.toInt)
- params.setDatasetName(datasetName)
- params.setDataPath(dataPath)
- params.setJSetOutPath(jSetOutPath)
- params.setISetOutPath(iSetOutPath)
- params.setIsRaw(isRaw)
- params.setAlgorithmName("Fraudar")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val costTime = isRaw match {
- case "no" => new FraudarKernel().runOptJob(spark, params)
- case "yes" => new FraudarKernel().runRawJob(spark, params)
- }
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}s")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class FraudarKernel {
- def runOptJob(spark: SparkSession, params: FraudarParams): Double = {
- val sc = spark.sparkContext
- sc.setLogLevel("WARN")
- val startTime = System.currentTimeMillis()
- val bipartGraph = Util.readUndirectDataFromHDFS(sc, params.dataPath, params.splitGraph, params.partitions)
- .map(f => (f._1.toLong, f._2.toLong))
- .persist(StorageLevel.MEMORY_ONLY_SER)
- bipartGraph.foreachPartition(f => {})
-
- val res = Fraudar.runFraudar(bipartGraph)
- res.map(f => f._1).distinct().saveAsTextFile(params.iSetOutPath)
- res.map(f => f._2).distinct().saveAsTextFile(params.jSetOutPath)
-
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
- costTime
- }
-
- def runRawJob(spark: SparkSession, params: FraudarParams): Double = {
- val sc = spark.sparkContext
- sc.setLogLevel("WARN")
- val startTime = System.currentTimeMillis()
- val bipartGraph = Util.readUndirectDataFromHDFS(sc, params.dataPath, params.splitGraph, params.partitions)
- .map(f => (f._1.toLong, f._2.toLong))
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- bipartGraph.foreachPartition(f => {})
-
- val res = runFraudar(sc, bipartGraph)
- outputResult(params.iSetOutPath, params.jSetOutPath, res.toSet)
-
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
- costTime
- }
-
- /**
- * 开源
- * https://github.com/XinyaZhao/Social-Network-Fraud-Detection/blob/master/Analytics%20Code/Code_to_Analyze/frauder_v4.scala
- */
- /* get degree from a key-values pair */
- def getDegree(s: Tuple2[Long, Iterable[(Long, Long)]]): List[String] = {
- var myList = List[String]()
- for (e <- s._2) {
- val crt = "i" + e._2 + " j" + s._1 + " " + s._2.size //i dst点 j src点 " " scr的出度大小
- myList = crt :: myList
- }
- myList.reverse
- }
-
- /* cost function for column-weighting */
- def getCost(s: String): Double = {
- val degree = s.toDouble
- 1 / (math.log(degree + 5))
- }
-
- def flat_degree(s: String): List[(String, Double)] = {
- var l = List[(String, Double)]()
- val i = s.split(" ")(0)
- val j = s.split(" ")(1)
- val cij = s.split(" ")(2)
- l = (i, getCost(cij)) :: l
- l = (j, getCost(cij)) :: l
- l.reverse
- }
-
- /* get a scala mutable map from a RDD which stores the "i, j, degree of ij" */
- def getCostMap(rdd: RDD[String]): RDD[(String, Double)] = {
- rdd.flatMap(flat_degree).reduceByKey((sum, n) => (sum + n))
- }
-
- /* Calcuate the f value of the whole set */
- def getSetValue(c: RDD[(String, Double)]): Double = {
- if (c.count != 0) {
- val v = c.reduce((a, b) => (" ", (a._2 + b._2)))._2
- v / 2
- }
- else {
- 0.00
- }
- }
-
- /* get the vertex with minimum cost */
- def getDeleted(c: RDD[(String, Double)]): String = {
- if (c.count != 0) {
- val deleted = c.min()(new Ordering[Tuple2[String, Double]]() {
- override def compare(x: (String, Double), y: (String, Double)): Int =
- Ordering[Double].compare(x._2, y._2)
- })._1
- //println("deleted:------------------------- " + deleted)
- deleted
- }
- else {
- " "
- }
- }
-
- /* update each line with a deleted vertex */
- def update(sc:SparkContext, degree: RDD[String], d: String): RDD[String] = {
- var new_array = degree.collect;
- var tmp = new_array.to[mutable.ArrayBuffer]
- if (d.contains("j")) {
- new_array = new_array.filterNot(s => s.split(" ")(1) == d)
- tmp = new_array.to[mutable.ArrayBuffer]
- }
- if (d.contains("i")) {
- var update_j = List[String]()
- for (s <- new_array) {
- if (s.split(" ")(0) == d) {
- update_j = s.split(" ")(1) :: update_j
- tmp -= s
- }
- }
-
- val tmp_buffer = tmp.toArray
- // need a tmp buffert to deletee tmp
- for (j <- update_j) {
- for (s <- tmp_buffer) {
- if (s.split(" ")(1) == j) {
- tmp -= s
- val iszero = s.split(" ")(2).toInt - 1
- if (iszero != 0) {
- val new_line = s.split(" ")(0) + " " + s.split(" ")(1) + " " + (s.split(" ")(2).toInt - 1).toString
- tmp -= s
- tmp += new_line
- }
- }
- }
- }
- }
- sc.parallelize(tmp)
- }
-
- /* get graph from cost array*/
- def getGraph(degree: RDD[String]): List[String] = {
- var g = List[String]()
- for (c <- degree.collect) {
- if (!g.contains(c.split(" ")(0))) {
- g = c.split(" ")(0) :: g
- }
- if (!g.contains(c.split(" ")(1))) {
- g = c.split(" ")(1) :: g
- }
- }
- g
- }
-
- /* iterative delete a vertex */
- def greedyDecreasing(sc:SparkContext, degree: RDD[String]): List[String] = {
- val cost = getCostMap(degree)
- val value = getSetValue(cost) / cost.count
- var valueMap = Map[Int, Double]()
- valueMap += (0 -> value)
- var graph = List[List[String]]()
- graph = getGraph(degree) :: graph
- var new_degree = degree
- var c = cost
- var a = 0
- while (c.count != 0) { // not cost size, need to be the number of vertex
- val iter1 = System.currentTimeMillis()
- println("c.count : " + c.count)
- a = a + 1
- val d = getDeleted(c)
- new_degree = update(sc, new_degree, d)
- //newDegree = update(deleted) //update the degree of remaining i and j based on the deteled vertex
- c = getCostMap(new_degree)
- graph = getGraph(new_degree) :: graph
- val value = getSetValue(c) / c.count // the set vaule should be divided by the |C|
- //println("value : " + value)
- //new_degree.foreach(println)
- //println(getGraph(c))
- valueMap += (a -> value)
-
- val iter2 = System.currentTimeMillis()
- println(s"iterNum:${a}, updatetime: "+ ((iter2 - iter1) / 1000.0) + " sec")
- }
- var max_index = -1
- var max_Value = -1.000
- for (s <- valueMap) {
- if (s._2 > max_Value) {
- max_index = s._1
- max_Value = s._2
- }
- }
- //println("maxvalue" + " " + max_Value + " index:" + max_index)
- //graph.reverse.foreach(println)
- val objectGraph = graph.reverse(max_index)
-
- //objectGraph.foreach(f=>println(f))
- objectGraph
- }
-
- /* get the most density graph*/
- def getFinalSet(cst: Map[String, Double], dgr: List[String]): Set[String] = {
- var set = Set[String]()
- for (e <- cst) {
- set += (e._1)
- }
- set -- dgr.toSet
- }
-
- def outputResult(iset_out:String, jset_out:String, set: Set[String]): Unit = {
- val ibf = new BufferedWriter(new FileWriter(iset_out));
- val jbf = new BufferedWriter(new FileWriter(jset_out));
- val sorted_list = set.toList.sortWith(_.substring(1).toLong < _.substring(1).toLong)
- for (s <- sorted_list) {
- if (s.contains("i")) {
- ibf.write(s + "\n");
- }
- else {
- jbf.write(s + "\n");
- }
- }
- ibf.flush()
- jbf.flush()
- ibf.close()
- jbf.close()
- }
-
- def runFraudar(sc:SparkContext, bipartGraph: RDD[(Long, Long)]):List[String] = {
- val pairs = bipartGraph.map(x => (x._2, x._1))
- val group = pairs.groupBy(x => x._1)
- val degree = group.flatMap(getDegree)
- greedyDecreasing(sc, degree)
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala
deleted file mode 100644
index b04798e961487a5794ae89e9aabb9cdda1c5fb08..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-
-import java.io.{File, FileWriter, InputStreamReader}
-import java.util
-import com.bigdata.utils.Utils
-import org.apache.spark.graphx.lib.{IncConnectedComponents, Parameters}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.storage.StorageLevel
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import scala.beans.BeanProperty
-
-class IncCCConfig extends Serializable {
- @BeanProperty var inccc: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class IncCCParams extends Serializable {
- @BeanProperty var splitGraph: String = _
- @BeanProperty var partitions: Int = _
-
- @BeanProperty var orgCCPath: String = _
- @BeanProperty var outputPath: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
-}
-
-
-object IncConnectedComponentsRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val outputPath = args(2)
- val orgCCPath = args(3)
- val incGraphPath = args(4)
- val representer = new Representer
- representer.addClassTag(classOf[IncCCParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/inccc/inccc.yml")
- val yaml = new Yaml(new Constructor(classOf[IncCCConfig]), representer, options)
- val description = new TypeDescription(classOf[IncCCParams])
- yaml.addTypeDescription(description)
- val config: IncCCConfig = yaml.load(stream).asInstanceOf[IncCCConfig]
-
- val params = new IncCCParams()
- val paramsMap =
- config.inccc.get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).get(datasetName).asInstanceOf[util.HashMap[String, Object]]
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setPartitions(paramsMap.get("partitions").toString.toInt)
- params.setOrgCCPath(orgCCPath)
- params.setDatasetName(datasetName)
- params.setDataPath(dataPath)
- params.setOutputPath(outputPath)
- params.setIsRaw(isRaw)
- params.setAlgorithmName("IncCC")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val sc = spark.sparkContext
-
- val startTime = System.currentTimeMillis()
- val historyCC = Util.readUndirectDataFromHDFS(sc, orgCCPath, params.splitGraph, params.partitions)
- .map(f => (f._1.toLong, f._2.toLong))
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- historyCC.foreachPartition(f => {})
- val incGraph = Util.readUndirectDataFromHDFS(sc, incGraphPath, params.splitGraph, params.partitions)
- .map(f => (f._1.toLong, f._2.toLong))
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- incGraph.foreachPartition(f => {})
- val res = IncConnectedComponents.run(incGraph, historyCC)
- res.map(f => f._1 + "," + f._2).saveAsTextFile(outputPath)
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala
deleted file mode 100644
index f444cdd1f5a225cf46034d5590bb59bc86c0e7e6..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-import com.bigdata.utils.Utils
-
-import org.apache.spark.graphx.lib.{KatzCentrality, Parameters}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, SparkContext}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.io.{File, FileWriter, InputStreamReader}
-import java.util
-import scala.beans.BeanProperty
-
-class KatzCentralityConfig extends Serializable {
- @BeanProperty var katz: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class KatzCentralityParams extends Serializable {
- @BeanProperty var splitGraph: String = _
- @BeanProperty var partitions: Int = _
- @BeanProperty var isWeight: Boolean = _
- @BeanProperty var tol: Double = _
- @BeanProperty var maxIter: Int = _
- @BeanProperty var normalized: Boolean = _
-
- @BeanProperty var outputPath: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
-}
-
-
-object KatzCentralityRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val outputPath = args(2)
- val representer = new Representer
- representer.addClassTag(classOf[KatzCentralityParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/katz/katz.yml")
- val yaml = new Yaml(new Constructor(classOf[KatzCentralityConfig]), representer, options)
- val description = new TypeDescription(classOf[KatzCentralityParams])
- yaml.addTypeDescription(description)
- val config: KatzCentralityConfig = yaml.load(stream).asInstanceOf[KatzCentralityConfig]
-
- val params = new KatzCentralityParams()
- val paramsMap =
- config.katz.get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).get(datasetName).asInstanceOf[util.HashMap[String, Object]]
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setPartitions(paramsMap.get("partitions").toString.toInt)
- params.setIsWeight(paramsMap.get("isWeight").toString.toBoolean)
- params.setTol(paramsMap.get("tol").toString.toDouble)
- params.setMaxIter(paramsMap.get("maxIter").toString.toInt)
- params.setNormalized(paramsMap.get("normalized").toString.toBoolean)
- params.setDatasetName(datasetName)
- params.setDataPath(dataPath)
- params.setOutputPath(outputPath)
- params.setIsRaw(isRaw)
- params.setAlgorithmName("KatzCentrality")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val sc = spark.sparkContext
-
- val startTime = System.currentTimeMillis()
- val edgeRDD = Util.readGraphFromHDFS(sc, params.dataPath, params.splitGraph, params.isWeight, params.partitions)
- .map(x => (x._1, x._2, x._3))
- val tmp = edgeRDD.map(f => Edge(f._1, f._2, f._3))
- val g: Graph[Double, Double] = Graph.fromEdges(tmp, 1.0)
- val result = KatzCentrality.run(g, params.maxIter, params.tol, params.normalized)
- result.map(f => (f._1, f._2)).saveAsTextFile(params.outputPath)
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala
deleted file mode 100644
index 49f7a0e4db98fd42488f3913883e3aa09bdb4a89..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-import com.bigdata.utils.Utils
-
-import org.apache.spark.graphx.lib.{SpearkListenerLabelPropagation, Parameters}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, SparkContext}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.io.{File, FileWriter, InputStreamReader}
-import java.util
-import scala.beans.BeanProperty
-
-class SLPAConfig extends Serializable {
- @BeanProperty var slpa: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class SLPAParams extends Serializable {
- @BeanProperty var splitGraph: String = _
- @BeanProperty var partitions: Int = _
- @BeanProperty var isWeight: Boolean = _
- @BeanProperty var isDirected: Boolean = _
- @BeanProperty var iterNum: Int = _
- @BeanProperty var threshold: Double = _
-
- @BeanProperty var outputPath: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
-}
-
-
-object SLPARunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val outputPath = args(2)
- val representer = new Representer
- representer.addClassTag(classOf[SLPAParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/slpa/slpa.yml")
- val yaml = new Yaml(new Constructor(classOf[SLPAConfig]), representer, options)
- val description = new TypeDescription(classOf[SLPAParams])
- yaml.addTypeDescription(description)
- val config: SLPAConfig = yaml.load(stream).asInstanceOf[SLPAConfig]
-
- val params = new SLPAParams()
- val paramsMap =
- config.slpa.get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).get(datasetName).asInstanceOf[util.HashMap[String, Object]]
- params.setSplitGraph(paramsMap.get("splitGraph").asInstanceOf[String])
- params.setPartitions(paramsMap.get("partitions").asInstanceOf[Int])
- params.setIsWeight(paramsMap.get("isWeight").asInstanceOf[Boolean])
- params.setIsDirected(paramsMap.get("isDirected").asInstanceOf[Boolean])
- params.setIterNum(paramsMap.get("iterNum").asInstanceOf[Int])
- params.setThreshold(paramsMap.get("threshold").asInstanceOf[Double])
- params.setDatasetName(datasetName)
- params.setDataPath(dataPath)
- params.setOutputPath(outputPath)
- params.setIsRaw(isRaw)
- params.setAlgorithmName("SLPA")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val sc = spark.sparkContext
-
- val startTime = System.currentTimeMillis()
- val edges = Util.readGraphFromHDFS(sc, params.dataPath, params.splitGraph, params.isWeight, params.partitions)
- edges.foreachPartition{f => {}}
-
- val slpaGraph = SpearkListenerLabelPropagation.buildGraph(edges, params.isDirected)
- val slpaComm = SpearkListenerLabelPropagation.run(slpaGraph, params.iterNum, params.threshold)
- slpaComm.edges.foreachPartition{f => {}}
-
- val outPutComm = s"${outputPath}/res"
- val outPutComp = s"${outputPath}/resForComparsion"
-
- val vertex2Comm = slpaComm.vertices.map(x => (x._1,"[" + x._2.mkString(",") + "]"))
- vertex2Comm.foreachPartition{f => {}}
- vertex2Comm.saveAsTextFile(outPutComm)
-
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
-
- obtainCommunities(sc, outPutComm, outPutComp)
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}s")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-
- def obtainCommunities(sc: SparkContext, commPath: String, outPath: String): Unit = {
- val labelIds = sc.textFile(commPath)
- .flatMap(line => {
- val x = line.split(",")
- var tmp = x(0).replace("(","")
- val id = tmp.toLong
- tmp = x(1).replace(")","")
- tmp = tmp.replace("[","")
- tmp = tmp.replace("]","")
- val labels = tmp.split(",")
- val attr = new Array[(Long, Long)](labels.length)
- var i = 0
- while (i < labels.length) {
- attr(i) = (labels(i).toLong, id)
- i = i + 1
- }
- attr.toIterator
- })
- labelIds.groupByKey()
- .map(x => x._2.toArray.mkString(" "))
- .repartition(1)
- .saveAsTextFile(outPath)
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala
deleted file mode 100644
index b3d58589d6a733f5a93d27b4f74d4165d091cc3d..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.bigdata.graph
-
-import java.io.FileWriter
-import java.util
-
-import scala.beans.BeanProperty
-
-import com.bigdata.utils.Utils
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-
-import org.apache.spark.graphx.lib.SubgraphMatching
-import org.apache.spark.{SparkConf, SparkContext}
-
-class SubgraphMatchingParams extends Serializable{
- @BeanProperty var inputPath: String = _
- @BeanProperty var outputPath: String = _
- @BeanProperty var splitDataGraph: util.HashMap[String, String] = new util.HashMap[String, String]()
- @BeanProperty var splitQueryGraph: String = _
- @BeanProperty var taskNum: Int = _
- @BeanProperty var resultNum: Int = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var numPartitions: Int = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var isIdentical: String = _
- @BeanProperty var matchResult: Long = _
-}
-
-
-object SubgraphMatchingRunner {
-
- def main(args: Array[String]): Unit = {
-
- try {
- val datasetName = args(0)
- val queryGraphName = args(1)
- val isRaw = args(2)
- val isIdentical = args(3)
- val outputPath = args(4)
- val inputPath = args(5)
- val partitionNum = args(6).toInt
- val taskNum = args(7).toInt
- val queryGraphPath = args(8)
- val testcaseType = s"SGM_${datasetName}_${queryGraphName}_${isIdentical}"
-
- val representer = new Representer
- representer.addClassTag(classOf[SubgraphMatchingParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[SubgraphMatchingParams]), representer, options)
- val description = new TypeDescription(classOf[SubgraphMatchingParams])
- yaml.addTypeDescription(description)
- val params = yaml.load(Utils.getStream("conf/graph/sgm/sgm.yml")).asInstanceOf[SubgraphMatchingParams]
-
- val splitDataGraph = params.getSplitDataGraph.get(datasetName)
- val resultNum = params.getResultNum
- val splitQueryGraph = params.getSplitQueryGraph
-
- params.setAlgorithmName("SGM")
- params.setDatasetName(datasetName)
- params.setInputPath(inputPath)
- params.setOutputPath(outputPath)
- params.setIsRaw(isRaw)
- params.setNumPartitions(partitionNum)
- params.setTestcaseType(testcaseType)
- params.setIsIdentical(isIdentical)
- params.setTaskNum(taskNum)
-
- val isIdenticalBool = isIdentical match {
- case "Identical" => true
- case "unIdentical" => false
- }
-
- val conf = new SparkConf()
- .setAppName(testcaseType)
- val sc = new SparkContext(conf)
-
- val startTime = System.currentTimeMillis()
-
- val inputRDD = Util.readUndirectDataFromHDFS(sc, inputPath, splitDataGraph, partitionNum)
- .map(f => (f._1.toLong, f._2.toLong))
- val queryGraphRDD = sc.textFile(queryGraphPath)
- val edgelist: Array[(Long, Long)] = queryGraphRDD.map(line => {
- val strings = line.split(splitQueryGraph)
- (strings(0).toLong, strings(1).toLong)
- }).collect()
-
- val (numSubgraphs, subgraphs) =
- SubgraphMatching.run(inputRDD, edgelist, taskNum, resultNum, isIdenticalBool)
-
- params.setMatchResult(numSubgraphs)
- println("total matched results:\t%d".format(numSubgraphs))
- subgraphs.map(x => x.mkString("\t")).saveAsTextFile(outputPath)
-
- val costTime = (System.currentTimeMillis() - startTime) / 1000.0
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- val writer = new FileWriter(
- s"report/SGM_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
-
- println(s"Exec Successful: costTime: ${costTime}s")
-
- }
- }
-}
\ No newline at end of file
diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala
deleted file mode 100644
index ccae82c36379ff6f9447d0a42952e556b64588c5..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-// scalastyle:off
-
-package com.bigdata.graph
-import com.bigdata.utils.Utils
-
-import org.apache.spark.graphx.lib._
-import org.apache.spark.graphx.{Edge, Graph, GraphLoader, PartitionStrategy}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, SparkContext}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.io.{File, FileWriter, InputStreamReader}
-import java.util
-import scala.beans.BeanProperty
-
-class WLPAConfig extends Serializable {
- @BeanProperty var wlpa: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class WLPAParams extends Serializable {
- @BeanProperty var splitGraph: String = _
- @BeanProperty var commputePartition: Int = _
- @BeanProperty var maxIter: Int = _
- @BeanProperty var partitions: Int = _
-
- @BeanProperty var outputPath: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
-}
-
-
-object WeightedLablePropagationRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val outputPath = args(2)
- val representer = new Representer
- representer.addClassTag(classOf[WLPAParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val stream: InputStreamReader = Utils.getStream("conf/graph/wlpa/wlpa.yml")
- val yaml = new Yaml(new Constructor(classOf[WLPAConfig]), representer, options)
- val description = new TypeDescription(classOf[WLPAParams])
- yaml.addTypeDescription(description)
- val config: WLPAConfig = yaml.load(stream).asInstanceOf[WLPAConfig]
-
- val params = new WLPAParams()
- val paramsMap =
- config.wlpa.get(isRaw match {
- case "no" => "opt"
- case _ => "raw"
- }).get(datasetName).asInstanceOf[util.HashMap[String, Object]]
- params.setSplitGraph(paramsMap.get("splitGraph").toString)
- params.setPartitions(paramsMap.get("commputePartition").toString.toInt)
- params.setMaxIter(paramsMap.get("maxIter").toString.toInt)
- params.setDatasetName(datasetName)
- params.setDataPath(dataPath)
- params.setOutputPath(outputPath)
- params.setIsRaw(isRaw)
- params.setAlgorithmName("WLPA")
- params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}")
-
- val conf = new SparkConf().setAppName(params.testcaseType)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val sc = spark.sparkContext
-
- val startTime = System.currentTimeMillis()
- val inputRdd = Util.readDirectWeightDataFromHDFS(sc, params.dataPath, params.splitGraph, params.commputePartition)
- .map(f => Edge(f._1.toLong, f._2.toLong, f._3.toDouble))
- val graph = Graph.fromEdges(inputRdd, 1.0)
- val result = WLabelPropagation.run(graph, params.maxIter).vertices
- Util.saveDataToHDFS(result, ",", outputPath)
- val finishTime = System.currentTimeMillis()
- val costTime = (finishTime - startTime) / 1000
-
- params.setCostTime(costTime)
- println(s"Exec Successful: costTime: ${costTime}s")
-
- val folder = new File("report")
- if (!folder.exists()) {
- val mkdir = folder.mkdirs()
- println(s"Create dir report ${mkdir}")
- }
- val writer = new FileWriter(
- s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml")
- yaml.dump(params, writer)
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala
deleted file mode 100644
index dadd8f9dc3317b6635930d69b82f87fdd9a1bce9..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.UpEvaluationVerify
-
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.ml.clustering.Hdbscan
-import org.apache.spark.ml.evaluation.ClusteringEvaluator
-import org.apache.spark.ml.linalg.{Vector, Vectors}
-
-import java.io.{File, FileWriter, PrintWriter}
-import java.util
-import java.util.Date
-import scala.beans.BeanProperty
-import scala.collection.mutable.ArrayBuffer
-
-class HDBConfig extends Serializable {
- @BeanProperty var hdb: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class HDBParams extends Serializable {
- @BeanProperty var pt: Int = _
- @BeanProperty var mstPartitionNum: Int = _
- @BeanProperty var seed: Int = _
- @BeanProperty var saurfangThreshold: Double = _
-
- @BeanProperty var dataPath: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var standSilhouette: Double = _
- @BeanProperty var cpuName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var startTime: Long = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-
-object HDBRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val cpuName = args(2)
- val saveResultPath = args(3)
-
- val stream = Utils.getStream("conf/ml/hdb/hdb.yml")
- val representer = new Representer
- representer.addClassTag(classOf[HDBParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[HDBConfig]), representer, options)
- val description = new TypeDescription(classOf[HDBParams])
- yaml.addTypeDescription(description)
- val configs: HDBConfig = yaml.load(stream).asInstanceOf[HDBConfig]
- val paramsMap: util.HashMap[String, Object] = configs.hdb.get(isRaw match {
- case "no" => "opt"
- case "yes" => "raw"
- }).get(datasetName)
- val params = new HDBParams()
- params.setPt(paramsMap.get("pt").asInstanceOf[Int])
- params.setMstPartitionNum(paramsMap.get("mstPartitionNum").asInstanceOf[Int])
- params.setSeed(paramsMap.get("seed").asInstanceOf[Int])
- params.setSaurfangThreshold(paramsMap.get("saurfangThreshold").asInstanceOf[Double])
- params.setDataPath(dataPath)
- params.setDatasetName(datasetName)
- params.setCpuName(cpuName)
- params.setIsRaw(isRaw)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("HDB")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${datasetName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${datasetName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
-
- val conf = new SparkConf().setAppName(appName)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- spark.sparkContext.setLogLevel("ERROR")
- val (res, costTime) = new HDBKernel().runJob(spark, params)
- params.setStandSilhouette(res)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- params.setIsCorrect(UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark))
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
-
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${costTime}min; standSilhouette: ${res};isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class HDBKernel {
- def runJob(spark: SparkSession, params: HDBParams): (Double, Double) = {
- val sc = spark.sparkContext
- sc.setLogLevel("ERROR")
-
- val startTime = System.currentTimeMillis()
-
- println("\n--------start--------\n")
-
- // val dataRDD = sc.textFile(dataPath).map
- // {
- // t=>t.split(",").map{t=>t.toDouble}
- // }.repartition(pt).persist()
-
- import scala.io.{BufferedSource, Source}
- val data = new ArrayBuffer[Vector]()
- val source2: BufferedSource = Source.fromFile(params.dataPath)
- for (line <- source2.getLines()){//source.getLines()获取所有的行
- data.append(Vectors.dense(line.split(",").map{_.toDouble}))
- }
-
- val d1 = data.toArray
- val dataRDD = sc.parallelize(d1).repartition(params.pt).cache()
-
- println("count: "+ dataRDD.count())
- println("dim: " + dataRDD.first().size)
- val t1 = System.currentTimeMillis()
- println("map Cost[min]: " + (t1 - startTime).toDouble/60/1000)
-
- val hdb = new Hdbscan()
- .setMstPartitionNum(params.mstPartitionNum)
- .setSaurfangThreshold(params.saurfangThreshold)
- .setRandomSeed(params.seed)
- val labels = hdb.fit(dataRDD)
- val t2 = System.currentTimeMillis()
- println("train Cost[min]: " + (t2 - t1).toDouble/60/1000)
- println("total Cost[min]: " + (t2 - startTime).toDouble/60/1000)
-
- import spark.implicits._
- val valid = labels.map{t => (t._2, t._3)}.toDF("features", "prediction")
- val evaluator = new ClusteringEvaluator()
- val silhouette = evaluator.evaluate(valid)
- val standSilhouette = (silhouette + 1) / 2.0
- println(s"Silhouette with squared euclidean distance = $standSilhouette")
- // labels.map{t=>(t._3,1)}.reduceByKey{(x,y)=>x+y}.collect().foreach(println)
- println("\n--------success--------\n")
- Utils.saveEvaluation(standSilhouette, params.saveDataPath, sc)
- val costTime = (t2 - startTime).toDouble/60/1000
- (standSilhouette, costTime)
- }
-}
\ No newline at end of file
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala
deleted file mode 100644
index 85aa91a4d7e285872d831c6278e97260a644d1e4..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.DownEvaluationVerify
-
-import org.apache.hadoop.io.LongWritable
-import org.apache.mahout.math.VectorWritable
-import org.apache.spark.ml.clustering.{KMeans => MlKMeans}
-import org.apache.spark.ml.linalg.{Vectors => MlVectors}
-import org.apache.spark.ml.param.{ParamMap, ParamPair}
-import org.apache.spark.mllib.clustering.{KMeans => MlibKMeans}
-import org.apache.spark.mllib.linalg.{Vectors => MlibVectors}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.udf
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.util
-import scala.beans.BeanProperty
-import java.io.{File, FileWriter}
-import java.util.HashMap
-
-class KMeansConfig extends Serializable {
-
- @BeanProperty var kmeans: util.HashMap[String, util.HashMap[String, Object]] = _
-}
-
-class KMeansParams extends Serializable {
-
- @BeanProperty var numPartitions: Int = _
- @BeanProperty var maxIterations: Int = _
- @BeanProperty var k: Int = _
-
- @BeanProperty var dataPath: String = _
- @BeanProperty var apiName: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var datasetCpuName: String = _
- @BeanProperty var isRaw: String = "no"
- @BeanProperty var evaluation: Double = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var loadDataTime: Double = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-
-object KMeansRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (dataStructure, datasetName, apiName, cpuName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3), modelConfSplit(4), modelConfSplit(5))
- val dataPath = args(1)
- val datasetCpuName = s"${datasetName}_${cpuName}"
- val saveResultPath = args(2)
-
- val stream = Utils.getStream("conf/ml/kmeans/kmeans.yml")
- val representer = new Representer
- representer.addClassTag(classOf[KMeansParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[KMeansConfig]), representer, options)
- val description = new TypeDescription(classOf[KMeansParams])
- yaml.addTypeDescription(description)
- val config: KMeansConfig = yaml.load(stream).asInstanceOf[KMeansConfig]
- val paramsMap: util.HashMap[String, Object] = config.kmeans.get(datasetCpuName)
- val params = new KMeansParams()
- params.setNumPartitions(paramsMap.get("numPartitions").asInstanceOf[Int])
- params.setMaxIterations(paramsMap.get("maxIterations").asInstanceOf[Int])
- params.setK(paramsMap.get("k").asInstanceOf[Int])
- params.setApiName(apiName)
- params.setDataPath(dataPath)
- params.setDatasetName(datasetName)
- params.setDatasetCpuName(datasetCpuName)
- params.setIsRaw(isRaw)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("KMeans")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}_${dataStructure}_${apiName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${datasetName}_${dataStructure}_${apiName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${datasetName}_${dataStructure}_${apiName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
-
- val conf = new SparkConf().setAppName(appName)
- val spark = SparkSession.builder.config(conf).getOrCreate()
-
- val (res, costTime) = dataStructure match {
- case "dataframe" => new KMeansKernel().runDataFrameJob(spark, params)
- case "rdd" => new KMeansKernel().runRDDJob(spark, params)
- }
- params.setEvaluation(res)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- params.setIsCorrect(DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark))
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
-
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class KMeansKernel {
-
- def runDataFrameJob(spark: SparkSession, params: KMeansParams): (Double, Double) = {
-
- val sc = spark.sparkContext
- val startTime = System.currentTimeMillis()
- val data = sc.sequenceFile[LongWritable, VectorWritable](params.dataPath)
- val dataRDD = data.map{ case (k, v) =>
- var vector: Array[Double] = new Array[Double](v.get().size)
- for (i <- 0 until v.get().size) vector(i) = v.get().get(i)
- vector
- }.repartition(params.numPartitions).persist()
-
- import spark.implicits._
- val dataDF = dataRDD.toDF("features")
- val convertToVector = udf((array: Seq[Double]) => {
- MlVectors.dense(array.toArray)
- })
- val trainingData = dataDF.withColumn("features", convertToVector($"features"))
- println("count: " + trainingData.count())
- val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0
- val kmeans = new MlKMeans().setK(params.k).setMaxIter(params.maxIterations)
-
- val paramMap = ParamMap(kmeans.k -> params.k)
- .put(kmeans.maxIter, params.maxIterations)
- val paramMaps: Array[ParamMap] = new Array[ParamMap](2)
- for (i <- 0 to paramMaps.size -1) {
- paramMaps(i) = ParamMap(kmeans.k -> params.k)
- .put(kmeans.maxIter, params.maxIterations)
- }
-
-
- val maxIterParamPair = ParamPair(kmeans.maxIter, params.maxIterations)
- val kPair = ParamPair(kmeans.k, params.k)
- val model = params.apiName match {
- case "fit" => kmeans.fit(trainingData)
- case "fit1" => kmeans.fit(trainingData, paramMap)
- case "fit2" =>
- val models = kmeans.fit(trainingData, paramMaps)
- models(0)
- case "fit3" => kmeans.fit(trainingData, kPair, maxIterParamPair)
- }
- val costTime = (System.currentTimeMillis() - startTime) / 1000.0
- params.setLoadDataTime(loadDataTime)
- val res = model.computeCost(trainingData)
- Utils.saveEvaluation(res, params.saveDataPath, sc)
- (res, costTime)
- }
-
- def runRDDJob(spark: SparkSession, params: KMeansParams): (Double, Double) = {
-
- val sc = spark.sparkContext
- val startTime = System.currentTimeMillis()
- val data = sc.sequenceFile[LongWritable, VectorWritable](params.dataPath)
- val dataRDD = data.map{ case (k, v) =>
- var vector: Array[Double] = new Array[Double](v.get().size)
- for (i <- 0 until v.get().size) vector(i) = v.get().get(i)
- MlibVectors.dense(vector)
- }.repartition(params.numPartitions).cache()
- println("count: " + dataRDD.count())
- val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0
-
- val model = new MlibKMeans()
- .setK(params.k)
- .setMaxIterations(params.maxIterations)
- .run(dataRDD)
- val costTime = (System.currentTimeMillis() - startTime) / 1000.0
-
- params.setLoadDataTime(loadDataTime)
- val res = model.computeCost(dataRDD)
- Utils.saveEvaluation(res, params.saveDataPath, sc)
- (res, costTime)
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala
deleted file mode 100644
index 96ca6b15e90bea23226728b62be9e9d52ace2a29..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.UpEvaluationVerify
-import com.bigdata.compare.ml.DownEvaluationVerify
-
-import com.microsoft.ml.spark.core.metrics.MetricConstants
-import com.microsoft.ml.spark.train.ComputeModelStatistics
-import com.microsoft.ml.spark.lightgbm.{LightGBMClassifier, LightGBMRegressor}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.lang.System.nanoTime
-import java.io.{File, FileWriter, PrintWriter}
-import java.nio.file.{Paths, Files}
-import java.util
-import scala.beans.BeanProperty
-import scala.util.Random
-
-class LightGBMRawConfig extends Serializable {
- @BeanProperty var lgbm: util.HashMap[String, util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]]] = _
-}
-
-class LightGBMRawParams extends Serializable {
- @BeanProperty var objective: String = _
- @BeanProperty var labelCol: String = _
- @BeanProperty var featuresCol: String = _
- @BeanProperty var verbosity: Int = _
- @BeanProperty var learningRate: Double = _
- @BeanProperty var maxDepth: Int = _
- @BeanProperty var maxBin: Int = _
- @BeanProperty var numIterations: Int = _
- @BeanProperty var numTasks: Int = _
- @BeanProperty var minGainToSplit: Double = _
- @BeanProperty var lambdaL2: Double = _
- @BeanProperty var numLeaves: Int = _
- @BeanProperty var minSumHessianInLeaf: Double = _
- @BeanProperty var minDataInLeaf: Int = _
- @BeanProperty var baggingFraction: Double = _
- @BeanProperty var baggingFreq: Int = _
- @BeanProperty var numThreads: Int = _
- @BeanProperty var networkCompression: Int = _
- @BeanProperty var histSynchAlgo: Int = _
- @BeanProperty var loglossApx: Int = _
- @BeanProperty var loglossApxEps: Double = _
- @BeanProperty var loadingBalance: String = _
-
- @BeanProperty var trainingDataPath: String = _
- @BeanProperty var testDataPath: String = _
- @BeanProperty var algorithmType: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var evaluation: Double = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var cpuName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-
-object LightGBMRawRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (algorithmType, datasetName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3))
- val dataPath = args(1)
- val dataPathSplit = dataPath.split(",")
- val (trainingDataPath, testDataPath) = (dataPathSplit(0), dataPathSplit(1))
- val cpuName = args(2)
- val saveResultPath = args(3)
-
- val stream = Utils.getStream("conf/ml/lgbm/lgbm.yml")
- val representer = new Representer
- representer.addClassTag(classOf[LightGBMRawParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[LightGBMRawConfig]), representer, options)
- val description = new TypeDescription(classOf[LightGBMRawParams])
- yaml.addTypeDescription(description)
- val configs: LightGBMRawConfig = yaml.load(stream).asInstanceOf[LightGBMRawConfig]
- val params = new LightGBMRawParams()
- val paramsMap: util.HashMap[String, Object] = configs.lgbm.get(isRaw match {
- case "no" => "opt"
- case "yes" => "raw"
- }).get(algorithmType).get(datasetName)
- params.setObjective(paramsMap.get("objective").asInstanceOf[String])
- params.setLabelCol(paramsMap.get("labelCol").asInstanceOf[String])
- params.setFeaturesCol(paramsMap.get("featuresCol").asInstanceOf[String])
- params.setVerbosity(paramsMap.get("verbosity").asInstanceOf[Int])
- params.setLearningRate(paramsMap.get("eta").asInstanceOf[Double])
- params.setMaxDepth(paramsMap.get("max_depth").asInstanceOf[Int])
- params.setMaxBin(paramsMap.get("max_bin").asInstanceOf[Int])
- params.setNumIterations(paramsMap.get("num_round").asInstanceOf[Int])
- params.setNumTasks(paramsMap.get("num_tasks").asInstanceOf[Int])
- params.setMinGainToSplit(paramsMap.get("min_gain_to_split").asInstanceOf[Double])
- params.setLambdaL2(paramsMap.get("lambda_l2").asInstanceOf[Double])
- params.setNumLeaves(paramsMap.get("num_leaves").asInstanceOf[Int])
- params.setMinSumHessianInLeaf(paramsMap.get("min_child_weight").asInstanceOf[Double])
- params.setMinDataInLeaf(paramsMap.get("min_data_in_leaf").asInstanceOf[Int])
- params.setBaggingFraction(paramsMap.get("bagging").asInstanceOf[Double])
- params.setBaggingFreq(paramsMap.get("bagging_freq").asInstanceOf[Int])
- params.setNumThreads(paramsMap.get("num_threads").asInstanceOf[Int])
- params.setNetworkCompression(paramsMap.get("network_compression").asInstanceOf[Int])
- params.setHistSynchAlgo(paramsMap.get("hist_synch_algo").asInstanceOf[Int])
- params.setLoglossApx(paramsMap.get("logloss_apx").asInstanceOf[Int])
- params.setLoglossApxEps(paramsMap.get("logloss_apx_eps").asInstanceOf[Double])
- params.setLoadingBalance(paramsMap.get("loading_balance").asInstanceOf[String])
- params.setTrainingDataPath(trainingDataPath)
- params.setTestDataPath(testDataPath)
- params.setAlgorithmType(algorithmType)
- params.setDatasetName(datasetName)
- params.setCpuName(cpuName)
- params.setIsRaw(isRaw)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("LightGBM")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${algorithmType}_${datasetName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${algorithmType}_${datasetName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${algorithmType}_${datasetName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
-
- val conf = new SparkConf().setAppName(appName)
- val spark = SparkSession.builder.config(conf).getOrCreate()
-
- val (res, costTime) = new LightGBMRawKernel().runJob(spark, params)
- params.setEvaluation(res)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- val isCorrect = params.algorithmType match {
- case "classification" => UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)
- case "regression" => DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)
- }
- params.setIsCorrect(isCorrect)
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
-
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class LightGBMRawKernel{
- def runJob(spark: SparkSession, params: LightGBMRawParams): (Double, Double) = {
- val sc = spark.sparkContext
- sc.setLogLevel("INFO")
- println(s"Initialized spark session.")
- val t1 = System.currentTimeMillis()
-
- import spark.implicits._
- val trainData = spark.read.format("libsvm").option("vectorType", "sparse")
- .load(params.trainingDataPath)
- .repartition(params.numTasks)
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- val t2 = System.currentTimeMillis()
- println("* after preprocess: " + t2)
-
- val lgbm = params.algorithmType match {
- case "classification" =>{
- val classifier = new LightGBMClassifier()
- .setObjective(params.objective)
- .setLabelCol(params.labelCol)
- .setFeaturesCol(params.featuresCol)
- .setVerbosity(params.verbosity)
- .setNumIterations(params.numIterations)
- .setMaxDepth(params.maxDepth)
- .setLearningRate(params.learningRate)
- .setNumTasks(params.numTasks)
- .setMaxBin(params.maxBin)
- .setMinGainToSplit(params.minGainToSplit)
- .setLambdaL2(params.lambdaL2)
- .setNumLeaves(params.numLeaves)
- .setMinDataInLeaf(params.minDataInLeaf)
- .setMinSumHessianInLeaf(params.minSumHessianInLeaf)
- .setBaggingFraction(params.baggingFraction)
- .setBaggingFreq(params.baggingFreq)
- classifier
- }
- case "regression" =>{
- val regressor = new LightGBMRegressor()
- .setObjective(params.objective)
- .setLabelCol(params.labelCol)
- .setFeaturesCol(params.featuresCol)
- .setVerbosity(params.verbosity)
- .setNumIterations(params.numIterations)
- .setMaxDepth(params.maxDepth)
- .setLearningRate(params.learningRate)
- .setNumTasks(params.numTasks)
- .setMaxBin(params.maxBin)
- .setMinGainToSplit(params.minGainToSplit)
- .setLambdaL2(params.lambdaL2)
- .setNumLeaves(params.numLeaves)
- .setMinDataInLeaf(params.minDataInLeaf)
- .setMinSumHessianInLeaf(params.minSumHessianInLeaf)
- .setBaggingFraction(params.baggingFraction)
- .setBaggingFreq(params.baggingFreq)
- regressor
- }
- }
- val model = lgbm.fit(trainData)
- val t3 = System.currentTimeMillis()
- println("* after train: " + t3)
-
- val testData = spark.read.format("libsvm").option("vectorType", "sparse")
- .load(params.testDataPath)
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- println(s"Test data read successful. Number of partitions - ${testData.rdd.getNumPartitions}")
- val predictions = model.transform(testData)
- val (res, t4) = params.algorithmType match {
- case "classification" =>{
- val metrics = new ComputeModelStatistics()
- .setLabelCol("label")
- .setScoresCol("probability")
- .setScoredLabelsCol("prediction")
- .setEvaluationMetric(MetricConstants.AccuracySparkMetric)
- .transform(predictions)
- val ecc = metrics.collect().apply(0).apply(1).asInstanceOf[Double]
- val t4 = System.currentTimeMillis()
- (ecc, t4)
- }
- case "regression" =>{
- // compute model metrics
- val metrics = new ComputeModelStatistics()
- .setEvaluationMetric("regression")
- .setLabelCol("label")
- .setScoresCol("prediction")
- .transform(predictions)
- // print metrics
- val mse = metrics.collect().apply(0).apply(0).asInstanceOf[Double]
- val t4 = System.currentTimeMillis()
- (mse, t4)
- }
- }
- println("Model predictions:")
- predictions.select("prediction", "label", "features").show(5)
- val trainingProcess = (t3 - t1).toDouble / 1000
- val trainingStep = (t3 - t2).toDouble / 1000
- val dataProcess = (t2 - t1).toDouble / 1000
- val predict = (t4 - t3).toDouble / 1000
- println("[s]train total: " + trainingProcess)
- println("[s]data preprocess: " + dataProcess)
- println("[s]train: " + trainingStep)
- println("[s]predict: " + predict)
-
- Utils.saveEvaluation(res, params.saveDataPath, sc)
- (res, trainingProcess)
- }
-}
\ No newline at end of file
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala
deleted file mode 100644
index 34a0368c907eb3d1efc68fd81116d706471d1b89..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.UpEvaluationVerify
-import com.bigdata.compare.ml.DownEvaluationVerify
-
-import com.microsoft.ml.spark.core.metrics.MetricConstants
-import com.microsoft.ml.spark.train.ComputeModelStatistics
-import com.microsoft.ml.spark.lightgbm.{LightGBMClassifier, LightGBMRegressor}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.lang.System.nanoTime
-import java.io.{File, FileWriter, PrintWriter}
-import java.nio.file.{Paths, Files}
-import java.util
-import scala.beans.BeanProperty
-import scala.util.Random
-
-class LightGBMConfig extends Serializable {
- @BeanProperty var lgbm: util.HashMap[String, util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]]] = _
-}
-
-class LightGBMParams extends Serializable {
- @BeanProperty var objective: String = _
- @BeanProperty var labelCol: String = _
- @BeanProperty var featuresCol: String = _
- @BeanProperty var verbosity: Int = _
- @BeanProperty var learningRate: Double = _
- @BeanProperty var maxDepth: Int = _
- @BeanProperty var maxBin: Int = _
- @BeanProperty var numIterations: Int = _
- @BeanProperty var numTasks: Int = _
- @BeanProperty var minGainToSplit: Double = _
- @BeanProperty var lambdaL2: Double = _
- @BeanProperty var numLeaves: Int = _
- @BeanProperty var minSumHessianInLeaf: Double = _
- @BeanProperty var minDataInLeaf: Int = _
- @BeanProperty var baggingFraction: Double = _
- @BeanProperty var baggingFreq: Int = _
- @BeanProperty var numThreads: Int = _
- @BeanProperty var networkCompression: Int = _
- @BeanProperty var histSynchAlgo: Int = _
- @BeanProperty var loglossApx: Int = _
- @BeanProperty var loglossApxEps: Double = _
- @BeanProperty var loadingBalance: String = _
-
- @BeanProperty var trainingDataPath: String = _
- @BeanProperty var testDataPath: String = _
- @BeanProperty var algorithmType: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var evaluation: Double = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var cpuName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-
-object LightGBMRunner {
- def main(args: Array[String]): Unit = {
- try {
- val modelConfSplit = args(0).split("-")
- val (algorithmType, datasetName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3))
- val dataPath = args(1)
- val dataPathSplit = dataPath.split(",")
- val (trainingDataPath, testDataPath) = (dataPathSplit(0), dataPathSplit(1))
- val cpuName = args(2)
- val saveResultPath = args(3)
-
- val stream = Utils.getStream("conf/ml/lgbm/lgbm.yml")
- val representer = new Representer
- representer.addClassTag(classOf[LightGBMParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[LightGBMConfig]), representer, options)
- val description = new TypeDescription(classOf[LightGBMParams])
- yaml.addTypeDescription(description)
- val configs: LightGBMConfig = yaml.load(stream).asInstanceOf[LightGBMConfig]
- val params = new LightGBMParams()
- val paramsMap: util.HashMap[String, Object] = configs.lgbm.get(isRaw match {
- case "no" => "opt"
- case "yes" => "raw"
- }).get(algorithmType).get(datasetName)
- params.setObjective(paramsMap.get("objective").asInstanceOf[String])
- params.setLabelCol(paramsMap.get("labelCol").asInstanceOf[String])
- params.setFeaturesCol(paramsMap.get("featuresCol").asInstanceOf[String])
- params.setVerbosity(paramsMap.get("verbosity").asInstanceOf[Int])
- params.setLearningRate(paramsMap.get("eta").asInstanceOf[Double])
- params.setMaxDepth(paramsMap.get("max_depth").asInstanceOf[Int])
- params.setMaxBin(paramsMap.get("max_bin").asInstanceOf[Int])
- params.setNumIterations(paramsMap.get("num_round").asInstanceOf[Int])
- params.setNumTasks(paramsMap.get("num_tasks").asInstanceOf[Int])
- params.setMinGainToSplit(paramsMap.get("min_gain_to_split").asInstanceOf[Double])
- params.setLambdaL2(paramsMap.get("lambda_l2").asInstanceOf[Double])
- params.setNumLeaves(paramsMap.get("num_leaves").asInstanceOf[Int])
- params.setMinSumHessianInLeaf(paramsMap.get("min_child_weight").asInstanceOf[Double])
- params.setMinDataInLeaf(paramsMap.get("min_data_in_leaf").asInstanceOf[Int])
- params.setBaggingFraction(paramsMap.get("bagging").asInstanceOf[Double])
- params.setBaggingFreq(paramsMap.get("bagging_freq").asInstanceOf[Int])
- params.setNumThreads(paramsMap.get("num_threads").asInstanceOf[Int])
- params.setNetworkCompression(paramsMap.get("network_compression").asInstanceOf[Int])
- params.setHistSynchAlgo(paramsMap.get("hist_synch_algo").asInstanceOf[Int])
- params.setLoglossApx(paramsMap.get("logloss_apx").asInstanceOf[Int])
- params.setLoglossApxEps(paramsMap.get("logloss_apx_eps").asInstanceOf[Double])
- params.setLoadingBalance(paramsMap.get("loading_balance").asInstanceOf[String])
- params.setTrainingDataPath(trainingDataPath)
- params.setTestDataPath(testDataPath)
- params.setAlgorithmType(algorithmType)
- params.setDatasetName(datasetName)
- params.setCpuName(cpuName)
- params.setIsRaw(isRaw)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("LightGBM")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${algorithmType}_${datasetName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${algorithmType}_${datasetName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${algorithmType}_${datasetName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
-
- val conf = new SparkConf().setAppName(appName)
- val spark = SparkSession.builder.config(conf).getOrCreate()
-
- val (res, costTime) = new LightGBMKernel().runJob(spark, params)
- params.setEvaluation(res)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- val isCorrect = params.algorithmType match {
- case "classification" => UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)
- case "regression" => DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)
- }
- params.setIsCorrect(isCorrect)
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
-
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class LightGBMKernel{
- def runJob(spark: SparkSession, params: LightGBMParams): (Double, Double) = {
- val sc = spark.sparkContext
- sc.setLogLevel("INFO")
- println(s"Initialized spark session.")
- val t1 = System.currentTimeMillis()
-
- import spark.implicits._
- val trainData = spark.read.format("libsvm").option("vectorType", "sparse")
- .load(params.trainingDataPath)
- .repartition(params.numTasks)
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- val t2 = System.currentTimeMillis()
- println("* after preprocess: " + t2)
-
- val lgbm = params.algorithmType match {
- case "classification" =>{
- val classifier = new LightGBMClassifier()
- .setObjective(params.objective)
- .setLabelCol(params.labelCol)
- .setFeaturesCol(params.featuresCol)
- .setVerbosity(params.verbosity)
- .setNumIterations(params.numIterations)
- .setMaxDepth(params.maxDepth)
- .setLearningRate(params.learningRate)
- .setNumTasks(params.numTasks)
- .setMaxBin(params.maxBin)
- .setMinGainToSplit(params.minGainToSplit)
- .setLambdaL2(params.lambdaL2)
- .setNumLeaves(params.numLeaves)
- .setMinDataInLeaf(params.minDataInLeaf)
- .setMinSumHessianInLeaf(params.minSumHessianInLeaf)
- .setBaggingFraction(params.baggingFraction)
- .setBaggingFreq(params.baggingFreq)
- classifier
- }
- case "regression" =>{
- val regressor = new LightGBMRegressor()
- .setObjective(params.objective)
- .setLabelCol(params.labelCol)
- .setFeaturesCol(params.featuresCol)
- .setVerbosity(params.verbosity)
- .setNumIterations(params.numIterations)
- .setMaxDepth(params.maxDepth)
- .setLearningRate(params.learningRate)
- .setNumTasks(params.numTasks)
- .setMaxBin(params.maxBin)
- .setMinGainToSplit(params.minGainToSplit)
- .setLambdaL2(params.lambdaL2)
- .setNumLeaves(params.numLeaves)
- .setMinDataInLeaf(params.minDataInLeaf)
- .setMinSumHessianInLeaf(params.minSumHessianInLeaf)
- .setBaggingFraction(params.baggingFraction)
- .setBaggingFreq(params.baggingFreq)
- regressor
- }
- }
- if(params.isRaw.equals("no")) {
- lgbm.setAuxParams("num_threads", params.numThreads.toString)
- lgbm.setAuxParams("network_compression", params.networkCompression.toString)
- lgbm.setAuxParams("logloss_apx", params.loglossApx.toString)
- lgbm.setAuxParams("logloss_apx_eps", params.loglossApxEps.toString)
- lgbm.setAuxParams("loading_balance", params.loadingBalance)
- }
- val model = lgbm.fit(trainData)
- val t3 = System.currentTimeMillis()
- println("* after train: " + t3)
-
- val testData = spark.read.format("libsvm").option("vectorType", "sparse")
- .load(params.testDataPath)
- .persist(StorageLevel.MEMORY_AND_DISK_SER)
- println(s"Test data read successful. Number of partitions - ${testData.rdd.getNumPartitions}")
- val predictions = model.transform(testData)
- val (res, t4) = params.algorithmType match {
- case "classification" =>{
- val metrics = new ComputeModelStatistics()
- .setLabelCol("label")
- .setScoresCol("probability")
- .setScoredLabelsCol("prediction")
- .setEvaluationMetric(MetricConstants.AccuracySparkMetric)
- .transform(predictions)
- val ecc = metrics.collect().apply(0).apply(1).asInstanceOf[Double]
- val t4 = System.currentTimeMillis()
- (ecc, t4)
- }
- case "regression" =>{
- // compute model metrics
- val metrics = new ComputeModelStatistics()
- .setEvaluationMetric("regression")
- .setLabelCol("label")
- .setScoresCol("prediction")
- .transform(predictions)
- // print metrics
- val mse = metrics.collect().apply(0).apply(0).asInstanceOf[Double]
- val t4 = System.currentTimeMillis()
- (mse, t4)
- }
- }
- println("Model predictions:")
- predictions.select("prediction", "label", "features").show(5)
- val trainingProcess = (t3 - t1).toDouble / 1000
- val trainingStep = (t3 - t2).toDouble / 1000
- val dataProcess = (t2 - t1).toDouble / 1000
- val predict = (t4 - t3).toDouble / 1000
- println("[s]train total: " + trainingProcess)
- println("[s]data preprocess: " + dataProcess)
- println("[s]train: " + trainingStep)
- println("[s]predict: " + predict)
-
- Utils.saveEvaluation(res, params.saveDataPath, sc)
- (res, trainingProcess)
- }
-}
\ No newline at end of file
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala
deleted file mode 100644
index 7f0b66db97fe9cd3f6a78d4a53a471f942f24589..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.MatrixVerify
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.ml.feature.SPCA
-import org.apache.spark.ml.feature.PCA
-import org.apache.spark.ml.linalg.Vectors
-import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector}
-import org.apache.spark.storage.StorageLevel
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.ml.param.{ParamMap, ParamPair}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.io.{File, FileWriter}
-import java.util
-import scala.beans.BeanProperty
-
-class SPCAConfig extends Serializable {
-
- @BeanProperty var spca: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class SPCAParams extends Serializable {
-
- @BeanProperty var pt: Int = _
- @BeanProperty var k: Int = _
- @BeanProperty var sep: String = _
- @BeanProperty var numCols: Int = _
- @BeanProperty var pcPath: String = _
- @BeanProperty var sigmaPath: String = _
-
- @BeanProperty var dataPath: String = _
- @BeanProperty var apiName: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var isRaw: String = _
- @BeanProperty var cpuName: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var loadDataTime: Double = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-
-object SPCARunner {
- def main(args: Array[String]): Unit = {
-
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, apiName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3))
- val dataPath = args(1)
- val cpuName = args(2)
- val sparkConfSplit = args(3).split("_")
- val (master, deployMode, numExec, execCores, execMem) =
- (sparkConfSplit(0), sparkConfSplit(1), sparkConfSplit(2), sparkConfSplit(3), sparkConfSplit(4))
- val saveResultPath = args(4)
-
- val stream = Utils.getStream("conf/ml/spca/spca.yml")
- val representer = new Representer
- representer.addClassTag(classOf[SPCAParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[SPCAConfig]), representer, options)
- val description = new TypeDescription(classOf[SPCAParams])
- yaml.addTypeDescription(description)
- val configs: SPCAConfig = yaml.load(stream).asInstanceOf[SPCAConfig]
- val params = new SPCAParams()
- val paramsMap: util.HashMap[String, Object] = configs.spca.get(isRaw match {
- case "no" => "opt"
- case "yes" => "raw"
- }).get(datasetName)
- params.setPt(paramsMap.getOrDefault("pt", "250").asInstanceOf[Int])
- params.setK(paramsMap.getOrDefault("k", "10").asInstanceOf[Int])
- params.setSep(paramsMap.getOrDefault("sep", " ").asInstanceOf[String])
- params.setNumCols(paramsMap.getOrDefault("numCols", "0").asInstanceOf[Int])
- params.setPcPath(paramsMap.getOrDefault("pcPath", null.asInstanceOf[String]).asInstanceOf[String])
- params.setSigmaPath(paramsMap.getOrDefault("sigmaPath", null.asInstanceOf[String]).asInstanceOf[String])
- params.setDataPath(dataPath)
- params.setDatasetName(datasetName)
- params.setApiName(apiName)
- params.setCpuName(cpuName)
- params.setIsRaw(isRaw)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("SPCA")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}_${apiName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${datasetName}_${apiName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${datasetName}_${apiName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
- val conf = new SparkConf().setAppName(appName).setMaster(master)
- val commonParas = Array (
- ("spark.submit.deployMode", deployMode),
- ("spark.executor.instances", numExec),
- ("spark.executor.cores", execCores),
- ("spark.executor.memory", execMem)
- )
- conf.setAll(commonParas)
- val spark = SparkSession.builder.config(conf).getOrCreate()
-
- val costTime = new SPCAKernel().runJob(spark, params)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- params.setIsCorrect(MatrixVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark))
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
-
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${params.getCostTime}s;isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-class SPCAKernel {
-
- def runJob(spark: SparkSession, params: SPCAParams): Double = {
-
- import spark.implicits._
- val sc = spark.sparkContext
- val fs = FileSystem.get(sc.hadoopConfiguration)
- val startTime = System.currentTimeMillis()
- val trainingData = if (params.isRaw == "yes"){
- val numColsBC = sc.broadcast(params.numCols)
- val sepBC = sc.broadcast(params.sep)
- val data = spark.createDataFrame(sc.textFile(params.dataPath, params.pt)
- .map(line => {
- val entry = line.split(sepBC.value)
- (entry(0).toInt, (entry(1).toInt, entry(2).toDouble))
- }).groupByKey()
- .map { case (_, vectorEntries) => Vectors.sparse(numColsBC.value, vectorEntries.toSeq) }
- .repartition(params.pt)
- .map(Tuple1.apply))
- .toDF("matrix").persist(StorageLevel.MEMORY_ONLY)
- data
- } else {
- val data = spark.createDataFrame(sc.textFile(params.dataPath, params.pt)
- .map(line => {
- val entry = line.split(params.sep)
- (entry(0).toInt, (entry(1).toInt, entry(2).toDouble))
- }).groupByKey()
- .map{case (_, vectorEntries) => Vectors.sparse(params.numCols, vectorEntries.toSeq)}
- .repartition(params.pt)
- .map(Tuple1.apply))
- .toDF("matrix")
- .persist(StorageLevel.MEMORY_ONLY)
- data
- }
-
- val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0
- params.setLoadDataTime(loadDataTime)
-
- val spca = params.isRaw match {
- case "yes" => new PCA().setK(params.k).setInputCol("matrix")
- case "no" => new SPCA().setK(params.k).setInputCol("matrix")
- }
-
- val paramMap = ParamMap(spca.k -> params.k)
- .put(spca.inputCol, "matrix")
- val paramMaps: Array[ParamMap] = new Array[ParamMap](2)
- for (i <- 0 to paramMaps.size - 1) {
- paramMaps(i) = ParamMap(spca.k -> params.k)
- .put(spca.inputCol, "matrix")
- }
- val kPair = ParamPair(spca.k, params.k)
- val inputColPair = ParamPair(spca.inputCol, "matrix")
- val model = params.apiName match {
- case "fit" => spca.fit(trainingData)
- case "fit1" => spca.fit(trainingData, paramMap)
- case "fit2" =>
- val models = spca.fit(trainingData, paramMaps)
- models(0)
- case "fit3" => spca.fit(trainingData, kPair, inputColPair)
- }
- val costTime = (System.currentTimeMillis() - startTime) / 1000.0
- params.setLoadDataTime(costTime)
-
- val spcaMat = new DenseMatrix(model.pc.numRows, model.pc.numCols, model.pc.values, model.pc.isTransposed)
- MatrixVerify.saveMatrix(spcaMat, params.saveDataPath, sc)
- costTime
- }
-}
diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala
deleted file mode 100644
index cd7c70f41b513018097f223fb074040b497c93ba..0000000000000000000000000000000000000000
--- a/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-package com.bigdata.ml
-
-import com.bigdata.utils.Utils
-import com.bigdata.compare.ml.SimRankVerify
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.ml.recommendation.ALS.Rating
-import org.apache.spark.ml.recommendation.{SimRank, SimRankOpenSource}
-import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml}
-import org.yaml.snakeyaml.constructor.Constructor
-import org.yaml.snakeyaml.nodes.Tag
-import org.yaml.snakeyaml.representer.Representer
-
-import java.util
-import java.io.{File, FileWriter}
-import java.util.HashMap
-import scala.beans.BeanProperty
-
-class SimRankConfig extends Serializable {
-
- @BeanProperty var simrank: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _
-}
-
-class SimRankParams extends Serializable {
-
- @BeanProperty var numPartitions: Int = _
- @BeanProperty var damp: Double = _
- @BeanProperty var maxIter: Int = _
-
- @BeanProperty var isRaw: String = _
- @BeanProperty var cpuName: String = _
- @BeanProperty var dataPath: String = _
- @BeanProperty var datasetName: String = _
- @BeanProperty var costTime: Double = _
- @BeanProperty var loadDataTime: Double = _
- @BeanProperty var algorithmName: String = _
- @BeanProperty var testcaseType: String = _
- @BeanProperty var saveDataPath: String = _
- @BeanProperty var verifiedDataPath: String = _
- @BeanProperty var ifCheck: String = _
- @BeanProperty var isCorrect: String = _
-}
-object SimRankRunner {
- def main(args: Array[String]): Unit = {
-
- try {
- val modelConfSplit = args(0).split("-")
- val (datasetName, isRaw, ifCheck) =
- (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2))
- val dataPath = args(1)
- val cpuName = args(2)
- val saveResultPath = args(3)
- val datasetCpuName = s"${datasetName}-${cpuName}"
-
- val stream = Utils.getStream("conf/ml/simrank/simrank.yml")
- val representer = new Representer
- representer.addClassTag(classOf[SimRankParams], Tag.MAP)
- val options = new DumperOptions
- options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
- val yaml = new Yaml(new Constructor(classOf[SimRankConfig]), representer, options)
- val description = new TypeDescription(classOf[SimRankParams])
- yaml.addTypeDescription(description)
- val config: SimRankConfig = yaml.load(stream).asInstanceOf[SimRankConfig]
- val params = new SimRankParams()
- val paramsMap = config.simrank.get(isRaw match {
- case "no" => "opt"
- case "yes" => "raw"
- }).get(datasetCpuName)
- params.setNumPartitions(paramsMap.get("numPartitions").asInstanceOf[Int])
- params.setMaxIter(paramsMap.get("maxIter").asInstanceOf[Int])
- params.setDamp(paramsMap.get("damp").asInstanceOf[Double])
- params.setCpuName(cpuName)
- params.setIsRaw(isRaw)
- params.setDataPath(dataPath)
- params.setDatasetName(datasetName)
- params.setIfCheck(ifCheck)
- params.setAlgorithmName("SimRank")
- params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}")
- params.setVerifiedDataPath(s"${params.saveDataPath}_raw")
- var appName = s"${params.algorithmName}_${datasetName}"
- if (isRaw.equals("yes")){
- appName = s"${params.algorithmName}_${datasetName}_raw"
- params.setVerifiedDataPath(params.saveDataPath)
- params.setSaveDataPath(s"${params.saveDataPath}_raw")
- }
- params.setTestcaseType(appName)
-
- val conf = new SparkConf().setAppName(appName)
- val spark = SparkSession.builder.config(conf).getOrCreate()
- val costTime = new SimRankKernel().runJob(spark, params)
- params.setCostTime(costTime)
-
- Utils.checkDirs("report")
- if(ifCheck.equals("yes")){
- params.setIsCorrect(SimRankVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark))
- val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true)
- writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n")
- writerIsCorrect.close()
- }
- val writer = new FileWriter(s"report/${params.testcaseType}_${
- Utils.getDateStrFromUTC("yyyyMMdd_HHmmss",
- System.currentTimeMillis())
- }.yml")
- yaml.dump(params, writer)
- println(s"Exec Successful: costTime: ${costTime}s;isCorrect: ${params.isCorrect}")
- } catch {
- case e: Throwable =>
- println(s"Exec Failure: ${e.getMessage}")
- throw e
- }
- }
-}
-
-
-class SimRankKernel {
-
- def runJob(spark: SparkSession, params: SimRankParams): Double = {
- val sc = spark.sparkContext
- val startTime = System.currentTimeMillis()
- var costTime: Double = 0
-
- import spark.implicits._
- val userCol = "user"
- val itemCol = "item"
- val df = spark.sparkContext.objectFile[Rating[Int]](params.getDataPath).repartition(params.getNumPartitions)
- .map(row => {
- ("user-" + row.user.toString, "item-" + row.item.toString)
- }).toDF(userCol, itemCol)
-
- val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0
- params.setLoadDataTime(loadDataTime)
- if (params.getIsRaw.equals("no")) {
- val simrank = new SimRank()
- .setDamp(params.getDamp)
- .setNumIter(params.getMaxIter)
- .setUserCol(userCol)
- .setItemCol(itemCol)
- val simrankRes = simrank.computeSimilarity(df)
- simrankRes.itemSimilarity.foreach(_ => {})
- simrankRes.userSimilarity.foreach(_ => {})
- costTime = (System.currentTimeMillis() - startTime) / 1000.0
- SimRankVerify.saveRes(simrankRes.userSimilarity, simrankRes.itemSimilarity, params.saveDataPath, sc)
-
- } else {
- val simrankRes = new SimRankOpenSource().execute(df, (userCol, itemCol), params.getDamp, params.getMaxIter)
- simrankRes._1.foreach(_ => {})
- simrankRes._2.foreach(_ => {})
- costTime = (System.currentTimeMillis() - startTime) / 1000.0
- SimRankVerify.saveRes(simrankRes._1, simrankRes._2, params.saveDataPath, sc)
- }
- costTime
- }
-}