# pyspark_project
**Repository Path**: cucy/pyspark_project
## Basic Information
- **Project Name**: pyspark_project
- **Description**: Python3实战Spark大数据分析及调度
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 17
- **Forks**: 16
- **Created**: 2019-11-16
- **Last Updated**: 2023-07-16
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Python3实战Spark大数据分析及调度
# 第1章 课程介绍
## 一、PySpark导学

# 第2章 实战环境搭建
## 一、Python3环境部署
[下载网址](https://www.python.org/downloads/release/python-365/)

[参考网址](http://spark.apache.org/docs/latest/building-spark.html)
```
cd software/
```
```
wget https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
```
```
tar -zvxf Python-3.6.5.tgz -C ~/app/
```
--编译前安装依赖,python依赖安装
```
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
```
```
cd Python-3.6.5/
```
```
./configure --prefix=/home/jungle/app/python3
```
```
make && make install
```
```
cd /home/jungle/app/python3/bin
pwd
```
--配置环境变量
```
vi ~/.bash_profile
```
```
export PATH=/home/jungle/app/python3/bin:$PATH
```
```
source ~/.bash_profile
```
## 二、Spark源码编译及部署
--Spark
```
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0
```
# 第3章 Spark Core核心RDD
## 一、RDD是什么
```
官网:xxxx.apache.org
源码:https://github.com/apache/xxxx
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
1)RDD是一个抽象类
2)带泛型的,可以支持多种类型: String、Person、User
RDD:Resilient Distributed Dataset 弹性 分布式 数据集
Represents an
immutable:不可变
partitioned collection of elements :分区
Array(1,2,3,4,5,6,7,8,9,10) 3个分区: (1,2,3) (4,5,6) (7,8,9,10)
that can be operated on in parallel: 并行计算的问题
```
```
单机存储/计算==>分布式存储/计算
1)数据的存储: 切割 HDFS的Block
2)数据的计算: 切割(分布式并行计算) MapReduce/Spark
3)存储+计算 : HDFS/S3+MapReduce/Spark
==> OK
```
## 二、RDD的五大特性
```
RDD的特性:
Internally, each RDD is characterized by five main properties:
- A list of partitions
一系列的分区/分片
- A function for computing each split/partition
y = f(x)
rdd.map(_+1)
- A list of dependencies on other RDDs
rdd1 ==> rdd2 ==> rdd3 ==> rdd4
dependencies: *****
rdda = 5个partition
==>map
rddb = 5个partition
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g.
block locations for an HDFS file)
数据在哪优先把作业调度到数据所在的节点进行计算:移动数据不如移动计算
为什么location有s(location有多个?)
```
## 三、RDD特性在源码中的体现
```
五大特性源码体现:
def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getPartitions: Array[Partition] 特性一
def getDependencies: Seq[Dependency[_]] = deps 特性三
def getPreferredLocations(split: Partition): Seq[String] = Nil 特性五
val partitioner: Option[Partitioner] = None 特性四
```
## 四、图解RDD

## 五、SparkContext&SparkConf详解
[参考网址](http://spark.apache.org/docs/latest/rdd-programming-guide.html)
```
第一要务:创建SparkContext
连接到Spark“集群”:local、standalone、yarn、mesos
通过SparkContext来创建RDD、广播变量到集群
在创建SparkContext之前还需要创建一个SparkConf对象
```
## 六、pyspark
```
cd $SPARK_HOME/bin/
```
```
pyspark --master local[2] --jars /home/jungle/app/hive-1.1.0-cdh5.7.0/lib/mysql-connector-java-5.1.27-bin.jar
```

==UI界面==
```
http://192.168.1.18:4040
```
```
vi ~/.bash_profile
```
```
export PYSPARK_PYTHON=python3.5
```

```
source ~/.bash_profile
```
## 七、RDD的创建
### 1.[方式一(Parallelized Collections)](http://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections)
```
pyspark --master local[2] --jars /home/jungle/app/hive-1.1.0-cdh5.7.0/lib/mysql-connector-java-5.1.27-bin.jar
```




### 2.[方式二(External Datasets)](http://spark.apache.org/docs/latest/rdd-programming-guide.html#external-datasets)
```
cd data
vi hello.txt
```

```
sc.textFile("file:///home/jungle/data/hello.txt").collect()
```

```
hadoop fs -put hello.txt /
hadoop fs -text /hello.txt
```

```
sc.textFile("hdfs://192.168.1.18:8020/hello.txt").collect()
```


==注意==
```
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes
1)我们上课是在单节点上的:一个节点, hello.txt只要在这台机器上有就行了
2)standalone: Spark集群: 3个节点 local path 都是从节点的本地读取数据 不建议
```
+ 写入文件系统
```
disData = sc.parallelize(data)
disData.saveAsTextFile("file:///home/jungle/data/output")
```


## 八、使用IDE开发pyspark应用程序
```
开发pyspark应用程序
1) IDE: IDEA pycharm
2) 设置基本参数: python interceptor PYTHONPATH SPARK_HOME 2zip包
3)开发
4)使用local进行本地测试
```

### 1.环境设置
----


```
PYTHONPATH:E:\spark-2.1.0-bin-2.6.0-cdh5.7.0\python
SPARK_HOME:E:\spark-2.1.0-bin-2.6.0-cdh5.7.0
```

---




### 2.程序
--spark0301.py
```python
from pyspark import SparkConf,SparkContext
# 创建SparkConf:设置的是Spark相关的参数信息
conf = SparkConf().setMaster("local[2]").setAppName("spark0301")
# 创建SparkContext
sc = SparkContext(conf=conf)
# 业务逻辑
data = [1,2,3,4,5]
distData = sc.parallelize(data)
print(distData.collect())
# 好的习惯
sc.stop()
```
==运行结果==

## 九、提交pyspark作业到服务器上运行
```
mkdir script
cd script/
```
```
vi spark0301.py
```

==提交任务==
```
cd $SPARK_HOME/bin/
```
```
./spark-submit --master local[2] --name spark0301 /home/jungle/script/spark0301.py
```
==报错==
> Exception in thread "main" java.io.IOException: Cannot run program "python3.5": error=2, No such file or directory
改正:
----
# 第4章 Spark Core RDD编程
## 一、RDD常用操作
[RDD Operations](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations)
```
RDD Operation
transformations: create a new dataset from an existing one
RDDA ---transformation--> RDDB
# 举个例子
y = f(x)
rddb = rdda.map(....)
lazy(*****)
# 遇到collect才计算
rdda.map().filter()......collect
# transformation
map/filter/group by/distinct/.....
actions:
return a value to the driver program after running a computation on the dataset
# actions
count/reduce/collect......
# 特点
1) transformation are lazy, nothing actually happens until an action is called;
2) action triggers the computation;
3) action returns values to driver or writes data to external storage;
```
+ 程序
```python
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
sc = SparkContext(conf = conf)
def my_map():
data = [1,2,3,4,5]
# 变成RDD
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:x*2)
# 输出collect()
print(rdd2.collect())
def my_map2():
a = sc.parallelize(["dog", "tiger", "lion", "cat", "panther", " eagle"])
b = a.map(lambda x:(x,1))
print(b.collect())
def my_filter():
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
mapRdd = rdd1.map(lambda x:x*2)
filterRdd = mapRdd.filter(lambda x:x>5)
print(filterRdd.collect())
print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
def my_flatMap():
data = ["hello spark","hello world","hello world"]
rdd = sc.parallelize(data)
print(rdd.flatMap(lambda line:line.split(" ")).collect())
def my_groupBy():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
groupByRdd = mapRdd.groupByKey()
print(groupByRdd.collect())
print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())
def my_reduceByKey():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
def my_sort():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
reduceByKeyRdd.sortByKey(False).collect()
reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
def my_union():
a = sc.parallelize([1,2,3])
b = sc.parallelize([3,4,5])
print(a.union(b).collect())
def my_distinct():
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 2])
a.union(b).distinct().collect()
def my_join():
a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
# a.join(b).collect
# a.leftOuterJoin(b).collect
# a.rightOuterJoin(b).collect
a.fullOuterJoin(b).collect
def my_action():
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data)
rdd.collect()
rdd.reduce(lambda x,y:x+y)
rdd.foreach(lambda x:print(x))
my_union()
sc.stop()
```
## 二、transformation算子使用
### 1.map算子使用详解
```
map:
map(func)
将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回
word => (word,1)
```



### 2.filter算子详解
```
filter:
filter(func)
选出所有func返回值为true的元素,生成一个新的分布式的数据集返回
```

### 3.flatMap算子详解
```
flatMap
flatMap(func)
输入的item能够被map到0或者多个items输出,返回值是一个Sequence
```
### 4.groupByKey算子详解
```
groupByKey:把相同的key的数据分发到一起
['hello', 'spark', 'hello', 'world', 'hello', 'world']
('hello',1) ('spark',1)........
```
### 5.reduceByKey算子详解
```
reduceByKey: 把相同的key的数据分发到一起并进行相应的计算
mapRdd.reduceByKey(lambda a,b:a+b)
[1,1] 1+1
[1,1,1] 1+1=2+1=3
[1] 1
```
### 6.sortByKey算子详解
```
需求: 请按wc结果中出现的次数降序排列 sortByKey
('hello', 3), ('world', 2), ('spark', 1)
```
```python
def my_sort():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
reduceByKeyRdd.sortByKey(False).collect()
reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
```
### 7.union算子使用详解
--合并
```python
def my_union():
a = sc.parallelize([1,2,3])
b = sc.parallelize([3,4,5])
print(a.union(b).collect())
```
### 8.distinct算子使用详解
--去重
```python
def my_distinct():
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 2])
a.union(b).distinct().collect()
```
### 9.join算子详解
```
join:
inner join
outer join:left/right/full
```
```python
def my_join():
a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
# a.join(b).collect
# a.leftOuterJoin(b).collect
# a.rightOuterJoin(b).collect
a.fullOuterJoin(b).collect
```
## 三、action常用算子详解

```python
def my_action():
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data)
rdd.collect()
rdd.reduce(lambda x,y:x+y)
rdd.foreach(lambda x:print(x))
```
## 四、算子综合案例实战
### 1.词频统计
```
词频案例:wc
1) input: 1/n文件 文件夹 后缀名
hello spark
hello hadoop
hello welcome
2) 开发步骤分析
文本内容的每一行转成一个个的单词 : flatMap
单词 ==> (单词, 1): map
把所有相同单词的计数相加得到最终的结果: reduceByKey
```
--程序
```python
import sys
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
if len(sys.argv) != 3:
print("Usage: wordcount