# spark-maven-scala-idea **Repository Path**: tomdev/spark-maven-scala-idea ## Basic Information - **Project Name**: spark-maven-scala-idea - **Description**: maven spark 打包,部署应用到集群环境中 scala 版 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 5 - **Created**: 2016-02-15 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # spark用maven来管理依赖部署应用到集群中-scala版 ## 更多笔记分享 * 笔记分享 https://opensourceteam.gitbooks.io/bigdata/content/notes/share/mei_tian_xue_xi_bi_ji_fen_xiang.html ##准备工作 * 了解 maven 的安装,打包命令 * 安装好hadoop集群 * 安装好spark集群 * 安装好 scala环境 * 了解 scala 开发 ##功能概述 * 分布式计算,HDFS文件系统上的一个目录(目录下的所有文件)或一个文件的内容,各单词的统计个数,(单词之间以空格来分隔) * scala 语言开发 spark程序 * 通过maven 打包 spark 项目 * 通过maven 打包后,都可以直接在 idea 工具中直接运行项目,把应用部署到集群中去 * maven scala 打包命令 java执行:mvn clean compile package -DskipTests
scala执行:mvn clean scala:compile compile package -DskipTests ##运行方式 * 直接在 idea 开发工具中运行(方便开发测试时用,不推荐在生产环境中使用,在生产中可以直接用spark-submit.sh 工具部署) ##源码下载 * https://git.oschina.net/opensourceteams/spark-maven-scala-idea.git ##官网说明依赖包 * Linking with Spark http://spark.apache.org/docs/latest/programming-guide.html * scala 进行 spark 开发(scala 高于2.10) Spark 1.6.0 uses Scala 2.10. To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.10.X). To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at: groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.6.0 In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS. groupId = org.apache.hadoop artifactId = hadoop-client version = Finally, you need to import some Spark classes into your program. Add the following lines: import org.apache.spark.SparkContext import org.apache.spark.SparkConf * java 进行spark 开发(JDK要高于1.7) Spark 1.6.0 works with Java 7 and higher. If you are using Java 8, Spark supports lambda expressions for concisely writing functions, otherwise you can use the classes in the org.apache.spark.api.java.function package. To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at: groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.6.0 In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS. groupId = org.apache.hadoop artifactId = hadoop-client version = Finally, you need to import some Spark classes into your program. Add the following lines: import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf * maven pom.xml 必须引入 UTF-8 1.6.0 2.6.0 org.apache.spark spark-core_2.10 ${apache.spark-core.version} org.apache.hadoop hadoop-client ${apache.hadoop-client.version} ## 单词统计 package com.opensourceteams.module.common.bigdata.spark.quickstart import org.apache.spark.{SparkContext, SparkConf} /** * 开发者:刘文 Email:372065525@qq.com * 16/1/20 下午4:30 * 功能描述: 单词个数统计 */ object WordCount extends App{ /* * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序 * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差 */ val conf = new SparkConf val jar = "/Users/hadoop/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar" conf.setJars(Array(jar)) /** * appname: 应用的名称 * master:主机地址 * 本地,不运行在集群值:local * 运行在集群:spark://s0:7077 */ conf.setAppName("WordCountCluster").setMaster("spark://s0:7077") /** * 创建SparkContext对象 * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等 * 同时还会负则Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ val sc =new SparkContext(conf) /** * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD) * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作 * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴 */ val path = "hdfs://s0:9000/library/wordcount/input/Data" val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行) val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果 val mapMap = mapArray.map { x => (x,1) } val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加 result.collect().foreach(x => println("key:"+ x._1 + " value" + x._2)) //result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_02") sc.stop() } ## 单词统计加排序 package com.opensourceteams.module.common.bigdata.spark.quickstart import org.apache.spark.{SparkConf, SparkContext} /** * 开发者:刘文 Email:372065525@qq.com * 16/1/20 下午4:30 * 功能描述: 单词个数统计 排序 */ object WordCountSort extends App{ /* * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序 * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差 */ val conf = new SparkConf val jar = "/Users/hadoop/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar" conf.setJars(Array(jar)) /** * appname: 应用的名称 * master:主机地址 * 本地,不运行在集群值:local * 运行在集群:spark://s0:7077 */ conf.setAppName("WordCountCluster").setMaster("spark://s0:7077") /** * 创建SparkContext对象 * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等 * 同时还会负则Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ val sc =new SparkContext(conf) /** * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD) * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作 * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴 */ val path = "hdfs://s0:9000/library/wordcount/input/Data" val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行) val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果 val mapMap = mapArray.map { x => (x,1) } val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加 result.collect().foreach(x => println("key:"+ x._1 + " value" + x._2)) //result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_02") //排序 val resultValut = result.map(x => (x._2,x._1)) val resultSort = resultValut.sortByKey(false).map(x => (x._2,x._1)) println("排序后的:==================================================") resultSort.collect().foreach(x => println("key:"+ x._1 + " value:" + x._2)) sc.stop() }