# spark-maven-scala-idea
**Repository Path**: tomdev/spark-maven-scala-idea
## Basic Information
- **Project Name**: spark-maven-scala-idea
- **Description**: maven spark 打包,部署应用到集群环境中 scala 版
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 5
- **Created**: 2016-02-15
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# spark用maven来管理依赖部署应用到集群中-scala版
## 更多笔记分享
* 笔记分享 https://opensourceteam.gitbooks.io/bigdata/content/notes/share/mei_tian_xue_xi_bi_ji_fen_xiang.html
##准备工作
* 了解 maven 的安装,打包命令
* 安装好hadoop集群
* 安装好spark集群
* 安装好 scala环境
* 了解 scala 开发
##功能概述
* 分布式计算,HDFS文件系统上的一个目录(目录下的所有文件)或一个文件的内容,各单词的统计个数,(单词之间以空格来分隔)
* scala 语言开发 spark程序
* 通过maven 打包 spark 项目
* 通过maven 打包后,都可以直接在 idea 工具中直接运行项目,把应用部署到集群中去
* maven scala 打包命令
java执行:mvn clean compile package -DskipTests
scala执行:mvn clean scala:compile compile package -DskipTests
##运行方式
* 直接在 idea 开发工具中运行(方便开发测试时用,不推荐在生产环境中使用,在生产中可以直接用spark-submit.sh 工具部署)
##源码下载
* https://git.oschina.net/opensourceteams/spark-maven-scala-idea.git
##官网说明依赖包
* Linking with Spark
http://spark.apache.org/docs/latest/programming-guide.html
* scala 进行 spark 开发(scala 高于2.10)
Spark 1.6.0 uses Scala 2.10. To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.10.X).
To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0
In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
Finally, you need to import some Spark classes into your program. Add the following lines:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
* java 进行spark 开发(JDK要高于1.7)
Spark 1.6.0 works with Java 7 and higher. If you are using Java 8, Spark supports lambda expressions for concisely writing functions, otherwise you can use the classes in the org.apache.spark.api.java.function package.
To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0
In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
Finally, you need to import some Spark classes into your program. Add the following lines:
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf
* maven pom.xml 必须引入
UTF-8
1.6.0
2.6.0
org.apache.spark
spark-core_2.10
${apache.spark-core.version}
org.apache.hadoop
hadoop-client
${apache.hadoop-client.version}
## 单词统计
package com.opensourceteams.module.common.bigdata.spark.quickstart
import org.apache.spark.{SparkContext, SparkConf}
/**
* 开发者:刘文 Email:372065525@qq.com
* 16/1/20 下午4:30
* 功能描述: 单词个数统计
*/
object WordCount extends App{
/*
* 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
* 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
*/
val conf = new SparkConf
val jar = "/Users/hadoop/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar"
conf.setJars(Array(jar))
/**
* appname: 应用的名称
* master:主机地址
* 本地,不运行在集群值:local
* 运行在集群:spark://s0:7077
*/
conf.setAppName("WordCountCluster").setMaster("spark://s0:7077")
/**
* 创建SparkContext对象
* SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
* 同时还会负则Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
val sc =new SparkContext(conf)
/**
* 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD)
* RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴
*/
val path = "hdfs://s0:9000/library/wordcount/input/Data"
val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行)
val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果
val mapMap = mapArray.map { x => (x,1) }
val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加
result.collect().foreach(x => println("key:"+ x._1 + " value" + x._2))
//result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_02")
sc.stop()
}
## 单词统计加排序
package com.opensourceteams.module.common.bigdata.spark.quickstart
import org.apache.spark.{SparkConf, SparkContext}
/**
* 开发者:刘文 Email:372065525@qq.com
* 16/1/20 下午4:30
* 功能描述: 单词个数统计 排序
*/
object WordCountSort extends App{
/*
* 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
* 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
*/
val conf = new SparkConf
val jar = "/Users/hadoop/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar"
conf.setJars(Array(jar))
/**
* appname: 应用的名称
* master:主机地址
* 本地,不运行在集群值:local
* 运行在集群:spark://s0:7077
*/
conf.setAppName("WordCountCluster").setMaster("spark://s0:7077")
/**
* 创建SparkContext对象
* SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
* 同时还会负则Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
val sc =new SparkContext(conf)
/**
* 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD)
* RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴
*/
val path = "hdfs://s0:9000/library/wordcount/input/Data"
val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行)
val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果
val mapMap = mapArray.map { x => (x,1) }
val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加
result.collect().foreach(x => println("key:"+ x._1 + " value" + x._2))
//result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_02")
//排序
val resultValut = result.map(x => (x._2,x._1))
val resultSort = resultValut.sortByKey(false).map(x => (x._2,x._1))
println("排序后的:==================================================")
resultSort.collect().foreach(x => println("key:"+ x._1 + " value:" + x._2))
sc.stop()
}