diff --git a/tools/kal-test/README.md b/tools/kal-test/README.md index 83ff55b07c64a6dedaa9a36bd7c392a6db7462cd..3e0579c7085dd4323411891016f1b40f03a839f0 100644 --- a/tools/kal-test/README.md +++ b/tools/kal-test/README.md @@ -13,9 +13,9 @@ The Kunpeng algorithm library test tool can be used to test machine learning and #### Procedure 1. Go to the Spark-ml-algo-lib/tools/kal-test directory in the compilation environment. 2. Install the dependencies.
- Take spark 2.3.2 as an example, the install command is as follows:
- mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-graph-kernel-client_2.11 -Dversion=2.2.0 -Dclassifier=spark2.3.2 -Dfile=boostkit-graph-kernel-client_2.11-2.2.0-spark2.3.2.jar -Dpackaging=jar -DgeneratePom=true
- mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-ml-kernel-client_2.11 -Dversion=2.2.0 -Dclassifier=spark2.3.2 -Dfile=boostkit-ml-kernel-client_2.11-2.2.0-spark2.3.2.jar -Dpackaging=jar -DgeneratePom=true + Take spark 3.3.1 as an example, the install command is as follows:
+ mvn install:install-file -DgroupId=org.apache.spark.graphx.lib -DartifactId=boostkit-graph-kernel-client_2.12 -Dversion=3.0.0 -Dclassifier=spark3.3.1 -Dfile=boostkit-graph-kernel-client_2.12-3.0.0-spark3.3.1.jar -Dpackaging=jar -DgeneratePom=true
+ mvn install:install-file -DgroupId=org.apache.spark -DartifactId=boostkit-ml-kernel-client_2.12 -Dversion=3.0.0 -Dclassifier=spark3.3.1 -Dfile=boostkit-ml-kernel-client_2.12-3.0.0-spark3.3.1.jar -Dpackaging=jar -DgeneratePom=true 3. Run the compile command:
mvn clean install 4. View the kal-test_2.11-0.1.jar file generated in Spark-ml-algo-lib/tools/kal-test/target. diff --git a/tools/kal-test/pom.xml b/tools/kal-test/pom.xml index f0d483c1edb82b5af0eeac113a541017482cb27f..f35a20ee9fd56cc11e6c1c439a95031a7b6018eb 100644 --- a/tools/kal-test/pom.xml +++ b/tools/kal-test/pom.xml @@ -14,7 +14,7 @@ UTF-8 2.12 3.0.0 - 3.1.1 + 3.3.1 @@ -30,9 +30,19 @@ - com.microsoft.ml.spark - mmlspark_2.12_spark3.1.2 - 0.0.0+79-09152193 + com.fasterxml.jackson.core + jackson-core + 2.12.3 + + + com.fasterxml.jackson.core + jackson-annotations + 2.12.3 + + + com.fasterxml.jackson.core + jackson-databind + 2.12.3 ai.h2o diff --git a/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala b/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala deleted file mode 100644 index e90896c0cf8c79e7e1d75278ef4ffd881192bce3..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/compare/graph/DeepWalkVerify.scala +++ /dev/null @@ -1,153 +0,0 @@ -// scalastyle:off - -package com.bigdata.compare.graph - -import java.io.InputStreamReader - -import scala.collection.Map -import scala.util.Try -import com.bigdata.utils.Utils -import com.bigdata.graph.{DeepWalkConfig, DeepWalkParams} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.apache.spark.graphx.VertexId -import org.apache.spark.ml.linalg.{DenseVector, Vector} -import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkConf, SparkContext} -import org.yaml.snakeyaml.representer.Representer -import smile.math.MathEx.cos -import smile.validation.AUC - -object DeepWalkVerify { - - def readEdgeListFromHDFS( - sc: SparkContext, - filePath: String, - split: String, - partition: Int): RDD[(VertexId, VertexId, Double)] = { - sc.textFile(filePath, partition) - .flatMap(line => { - if (line.startsWith("#")) { - Iterator.empty - } - else { - val x = line.trim.split(split) - if (x.length < 2) { - Iterator.empty - } - else { - var w = x(2).toDouble - Iterator.single((x(0).toLong, x(1).toLong, w)) - } - } - }) - } - - def readUndirectEdgeFromHDFS( - sc: SparkContext, - filePath: String, - split: String, - partition: Int): RDD[(Long, Long)] = { - sc.textFile(filePath, partition) - .flatMap(line => { - if (line.startsWith("#")) { - Iterator.empty - } else { - val x = line.split(split) - if (x.length < 2) { - Iterator.empty - } else { - val node1 = x(0).toLong - val node2 = x(1).toLong - Iterator.single((node1, node2)) - } - } - }) - } - - def readNode2VecModel(sc: SparkContext, input: String): RDD[(Long, Vector)] = { - val rdd: RDD[(Long, Vector)] = sc - .textFile(input) - .mapPartitions(it => { - val regexp = "([0-9]+) \\((.*)\\)".r - it.map { case regexp(u, emb) => (u.toLong, new DenseVector(emb.split(",") - .map(_.toDouble)): Vector) - } - }).cache() - rdd - } - - def get(modelRDD: RDD[(Long, Vector)]): Map[Long, Vector] = { - modelRDD.collectAsMap() - } - - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName) = (modelConfSplit(0), modelConfSplit(1)) - val graphPath = args(1) - val negEdgePath = args(2) - val embeddingPath = args(3) - val isRaw = args(4) - - val representer = new Representer - representer.addClassTag(classOf[DeepWalkParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/deepwalk/deepwalk.yml") - val yaml = new Yaml(new Constructor(classOf[DeepWalkConfig]), representer, options) - val description = new TypeDescription(classOf[DeepWalkParams]) - yaml.addTypeDescription(description) - val config: DeepWalkConfig = yaml.load(stream).asInstanceOf[DeepWalkConfig] - val paramsMap = - config.deepwalk.get(datasetName).get(isRaw match { - case "no" => "opt" - case _ => "raw" - }) - - val params = new DeepWalkParams() - - params.setDatasetName(datasetName) - params.setPartitions(paramsMap.get("partitions").toString.toInt) - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setAlgorithmName("DeepWalk") - params.setTestcaseType(s"DeepWalk_${datasetName}") - - val sparkConf = new SparkConf().setAppName("DeepwalkVerify") - val sc = SparkContext.getOrCreate(sparkConf) - - val edgesRDD = readEdgeListFromHDFS(sc, graphPath, params.getSplitGraph, params.getPartitions) - val negativeEdgesRDD = readUndirectEdgeFromHDFS(sc, negEdgePath, ",", params.getPartitions) - - val nvModel: collection.Map[Long, Vector] = get(readNode2VecModel(sc, embeddingPath)) - - val nvModelBC = sc.broadcast(nvModel) - edgesRDD.foreachPartition(_ => nvModelBC.value) - - val positiveEdgesScores: Array[Double] = edgesRDD - .flatMap({ case (src, dst, weight) => - Try(Iterator.single(cos(nvModelBC.value(src).toArray, nvModelBC.value(dst).toArray))) - .getOrElse(Iterator.empty) - }) - .filter(score => !score.isInfinity && !score.isNaN) - .collect() - - val negativeEdgesScores: Array[Double] = negativeEdgesRDD - .flatMap({ case (src, dst) => - Try(Iterator.single(cos(nvModelBC.value(src).toArray, nvModelBC.value(dst).toArray))) - .getOrElse(Iterator.empty) - }) - .filter(score => !score.isInfinity && !score.isNaN) - .collect() - - val truths = Array.fill(positiveEdgesScores.length)(1) ++ Array.fill(negativeEdgesScores.length)(0) - val auc = AUC.of(truths, (positiveEdgesScores ++ negativeEdgesScores)) - println(s"Link Prediction AUC Score = $auc") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} \ No newline at end of file diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala deleted file mode 100644 index ad82c115b3d0e24f2ef9dfbbb963a6934c8bc575..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/DeepWalkRunner.scala +++ /dev/null @@ -1,111 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph - -import com.bigdata.utils.Utils -import org.apache.spark.graphx.lib.{DeepWalk, Parameters} -import org.apache.spark.{SparkConf, SparkContext} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import scala.beans.BeanProperty -import java.io.{File, FileWriter, InputStreamReader} -import java.util - -class DeepWalkConfig extends Serializable { - @BeanProperty var deepwalk: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class DeepWalkParams extends Serializable { - @BeanProperty var inputPath: String = _ - @BeanProperty var outputPath: String = _ - @BeanProperty var splitGraph: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var partitions: Int = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var walkLength: Int = _ - @BeanProperty var numWalks: Int = _ - @BeanProperty var iteration: Int = _ - @BeanProperty var dimension: Int = _ - @BeanProperty var windowSize: Int = _ - @BeanProperty var negativeSample: Int = _ -} - -object DeepWalkRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName) = (modelConfSplit(0), modelConfSplit(1)) - val inputPath = args(1) - val outputPath = args(2) - val isRaw = args(3) - - val representer = new Representer - representer.addClassTag(classOf[DeepWalkParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/deepwalk/deepwalk.yml") - val yaml = new Yaml(new Constructor(classOf[DeepWalkConfig]), representer, options) - val description = new TypeDescription(classOf[DeepWalkParams]) - yaml.addTypeDescription(description) - val config: DeepWalkConfig = yaml.load(stream).asInstanceOf[DeepWalkConfig] - - val paramsMap = - config.deepwalk.get(datasetName).get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).asInstanceOf[util.HashMap[String, Object]] - - val params = new DeepWalkParams() - - params.setDatasetName(datasetName) - params.setInputPath(inputPath) - params.setOutputPath(outputPath) - params.setPartitions(paramsMap.get("partitions").toString.toInt) - params.setWalkLength(paramsMap.get("walkLength").toString.toInt) - params.setNumWalks(paramsMap.get("numWalks").toString.toInt) - params.setNegativeSample(paramsMap.get("negativeSample").toString.toInt) - params.setIteration(paramsMap.get("iteration").toString.toInt) - params.setDimension(paramsMap.get("dimension").toString.toInt) - params.setWindowSize(paramsMap.get("windowSize").toString.toInt) - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setAlgorithmName("DeepWalk") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val sc = new SparkContext(conf) - - val startTime = System.currentTimeMillis() - - val edgeRDD = Util.readCommFromHDFS(sc, inputPath, params.getSplitGraph, params.getPartitions) - - val deepwalkParams = Parameters(params.getWalkLength, params.getNumWalks, params.getIteration, params.getDimension, params.getWindowSize, params.getNegativeSample) - - val deepwalkModel = DeepWalk.run(edgeRDD,deepwalkParams) - Util.saveNode2VecModel(deepwalkModel, params.getOutputPath) - - val costTime = (System.currentTimeMillis() - startTime) / 1000.0 - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/deepWalk_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala deleted file mode 100644 index d430981a3048a6f6924b10e4955205dcacae3435..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/FraudarRunner.scala +++ /dev/null @@ -1,326 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph -import com.bigdata.utils.Utils - -import org.apache.spark.graphx.lib.{Fraudar, Parameters} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.storage.StorageLevel -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.io.{BufferedWriter, File, FileWriter, InputStreamReader} -import java.util -import scala.beans.BeanProperty -import scala.collection.mutable -import scala.collection.mutable.Map - -class FraudarConfig extends Serializable { - @BeanProperty var fraudar: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class FraudarParams extends Serializable { - @BeanProperty var splitGraph: String = _ - @BeanProperty var partitions: Int = _ - - @BeanProperty var iSetOutPath: String = _ - @BeanProperty var jSetOutPath: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ -} - - -object FraudarRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val iSetOutPath = args(2) - val jSetOutPath = args(3) - val representer = new Representer - representer.addClassTag(classOf[FraudarParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/fraudar/fraudar.yml") - val yaml = new Yaml(new Constructor(classOf[FraudarConfig]), representer, options) - val description = new TypeDescription(classOf[FraudarParams]) - yaml.addTypeDescription(description) - val config: FraudarConfig = yaml.load(stream).asInstanceOf[FraudarConfig] - - val params = new FraudarParams() - val paramsMap = - config.fraudar.get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).get(datasetName).asInstanceOf[util.HashMap[String, Object]] - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setPartitions(paramsMap.get("partitions").toString.toInt) - params.setDatasetName(datasetName) - params.setDataPath(dataPath) - params.setJSetOutPath(jSetOutPath) - params.setISetOutPath(iSetOutPath) - params.setIsRaw(isRaw) - params.setAlgorithmName("Fraudar") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val spark = SparkSession.builder.config(conf).getOrCreate() - val costTime = isRaw match { - case "no" => new FraudarKernel().runOptJob(spark, params) - case "yes" => new FraudarKernel().runRawJob(spark, params) - } - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}s") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class FraudarKernel { - def runOptJob(spark: SparkSession, params: FraudarParams): Double = { - val sc = spark.sparkContext - sc.setLogLevel("WARN") - val startTime = System.currentTimeMillis() - val bipartGraph = Util.readUndirectDataFromHDFS(sc, params.dataPath, params.splitGraph, params.partitions) - .map(f => (f._1.toLong, f._2.toLong)) - .persist(StorageLevel.MEMORY_ONLY_SER) - bipartGraph.foreachPartition(f => {}) - - val res = Fraudar.runFraudar(bipartGraph) - res.map(f => f._1).distinct().saveAsTextFile(params.iSetOutPath) - res.map(f => f._2).distinct().saveAsTextFile(params.jSetOutPath) - - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - costTime - } - - def runRawJob(spark: SparkSession, params: FraudarParams): Double = { - val sc = spark.sparkContext - sc.setLogLevel("WARN") - val startTime = System.currentTimeMillis() - val bipartGraph = Util.readUndirectDataFromHDFS(sc, params.dataPath, params.splitGraph, params.partitions) - .map(f => (f._1.toLong, f._2.toLong)) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - bipartGraph.foreachPartition(f => {}) - - val res = runFraudar(sc, bipartGraph) - outputResult(params.iSetOutPath, params.jSetOutPath, res.toSet) - - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - costTime - } - - /** - * 开源 - * https://github.com/XinyaZhao/Social-Network-Fraud-Detection/blob/master/Analytics%20Code/Code_to_Analyze/frauder_v4.scala - */ - /* get degree from a key-values pair */ - def getDegree(s: Tuple2[Long, Iterable[(Long, Long)]]): List[String] = { - var myList = List[String]() - for (e <- s._2) { - val crt = "i" + e._2 + " j" + s._1 + " " + s._2.size //i dst点 j src点 " " scr的出度大小 - myList = crt :: myList - } - myList.reverse - } - - /* cost function for column-weighting */ - def getCost(s: String): Double = { - val degree = s.toDouble - 1 / (math.log(degree + 5)) - } - - def flat_degree(s: String): List[(String, Double)] = { - var l = List[(String, Double)]() - val i = s.split(" ")(0) - val j = s.split(" ")(1) - val cij = s.split(" ")(2) - l = (i, getCost(cij)) :: l - l = (j, getCost(cij)) :: l - l.reverse - } - - /* get a scala mutable map from a RDD which stores the "i, j, degree of ij" */ - def getCostMap(rdd: RDD[String]): RDD[(String, Double)] = { - rdd.flatMap(flat_degree).reduceByKey((sum, n) => (sum + n)) - } - - /* Calcuate the f value of the whole set */ - def getSetValue(c: RDD[(String, Double)]): Double = { - if (c.count != 0) { - val v = c.reduce((a, b) => (" ", (a._2 + b._2)))._2 - v / 2 - } - else { - 0.00 - } - } - - /* get the vertex with minimum cost */ - def getDeleted(c: RDD[(String, Double)]): String = { - if (c.count != 0) { - val deleted = c.min()(new Ordering[Tuple2[String, Double]]() { - override def compare(x: (String, Double), y: (String, Double)): Int = - Ordering[Double].compare(x._2, y._2) - })._1 - //println("deleted:------------------------- " + deleted) - deleted - } - else { - " " - } - } - - /* update each line with a deleted vertex */ - def update(sc:SparkContext, degree: RDD[String], d: String): RDD[String] = { - var new_array = degree.collect; - var tmp = new_array.to[mutable.ArrayBuffer] - if (d.contains("j")) { - new_array = new_array.filterNot(s => s.split(" ")(1) == d) - tmp = new_array.to[mutable.ArrayBuffer] - } - if (d.contains("i")) { - var update_j = List[String]() - for (s <- new_array) { - if (s.split(" ")(0) == d) { - update_j = s.split(" ")(1) :: update_j - tmp -= s - } - } - - val tmp_buffer = tmp.toArray - // need a tmp buffert to deletee tmp - for (j <- update_j) { - for (s <- tmp_buffer) { - if (s.split(" ")(1) == j) { - tmp -= s - val iszero = s.split(" ")(2).toInt - 1 - if (iszero != 0) { - val new_line = s.split(" ")(0) + " " + s.split(" ")(1) + " " + (s.split(" ")(2).toInt - 1).toString - tmp -= s - tmp += new_line - } - } - } - } - } - sc.parallelize(tmp) - } - - /* get graph from cost array*/ - def getGraph(degree: RDD[String]): List[String] = { - var g = List[String]() - for (c <- degree.collect) { - if (!g.contains(c.split(" ")(0))) { - g = c.split(" ")(0) :: g - } - if (!g.contains(c.split(" ")(1))) { - g = c.split(" ")(1) :: g - } - } - g - } - - /* iterative delete a vertex */ - def greedyDecreasing(sc:SparkContext, degree: RDD[String]): List[String] = { - val cost = getCostMap(degree) - val value = getSetValue(cost) / cost.count - var valueMap = Map[Int, Double]() - valueMap += (0 -> value) - var graph = List[List[String]]() - graph = getGraph(degree) :: graph - var new_degree = degree - var c = cost - var a = 0 - while (c.count != 0) { // not cost size, need to be the number of vertex - val iter1 = System.currentTimeMillis() - println("c.count : " + c.count) - a = a + 1 - val d = getDeleted(c) - new_degree = update(sc, new_degree, d) - //newDegree = update(deleted) //update the degree of remaining i and j based on the deteled vertex - c = getCostMap(new_degree) - graph = getGraph(new_degree) :: graph - val value = getSetValue(c) / c.count // the set vaule should be divided by the |C| - //println("value : " + value) - //new_degree.foreach(println) - //println(getGraph(c)) - valueMap += (a -> value) - - val iter2 = System.currentTimeMillis() - println(s"iterNum:${a}, updatetime: "+ ((iter2 - iter1) / 1000.0) + " sec") - } - var max_index = -1 - var max_Value = -1.000 - for (s <- valueMap) { - if (s._2 > max_Value) { - max_index = s._1 - max_Value = s._2 - } - } - //println("maxvalue" + " " + max_Value + " index:" + max_index) - //graph.reverse.foreach(println) - val objectGraph = graph.reverse(max_index) - - //objectGraph.foreach(f=>println(f)) - objectGraph - } - - /* get the most density graph*/ - def getFinalSet(cst: Map[String, Double], dgr: List[String]): Set[String] = { - var set = Set[String]() - for (e <- cst) { - set += (e._1) - } - set -- dgr.toSet - } - - def outputResult(iset_out:String, jset_out:String, set: Set[String]): Unit = { - val ibf = new BufferedWriter(new FileWriter(iset_out)); - val jbf = new BufferedWriter(new FileWriter(jset_out)); - val sorted_list = set.toList.sortWith(_.substring(1).toLong < _.substring(1).toLong) - for (s <- sorted_list) { - if (s.contains("i")) { - ibf.write(s + "\n"); - } - else { - jbf.write(s + "\n"); - } - } - ibf.flush() - jbf.flush() - ibf.close() - jbf.close() - } - - def runFraudar(sc:SparkContext, bipartGraph: RDD[(Long, Long)]):List[String] = { - val pairs = bipartGraph.map(x => (x._2, x._1)) - val group = pairs.groupBy(x => x._1) - val degree = group.flatMap(getDegree) - greedyDecreasing(sc, degree) - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala deleted file mode 100644 index b04798e961487a5794ae89e9aabb9cdda1c5fb08..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/IncConnectedComponentsRunner.scala +++ /dev/null @@ -1,108 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph - -import java.io.{File, FileWriter, InputStreamReader} -import java.util -import com.bigdata.utils.Utils -import org.apache.spark.graphx.lib.{IncConnectedComponents, Parameters} -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.storage.StorageLevel -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import scala.beans.BeanProperty - -class IncCCConfig extends Serializable { - @BeanProperty var inccc: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class IncCCParams extends Serializable { - @BeanProperty var splitGraph: String = _ - @BeanProperty var partitions: Int = _ - - @BeanProperty var orgCCPath: String = _ - @BeanProperty var outputPath: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ -} - - -object IncConnectedComponentsRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val outputPath = args(2) - val orgCCPath = args(3) - val incGraphPath = args(4) - val representer = new Representer - representer.addClassTag(classOf[IncCCParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/inccc/inccc.yml") - val yaml = new Yaml(new Constructor(classOf[IncCCConfig]), representer, options) - val description = new TypeDescription(classOf[IncCCParams]) - yaml.addTypeDescription(description) - val config: IncCCConfig = yaml.load(stream).asInstanceOf[IncCCConfig] - - val params = new IncCCParams() - val paramsMap = - config.inccc.get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).get(datasetName).asInstanceOf[util.HashMap[String, Object]] - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setPartitions(paramsMap.get("partitions").toString.toInt) - params.setOrgCCPath(orgCCPath) - params.setDatasetName(datasetName) - params.setDataPath(dataPath) - params.setOutputPath(outputPath) - params.setIsRaw(isRaw) - params.setAlgorithmName("IncCC") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val spark = SparkSession.builder.config(conf).getOrCreate() - val sc = spark.sparkContext - - val startTime = System.currentTimeMillis() - val historyCC = Util.readUndirectDataFromHDFS(sc, orgCCPath, params.splitGraph, params.partitions) - .map(f => (f._1.toLong, f._2.toLong)) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - historyCC.foreachPartition(f => {}) - val incGraph = Util.readUndirectDataFromHDFS(sc, incGraphPath, params.splitGraph, params.partitions) - .map(f => (f._1.toLong, f._2.toLong)) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - incGraph.foreachPartition(f => {}) - val res = IncConnectedComponents.run(incGraph, historyCC) - res.map(f => f._1 + "," + f._2).saveAsTextFile(outputPath) - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala deleted file mode 100644 index f444cdd1f5a225cf46034d5590bb59bc86c0e7e6..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/KatzCentrality.scala +++ /dev/null @@ -1,107 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph -import com.bigdata.utils.Utils - -import org.apache.spark.graphx.lib.{KatzCentrality, Parameters} -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.io.{File, FileWriter, InputStreamReader} -import java.util -import scala.beans.BeanProperty - -class KatzCentralityConfig extends Serializable { - @BeanProperty var katz: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class KatzCentralityParams extends Serializable { - @BeanProperty var splitGraph: String = _ - @BeanProperty var partitions: Int = _ - @BeanProperty var isWeight: Boolean = _ - @BeanProperty var tol: Double = _ - @BeanProperty var maxIter: Int = _ - @BeanProperty var normalized: Boolean = _ - - @BeanProperty var outputPath: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ -} - - -object KatzCentralityRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val outputPath = args(2) - val representer = new Representer - representer.addClassTag(classOf[KatzCentralityParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/katz/katz.yml") - val yaml = new Yaml(new Constructor(classOf[KatzCentralityConfig]), representer, options) - val description = new TypeDescription(classOf[KatzCentralityParams]) - yaml.addTypeDescription(description) - val config: KatzCentralityConfig = yaml.load(stream).asInstanceOf[KatzCentralityConfig] - - val params = new KatzCentralityParams() - val paramsMap = - config.katz.get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).get(datasetName).asInstanceOf[util.HashMap[String, Object]] - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setPartitions(paramsMap.get("partitions").toString.toInt) - params.setIsWeight(paramsMap.get("isWeight").toString.toBoolean) - params.setTol(paramsMap.get("tol").toString.toDouble) - params.setMaxIter(paramsMap.get("maxIter").toString.toInt) - params.setNormalized(paramsMap.get("normalized").toString.toBoolean) - params.setDatasetName(datasetName) - params.setDataPath(dataPath) - params.setOutputPath(outputPath) - params.setIsRaw(isRaw) - params.setAlgorithmName("KatzCentrality") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val spark = SparkSession.builder.config(conf).getOrCreate() - val sc = spark.sparkContext - - val startTime = System.currentTimeMillis() - val edgeRDD = Util.readGraphFromHDFS(sc, params.dataPath, params.splitGraph, params.isWeight, params.partitions) - .map(x => (x._1, x._2, x._3)) - val tmp = edgeRDD.map(f => Edge(f._1, f._2, f._3)) - val g: Graph[Double, Double] = Graph.fromEdges(tmp, 1.0) - val result = KatzCentrality.run(g, params.maxIter, params.tol, params.normalized) - result.map(f => (f._1, f._2)).saveAsTextFile(params.outputPath) - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala deleted file mode 100644 index 49f7a0e4db98fd42488f3913883e3aa09bdb4a89..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/SLPARunner.scala +++ /dev/null @@ -1,141 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph -import com.bigdata.utils.Utils - -import org.apache.spark.graphx.lib.{SpearkListenerLabelPropagation, Parameters} -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.io.{File, FileWriter, InputStreamReader} -import java.util -import scala.beans.BeanProperty - -class SLPAConfig extends Serializable { - @BeanProperty var slpa: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class SLPAParams extends Serializable { - @BeanProperty var splitGraph: String = _ - @BeanProperty var partitions: Int = _ - @BeanProperty var isWeight: Boolean = _ - @BeanProperty var isDirected: Boolean = _ - @BeanProperty var iterNum: Int = _ - @BeanProperty var threshold: Double = _ - - @BeanProperty var outputPath: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ -} - - -object SLPARunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val outputPath = args(2) - val representer = new Representer - representer.addClassTag(classOf[SLPAParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/slpa/slpa.yml") - val yaml = new Yaml(new Constructor(classOf[SLPAConfig]), representer, options) - val description = new TypeDescription(classOf[SLPAParams]) - yaml.addTypeDescription(description) - val config: SLPAConfig = yaml.load(stream).asInstanceOf[SLPAConfig] - - val params = new SLPAParams() - val paramsMap = - config.slpa.get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).get(datasetName).asInstanceOf[util.HashMap[String, Object]] - params.setSplitGraph(paramsMap.get("splitGraph").asInstanceOf[String]) - params.setPartitions(paramsMap.get("partitions").asInstanceOf[Int]) - params.setIsWeight(paramsMap.get("isWeight").asInstanceOf[Boolean]) - params.setIsDirected(paramsMap.get("isDirected").asInstanceOf[Boolean]) - params.setIterNum(paramsMap.get("iterNum").asInstanceOf[Int]) - params.setThreshold(paramsMap.get("threshold").asInstanceOf[Double]) - params.setDatasetName(datasetName) - params.setDataPath(dataPath) - params.setOutputPath(outputPath) - params.setIsRaw(isRaw) - params.setAlgorithmName("SLPA") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val spark = SparkSession.builder.config(conf).getOrCreate() - val sc = spark.sparkContext - - val startTime = System.currentTimeMillis() - val edges = Util.readGraphFromHDFS(sc, params.dataPath, params.splitGraph, params.isWeight, params.partitions) - edges.foreachPartition{f => {}} - - val slpaGraph = SpearkListenerLabelPropagation.buildGraph(edges, params.isDirected) - val slpaComm = SpearkListenerLabelPropagation.run(slpaGraph, params.iterNum, params.threshold) - slpaComm.edges.foreachPartition{f => {}} - - val outPutComm = s"${outputPath}/res" - val outPutComp = s"${outputPath}/resForComparsion" - - val vertex2Comm = slpaComm.vertices.map(x => (x._1,"[" + x._2.mkString(",") + "]")) - vertex2Comm.foreachPartition{f => {}} - vertex2Comm.saveAsTextFile(outPutComm) - - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - - obtainCommunities(sc, outPutComm, outPutComp) - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}s") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } - - def obtainCommunities(sc: SparkContext, commPath: String, outPath: String): Unit = { - val labelIds = sc.textFile(commPath) - .flatMap(line => { - val x = line.split(",") - var tmp = x(0).replace("(","") - val id = tmp.toLong - tmp = x(1).replace(")","") - tmp = tmp.replace("[","") - tmp = tmp.replace("]","") - val labels = tmp.split(",") - val attr = new Array[(Long, Long)](labels.length) - var i = 0 - while (i < labels.length) { - attr(i) = (labels(i).toLong, id) - i = i + 1 - } - attr.toIterator - }) - labelIds.groupByKey() - .map(x => x._2.toArray.mkString(" ")) - .repartition(1) - .saveAsTextFile(outPath) - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala deleted file mode 100644 index b3d58589d6a733f5a93d27b4f74d4165d091cc3d..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/SubgraphMatchingRunner.scala +++ /dev/null @@ -1,112 +0,0 @@ -package com.bigdata.graph - -import java.io.FileWriter -import java.util - -import scala.beans.BeanProperty - -import com.bigdata.utils.Utils -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} - -import org.apache.spark.graphx.lib.SubgraphMatching -import org.apache.spark.{SparkConf, SparkContext} - -class SubgraphMatchingParams extends Serializable{ - @BeanProperty var inputPath: String = _ - @BeanProperty var outputPath: String = _ - @BeanProperty var splitDataGraph: util.HashMap[String, String] = new util.HashMap[String, String]() - @BeanProperty var splitQueryGraph: String = _ - @BeanProperty var taskNum: Int = _ - @BeanProperty var resultNum: Int = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var numPartitions: Int = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var isIdentical: String = _ - @BeanProperty var matchResult: Long = _ -} - - -object SubgraphMatchingRunner { - - def main(args: Array[String]): Unit = { - - try { - val datasetName = args(0) - val queryGraphName = args(1) - val isRaw = args(2) - val isIdentical = args(3) - val outputPath = args(4) - val inputPath = args(5) - val partitionNum = args(6).toInt - val taskNum = args(7).toInt - val queryGraphPath = args(8) - val testcaseType = s"SGM_${datasetName}_${queryGraphName}_${isIdentical}" - - val representer = new Representer - representer.addClassTag(classOf[SubgraphMatchingParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[SubgraphMatchingParams]), representer, options) - val description = new TypeDescription(classOf[SubgraphMatchingParams]) - yaml.addTypeDescription(description) - val params = yaml.load(Utils.getStream("conf/graph/sgm/sgm.yml")).asInstanceOf[SubgraphMatchingParams] - - val splitDataGraph = params.getSplitDataGraph.get(datasetName) - val resultNum = params.getResultNum - val splitQueryGraph = params.getSplitQueryGraph - - params.setAlgorithmName("SGM") - params.setDatasetName(datasetName) - params.setInputPath(inputPath) - params.setOutputPath(outputPath) - params.setIsRaw(isRaw) - params.setNumPartitions(partitionNum) - params.setTestcaseType(testcaseType) - params.setIsIdentical(isIdentical) - params.setTaskNum(taskNum) - - val isIdenticalBool = isIdentical match { - case "Identical" => true - case "unIdentical" => false - } - - val conf = new SparkConf() - .setAppName(testcaseType) - val sc = new SparkContext(conf) - - val startTime = System.currentTimeMillis() - - val inputRDD = Util.readUndirectDataFromHDFS(sc, inputPath, splitDataGraph, partitionNum) - .map(f => (f._1.toLong, f._2.toLong)) - val queryGraphRDD = sc.textFile(queryGraphPath) - val edgelist: Array[(Long, Long)] = queryGraphRDD.map(line => { - val strings = line.split(splitQueryGraph) - (strings(0).toLong, strings(1).toLong) - }).collect() - - val (numSubgraphs, subgraphs) = - SubgraphMatching.run(inputRDD, edgelist, taskNum, resultNum, isIdenticalBool) - - params.setMatchResult(numSubgraphs) - println("total matched results:\t%d".format(numSubgraphs)) - subgraphs.map(x => x.mkString("\t")).saveAsTextFile(outputPath) - - val costTime = (System.currentTimeMillis() - startTime) / 1000.0 - params.setCostTime(costTime) - - Utils.checkDirs("report") - val writer = new FileWriter( - s"report/SGM_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - - println(s"Exec Successful: costTime: ${costTime}s") - - } - } -} \ No newline at end of file diff --git a/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala deleted file mode 100644 index ccae82c36379ff6f9447d0a42952e556b64588c5..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/graph/WeightedLablePropagationRunner.scala +++ /dev/null @@ -1,103 +0,0 @@ -// scalastyle:off - -package com.bigdata.graph -import com.bigdata.utils.Utils - -import org.apache.spark.graphx.lib._ -import org.apache.spark.graphx.{Edge, Graph, GraphLoader, PartitionStrategy} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.io.{File, FileWriter, InputStreamReader} -import java.util -import scala.beans.BeanProperty - -class WLPAConfig extends Serializable { - @BeanProperty var wlpa: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class WLPAParams extends Serializable { - @BeanProperty var splitGraph: String = _ - @BeanProperty var commputePartition: Int = _ - @BeanProperty var maxIter: Int = _ - @BeanProperty var partitions: Int = _ - - @BeanProperty var outputPath: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ -} - - -object WeightedLablePropagationRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, platformName, isRaw) = (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val outputPath = args(2) - val representer = new Representer - representer.addClassTag(classOf[WLPAParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val stream: InputStreamReader = Utils.getStream("conf/graph/wlpa/wlpa.yml") - val yaml = new Yaml(new Constructor(classOf[WLPAConfig]), representer, options) - val description = new TypeDescription(classOf[WLPAParams]) - yaml.addTypeDescription(description) - val config: WLPAConfig = yaml.load(stream).asInstanceOf[WLPAConfig] - - val params = new WLPAParams() - val paramsMap = - config.wlpa.get(isRaw match { - case "no" => "opt" - case _ => "raw" - }).get(datasetName).asInstanceOf[util.HashMap[String, Object]] - params.setSplitGraph(paramsMap.get("splitGraph").toString) - params.setPartitions(paramsMap.get("commputePartition").toString.toInt) - params.setMaxIter(paramsMap.get("maxIter").toString.toInt) - params.setDatasetName(datasetName) - params.setDataPath(dataPath) - params.setOutputPath(outputPath) - params.setIsRaw(isRaw) - params.setAlgorithmName("WLPA") - params.setTestcaseType(s"${params.algorithmName}_${datasetName}_${isRaw}") - - val conf = new SparkConf().setAppName(params.testcaseType) - val spark = SparkSession.builder.config(conf).getOrCreate() - val sc = spark.sparkContext - - val startTime = System.currentTimeMillis() - val inputRdd = Util.readDirectWeightDataFromHDFS(sc, params.dataPath, params.splitGraph, params.commputePartition) - .map(f => Edge(f._1.toLong, f._2.toLong, f._3.toDouble)) - val graph = Graph.fromEdges(inputRdd, 1.0) - val result = WLabelPropagation.run(graph, params.maxIter).vertices - Util.saveDataToHDFS(result, ",", outputPath) - val finishTime = System.currentTimeMillis() - val costTime = (finishTime - startTime) / 1000 - - params.setCostTime(costTime) - println(s"Exec Successful: costTime: ${costTime}s") - - val folder = new File("report") - if (!folder.exists()) { - val mkdir = folder.mkdirs() - println(s"Create dir report ${mkdir}") - } - val writer = new FileWriter( - s"report/${params.testcaseType}_${Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", System.currentTimeMillis())}.yml") - yaml.dump(params, writer) - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala deleted file mode 100644 index dadd8f9dc3317b6635930d69b82f87fdd9a1bce9..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/HDBRunner.scala +++ /dev/null @@ -1,170 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.UpEvaluationVerify - -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.ml.clustering.Hdbscan -import org.apache.spark.ml.evaluation.ClusteringEvaluator -import org.apache.spark.ml.linalg.{Vector, Vectors} - -import java.io.{File, FileWriter, PrintWriter} -import java.util -import java.util.Date -import scala.beans.BeanProperty -import scala.collection.mutable.ArrayBuffer - -class HDBConfig extends Serializable { - @BeanProperty var hdb: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class HDBParams extends Serializable { - @BeanProperty var pt: Int = _ - @BeanProperty var mstPartitionNum: Int = _ - @BeanProperty var seed: Int = _ - @BeanProperty var saurfangThreshold: Double = _ - - @BeanProperty var dataPath: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var standSilhouette: Double = _ - @BeanProperty var cpuName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var startTime: Long = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} - -object HDBRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (datasetName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val cpuName = args(2) - val saveResultPath = args(3) - - val stream = Utils.getStream("conf/ml/hdb/hdb.yml") - val representer = new Representer - representer.addClassTag(classOf[HDBParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[HDBConfig]), representer, options) - val description = new TypeDescription(classOf[HDBParams]) - yaml.addTypeDescription(description) - val configs: HDBConfig = yaml.load(stream).asInstanceOf[HDBConfig] - val paramsMap: util.HashMap[String, Object] = configs.hdb.get(isRaw match { - case "no" => "opt" - case "yes" => "raw" - }).get(datasetName) - val params = new HDBParams() - params.setPt(paramsMap.get("pt").asInstanceOf[Int]) - params.setMstPartitionNum(paramsMap.get("mstPartitionNum").asInstanceOf[Int]) - params.setSeed(paramsMap.get("seed").asInstanceOf[Int]) - params.setSaurfangThreshold(paramsMap.get("saurfangThreshold").asInstanceOf[Double]) - params.setDataPath(dataPath) - params.setDatasetName(datasetName) - params.setCpuName(cpuName) - params.setIsRaw(isRaw) - params.setIfCheck(ifCheck) - params.setAlgorithmName("HDB") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${datasetName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${datasetName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - - val conf = new SparkConf().setAppName(appName) - val spark = SparkSession.builder.config(conf).getOrCreate() - spark.sparkContext.setLogLevel("ERROR") - val (res, costTime) = new HDBKernel().runJob(spark, params) - params.setStandSilhouette(res) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - params.setIsCorrect(UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${costTime}min; standSilhouette: ${res};isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class HDBKernel { - def runJob(spark: SparkSession, params: HDBParams): (Double, Double) = { - val sc = spark.sparkContext - sc.setLogLevel("ERROR") - - val startTime = System.currentTimeMillis() - - println("\n--------start--------\n") - - // val dataRDD = sc.textFile(dataPath).map - // { - // t=>t.split(",").map{t=>t.toDouble} - // }.repartition(pt).persist() - - import scala.io.{BufferedSource, Source} - val data = new ArrayBuffer[Vector]() - val source2: BufferedSource = Source.fromFile(params.dataPath) - for (line <- source2.getLines()){//source.getLines()获取所有的行 - data.append(Vectors.dense(line.split(",").map{_.toDouble})) - } - - val d1 = data.toArray - val dataRDD = sc.parallelize(d1).repartition(params.pt).cache() - - println("count: "+ dataRDD.count()) - println("dim: " + dataRDD.first().size) - val t1 = System.currentTimeMillis() - println("map Cost[min]: " + (t1 - startTime).toDouble/60/1000) - - val hdb = new Hdbscan() - .setMstPartitionNum(params.mstPartitionNum) - .setSaurfangThreshold(params.saurfangThreshold) - .setRandomSeed(params.seed) - val labels = hdb.fit(dataRDD) - val t2 = System.currentTimeMillis() - println("train Cost[min]: " + (t2 - t1).toDouble/60/1000) - println("total Cost[min]: " + (t2 - startTime).toDouble/60/1000) - - import spark.implicits._ - val valid = labels.map{t => (t._2, t._3)}.toDF("features", "prediction") - val evaluator = new ClusteringEvaluator() - val silhouette = evaluator.evaluate(valid) - val standSilhouette = (silhouette + 1) / 2.0 - println(s"Silhouette with squared euclidean distance = $standSilhouette") - // labels.map{t=>(t._3,1)}.reduceByKey{(x,y)=>x+y}.collect().foreach(println) - println("\n--------success--------\n") - Utils.saveEvaluation(standSilhouette, params.saveDataPath, sc) - val costTime = (t2 - startTime).toDouble/60/1000 - (standSilhouette, costTime) - } -} \ No newline at end of file diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala deleted file mode 100644 index 85aa91a4d7e285872d831c6278e97260a644d1e4..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/KMeansRunner.scala +++ /dev/null @@ -1,200 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.DownEvaluationVerify - -import org.apache.hadoop.io.LongWritable -import org.apache.mahout.math.VectorWritable -import org.apache.spark.ml.clustering.{KMeans => MlKMeans} -import org.apache.spark.ml.linalg.{Vectors => MlVectors} -import org.apache.spark.ml.param.{ParamMap, ParamPair} -import org.apache.spark.mllib.clustering.{KMeans => MlibKMeans} -import org.apache.spark.mllib.linalg.{Vectors => MlibVectors} -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.udf -import org.apache.hadoop.fs.{FileSystem, Path} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.util -import scala.beans.BeanProperty -import java.io.{File, FileWriter} -import java.util.HashMap - -class KMeansConfig extends Serializable { - - @BeanProperty var kmeans: util.HashMap[String, util.HashMap[String, Object]] = _ -} - -class KMeansParams extends Serializable { - - @BeanProperty var numPartitions: Int = _ - @BeanProperty var maxIterations: Int = _ - @BeanProperty var k: Int = _ - - @BeanProperty var dataPath: String = _ - @BeanProperty var apiName: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var datasetCpuName: String = _ - @BeanProperty var isRaw: String = "no" - @BeanProperty var evaluation: Double = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var loadDataTime: Double = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} - -object KMeansRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (dataStructure, datasetName, apiName, cpuName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3), modelConfSplit(4), modelConfSplit(5)) - val dataPath = args(1) - val datasetCpuName = s"${datasetName}_${cpuName}" - val saveResultPath = args(2) - - val stream = Utils.getStream("conf/ml/kmeans/kmeans.yml") - val representer = new Representer - representer.addClassTag(classOf[KMeansParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[KMeansConfig]), representer, options) - val description = new TypeDescription(classOf[KMeansParams]) - yaml.addTypeDescription(description) - val config: KMeansConfig = yaml.load(stream).asInstanceOf[KMeansConfig] - val paramsMap: util.HashMap[String, Object] = config.kmeans.get(datasetCpuName) - val params = new KMeansParams() - params.setNumPartitions(paramsMap.get("numPartitions").asInstanceOf[Int]) - params.setMaxIterations(paramsMap.get("maxIterations").asInstanceOf[Int]) - params.setK(paramsMap.get("k").asInstanceOf[Int]) - params.setApiName(apiName) - params.setDataPath(dataPath) - params.setDatasetName(datasetName) - params.setDatasetCpuName(datasetCpuName) - params.setIsRaw(isRaw) - params.setIfCheck(ifCheck) - params.setAlgorithmName("KMeans") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}_${dataStructure}_${apiName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${datasetName}_${dataStructure}_${apiName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${datasetName}_${dataStructure}_${apiName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - - val conf = new SparkConf().setAppName(appName) - val spark = SparkSession.builder.config(conf).getOrCreate() - - val (res, costTime) = dataStructure match { - case "dataframe" => new KMeansKernel().runDataFrameJob(spark, params) - case "rdd" => new KMeansKernel().runRDDJob(spark, params) - } - params.setEvaluation(res) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - params.setIsCorrect(DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class KMeansKernel { - - def runDataFrameJob(spark: SparkSession, params: KMeansParams): (Double, Double) = { - - val sc = spark.sparkContext - val startTime = System.currentTimeMillis() - val data = sc.sequenceFile[LongWritable, VectorWritable](params.dataPath) - val dataRDD = data.map{ case (k, v) => - var vector: Array[Double] = new Array[Double](v.get().size) - for (i <- 0 until v.get().size) vector(i) = v.get().get(i) - vector - }.repartition(params.numPartitions).persist() - - import spark.implicits._ - val dataDF = dataRDD.toDF("features") - val convertToVector = udf((array: Seq[Double]) => { - MlVectors.dense(array.toArray) - }) - val trainingData = dataDF.withColumn("features", convertToVector($"features")) - println("count: " + trainingData.count()) - val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0 - val kmeans = new MlKMeans().setK(params.k).setMaxIter(params.maxIterations) - - val paramMap = ParamMap(kmeans.k -> params.k) - .put(kmeans.maxIter, params.maxIterations) - val paramMaps: Array[ParamMap] = new Array[ParamMap](2) - for (i <- 0 to paramMaps.size -1) { - paramMaps(i) = ParamMap(kmeans.k -> params.k) - .put(kmeans.maxIter, params.maxIterations) - } - - - val maxIterParamPair = ParamPair(kmeans.maxIter, params.maxIterations) - val kPair = ParamPair(kmeans.k, params.k) - val model = params.apiName match { - case "fit" => kmeans.fit(trainingData) - case "fit1" => kmeans.fit(trainingData, paramMap) - case "fit2" => - val models = kmeans.fit(trainingData, paramMaps) - models(0) - case "fit3" => kmeans.fit(trainingData, kPair, maxIterParamPair) - } - val costTime = (System.currentTimeMillis() - startTime) / 1000.0 - params.setLoadDataTime(loadDataTime) - val res = model.computeCost(trainingData) - Utils.saveEvaluation(res, params.saveDataPath, sc) - (res, costTime) - } - - def runRDDJob(spark: SparkSession, params: KMeansParams): (Double, Double) = { - - val sc = spark.sparkContext - val startTime = System.currentTimeMillis() - val data = sc.sequenceFile[LongWritable, VectorWritable](params.dataPath) - val dataRDD = data.map{ case (k, v) => - var vector: Array[Double] = new Array[Double](v.get().size) - for (i <- 0 until v.get().size) vector(i) = v.get().get(i) - MlibVectors.dense(vector) - }.repartition(params.numPartitions).cache() - println("count: " + dataRDD.count()) - val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0 - - val model = new MlibKMeans() - .setK(params.k) - .setMaxIterations(params.maxIterations) - .run(dataRDD) - val costTime = (System.currentTimeMillis() - startTime) / 1000.0 - - params.setLoadDataTime(loadDataTime) - val res = model.computeCost(dataRDD) - Utils.saveEvaluation(res, params.saveDataPath, sc) - (res, costTime) - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala deleted file mode 100644 index 96ca6b15e90bea23226728b62be9e9d52ace2a29..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRawRunner.scala +++ /dev/null @@ -1,276 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.UpEvaluationVerify -import com.bigdata.compare.ml.DownEvaluationVerify - -import com.microsoft.ml.spark.core.metrics.MetricConstants -import com.microsoft.ml.spark.train.ComputeModelStatistics -import com.microsoft.ml.spark.lightgbm.{LightGBMClassifier, LightGBMRegressor} -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.lang.System.nanoTime -import java.io.{File, FileWriter, PrintWriter} -import java.nio.file.{Paths, Files} -import java.util -import scala.beans.BeanProperty -import scala.util.Random - -class LightGBMRawConfig extends Serializable { - @BeanProperty var lgbm: util.HashMap[String, util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]]] = _ -} - -class LightGBMRawParams extends Serializable { - @BeanProperty var objective: String = _ - @BeanProperty var labelCol: String = _ - @BeanProperty var featuresCol: String = _ - @BeanProperty var verbosity: Int = _ - @BeanProperty var learningRate: Double = _ - @BeanProperty var maxDepth: Int = _ - @BeanProperty var maxBin: Int = _ - @BeanProperty var numIterations: Int = _ - @BeanProperty var numTasks: Int = _ - @BeanProperty var minGainToSplit: Double = _ - @BeanProperty var lambdaL2: Double = _ - @BeanProperty var numLeaves: Int = _ - @BeanProperty var minSumHessianInLeaf: Double = _ - @BeanProperty var minDataInLeaf: Int = _ - @BeanProperty var baggingFraction: Double = _ - @BeanProperty var baggingFreq: Int = _ - @BeanProperty var numThreads: Int = _ - @BeanProperty var networkCompression: Int = _ - @BeanProperty var histSynchAlgo: Int = _ - @BeanProperty var loglossApx: Int = _ - @BeanProperty var loglossApxEps: Double = _ - @BeanProperty var loadingBalance: String = _ - - @BeanProperty var trainingDataPath: String = _ - @BeanProperty var testDataPath: String = _ - @BeanProperty var algorithmType: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var evaluation: Double = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var cpuName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} - -object LightGBMRawRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (algorithmType, datasetName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3)) - val dataPath = args(1) - val dataPathSplit = dataPath.split(",") - val (trainingDataPath, testDataPath) = (dataPathSplit(0), dataPathSplit(1)) - val cpuName = args(2) - val saveResultPath = args(3) - - val stream = Utils.getStream("conf/ml/lgbm/lgbm.yml") - val representer = new Representer - representer.addClassTag(classOf[LightGBMRawParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[LightGBMRawConfig]), representer, options) - val description = new TypeDescription(classOf[LightGBMRawParams]) - yaml.addTypeDescription(description) - val configs: LightGBMRawConfig = yaml.load(stream).asInstanceOf[LightGBMRawConfig] - val params = new LightGBMRawParams() - val paramsMap: util.HashMap[String, Object] = configs.lgbm.get(isRaw match { - case "no" => "opt" - case "yes" => "raw" - }).get(algorithmType).get(datasetName) - params.setObjective(paramsMap.get("objective").asInstanceOf[String]) - params.setLabelCol(paramsMap.get("labelCol").asInstanceOf[String]) - params.setFeaturesCol(paramsMap.get("featuresCol").asInstanceOf[String]) - params.setVerbosity(paramsMap.get("verbosity").asInstanceOf[Int]) - params.setLearningRate(paramsMap.get("eta").asInstanceOf[Double]) - params.setMaxDepth(paramsMap.get("max_depth").asInstanceOf[Int]) - params.setMaxBin(paramsMap.get("max_bin").asInstanceOf[Int]) - params.setNumIterations(paramsMap.get("num_round").asInstanceOf[Int]) - params.setNumTasks(paramsMap.get("num_tasks").asInstanceOf[Int]) - params.setMinGainToSplit(paramsMap.get("min_gain_to_split").asInstanceOf[Double]) - params.setLambdaL2(paramsMap.get("lambda_l2").asInstanceOf[Double]) - params.setNumLeaves(paramsMap.get("num_leaves").asInstanceOf[Int]) - params.setMinSumHessianInLeaf(paramsMap.get("min_child_weight").asInstanceOf[Double]) - params.setMinDataInLeaf(paramsMap.get("min_data_in_leaf").asInstanceOf[Int]) - params.setBaggingFraction(paramsMap.get("bagging").asInstanceOf[Double]) - params.setBaggingFreq(paramsMap.get("bagging_freq").asInstanceOf[Int]) - params.setNumThreads(paramsMap.get("num_threads").asInstanceOf[Int]) - params.setNetworkCompression(paramsMap.get("network_compression").asInstanceOf[Int]) - params.setHistSynchAlgo(paramsMap.get("hist_synch_algo").asInstanceOf[Int]) - params.setLoglossApx(paramsMap.get("logloss_apx").asInstanceOf[Int]) - params.setLoglossApxEps(paramsMap.get("logloss_apx_eps").asInstanceOf[Double]) - params.setLoadingBalance(paramsMap.get("loading_balance").asInstanceOf[String]) - params.setTrainingDataPath(trainingDataPath) - params.setTestDataPath(testDataPath) - params.setAlgorithmType(algorithmType) - params.setDatasetName(datasetName) - params.setCpuName(cpuName) - params.setIsRaw(isRaw) - params.setIfCheck(ifCheck) - params.setAlgorithmName("LightGBM") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${algorithmType}_${datasetName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${algorithmType}_${datasetName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${algorithmType}_${datasetName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - - val conf = new SparkConf().setAppName(appName) - val spark = SparkSession.builder.config(conf).getOrCreate() - - val (res, costTime) = new LightGBMRawKernel().runJob(spark, params) - params.setEvaluation(res) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - val isCorrect = params.algorithmType match { - case "classification" => UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark) - case "regression" => DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark) - } - params.setIsCorrect(isCorrect) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class LightGBMRawKernel{ - def runJob(spark: SparkSession, params: LightGBMRawParams): (Double, Double) = { - val sc = spark.sparkContext - sc.setLogLevel("INFO") - println(s"Initialized spark session.") - val t1 = System.currentTimeMillis() - - import spark.implicits._ - val trainData = spark.read.format("libsvm").option("vectorType", "sparse") - .load(params.trainingDataPath) - .repartition(params.numTasks) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - val t2 = System.currentTimeMillis() - println("* after preprocess: " + t2) - - val lgbm = params.algorithmType match { - case "classification" =>{ - val classifier = new LightGBMClassifier() - .setObjective(params.objective) - .setLabelCol(params.labelCol) - .setFeaturesCol(params.featuresCol) - .setVerbosity(params.verbosity) - .setNumIterations(params.numIterations) - .setMaxDepth(params.maxDepth) - .setLearningRate(params.learningRate) - .setNumTasks(params.numTasks) - .setMaxBin(params.maxBin) - .setMinGainToSplit(params.minGainToSplit) - .setLambdaL2(params.lambdaL2) - .setNumLeaves(params.numLeaves) - .setMinDataInLeaf(params.minDataInLeaf) - .setMinSumHessianInLeaf(params.minSumHessianInLeaf) - .setBaggingFraction(params.baggingFraction) - .setBaggingFreq(params.baggingFreq) - classifier - } - case "regression" =>{ - val regressor = new LightGBMRegressor() - .setObjective(params.objective) - .setLabelCol(params.labelCol) - .setFeaturesCol(params.featuresCol) - .setVerbosity(params.verbosity) - .setNumIterations(params.numIterations) - .setMaxDepth(params.maxDepth) - .setLearningRate(params.learningRate) - .setNumTasks(params.numTasks) - .setMaxBin(params.maxBin) - .setMinGainToSplit(params.minGainToSplit) - .setLambdaL2(params.lambdaL2) - .setNumLeaves(params.numLeaves) - .setMinDataInLeaf(params.minDataInLeaf) - .setMinSumHessianInLeaf(params.minSumHessianInLeaf) - .setBaggingFraction(params.baggingFraction) - .setBaggingFreq(params.baggingFreq) - regressor - } - } - val model = lgbm.fit(trainData) - val t3 = System.currentTimeMillis() - println("* after train: " + t3) - - val testData = spark.read.format("libsvm").option("vectorType", "sparse") - .load(params.testDataPath) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - println(s"Test data read successful. Number of partitions - ${testData.rdd.getNumPartitions}") - val predictions = model.transform(testData) - val (res, t4) = params.algorithmType match { - case "classification" =>{ - val metrics = new ComputeModelStatistics() - .setLabelCol("label") - .setScoresCol("probability") - .setScoredLabelsCol("prediction") - .setEvaluationMetric(MetricConstants.AccuracySparkMetric) - .transform(predictions) - val ecc = metrics.collect().apply(0).apply(1).asInstanceOf[Double] - val t4 = System.currentTimeMillis() - (ecc, t4) - } - case "regression" =>{ - // compute model metrics - val metrics = new ComputeModelStatistics() - .setEvaluationMetric("regression") - .setLabelCol("label") - .setScoresCol("prediction") - .transform(predictions) - // print metrics - val mse = metrics.collect().apply(0).apply(0).asInstanceOf[Double] - val t4 = System.currentTimeMillis() - (mse, t4) - } - } - println("Model predictions:") - predictions.select("prediction", "label", "features").show(5) - val trainingProcess = (t3 - t1).toDouble / 1000 - val trainingStep = (t3 - t2).toDouble / 1000 - val dataProcess = (t2 - t1).toDouble / 1000 - val predict = (t4 - t3).toDouble / 1000 - println("[s]train total: " + trainingProcess) - println("[s]data preprocess: " + dataProcess) - println("[s]train: " + trainingStep) - println("[s]predict: " + predict) - - Utils.saveEvaluation(res, params.saveDataPath, sc) - (res, trainingProcess) - } -} \ No newline at end of file diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala deleted file mode 100644 index 34a0368c907eb3d1efc68fd81116d706471d1b89..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/LightGBMRunner.scala +++ /dev/null @@ -1,283 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.UpEvaluationVerify -import com.bigdata.compare.ml.DownEvaluationVerify - -import com.microsoft.ml.spark.core.metrics.MetricConstants -import com.microsoft.ml.spark.train.ComputeModelStatistics -import com.microsoft.ml.spark.lightgbm.{LightGBMClassifier, LightGBMRegressor} -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.lang.System.nanoTime -import java.io.{File, FileWriter, PrintWriter} -import java.nio.file.{Paths, Files} -import java.util -import scala.beans.BeanProperty -import scala.util.Random - -class LightGBMConfig extends Serializable { - @BeanProperty var lgbm: util.HashMap[String, util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]]] = _ -} - -class LightGBMParams extends Serializable { - @BeanProperty var objective: String = _ - @BeanProperty var labelCol: String = _ - @BeanProperty var featuresCol: String = _ - @BeanProperty var verbosity: Int = _ - @BeanProperty var learningRate: Double = _ - @BeanProperty var maxDepth: Int = _ - @BeanProperty var maxBin: Int = _ - @BeanProperty var numIterations: Int = _ - @BeanProperty var numTasks: Int = _ - @BeanProperty var minGainToSplit: Double = _ - @BeanProperty var lambdaL2: Double = _ - @BeanProperty var numLeaves: Int = _ - @BeanProperty var minSumHessianInLeaf: Double = _ - @BeanProperty var minDataInLeaf: Int = _ - @BeanProperty var baggingFraction: Double = _ - @BeanProperty var baggingFreq: Int = _ - @BeanProperty var numThreads: Int = _ - @BeanProperty var networkCompression: Int = _ - @BeanProperty var histSynchAlgo: Int = _ - @BeanProperty var loglossApx: Int = _ - @BeanProperty var loglossApxEps: Double = _ - @BeanProperty var loadingBalance: String = _ - - @BeanProperty var trainingDataPath: String = _ - @BeanProperty var testDataPath: String = _ - @BeanProperty var algorithmType: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var evaluation: Double = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var cpuName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} - -object LightGBMRunner { - def main(args: Array[String]): Unit = { - try { - val modelConfSplit = args(0).split("-") - val (algorithmType, datasetName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3)) - val dataPath = args(1) - val dataPathSplit = dataPath.split(",") - val (trainingDataPath, testDataPath) = (dataPathSplit(0), dataPathSplit(1)) - val cpuName = args(2) - val saveResultPath = args(3) - - val stream = Utils.getStream("conf/ml/lgbm/lgbm.yml") - val representer = new Representer - representer.addClassTag(classOf[LightGBMParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[LightGBMConfig]), representer, options) - val description = new TypeDescription(classOf[LightGBMParams]) - yaml.addTypeDescription(description) - val configs: LightGBMConfig = yaml.load(stream).asInstanceOf[LightGBMConfig] - val params = new LightGBMParams() - val paramsMap: util.HashMap[String, Object] = configs.lgbm.get(isRaw match { - case "no" => "opt" - case "yes" => "raw" - }).get(algorithmType).get(datasetName) - params.setObjective(paramsMap.get("objective").asInstanceOf[String]) - params.setLabelCol(paramsMap.get("labelCol").asInstanceOf[String]) - params.setFeaturesCol(paramsMap.get("featuresCol").asInstanceOf[String]) - params.setVerbosity(paramsMap.get("verbosity").asInstanceOf[Int]) - params.setLearningRate(paramsMap.get("eta").asInstanceOf[Double]) - params.setMaxDepth(paramsMap.get("max_depth").asInstanceOf[Int]) - params.setMaxBin(paramsMap.get("max_bin").asInstanceOf[Int]) - params.setNumIterations(paramsMap.get("num_round").asInstanceOf[Int]) - params.setNumTasks(paramsMap.get("num_tasks").asInstanceOf[Int]) - params.setMinGainToSplit(paramsMap.get("min_gain_to_split").asInstanceOf[Double]) - params.setLambdaL2(paramsMap.get("lambda_l2").asInstanceOf[Double]) - params.setNumLeaves(paramsMap.get("num_leaves").asInstanceOf[Int]) - params.setMinSumHessianInLeaf(paramsMap.get("min_child_weight").asInstanceOf[Double]) - params.setMinDataInLeaf(paramsMap.get("min_data_in_leaf").asInstanceOf[Int]) - params.setBaggingFraction(paramsMap.get("bagging").asInstanceOf[Double]) - params.setBaggingFreq(paramsMap.get("bagging_freq").asInstanceOf[Int]) - params.setNumThreads(paramsMap.get("num_threads").asInstanceOf[Int]) - params.setNetworkCompression(paramsMap.get("network_compression").asInstanceOf[Int]) - params.setHistSynchAlgo(paramsMap.get("hist_synch_algo").asInstanceOf[Int]) - params.setLoglossApx(paramsMap.get("logloss_apx").asInstanceOf[Int]) - params.setLoglossApxEps(paramsMap.get("logloss_apx_eps").asInstanceOf[Double]) - params.setLoadingBalance(paramsMap.get("loading_balance").asInstanceOf[String]) - params.setTrainingDataPath(trainingDataPath) - params.setTestDataPath(testDataPath) - params.setAlgorithmType(algorithmType) - params.setDatasetName(datasetName) - params.setCpuName(cpuName) - params.setIsRaw(isRaw) - params.setIfCheck(ifCheck) - params.setAlgorithmName("LightGBM") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${algorithmType}_${datasetName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${algorithmType}_${datasetName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${algorithmType}_${datasetName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - - val conf = new SparkConf().setAppName(appName) - val spark = SparkSession.builder.config(conf).getOrCreate() - - val (res, costTime) = new LightGBMKernel().runJob(spark, params) - params.setEvaluation(res) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - val isCorrect = params.algorithmType match { - case "classification" => UpEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark) - case "regression" => DownEvaluationVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark) - } - params.setIsCorrect(isCorrect) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${costTime}s; evaluation: ${res};isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class LightGBMKernel{ - def runJob(spark: SparkSession, params: LightGBMParams): (Double, Double) = { - val sc = spark.sparkContext - sc.setLogLevel("INFO") - println(s"Initialized spark session.") - val t1 = System.currentTimeMillis() - - import spark.implicits._ - val trainData = spark.read.format("libsvm").option("vectorType", "sparse") - .load(params.trainingDataPath) - .repartition(params.numTasks) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - val t2 = System.currentTimeMillis() - println("* after preprocess: " + t2) - - val lgbm = params.algorithmType match { - case "classification" =>{ - val classifier = new LightGBMClassifier() - .setObjective(params.objective) - .setLabelCol(params.labelCol) - .setFeaturesCol(params.featuresCol) - .setVerbosity(params.verbosity) - .setNumIterations(params.numIterations) - .setMaxDepth(params.maxDepth) - .setLearningRate(params.learningRate) - .setNumTasks(params.numTasks) - .setMaxBin(params.maxBin) - .setMinGainToSplit(params.minGainToSplit) - .setLambdaL2(params.lambdaL2) - .setNumLeaves(params.numLeaves) - .setMinDataInLeaf(params.minDataInLeaf) - .setMinSumHessianInLeaf(params.minSumHessianInLeaf) - .setBaggingFraction(params.baggingFraction) - .setBaggingFreq(params.baggingFreq) - classifier - } - case "regression" =>{ - val regressor = new LightGBMRegressor() - .setObjective(params.objective) - .setLabelCol(params.labelCol) - .setFeaturesCol(params.featuresCol) - .setVerbosity(params.verbosity) - .setNumIterations(params.numIterations) - .setMaxDepth(params.maxDepth) - .setLearningRate(params.learningRate) - .setNumTasks(params.numTasks) - .setMaxBin(params.maxBin) - .setMinGainToSplit(params.minGainToSplit) - .setLambdaL2(params.lambdaL2) - .setNumLeaves(params.numLeaves) - .setMinDataInLeaf(params.minDataInLeaf) - .setMinSumHessianInLeaf(params.minSumHessianInLeaf) - .setBaggingFraction(params.baggingFraction) - .setBaggingFreq(params.baggingFreq) - regressor - } - } - if(params.isRaw.equals("no")) { - lgbm.setAuxParams("num_threads", params.numThreads.toString) - lgbm.setAuxParams("network_compression", params.networkCompression.toString) - lgbm.setAuxParams("logloss_apx", params.loglossApx.toString) - lgbm.setAuxParams("logloss_apx_eps", params.loglossApxEps.toString) - lgbm.setAuxParams("loading_balance", params.loadingBalance) - } - val model = lgbm.fit(trainData) - val t3 = System.currentTimeMillis() - println("* after train: " + t3) - - val testData = spark.read.format("libsvm").option("vectorType", "sparse") - .load(params.testDataPath) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - println(s"Test data read successful. Number of partitions - ${testData.rdd.getNumPartitions}") - val predictions = model.transform(testData) - val (res, t4) = params.algorithmType match { - case "classification" =>{ - val metrics = new ComputeModelStatistics() - .setLabelCol("label") - .setScoresCol("probability") - .setScoredLabelsCol("prediction") - .setEvaluationMetric(MetricConstants.AccuracySparkMetric) - .transform(predictions) - val ecc = metrics.collect().apply(0).apply(1).asInstanceOf[Double] - val t4 = System.currentTimeMillis() - (ecc, t4) - } - case "regression" =>{ - // compute model metrics - val metrics = new ComputeModelStatistics() - .setEvaluationMetric("regression") - .setLabelCol("label") - .setScoresCol("prediction") - .transform(predictions) - // print metrics - val mse = metrics.collect().apply(0).apply(0).asInstanceOf[Double] - val t4 = System.currentTimeMillis() - (mse, t4) - } - } - println("Model predictions:") - predictions.select("prediction", "label", "features").show(5) - val trainingProcess = (t3 - t1).toDouble / 1000 - val trainingStep = (t3 - t2).toDouble / 1000 - val dataProcess = (t2 - t1).toDouble / 1000 - val predict = (t4 - t3).toDouble / 1000 - println("[s]train total: " + trainingProcess) - println("[s]data preprocess: " + dataProcess) - println("[s]train: " + trainingStep) - println("[s]predict: " + predict) - - Utils.saveEvaluation(res, params.saveDataPath, sc) - (res, trainingProcess) - } -} \ No newline at end of file diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala deleted file mode 100644 index 7f0b66db97fe9cd3f6a78d4a53a471f942f24589..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/SPCARunner.scala +++ /dev/null @@ -1,205 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.MatrixVerify - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.ml.feature.SPCA -import org.apache.spark.ml.feature.PCA -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector} -import org.apache.spark.storage.StorageLevel -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.ml.param.{ParamMap, ParamPair} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.io.{File, FileWriter} -import java.util -import scala.beans.BeanProperty - -class SPCAConfig extends Serializable { - - @BeanProperty var spca: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class SPCAParams extends Serializable { - - @BeanProperty var pt: Int = _ - @BeanProperty var k: Int = _ - @BeanProperty var sep: String = _ - @BeanProperty var numCols: Int = _ - @BeanProperty var pcPath: String = _ - @BeanProperty var sigmaPath: String = _ - - @BeanProperty var dataPath: String = _ - @BeanProperty var apiName: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var isRaw: String = _ - @BeanProperty var cpuName: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var loadDataTime: Double = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} - -object SPCARunner { - def main(args: Array[String]): Unit = { - - try { - val modelConfSplit = args(0).split("-") - val (datasetName, apiName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2), modelConfSplit(3)) - val dataPath = args(1) - val cpuName = args(2) - val sparkConfSplit = args(3).split("_") - val (master, deployMode, numExec, execCores, execMem) = - (sparkConfSplit(0), sparkConfSplit(1), sparkConfSplit(2), sparkConfSplit(3), sparkConfSplit(4)) - val saveResultPath = args(4) - - val stream = Utils.getStream("conf/ml/spca/spca.yml") - val representer = new Representer - representer.addClassTag(classOf[SPCAParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[SPCAConfig]), representer, options) - val description = new TypeDescription(classOf[SPCAParams]) - yaml.addTypeDescription(description) - val configs: SPCAConfig = yaml.load(stream).asInstanceOf[SPCAConfig] - val params = new SPCAParams() - val paramsMap: util.HashMap[String, Object] = configs.spca.get(isRaw match { - case "no" => "opt" - case "yes" => "raw" - }).get(datasetName) - params.setPt(paramsMap.getOrDefault("pt", "250").asInstanceOf[Int]) - params.setK(paramsMap.getOrDefault("k", "10").asInstanceOf[Int]) - params.setSep(paramsMap.getOrDefault("sep", " ").asInstanceOf[String]) - params.setNumCols(paramsMap.getOrDefault("numCols", "0").asInstanceOf[Int]) - params.setPcPath(paramsMap.getOrDefault("pcPath", null.asInstanceOf[String]).asInstanceOf[String]) - params.setSigmaPath(paramsMap.getOrDefault("sigmaPath", null.asInstanceOf[String]).asInstanceOf[String]) - params.setDataPath(dataPath) - params.setDatasetName(datasetName) - params.setApiName(apiName) - params.setCpuName(cpuName) - params.setIsRaw(isRaw) - params.setIfCheck(ifCheck) - params.setAlgorithmName("SPCA") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}_${apiName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${datasetName}_${apiName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${datasetName}_${apiName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - val conf = new SparkConf().setAppName(appName).setMaster(master) - val commonParas = Array ( - ("spark.submit.deployMode", deployMode), - ("spark.executor.instances", numExec), - ("spark.executor.cores", execCores), - ("spark.executor.memory", execMem) - ) - conf.setAll(commonParas) - val spark = SparkSession.builder.config(conf).getOrCreate() - - val costTime = new SPCAKernel().runJob(spark, params) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - params.setIsCorrect(MatrixVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${params.getCostTime}s;isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - -class SPCAKernel { - - def runJob(spark: SparkSession, params: SPCAParams): Double = { - - import spark.implicits._ - val sc = spark.sparkContext - val fs = FileSystem.get(sc.hadoopConfiguration) - val startTime = System.currentTimeMillis() - val trainingData = if (params.isRaw == "yes"){ - val numColsBC = sc.broadcast(params.numCols) - val sepBC = sc.broadcast(params.sep) - val data = spark.createDataFrame(sc.textFile(params.dataPath, params.pt) - .map(line => { - val entry = line.split(sepBC.value) - (entry(0).toInt, (entry(1).toInt, entry(2).toDouble)) - }).groupByKey() - .map { case (_, vectorEntries) => Vectors.sparse(numColsBC.value, vectorEntries.toSeq) } - .repartition(params.pt) - .map(Tuple1.apply)) - .toDF("matrix").persist(StorageLevel.MEMORY_ONLY) - data - } else { - val data = spark.createDataFrame(sc.textFile(params.dataPath, params.pt) - .map(line => { - val entry = line.split(params.sep) - (entry(0).toInt, (entry(1).toInt, entry(2).toDouble)) - }).groupByKey() - .map{case (_, vectorEntries) => Vectors.sparse(params.numCols, vectorEntries.toSeq)} - .repartition(params.pt) - .map(Tuple1.apply)) - .toDF("matrix") - .persist(StorageLevel.MEMORY_ONLY) - data - } - - val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0 - params.setLoadDataTime(loadDataTime) - - val spca = params.isRaw match { - case "yes" => new PCA().setK(params.k).setInputCol("matrix") - case "no" => new SPCA().setK(params.k).setInputCol("matrix") - } - - val paramMap = ParamMap(spca.k -> params.k) - .put(spca.inputCol, "matrix") - val paramMaps: Array[ParamMap] = new Array[ParamMap](2) - for (i <- 0 to paramMaps.size - 1) { - paramMaps(i) = ParamMap(spca.k -> params.k) - .put(spca.inputCol, "matrix") - } - val kPair = ParamPair(spca.k, params.k) - val inputColPair = ParamPair(spca.inputCol, "matrix") - val model = params.apiName match { - case "fit" => spca.fit(trainingData) - case "fit1" => spca.fit(trainingData, paramMap) - case "fit2" => - val models = spca.fit(trainingData, paramMaps) - models(0) - case "fit3" => spca.fit(trainingData, kPair, inputColPair) - } - val costTime = (System.currentTimeMillis() - startTime) / 1000.0 - params.setLoadDataTime(costTime) - - val spcaMat = new DenseMatrix(model.pc.numRows, model.pc.numCols, model.pc.values, model.pc.isTransposed) - MatrixVerify.saveMatrix(spcaMat, params.saveDataPath, sc) - costTime - } -} diff --git a/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala b/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala deleted file mode 100644 index cd7c70f41b513018097f223fb074040b497c93ba..0000000000000000000000000000000000000000 --- a/tools/kal-test/src/main/scala/com/bigdata/ml/SimRankRunner.scala +++ /dev/null @@ -1,154 +0,0 @@ -package com.bigdata.ml - -import com.bigdata.utils.Utils -import com.bigdata.compare.ml.SimRankVerify - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.ml.recommendation.ALS.Rating -import org.apache.spark.ml.recommendation.{SimRank, SimRankOpenSource} -import org.yaml.snakeyaml.{DumperOptions, TypeDescription, Yaml} -import org.yaml.snakeyaml.constructor.Constructor -import org.yaml.snakeyaml.nodes.Tag -import org.yaml.snakeyaml.representer.Representer - -import java.util -import java.io.{File, FileWriter} -import java.util.HashMap -import scala.beans.BeanProperty - -class SimRankConfig extends Serializable { - - @BeanProperty var simrank: util.HashMap[String, util.HashMap[String, util.HashMap[String, Object]]] = _ -} - -class SimRankParams extends Serializable { - - @BeanProperty var numPartitions: Int = _ - @BeanProperty var damp: Double = _ - @BeanProperty var maxIter: Int = _ - - @BeanProperty var isRaw: String = _ - @BeanProperty var cpuName: String = _ - @BeanProperty var dataPath: String = _ - @BeanProperty var datasetName: String = _ - @BeanProperty var costTime: Double = _ - @BeanProperty var loadDataTime: Double = _ - @BeanProperty var algorithmName: String = _ - @BeanProperty var testcaseType: String = _ - @BeanProperty var saveDataPath: String = _ - @BeanProperty var verifiedDataPath: String = _ - @BeanProperty var ifCheck: String = _ - @BeanProperty var isCorrect: String = _ -} -object SimRankRunner { - def main(args: Array[String]): Unit = { - - try { - val modelConfSplit = args(0).split("-") - val (datasetName, isRaw, ifCheck) = - (modelConfSplit(0), modelConfSplit(1), modelConfSplit(2)) - val dataPath = args(1) - val cpuName = args(2) - val saveResultPath = args(3) - val datasetCpuName = s"${datasetName}-${cpuName}" - - val stream = Utils.getStream("conf/ml/simrank/simrank.yml") - val representer = new Representer - representer.addClassTag(classOf[SimRankParams], Tag.MAP) - val options = new DumperOptions - options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val yaml = new Yaml(new Constructor(classOf[SimRankConfig]), representer, options) - val description = new TypeDescription(classOf[SimRankParams]) - yaml.addTypeDescription(description) - val config: SimRankConfig = yaml.load(stream).asInstanceOf[SimRankConfig] - val params = new SimRankParams() - val paramsMap = config.simrank.get(isRaw match { - case "no" => "opt" - case "yes" => "raw" - }).get(datasetCpuName) - params.setNumPartitions(paramsMap.get("numPartitions").asInstanceOf[Int]) - params.setMaxIter(paramsMap.get("maxIter").asInstanceOf[Int]) - params.setDamp(paramsMap.get("damp").asInstanceOf[Double]) - params.setCpuName(cpuName) - params.setIsRaw(isRaw) - params.setDataPath(dataPath) - params.setDatasetName(datasetName) - params.setIfCheck(ifCheck) - params.setAlgorithmName("SimRank") - params.setSaveDataPath(s"${saveResultPath}/${params.algorithmName}/${datasetName}") - params.setVerifiedDataPath(s"${params.saveDataPath}_raw") - var appName = s"${params.algorithmName}_${datasetName}" - if (isRaw.equals("yes")){ - appName = s"${params.algorithmName}_${datasetName}_raw" - params.setVerifiedDataPath(params.saveDataPath) - params.setSaveDataPath(s"${params.saveDataPath}_raw") - } - params.setTestcaseType(appName) - - val conf = new SparkConf().setAppName(appName) - val spark = SparkSession.builder.config(conf).getOrCreate() - val costTime = new SimRankKernel().runJob(spark, params) - params.setCostTime(costTime) - - Utils.checkDirs("report") - if(ifCheck.equals("yes")){ - params.setIsCorrect(SimRankVerify.compareRes(params.saveDataPath, params.verifiedDataPath, spark)) - val writerIsCorrect = new FileWriter(s"report/ml_isCorrect.txt", true) - writerIsCorrect.write(s"${params.testcaseType} ${params.isCorrect} \n") - writerIsCorrect.close() - } - val writer = new FileWriter(s"report/${params.testcaseType}_${ - Utils.getDateStrFromUTC("yyyyMMdd_HHmmss", - System.currentTimeMillis()) - }.yml") - yaml.dump(params, writer) - println(s"Exec Successful: costTime: ${costTime}s;isCorrect: ${params.isCorrect}") - } catch { - case e: Throwable => - println(s"Exec Failure: ${e.getMessage}") - throw e - } - } -} - - -class SimRankKernel { - - def runJob(spark: SparkSession, params: SimRankParams): Double = { - val sc = spark.sparkContext - val startTime = System.currentTimeMillis() - var costTime: Double = 0 - - import spark.implicits._ - val userCol = "user" - val itemCol = "item" - val df = spark.sparkContext.objectFile[Rating[Int]](params.getDataPath).repartition(params.getNumPartitions) - .map(row => { - ("user-" + row.user.toString, "item-" + row.item.toString) - }).toDF(userCol, itemCol) - - val loadDataTime = (System.currentTimeMillis() - startTime) / 1000.0 - params.setLoadDataTime(loadDataTime) - if (params.getIsRaw.equals("no")) { - val simrank = new SimRank() - .setDamp(params.getDamp) - .setNumIter(params.getMaxIter) - .setUserCol(userCol) - .setItemCol(itemCol) - val simrankRes = simrank.computeSimilarity(df) - simrankRes.itemSimilarity.foreach(_ => {}) - simrankRes.userSimilarity.foreach(_ => {}) - costTime = (System.currentTimeMillis() - startTime) / 1000.0 - SimRankVerify.saveRes(simrankRes.userSimilarity, simrankRes.itemSimilarity, params.saveDataPath, sc) - - } else { - val simrankRes = new SimRankOpenSource().execute(df, (userCol, itemCol), params.getDamp, params.getMaxIter) - simrankRes._1.foreach(_ => {}) - simrankRes._2.foreach(_ => {}) - costTime = (System.currentTimeMillis() - startTime) / 1000.0 - SimRankVerify.saveRes(simrankRes._1, simrankRes._2, params.saveDataPath, sc) - } - costTime - } -}