# Spark-demo
**Repository Path**: CandyPop/spark-demo
## Basic Information
- **Project Name**: Spark-demo
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2022-04-04
- **Last Updated**: 2022-07-29
## Categories & Tags
**Categories**: Uncategorized
**Tags**: Spark
## README
#### Spark
Spark相对于Hadoop,拥有更快的数据挖掘,数据计算,一般来说,Spark基于内存进行计算,Spark可以取代Hadoop,也可以与Hadoop集成,取代他的MapReduce的引擎。
Spark和Hadoop的根本差异是多个作业之间的数据通信问题,Spark多个作业之间数据基于**内存**,而Hadoop是基于**磁盘。**

➢ **Spark Core**
Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL,
Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的
➢ **Spark SQL**
Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL
或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
➢ **Spark Streaming**
Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理
数据流的 API。
➢ **Spark MLlib**
MLlib 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等
额外的功能,还提供了一些更底层的机器学习原语。
➢ **Spark GraphX**
GraphX 是 Spark 面向图计算提供的框架与算法库。
##### 配置环境
由于Spark是Scala编写,所以idea也需要安装Sacla插件。
首先解压缩
```
bin/spark-shell
```

然后测试一下本地环境有没有什么问题
```
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
```


如果我们希望将自己的任务提交给sprak就需要将scala程序打包成jar包给spark执行。切换到安装路径
```
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
```

正常输出。
##### 集群模式
进入到conf目录下,将slaves.template 修改为 slaves,加入已经准备好的集群节点


然后修改spark-env.sh.template为spark-env.sh
加入如下代码
```
export JAVA_HOME=java安装地址
SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077
```
接着其它的103-104都放入相同的配置。然后在主节点上输入
```
sbin/start-all.sh
```



查看集群环境
```
http://hadoop102:8080/
```

看看集群环境是否ok
```
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
```


##### 配置历史记录服务
conf目录下,修改文件
```
mv spark-defaults.conf.template spark-defaults.conf
```
加入如下配置
```
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:8020/directory
```
在此之前请保证hadoop集群已启动,并且目录/directory存在
```
sbin/start-dfs.sh
hadoop fs -mkdir /directory
```
在spark-env.sh加入如下配置
```
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
-Dspark.history.retainedApplications=30"
```
参数 1 含义:WEB UI 访问的端口号为 18080
参数 2 含义:指定历史服务器日志存储路径
参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序
信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

##### Yarn
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这
种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主
要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是
和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的 Yarn 环境
下 Spark 是如何工作的(其实是因为在国内工作中,Yarn 使用的非常多)。
修改hadoop的yarn-site.xml
```xml
yarn.nodemanager.pmem-check-enabled
false
yarn.nodemanager.vmem-check-enabled
false
```
```
mv spark-env.sh.template spark-env.sh
```
```
添加如下配置
export JAVA_HOME=/usr/local/java
YARN_CONF_DIR=/usr/local/hadoop-3.1.3/etc/hadoop
```
首先启动hadoop再启动spark。完成后。提交个任务试试
```
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
```
```
--deploy-mode cluster集群模式不会展示结果,client会显示结果
```
可以注意到,我们的master已经不是spark的某个节点,而是yarn
| 模式 | Spark安装机器数 | 需启动的进程 | 所属者 | 应用场景 |
| ---------- | --------------- | ------------ | ------ | -------- |
| Local | 1 | 无 | Spark | 测试 |
| Standalone | 3 | Master和Work | Spark | 单独部署 |
| Yarn | 1 | Yarn 及 HDFS | Hadoop | 混合部署 |
➢ Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算) ➢ Spark Master 内部通信服务端口号:7077
➢ Standalone 模式下,Spark Master Web 端口号:8080(资源)
➢ Spark 历史服务器端口号:18080
➢ Hadoop YARN 任务运行情况查看端口号:8088
##### 聚合算子的区别
* groupByKey
* 相同的key分到同一个分区中
* reduceByKey
* 相同的key分到同一个分区中,并具有聚合功能
* foldByKey、
* 更加细节的分区规则,可以自定义分区间和分区内规则,但分区间的分区内的计算规则会统一
* AggreateByKey
* 需要给定初始值,可以分别定义分区间和分区内的计算规则
* CombineByKey
* 可以不给定初始值,但是要定义第一个值的转换规则,同样都可以自定义分区间规则和分区内规则
除了CombineByKey之前,所以的其它分区的最后逻辑使用的都是CombineByKey,只是CombineByKey的三个参数各自使用不一样而已。
##### 案例实操
通过解析一个日志
统计出每一个**省份**每个广告被点击数量排行的Top3



