# Spark-demo **Repository Path**: CandyPop/spark-demo ## Basic Information - **Project Name**: Spark-demo - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-04-04 - **Last Updated**: 2022-07-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: Spark ## README #### Spark Spark相对于Hadoop,拥有更快的数据挖掘,数据计算,一般来说,Spark基于内存进行计算,Spark可以取代Hadoop,也可以与Hadoop集成,取代他的MapReduce的引擎。 Spark和Hadoop的根本差异是多个作业之间的数据通信问题,Spark多个作业之间数据基于**内存**,而Hadoop是基于**磁盘。** ![1649077641388](./img/1649077641388.png) ➢ **Spark Core** Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL, Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的 ➢ **Spark SQL** Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。 ➢ **Spark Streaming** Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理 数据流的 API。 ➢ **Spark MLlib** MLlib 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等 额外的功能,还提供了一些更底层的机器学习原语。 ➢ **Spark GraphX** GraphX 是 Spark 面向图计算提供的框架与算法库。 ##### 配置环境 由于Spark是Scala编写,所以idea也需要安装Sacla插件。 首先解压缩 ``` bin/spark-shell ``` ![QQ图片20220405162054](./img/QQ图片20220405162054.png) 然后测试一下本地环境有没有什么问题 ``` sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect ``` ![QQ图片20220405163526](./img/QQ图片20220405163526.png) ![QQ图片20220405163532](./img/QQ图片20220405163532.png) 如果我们希望将自己的任务提交给sprak就需要将scala程序打包成jar包给spark执行。切换到安装路径 ``` bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[2] \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10 ``` ![QQ图片20220405163932](./img/QQ图片20220405163932.png) 正常输出。 ##### 集群模式 进入到conf目录下,将slaves.template 修改为 slaves,加入已经准备好的集群节点 ![QQ图片20220405164559](./img/QQ图片20220405164559.png) ![QQ图片20220405164602](./img/QQ图片20220405164602.png) 然后修改spark-env.sh.template为spark-env.sh 加入如下代码 ``` export JAVA_HOME=java安装地址 SPARK_MASTER_HOST=hadoop102 SPARK_MASTER_PORT=7077 ``` 接着其它的103-104都放入相同的配置。然后在主节点上输入 ``` sbin/start-all.sh ``` ![QQ图片20220405175846](./img/QQ图片20220405175846.png) ![QQ图片20220405175848](./img/QQ图片20220405175848.png) ![QQ图片20220405175850](./img/QQ图片20220405175850.png) 查看集群环境 ``` http://hadoop102:8080/ ``` ![QQ图片20220405180024](./img/QQ图片20220405180024.png) 看看集群环境是否ok ``` bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop102:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10 ``` ![QQ图片20220405194620](./img/QQ图片20220405194620.jpg) ![QQ图片20220405194632](./img/QQ图片20220405194632.png) ##### 配置历史记录服务 conf目录下,修改文件 ``` mv spark-defaults.conf.template spark-defaults.conf ``` 加入如下配置 ``` spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory ``` 在此之前请保证hadoop集群已启动,并且目录/directory存在 ``` sbin/start-dfs.sh hadoop fs -mkdir /directory ``` 在spark-env.sh加入如下配置 ``` export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30" ``` 参数 1 含义:WEB UI 访问的端口号为 18080 参数 2 含义:指定历史服务器日志存储路径 参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。![QQ图片20220405194635](./img/QQ图片20220405194635.png) ![QQ图片20220405194638](./img/QQ图片20220405194638.png) ##### Yarn 独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这 种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主 要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是 和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的 Yarn 环境 下 Spark 是如何工作的(其实是因为在国内工作中,Yarn 使用的非常多)。 修改hadoop的yarn-site.xml ```xml yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false ``` ``` mv spark-env.sh.template spark-env.sh ``` ``` 添加如下配置 export JAVA_HOME=/usr/local/java YARN_CONF_DIR=/usr/local/hadoop-3.1.3/etc/hadoop ``` 首先启动hadoop再启动spark。完成后。提交个任务试试 ``` bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10 ``` ``` --deploy-mode cluster集群模式不会展示结果,client会显示结果 ``` 可以注意到,我们的master已经不是spark的某个节点,而是yarn | 模式 | Spark安装机器数 | 需启动的进程 | 所属者 | 应用场景 | | ---------- | --------------- | ------------ | ------ | -------- | | Local | 1 | 无 | Spark | 测试 | | Standalone | 3 | Master和Work | Spark | 单独部署 | | Yarn | 1 | Yarn 及 HDFS | Hadoop | 混合部署 | ➢ Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算) ➢ Spark Master 内部通信服务端口号:7077 ➢ Standalone 模式下,Spark Master Web 端口号:8080(资源) ➢ Spark 历史服务器端口号:18080 ➢ Hadoop YARN 任务运行情况查看端口号:8088 ##### 聚合算子的区别 * groupByKey * 相同的key分到同一个分区中 * reduceByKey * 相同的key分到同一个分区中,并具有聚合功能 * foldByKey、 * 更加细节的分区规则,可以自定义分区间和分区内规则,但分区间的分区内的计算规则会统一 * AggreateByKey * 需要给定初始值,可以分别定义分区间和分区内的计算规则 * CombineByKey * 可以不给定初始值,但是要定义第一个值的转换规则,同样都可以自定义分区间规则和分区内规则 除了CombineByKey之前,所以的其它分区的最后逻辑使用的都是CombineByKey,只是CombineByKey的三个参数各自使用不一样而已。 ##### 案例实操 通过解析一个日志 统计出每一个**省份**每个广告被点击数量排行的Top3 ![1652279724894](./img/1652279724894.png) ![1652280123356](./img/1652280123356.png) ![1652280159926](./img/1652280159926.png) ##### Spark-Sql sparksql可以通过特定的语句,来连接到指定的数据源,数据源不限于hbase,hive,mysql等。 spark-sql中引入了sparksession的概念,与sprakcontext不同,他们的生命周期是不一样的。 session可以创建多个会话来操作数据源。 对于spark来说,数据源的种类可以是文件,也可以是数据库,也可以是内存。 sparksql中提供了DataFrame和DataSet ![1652700622135](./img/1652700622135.png) DataFrame可以将数据转换为二维视图,保存行列的关系,DataSet是DataFrame的扩展,他可以让你像操作对象一样的,操作其中的熟悉。 ```scala // SparkSession // 首先你可以读取一个数据源的数据,变成一个DataFrame,然后对他进行操作,DataFrame是一个Spark抽象出来的二维表格 val df:DataFrame = spark.read.json("input/user.json") //你首先可以查看这个dataFrame df.show() // 接着如果你想要操作这张表,可以用sql来操作,但是表名需要你创建 df.createTempView("user")// 这就创建一个叫user的表 // 如果你重复创建了,这里可以换个方法 df.createOrReplaceTempView("user") //接着你可以查询 spark.sql("select * from user").show() // 创建全局会话,让其他新会话也可以访问 df.createOrReplaceGlobalTempView("emp") //但是你访问的时候 要加这个前缀 spark.newSession.sql("session * from global_temp.emp").show ``` **注意:如果从内存中获取数据,Spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数据,不能确定是什么类型,只能用bigint接收,可以和Long类型转换,但是和Int不能转换** ##### DSL 语法 DataFrame 提供了一个特定的领域语法(domain-specific language,DSL)去管理结构化的数据,可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图 ```scala // 创建视图,假设你已经有了一个装好数据的DataFrame // 只查看username的属性 df.select("username").show // select方法就是dsl语法 // 将年龄+1 df.select($"age"+1).show //也可以这样写 df.select('age+1).show //如果你涉及了多个列名,涉及到计算,也必须都加上$,即便你的其它字段没有参与计算 df.select($"username",$"age"+1).show df.select('username,'age+1).show //查看age大于30的数据 df.filter($"age">30).show // 按,某个年龄进行分组,统计个数 df.groupBy("age").count.show ``` ![1658325869032](./img/1658325869032.png) ##### RDD ->DataFrame RDD是Spark对数据的最基本的抽象,他关注的是数据本身,而DataFrame更关注结构和类型,DataFrame的底层使用RDD来实现,本质上,是对RDD的封装。如果你没有一个数据源,但是你有一个装满数据的RDD,你也可以将她转换为DataFrame ![1658326608614](./img/1658326608614.png) ```scala val rdd = sc.makeRDD(List(1,2,3,4)) ``` ![1658326690600](./img/1658326690600.png) ```scala // 转换为DataFrame的方法 // 由于我们创建了一个List 1 2 3 的列表,所以我们可以为他命名,告诉他列名 val df:DataFrame = rdd.toDF("id") // 将DataFrame变回去Rdd df.rdd ``` ![1658327029121](./img/1658327029121.png) 转换后,是一个`org.apache.spark.sql.Row`类型的RDD ##### DataSet DataSet是具有强类型的数据集合,需要提供对应的类型信息 ```scala //由于DataSet需要明确的数据类型,在scala中,定义一个类拥有属性是很好的参数,所以我们定义一个样例类 case class Person(name:String,age:Long) // 封装一个集合 val list =List(Person("zhangsan",30),Person("lisi",40)) ``` ![1658327551985](./img/1658327551985.png) ```scala list.toDS ``` ![1658327576259](./img/1658327576259.png) ##### DataFrame -> DataSet ![1658327697148](./img/1658327697148.png) 首先我们已经有了一个DataFrame,还是之前的概念,想要转换成DataSet,我们需要明确的类型 ```scala case class Emp(age:Long,username:String)// 类型也要对应上 val ds:DataSet = df.as[Emp] ``` ##### RDD->DataSet 有类型,有数据,就可以转换成DataSet ```scala val rdd:RDD = sc.markRDD(List(Emp(30,"zhangsan"),Emp(40,"lisi"))) rdd.toDS //前提条件,一定要是个样例类才可以直接转换 ``` RDD,DataFrame,DataSet可以互相转换。 ![1652707721178](./img/1652707721178.png) rdd->DataSet,需要 ``` sc.markRDD(List(Emp(),Emp)) // emp是样例类, rdd.toDS() ``` DataSet->rdd ``` ds.rdd ``` ##### UDF ##### UDAF 函数实现原理 有些时候,api自己提供的函数可能不够用,所以需要自己去定义符合逻辑的函数。 例如一些聚合函数 想要计算一张表的聚合原理,UDAF函数将数据存储在一个缓冲区中,进行计算。 ![1658672300413](./img/1658672300413.png) ##### 数据的保存和读取 ![1658675816973](./img/1658675816973.png) spark中,默认保存和读取的格式为.parquet,但是这种格式我们不好创建,我们还是比较喜欢.json的格式,这个时候你可以使用 ```scala spark.read.format("json).load("data/user.json") // 当然你也可以用 spark.read.json("data/user.json") //保存文件 spark.write.format("json").save("output") ``` 而且,你如果觉得读取数据比较麻烦,可以不创建临时表,直接去读文件 ```scala // 表示用json的方式读取这个文件 spark.sql("select * from json.`/opt/modlue/data/user.json`").show ``` ![1658676334087](./img/1658676334087.png) 对于保存文件而言,如果输出的文件夹已经存在,是会报错的,原因是因为保存模式的原因,你可以通过切换模式,来避免这个错误。对应已经读取的数据源的DataFrame你可以这样保存 ```scala df.write.mode("append").json("data.output") // 追加模式,如果已经存在这个文件夹,那么就追加到文件夹的后面 // 读取csv格式 df.read.format("csv"). option("sep",";"). //表示分隔符 option("inferSchema","true"). option("header","true").// 将第一行作为表头 load("data/person.csv") ``` ![1658676891104](./img/1658676891104.png) ##### Spark与Hive 由于Spark与Hive有着很大的渊源,所以Spark和Hive存在两种关系,一种是Spark可以使用自己内置的hive,也可以连接外置的Hive ###### 内置的Hive 默认的spark的目录 ![1658761520763](./img/1658761520763.png) ![1658761546088](./img/1658761546088.png) ![1658761564254](./img/1658761564254.png) 当你执行一些操作的时候,他会去读本地的hiveconfig的文件,日志也展示出来了,如果不存在,就会去创建,也就是本身的hive初始化的操作,我们看到了`metastore_db`被创建出来了 ![1658761167699](./img/1658761167699.png) 我们可以通过临时视图创建表,也可以通过自己创建属于自己的表 ![1658761237341](./img/1658761237341.png) ```scala spark.sql("create table atguigu(id int)") //然后准备一个数据,因为我们这里创建了一个id的的表,所以你可以这样插入一些数据 ``` ![1658761362714](./img/1658761362714.png) ![1658761370480](./img/1658761370480.png) ```scala spark.sql("load data local inpath 'data/id.txt' into table atguigu") ``` ![1658761430602](./img/1658761430602.png) ![1658761446132](./img/1658761446132.png) ![1658761455175](./img/1658761455175.png) ![1658761465254](./img/1658761465254.png) 相当于本地创建了一个数据仓库,这个和hive操作没什么区别 ###### 外置Hive 要使用外置的hive首先,你要把内置的hive创建出来的文件给删除 ![1658761736606](./img/1658761736606.png) 删除掉。 ![1658761773037](./img/1658761773037.png) ![1658843205713](./img/1658843205713.png) 相关的配置文件,放到对应的spark文件的conf中,还有对应的mysql的jar包,保证可以使用hive正确连接到mysql ![1658843170491](./img/1658843170491.png) ###### 代码访问Hive 引入依赖 ```xml mysql mysql-connector-java 8.0.28 org.apache.spark spark-hive_2.12 3.0.0 org.apache.hive hive-exec 1.2.1 ``` 将你需要连接的hive的配置文件`hive-site.xml`,放置到resource目录下 ![1658762270936](./img/1658762270936.png) 请看源码 由于我们需要试用Spark连接外部的hive,所以我们在做练习之前,先创建一个数据库用于测试,连接到hive,创建数据库 ![1659020427294](./img/1659020427294.png) ![1659020532773](./img/1659020532773.png) 准备好测试的数据,然后看源码 ##### 实操例子 这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每 个商品在主要城市中的分布比例,超过两个城市用其他显示。 ![1659022855782](./img/1659022855782.png) ```sql select * from ( -- 第三步,按照点击次数排序 select *, -- 以区域进行排序,按照点击次数,降序 rank() over(partition by area order by clickCnt desc) as rank from ( -- 第二部,按照地区进行分组 select area,product_name, count(*) as clickCnt --点击的次数 from ( -- 第一步将三张表关联起来 select a.*, p.product_name, --商品名称 c.city_name, -- 城市名称 c.area -- 地区 from user_visit_action a join product_info p on p.product_id = a.click_product_id join city_info c on c.city_id = a.city_id where a.click_product_id > -1 --是点击操作 ) t1 group by area,product_name -- 按照地区和产品名分组 ) t2 ) t3 where rank <=3 ```