##### Spark-Sql
sparksql可以通过特定的语句,来连接到指定的数据源,数据源不限于hbase,hive,mysql等。
spark-sql中引入了sparksession的概念,与sprakcontext不同,他们的生命周期是不一样的。
session可以创建多个会话来操作数据源。
对于spark来说,数据源的种类可以是文件,也可以是数据库,也可以是内存。
sparksql中提供了DataFrame和DataSet

DataFrame可以将数据转换为二维视图,保存行列的关系,DataSet是DataFrame的扩展,他可以让你像操作对象一样的,操作其中的熟悉。
```scala
// SparkSession
// 首先你可以读取一个数据源的数据,变成一个DataFrame,然后对他进行操作,DataFrame是一个Spark抽象出来的二维表格
val df:DataFrame = spark.read.json("input/user.json")
//你首先可以查看这个dataFrame
df.show()
// 接着如果你想要操作这张表,可以用sql来操作,但是表名需要你创建
df.createTempView("user")// 这就创建一个叫user的表
// 如果你重复创建了,这里可以换个方法
df.createOrReplaceTempView("user")
//接着你可以查询
spark.sql("select * from user").show()
// 创建全局会话,让其他新会话也可以访问
df.createOrReplaceGlobalTempView("emp")
//但是你访问的时候 要加这个前缀
spark.newSession.sql("session * from global_temp.emp").show
```
**注意:如果从内存中获取数据,Spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数据,不能确定是什么类型,只能用bigint接收,可以和Long类型转换,但是和Int不能转换**
##### DSL 语法
DataFrame 提供了一个特定的领域语法(domain-specific language,DSL)去管理结构化的数据,可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图
```scala
// 创建视图,假设你已经有了一个装好数据的DataFrame
// 只查看username的属性
df.select("username").show
// select方法就是dsl语法
// 将年龄+1
df.select($"age"+1).show
//也可以这样写
df.select('age+1).show
//如果你涉及了多个列名,涉及到计算,也必须都加上$,即便你的其它字段没有参与计算
df.select($"username",$"age"+1).show
df.select('username,'age+1).show
//查看age大于30的数据
df.filter($"age">30).show
// 按,某个年龄进行分组,统计个数
df.groupBy("age").count.show
```

##### RDD ->DataFrame
RDD是Spark对数据的最基本的抽象,他关注的是数据本身,而DataFrame更关注结构和类型,DataFrame的底层使用RDD来实现,本质上,是对RDD的封装。如果你没有一个数据源,但是你有一个装满数据的RDD,你也可以将她转换为DataFrame

```scala
val rdd = sc.makeRDD(List(1,2,3,4))
```

```scala
// 转换为DataFrame的方法
// 由于我们创建了一个List 1 2 3 的列表,所以我们可以为他命名,告诉他列名
val df:DataFrame = rdd.toDF("id")
// 将DataFrame变回去Rdd
df.rdd
```

转换后,是一个`org.apache.spark.sql.Row`类型的RDD
##### DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息
```scala
//由于DataSet需要明确的数据类型,在scala中,定义一个类拥有属性是很好的参数,所以我们定义一个样例类
case class Person(name:String,age:Long)
// 封装一个集合
val list =List(Person("zhangsan",30),Person("lisi",40))
```

```scala
list.toDS
```

##### DataFrame -> DataSet

首先我们已经有了一个DataFrame,还是之前的概念,想要转换成DataSet,我们需要明确的类型
```scala
case class Emp(age:Long,username:String)// 类型也要对应上
val ds:DataSet = df.as[Emp]
```
##### RDD->DataSet
有类型,有数据,就可以转换成DataSet
```scala
val rdd:RDD = sc.markRDD(List(Emp(30,"zhangsan"),Emp(40,"lisi")))
rdd.toDS
//前提条件,一定要是个样例类才可以直接转换
```
RDD,DataFrame,DataSet可以互相转换。

rdd->DataSet,需要
```
sc.markRDD(List(Emp(),Emp))
// emp是样例类,
rdd.toDS()
```
DataSet->rdd
```
ds.rdd
```
##### UDF
##### UDAF 函数实现原理
有些时候,api自己提供的函数可能不够用,所以需要自己去定义符合逻辑的函数。
例如一些聚合函数
想要计算一张表的聚合原理,UDAF函数将数据存储在一个缓冲区中,进行计算。

##### 数据的保存和读取

spark中,默认保存和读取的格式为.parquet,但是这种格式我们不好创建,我们还是比较喜欢.json的格式,这个时候你可以使用
```scala
spark.read.format("json).load("data/user.json")
// 当然你也可以用
spark.read.json("data/user.json")
//保存文件
spark.write.format("json").save("output")
```
而且,你如果觉得读取数据比较麻烦,可以不创建临时表,直接去读文件
```scala
// 表示用json的方式读取这个文件
spark.sql("select * from json.`/opt/modlue/data/user.json`").show
```

对于保存文件而言,如果输出的文件夹已经存在,是会报错的,原因是因为保存模式的原因,你可以通过切换模式,来避免这个错误。对应已经读取的数据源的DataFrame你可以这样保存
```scala
df.write.mode("append").json("data.output")
// 追加模式,如果已经存在这个文件夹,那么就追加到文件夹的后面
// 读取csv格式
df.read.format("csv").
option("sep",";"). //表示分隔符
option("inferSchema","true").
option("header","true").// 将第一行作为表头
load("data/person.csv")
```

##### Spark与Hive
由于Spark与Hive有着很大的渊源,所以Spark和Hive存在两种关系,一种是Spark可以使用自己内置的hive,也可以连接外置的Hive
###### 内置的Hive
默认的spark的目录



当你执行一些操作的时候,他会去读本地的hiveconfig的文件,日志也展示出来了,如果不存在,就会去创建,也就是本身的hive初始化的操作,我们看到了`metastore_db`被创建出来了

我们可以通过临时视图创建表,也可以通过自己创建属于自己的表

```scala
spark.sql("create table atguigu(id int)")
//然后准备一个数据,因为我们这里创建了一个id的的表,所以你可以这样插入一些数据
```


```scala
spark.sql("load data local inpath 'data/id.txt' into table atguigu")
```




相当于本地创建了一个数据仓库,这个和hive操作没什么区别
###### 外置Hive
要使用外置的hive首先,你要把内置的hive创建出来的文件给删除

删除掉。


相关的配置文件,放到对应的spark文件的conf中,还有对应的mysql的jar包,保证可以使用hive正确连接到mysql

###### 代码访问Hive
引入依赖
```xml
mysql
mysql-connector-java
8.0.28
org.apache.spark
spark-hive_2.12
3.0.0
org.apache.hive
hive-exec
1.2.1
```
将你需要连接的hive的配置文件`hive-site.xml`,放置到resource目录下

请看源码
由于我们需要试用Spark连接外部的hive,所以我们在做练习之前,先创建一个数据库用于测试,连接到hive,创建数据库


准备好测试的数据,然后看源码
##### 实操例子
这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每
个商品在主要城市中的分布比例,超过两个城市用其他显示。

```sql
select * from
(
-- 第三步,按照点击次数排序
select *,
-- 以区域进行排序,按照点击次数,降序
rank() over(partition by area order by clickCnt desc) as rank
from (
-- 第二部,按照地区进行分组
select
area,product_name,
count(*) as clickCnt --点击的次数
from
( -- 第一步将三张表关联起来
select
a.*,
p.product_name, --商品名称
c.city_name, -- 城市名称
c.area -- 地区
from
user_visit_action a
join product_info p on p.product_id = a.click_product_id
join city_info c on c.city_id = a.city_id
where a.click_product_id > -1 --是点击操作
) t1 group by area,product_name -- 按照地区和产品名分组
) t2
) t3 where rank <=3
```