# hadoop_learning
**Repository Path**: sun81911/hadoop_learning
## Basic Information
- **Project Name**: hadoop_learning
- **Description**: hadoop知识点学习
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2020-06-10
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## **大数据概论**
### 概念
大数据(Big Data)指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产
主要解决,海量数据的**存储**和海量数据的**分析计算**问题
按顺序给出数据存储单位:bit、Byte、KB、MB、GB、TB、**PB、EB、ZB**、YB、BB、NB、DB
### 特点
1. **Volume(大量)**
截至目前,人类生产的所有印刷材料的数据量是200PB,而历史上全人类总共说过的话的数据量大约是5EB。当前,典型个人计算机硬盘的容量为TB量级,而一些大企业的数据量已经接近EB量级
2. **Velocity(高速)**
这是大数据区分于传统数据挖掘的最显著特征。根据IDC的“数字宇宙”的报告,预计到2020年,全球数据使用量将达到35.2ZB。在如此海量的数据面前,处理数据的效率就是企业的生命
3. **Variety(多样)**
这种类型的多样性也让数据被分为结构化数据和非结构化数据。相对于以往便于存储的以**数据库/文本**为主的结构化数据,非结构化数据越来越多,包括网络日志、音频、视频、图片、地理位置信息等,这些多类型的数据对数据的处理能力提出了更高要求
4. **Value(低价值密度)**
价值密度的高低与数据总量的大小成反比,如何快速对有价值数据“提纯”成为目前大数据背景下待解决的难题
## 大数据生态
### Hadoop
#### 介绍
Hadoop是一个由Apache基金会所开发的分布式系统基础架构
主要解决,海量数据的存储和海量数据的分析计算问题
广义上来说,Hadoop通常是指一个更广泛的概念——Hadoop生态圈
#### 优势
1. 高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失
2. 高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点
3. 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度
4. 高容错性:能够自动将失败的任务重新分配
#### 组成
##### Hadoop 1.x组成
1. Common(辅助工具)
2. HDFS(数据存储)
3. MapReduce(计算 + 资源调度)
##### Hadoop 2.x组成
1. Common(辅助工具)
2. HDFS(数据存储)
3. Yarn(资源调度)
4. MapReduce(计算)
在Hadoop1.x时代,Hadoop中的MapReduce同时处理**业务逻辑运算和资源的调度**,耦合性较大,在Hadoop2.x时代,增加了Yarn。**Yarn只负责资源的调度**,**MapReduce只负责运算**
#### HDFS 架构概述
1. NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等
2. DataNode(dn):在本地文件系统存储文件块数据,以及块数据的校验和
3. Secondary NameNode(2nn):用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照
#### YARN 架构概述
1. ResourceManager(RM)
1. 处理客户端请求
2. 监控NodeManager
3. 启动或监控ApplicationMaster
4. 资源的分配与调度
2. NodeManager(NM)
1. 管理单个节点上的资源
2. 处理来自ResourceManager的命令
3. 处理来自ApplicationMaster的命令
3. ApplicationMaster(AM)
1. 负责数据的切分
2. 为应用程序申请资源并分配给内部的任务
3. 任务的监控与容错
4. Container
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等

#### MapReduce 架构概述
MapReduce将计算过程分为两个阶段:Map 和 Reduce
1. Map 阶段并行处理输入数据
2. Reduce 阶段对 Map 结果进行汇总

### 技术生态体系

#### 技术名词解释
1. Sqoop:是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中
2. Flume:是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据。同时Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力
3. Kafka:是一种高吞吐量的分布式发布订阅消息系统
1. 通过磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
2. 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
3. 支持通过Kafka服务器和消费机集群来分区消息
4. 支持Hadoop并行数据加载
4. Storm:用于“连续计算”,对数据流做连续查询,在计算时就将结果以流的形式输出给用户
5. Spark:是当前最流行的开源大数据内存计算框架,可以基于Hadoop上存储的大数据进行计算
6. Oozie:是一个管理Hdoop作业(job)的工作流程调度管理系统
7. Hbase:是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库
8. Hive:是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析
9. Mahout:Apache Mahout是个可扩展的机器学习和数据挖掘库
10. ZooKeeper:是Google的Chubby一个开源的实现,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、 分布式同步、组服务等。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户
## Hadoop 运行环境搭建
### 虚拟机环境准备
1. 克隆虚拟机
2. 修改克隆虚拟机的静态IP:192.168.31.199
3. 修改主机名:hadoop199
4. 配置hosts映射
5. 关闭防火墙
```shell
systemctl stop firewalld
```
6. 在/opt目录下创建修改文件夹 module/ 和 software/
1. software/:存放安装软件 jdk.tar.gz 和 hadoop.tar.gz
2. module/ :存放解压的软件 jdk/ 和 hadoop/
### 安装 JDK
1. 解压JDK到/opt/module目录下
```shell
tar -zxvf jdk-8u144-linux-x64.tar.gz -C /opt/module/
```
2. 配置JDK环境变量
1. 获取JDK路径
```shell
pwd
```
2. 编辑/etc/profile文件
```shell
vim /etc/profile
```
3. 配置环境变量及路径
```shell
# JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
```
4. 保存退出
```shell
:wq
```
5. 让修改后的文件生效
```shell
source /etc/profile
```
3. 测试是否安装成功
```shell
java -version
```
```
java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
```
### 安装 Hadoop
1. 解压安装文件到/opt/module下面
```shell
tar -zxvf hadoop-2.7.2.tar.gz -C /opt/module/
```
2. 配置Hadoop环境变量
1. 获取Hadoop安装路径
```shell
pwd
```
2. 编辑/etc/profile文件
```shell
vim /etc/profile
```
3. 配置环境变量及路径
```shell
# HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export PATH=$HADOOP_HOME/bin:$PATH
export PATH=$HADOOP_HOME/sbin:$PATH
```
4. 保存退出
```shell
:wq
```
5. 让修改后的文件生效
```shell
source /etc/profile
```
3. 测试是否安装成功
```shell
hadoop version
```
```
Hadoop 2.7.2
Subversion Unknown -r Unknown
Compiled by root on 2017-05-22T10:49Z
Compiled with protoc 2.5.0
From source with checksum d0fda26633fa762bff87ec759ebe689c
This command was run using /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar
```
### Hadoop 目录结构
1. bin:存放对Hadoop相关服务(HDFS, YARN)进行操作的脚本
2. etc:Hadoop的配置文件目录,存放Hadoop的配置文件
3. lib:存放Hadoop的本地库,对数据进行压缩解压缩功能
4. sbin:存放启动或停止Hadoop相关服务的脚本
5. share:存放Hadoop的依赖jar包、文档和官方案例
## Hadoop 运行模式
Hadoop运行模式包括:**本地模式**、**伪分布式模式**以及**完全分布式模式**
### 本地运行模式
#### Grep 案例
1. 在 hadoop-2.7.2 文件下创建一个input文件夹
```shell
mkdir input
```
2. 将Hadoop的xml配置文件复制到input
```shell
cp etc/hadoop/*.xml input
```
3. 执行share目录下的MapReduce程序
```shell
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar grep input output 'dfs[a-z.]+'
```
4. 查看输出结果
```shell
cat output/*
```
```
1 dfsadmin
```
#### WordCount 案例
1. 创建在 hadoop-2.7.2 文件下面创建一个wcinput文件夹
```shell
mkdir wcinput
```
2. 在 wcinput 文件下创建一个wc.input文件并输入内容
```shell
vim wcinput/wc.input
```
```
hello world
hello roger
```
3. 保存退出
```shell
:wq
```
4. 回到 /opt/module/hadoop-2.7.2 目录执行程序
```shell
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount wcinput wcoutput
```
5. 查看输出结果
```shell
cat wcoutput/*
```
```
hello 2
roger 1
world 1
```
### 伪分布式运行模式
#### 启动HDFS运行MapReduce
##### 配置集群
1. 配置 hadoop-env.sh
```shell
# The java implementation to use.
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
2. 配置 core-site.xml
```xml
fs.defaultFS
hdfs://hadoop199:9000
hadoop.tmp.dir
/opt/module/hadoop-2.7.2/data/tmp
```
3. 配置 hdfs-site.xml
```xml
dfs.replication
1
```
##### 启动集群
1. 第一次启动集群时,需要格式化 NameNode
```shell
hdfs namenode -format
```
2. 启动 NameNode
```shell
hadoop-daemon.sh start namenode
```
3. 启动 DataNode
```shell
hadoop-daemon.sh start datanode
```
##### 查看集群
1. 命令行查看集群是否启动成功
```shell
jps
```
```
1778 NameNode
1939 Jps
1868 DataNode
```
**注意**:`jps`是JDK中的命令,不是Linux命令,不安装JDK不能使用
2. web端查看集群是否启动成功:http://hadoop199:50070
3. 查看产生的Log日志
```shell
cat logs/hadoop-root-datanode-hadoop199.log
```
**注意**:格式化 NameNode,会产生**新的集群 id**,导致NameNode和DataNode的**集群 id 不一致**,集群找不到已往数据,因此格式NameNode前一定要**先删除 `data` 数据和 `log` 日志**,然后再格式化 NameNode
##### 操作集群
1. 在HDFS文件系统上创建一个input文件夹
```shell
hdfs dfs -mkdir -p input
```
**补充**:使用HDFS的**相对路径**时,默认会在`/user/用户名/`下进行操作,故此时HDFS会自动递归创建 `/user/root/input`目录
2. 将测试文件内容上传到文件系统上
```shell
hdfs dfs -put wcinput/wc.input input
```
3. 查看上传的文件是否正确
```shell
hdfs dfs -ls input
```
```
Found 1 items
-rw-r--r-- 1 root supergroup 24 2020-05-07 19:17 input/wc.input
```
4. 运行MapReduce程序
```shell
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount input output
```
5. 查看输出结果
1. 命令行查看
```shell
hdfs dfs -cat output/*
```
2. web端查看:http://hadoop199:50070/explorer.html#/
6. 将测试文件内容下载到本地
```shell
hdfs dfs -get output/part-r-00000 wcoutput/
```
**注意**:此时需要保证本地 wcoutput/ 目录存在
7. 删除输出结果
```shell
hdfs dfs -rm -r output
```
#### 启动YARN运行MapReduce
##### 配置集群
1. 配置 yarn-env.sh
```shell
# some Java parameters
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
2. 配置 yarn-site.xml
```xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
hadoop199
```
3. 配置 mapred-env.sh
```shell
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
4. 拷贝一份mapred-site.xml.template重新命名为mapred-site.xml并进行配置
```shell
cp mapred-site.xml.template mapred-site.xml
```
```xml
mapreduce.framework.name
yarn
```
##### 启动集群
**注意**:启动前必须保证NameNode和DataNode已经启动
1. 启动ResourceManager
```shell
yarn-daemon.sh start resourcemanager
```
2. 启动NodeManager
```shell
yarn-daemon.sh start nodemanager
```
##### 查看集群
在web端查看集群情况:http://hadoop199:8088/cluster
##### 操作集群
1. 删除文件系统上的output文件
```shell
hdfs dfs -rm -r output
```
2. 执行MapReduce程序
```shell
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount input output
```
3. 查看运行结果
```shell
hdfs dfs -cat output/*
```
```
hello 2
roger 1
world 1
```
#### 配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器
1. 配置mapred-site.xml
```xml
mapreduce.framework.name
yarn
mapreduce.jobhistory.address
hadoop199:10020
mapreduce.jobhistory.webapp.address
hadoop199:19888
```
2. 启动历史服务器
```shell
mr-jobhistory-daemon.sh start historyserver
```
3. 查看历史服务器是否启动
```shell
jps
```
```
3472 Jps
1778 NameNode
2596 ResourceManager
1868 DataNode
2844 NodeManager
3439 JobHistoryServer
```
4. 查看JobHistory:http://hadoop199:19888/jobhistory
#### 配置日志的聚集
日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上
日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试
**注意**:开启日志聚集功能,需要重新启动NodeManager、ResourceManager和HistoryManager
1. 配置yarn-site.xml
```xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
hadoop199
yarn.log-aggregation-enable
true
yarn.log-aggregation.retain-seconds
604800
```
2. 关闭NodeManager、ResourceManager和HistoryManager
```shell
mr-jobhistory-daemon.sh stop historyserver
yarn-daemon.sh stop nodemanager
yarn-daemon.sh stop resourcemanager
```
3. 启动NodeManager、ResourceManager和HistoryManager
```shell
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
mr-jobhistory-daemon.sh start historyserver
```
4. 删除HDFS上已经存在的输出文件
```shell
hdfs dfs -rm -r output
```
5. 执行WordCount程序
```shell
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount input output
```
6. 查看日志
http://hadoop199:19888/jobhistory -> Job ID -> logs
#### 配置文件说明
Hadoop配置文件分两类:**默认配置文件**和**自定义配置文件**,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值
自定义配置文件
core-site.xml、hdfs-site.xml、yarn-site.xml和mapred-site.xml 四个配置文件存放在`$HADOOP_HOME/etc/hadoop`路径上,可以根据项目需求重新进行修改配置
### 完全分布式运行模式
需要配置3台虚拟机使用,分别为hadoop198、hadoop197和hadoop196
#### 编写集群分发脚本xsync
##### rsync 远程同步工具
rsync 主要用于备份和镜像,具有速度快、避免复制相同内容和支持符号链接的优点
rsync和scp区别:用rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新,scp是把所有文件都复制过去
**基本语法**
```shell
# 命令 选项参数 待拷贝的文件父路径/文件名称 目标用户@主机:目标路径/文件名称
rsync -rvl pdir/fname user@host:pdir/fname
```
1. -r:递归
2. -v:显示复制过程
3. -l:拷贝符号连接
**示例**
把hadoop199机器上的/opt/module目录同步到hadoop198服务器的root用户下的/opt目录
```shell
rsync -rvl /opt/module root@hadoop198:/opt
```
##### xsync集群分发脚本
循环复制文件到所有节点的相同目录下
在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行
**脚本编写**
```shell
#!/bin/bash
# 1. 获取输入参数的个数,若没有参数则直接退出
pcount=$#
if [ pcount -eq 0 ]; then
echo "no args"
exit
fi
# 2. 获取文件名称
p1=$1
fname=`basename $p1`
echo "fname=$fname"
# 3. 获取上级目录的绝对路径
pdir=`cd -P $(dirname $p1);pwd`
echo "pdir=$pdir"
# 4. 获取当前用户名称
user=`whoami`
echo "user=$user"
# 5. 循环
for((host=197; host>=196; host--)); do
echo "============hadoop$host============="
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
echo "rsync -rvl $pdir/$fname $user@hadoop$host:$pdir"
done
```
**修改执行权限**
```shell
chmod 755 xsync
```
**调用脚本**
```shell
xsync /root/bin
```
#### 集群配置
##### 集群部署规划
| | HDFS | YARN |
| --------- | --------------------------- | ---------------------------- |
| hadoop198 | NameNode, DataNode | NodeManager |
| hadoop197 | SecondaryNameNode, DataNode | NodeManager |
| hadoop196 | DataNode | ResourceManager, NodeManager |
##### 配置集群
1. 核心配置文件 core-site.xml
```xml
fs.defaultFS
hdfs://hadoop198:9000
hadoop.tmp.dir
/opt/module/hadoop-2.7.2/data/tmp
```
2. HDFS配置文件
1. 配置hadoop-env.sh
```shell
# The java implementation to use.
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
2. 配置hdfs-site.xml
```xml
dfs.replication
3
dfs.namenode.secondary.http-address
hadoop197:50090
```
3. YARN配置文件
1. 配置yarn-env.sh
```shell
# some Java parameters
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
2. 配置yarn-site.xml
```xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
hadoop196
yarn.log-aggregation-enable
true
yarn.log-aggregation.retain-seconds
604800
```
4. MapReduce配置文件
1. 配置mapred-env.sh
```shell
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
2. 配置mapred-site.xml
```xml
mapreduce.framework.name
yarn
mapreduce.jobhistory.address
hadoop198:10020
mapreduce.jobhistory.webapp.address
hadoop198:19888
```
##### 分发配置文件
```shell
xsync /opt/module/hadoop-2.7.2
```
#### 单点启动集群
如果集群是第一次启动,需要格式化NameNode
```shell
hadoop namenode -format
```
1. 在hadoop198上启动NameNode
```shell
hadoop-daemon.sh start namenode
```
2. 在hadoop198、hadoop197以及hadoop196上分别启动DataNode
```shell
hadoop-daemon.sh start datanode
```
#### 群起集群
##### 配置ssh免密登陆
1. 生成公钥和私钥
```shell
ssh-keygen -t rsa
```
2. 将公钥拷贝到要免密登录的目标机器上,需要配置登陆到自身
```shell
ssh-copy-id root@hadoop198
ssh-copy-id root@hadoop197
ssh-copy-id root@hadoop196
```
**文件功能解释**
| 文件 | 功能 |
| --------------- | ---------------------------- |
| known_hosts | 记录**访问**过的服务器的公钥 |
| authorized_keys | 存放**授权**过的服务器的公钥 |
| id_rsa | 生成的私钥 |
| id_rsa.pub | 生成的公钥 |
##### 配置 slaves
```shell
vim /opt/module/hadoop-2.7.2/etc/hadoop/slaves
```
```
hadoop198
hadoop197
hadoop196
```
**注意**:该文件中添加的内容结尾不允许有空格,文件中不允许有空行
##### 同步配置文件
```shell
xsync slaves
```
##### 启动集群
如果集群是第一次启动,需要格式化 NameNode
**注意**:格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除 data/ 和 logs/ 数据
```shell
stop-dfs.sh
```
```shell
hdfs namenode -format
```
**注意**:NameNode 和 ResourceManger 如果不是同一台机器,不能在 NameNode 上启动 YARN,应该在 ResouceManager 所在的机器上启动 YARN
1. 启动 HDFS
```
[root@hadoop198 hadoop-2.7.2]# start-dfs.sh
```
2. 启动 YARN
```
[root@hadoop196 hadoop-2.7.2]# start-yarn.sh
```
3. Web端查看 SecondaryNameNode:http://hadoop197:50090/
##### 集群测试
1. 上传文件到集群
1. 上传小文件
```shell
hdfs dfs -put wcinput/wc.input input
```
2. 上传大文件
```shell
hdfs dfs -put /opt/software/hadoop-2.7.2.tar.gz input
```
2. 查看文件存放位置
```shell
cd data/tmp/dfs/data/current/BP-1515266066-192.168.31.198-1588863516388/current/finalized/subdir0/subdir0/
```
```
总用量 194552
-rw-r--r--. 1 root root 24 5月 8 13:52 blk_1073741825
-rw-r--r--. 1 root root 11 5月 8 13:52 blk_1073741825_1001.meta
-rw-r--r--. 1 root root 134217728 5月 8 13:55 blk_1073741826
-rw-r--r--. 1 root root 1048583 5月 8 13:55 blk_1073741826_1002.meta
-rw-r--r--. 1 root root 63439959 5月 8 13:56 blk_1073741827
-rw-r--r--. 1 root root 495635 5月 8 13:56 blk_1073741827_1003.meta
```
3. 查看存储文件内容
1. 小文件查看
```shell
cat blk_1073741825
```
```
hello world
hello roger
```
2. 大文件操作
1. 拼接成一个大文件
```shell
cat blk_1073741826 >> tmp.file
cat blk_1073741827 >> tmp.file
```
2. 由于其本身是.tar.gz文件,故可以直接解压
```shell
tar -zxvf tmp.file
```
4. 下载
```shell
hdfs dfs -get input/hadoop-2.7.2.tar.gz /opt/test
```
#### 集群启动和停止
##### 逐一启动/停止
1. 启动/停止HDFS组件
```shell
hadoop-daemon.sh start/stop namenode/datanode/secondarynamenode
```
2. 启动/停止YARN组件
```shell
yarn-daemon.sh start/stop resourcemanager/nodemanager
```
##### 整体启动/停止
1. 启动/停止HDFS组件
```shell
start-dfs.sh/stop-dfs.sh
```
2. 启动/停止YARN组件
```shell
start-yarn.sh/stop-yarn.sh
```
#### 集群时间同步
##### 时间同步的方式
选定一台服务器作为时间服务器,所有的机器与这台集群时间进行定时的同步,比如每隔十分钟,同步一次时间
##### 时间服务器配置
1. 检查ntp是否安装
```shell
rpm -qa | grep ntp
```
2. 安装ntp
```shell
yum install ntp -y
```
3. 修改ntp配置文件
```shell
vim /etc/ntp.conf
```
1. (修改)授权访问网段IP
```shell
# Hosts on local network are less restricted.
restrict 192.168.31.0 mask 255.255.255.0 nomodify notrap
```
2. (修改)取消连接互联网时间
```shell
# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
```
3. (添加)若丢失网络连接,采用本地时间
```shell
server 127.127.1.0
fudge 127.127.1.0 stratum 10
```
4. 修改 /etc/sysconfig/ntpd 文件
新增内容,让硬件时间与系统时间一起同步
```shell
SYNC_HWCLOCK=yes
```
5. 重启ntpd服务
```shell
service ntpd restart
```
6. 设置ntpd服务开机启动
```shell
systemctl enable ntpd
```
##### 其他服务器配置
在其他机器配置10分钟与时间服务器同步一次
1. 编辑定时任务
```shell
crontab -e
```
```shell
*/10 * * * * /usr/sbin/ntpdate hadoop198
```
2. 修改任意机器时间
```shell
date -s "2017-9-11 11:11:11"
```
3. 10分钟后查看机器是否与时间服务器同步
```shell
date
```
## HDFS 概述
### 产生背景
随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种
### 定义
HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色
HDFS的使用场景:适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合用来做网盘应用
### 优缺点
#### 优点
1. **高容错性**
数据自动保存多个副本,它通过增加副本的形式,提高容错性
某一个副本丢失以后,它可以自动恢复
2. **适合处理大数据**
数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据
文件规模:能够处理百万规模以上的文件数量,数量相当之大
3. **低成本**
可构建在廉价机器上,通过多副本机制,提高可靠性
#### 缺点
1. 不适合低延时数据访问,比如毫秒级的存储数据是做不到的
2. 无法高效的对大量小文件进行存储
存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的
小文件存储的**寻址时间会超过读取时间**,它违反了HDFS的设计目标
3. 不支持并发写入、文件随机修改
一个文件只能有一个写,不允许多个线程同时写
仅支持数据**append(追加)**,不支持文件的随机修改
### 组成架构
#### 结构图示

#### 组件分析
1. `NameNode`:Master,它是一个主管、管理者
1. 管理HDFS的名称空间
2. 配置副本策略
3. 管理数据块(Block)映射信息
4. 处理客户端读写请求
2. `DataNode`:Slave,NameNode下达命令,DataNode执行实际的操作
1. 存储实际的数据块
2. 执行数据块的读/写操作
3. `Client`:客户端
1. 负责文件切分,文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传
2. 与NameNode交互,获取文件的位置信息
3. 与DataNode交互,读取或者写入数据
4. Client提供一些命令来管理HDFS,比如NameNode格式化
5. Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作
4. `Secondary NameNode`:并非NameNode的热备,当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务
1. 辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode
2. 在紧急情况下,可辅助恢复NameNode
### 文件块
HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在Hadoop 2.x版本中是128M,1.x版本中是64M

**注意**:文件块的大小既不能设置太小,也不能设置太大
1. 若HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置
2. 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢
3. 因此,HDFS块的大小设置主要取决于**磁盘传输速率**
## Shell 命令操作
### 基本语法
```shell
# 方式一
hadoop fs -shell命令
# 方式二
hdfs dfs -shell命令
```
其中 dfs 是 fs 的实现类
### 常用命令
1. 启动集群
```shell
# 启动hdfs文件系统
start-dfs.sh
# 启动yarn资源调度器
start-yarn.sh
```
2. 输出命令参数
```shell
hadoop fs -help rm
```
3. 显示目录信息
```shell
hadoop fs -ls /
```
4. 在HDFS上创建目录
```shell
hadoop fs -mkdir -p input
```
5. 从本地移动(剪切粘贴)到HDFS
```shell
hadoop fs -moveFromLocal input/wc.input input
```
6. 追加一个文件到已经存在的文件末尾
```shell
hadoop fs -appendToFile test.input input/wc.input
```
7. 显示文件内容
```shell
hadoop fs -cat input/wc.input
```
8. 修改文件所属权限,`-chgrp` 、`-chmod`、`-chown`
```shell
hadoop fs -chmod 777 input/wc.input
hadoop fs -chown roger:police input/wc.input
```
9. 从本地拷贝文件到HDFS
```shell
# 方式一
hadoop fs -copyFromLocal wc.input input
# 方式二
hadoop fs -put wc.input input
```
10. 从HDFS拷贝到本地
```shell
# 方式一
hadoop fs -copyToLocal input/wc.input .
# 方式二
hadoop fs -get input/wc.input .
```
11. 在HDFS中进行文件复制
```shell
hadoop fs -cp input/wc.input input/wctest.input
```
12. 在HDFS中进行文件移动
```shell
hadoop fs -mv input/wc.input output/wc.output
```
13. 合并下载多个文件
```shell
hadoop fs -getmerge input/* merge.txt
```
14. 显示文件的末尾
```shell
hadoop fs -tail input/wc.input
```
15. 删除文件或文件夹
```shell
hadoop fs -rm -r input
```
16. 统计文件夹的大小信息
```shell
hadoop fs -du -s -h /
```
17. 设置HDFS中文件的副本数量
```shell
hadoop fs -setrep 10 input/wc.input
```
**注意**:这里设置的副本数只是记录在NameNode的元数据中,实际副本数量还是取决于DataNode的数量。由于目前只有3台设备,最多也就3个副本,只有节点数的增加到10台时,副本数才能达到10
## 客户端操作
### 客户端环境
1. 创建Maven工程
2. 导入相关依赖
```xml
junit
junit
4.12
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
```
3. 在项目的src/main/resources目录下,新建`log4j.properties`配置文件
```properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
```
### API 操作
1. 创建目录
```java
// 1 获得配置文件
Configuration configuration = new Configuration();
// 2 获取文件系统,配置在集群上运行
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 3 创建目录
fs.mkdirs(new Path("input"));
// 4 关闭资源
fs.close();
```
2. 文件上传,测试参数优先级
1. 将`hdfs-site.xml`拷贝到项目的资源目录下
```xml
dfs.replication
2
```
2. 编写文件上传代码
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "1");
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 上传文件
fs.copyFromLocalFile(new Path("test.txt"), new Path("input/test1.txt"));
// 3 关闭资源
fs.close();
```
3. 参数优先级
参数优先级排序:客户端代码中设置的值 > 类路径下的用户自定义配置文件 > 服务器的默认配置
3. 文件下载
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 执行下载操作
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
fs.copyToLocalFile(false, new Path("input/test.txt"), new Path("test_download.txt"), true);
// 3 关闭资源
fs.close();
```
4. 文件删除
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 执行删除
// f 指定要删除的路径
// recursive 是否递归删除
fs.delete(new Path("input"), true);
// 3 关闭资源
fs.close();
```
5. 文件名更改
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 修改文件名称
fs.rename(new Path("input/test3.txt"), new Path("input/test.txt"));
// 3 关闭资源
fs.close();
```
6. 文件详情查看
```java
// 1获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 获取文件详情
// f 目标路径
// recursive 递归
RemoteIterator listFiles = fs.listFiles(new Path("input"), true);
// 迭代
while (listFiles.hasNext()) {
LocatedFileStatus status = listFiles.next();
// 输出详情
// 文件名称
System.out.println("名称: " + status.getPath().getName());
// 长度
System.out.println("长度: " + status.getLen());
// 权限
System.out.println("权限: " + status.getPermission());
// 分组
System.out.println("分组: " + status.getGroup());
// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
System.out.println("块存储信息: " + Arrays.toString(blockLocations));
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println("块存储的主机节点: " + host);
}
}
System.out.println("----------- 分割线 ----------");
}
// 3 关闭资源
fs.close();
```
7. 文件和文件夹判断
```java
// 1 获取文件配置信息
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 判断是文件还是文件夹
FileStatus[] listStatus = fs.listStatus(new Path("."));
for (FileStatus fileStatus : listStatus) {
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("文件: " + fileStatus.getPath().getName());
} else {
System.out.println("文件夹: " + fileStatus.getPath().getName());
}
}
// 3 关闭资源
fs.close();
```
### I/O 流操作
1. 文件上传
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 创建输入流
FileInputStream fis = new FileInputStream(new File("test.txt"));
// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("test_io_upload.txt"));
// 4 流对拷
IOUtils.copyBytes(fis, fos, configuration);
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
```
2. 文件下载
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("test_io_upload.txt"));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("test_io_download.txt"));
// 4 流的对拷
IOUtils.copyBytes(fis, fos, configuration);
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
```
3. 定位文件读取/分块读取文件
1. 读取第一块
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("hadoop-2.7.2.tar.gz"));
// 3 创建输出流
FileOutputStream fos = new FileOutputStream(new File("hadoop-2.7.2.tar.gz.part1"));
// 4 流的拷贝,复制第一部分
// 创建1K的缓冲区
byte[] buffer = new byte[1024];
// 循环读取写出1024 * 128次,总共操作128MB数据
for (int i = 0; i < 1024 * 128; i++) {
fis.read(buffer);
fos.write(buffer);
}
// 5关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
```
2. 读取第二块
```java
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop198:9000"), configuration, "root");
// 2 打开输入流
FSDataInputStream fis = fs.open(new Path("hadoop-2.7.2.tar.gz"));
// 3 定位输入数据位置
fis.seek(1024 * 1024 * 128);
// 4 创建输出流
FileOutputStream fos = new FileOutputStream(new File("hadoop-2.7.2.tar.gz.part2"));
// 5 流的对拷
IOUtils.copyBytes(fis, fos, configuration);
// 6 关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
```
3. 合并文件并更名
```shell
cat hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
mv hadoop-2.7.2.tar.gz.part1 hadoop-2.7.2-donwload.tar.gz
```
## 数据流
### 写数据流程
#### 图示分析

#### 流程分析
1. 客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在
2. NameNode返回是否可以上传
3. 客户端请求第一个 Block上传到哪些DataNode服务器上
4. NameNode返回3个DataNode节点,分别为dn1、dn2、dn3
5. 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成
6. dn1、dn2、dn3逐级应答客户端
7. 客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答
8. 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器,直至完成
### 读数据流程
#### 图示分析

#### 流程分析
1. 客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址
2. 挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据
3. DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)
4. 客户端以Packet为单位接收,先在本地缓存,然后写入目标文件
## NameNode/SecondaryNameNode
### NN和2NN工作机制
#### 问题引入
**NameNode中的元数据的存储位置?**
首先进行假设,如果存储在NameNode节点的**「磁盘」**中,因为经常需要进行随机访问,还有响应客户请求,必然是**「效率过低」**。故元数据需要存放在**「内存」**中,但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了,因此产生在**「磁盘中备份」**元数据的 `FsImage`
这样又会带来新的问题,若当在**内存中的元数据更新时同时更新FsImage**,就会导致**「效率过低」**,但若不更新,就会发生**「一致性问题」**,一旦NameNode节点断电,就会产生数据丢失。因此,引入`Edits`文件 (只进行**「追加」**操作,效率很高) 。每当元数据有**变更**时,修改**内存中的元数据**并**追加到Edits**中。一旦NameNode节点断电,可以通过**「合并FsImage和Edits」**,合成还原元数据
但是,如果长时间添加数据到Edits中,会导致该**文件数据过大,效率降低**,而且一旦断电,恢复元数据需要的**时间过长**。因此需要**「定期」**进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点`SecondaryNamenode`,**专门用于FsImage和Edits的合并**
#### 图示分析

Fsimage:NameNode内存中元数据「序列化」后形成的文件
Edits:记录客户端更新元数据信息的每一步操作,可通过Edits运算出元数据
#### 流程分析
1. 第一阶段:NameNode启动
1. 第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存
2. 客户端对元数据进行增删改的请求
3. NameNode记录操作日志,更新滚动日志
4. NameNode在内存中对数据进行增删改
2. 第二阶段:Secondary NameNode工作
1. Secondary NameNode询问NameNode是否需要CheckPoint,直接带回NameNode是否需要检查结果
2. Secondary NameNode请求执行CheckPoint
3. NameNode滚动正在写的Edits日志
4. 将滚动前的「编辑日志」和「镜像文件」拷贝到Secondary NameNode
5. Secondary NameNode加载编辑日志和镜像文件到内存进行合并
6. 生成新的镜像文件`fsimage.chkpoint`
7. 拷贝fsimage.chkpoint回到NameNode
8. NameNode将fsimage.chkpoint重新命名成`fsimage`
#### 机制分析
NameNode启动时,先滚动Edits并生成一个「空的」`edits.inprogress`,然后加载**Edits和Fsimage**到内存中,此时NameNode内存就持有「最新」的元数据信息
`Client`开始对NameNode发送元数据的增删改的请求,这些请求的操作**首先会被记录到edits.inprogress中**(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息,然后NameNode会在内存中执行元数据的增删改的操作
由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要定期对Edits和Fsimage进行合并(即将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage),「SecondaryNameNode」的作用就是**帮助NameNode进行Edits和Fsimage的合并工作**
SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,「定时时间到」和「Edits中数据写满」),直接带回NameNode是否需要检查结果
SecondaryNameNode执行CheckPoint操作,首先会让NameNode**滚动Edits并生成一个空的edits.inprogress**,滚动Edits的目的是**给Edits打个标记**,以后所有**新的操作都写入edits.inprogress**,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后「替换」掉原来的Fsimage
NameNode在启动时就只需要加载之前「未合并的Edits」和「Fsimage」即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中
### Fsimage和Edits解析
#### 概念
NameNode被格式化之后第一次运行,将在${HADOOP_HOME}/data/tmp/dfs/name/current目录中产生如下文件
```
总用量 1040
-rw-r--r--. 1 root root 1048576 5月 9 15:44 edits_inprogress_0000000000000000001
-rw-r--r--. 1 root root 351 5月 9 15:42 fsimage_0000000000000000000
-rw-r--r--. 1 root root 62 5月 9 15:42 fsimage_0000000000000000000.md5
-rw-r--r--. 1 root root 2 5月 9 15:44 seen_txid
-rw-r--r--. 1 root root 207 5月 9 15:42 VERSION
```
1. Fsimage文件:HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件inode的序列化信息
2. Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中
3. seen_txid文件:保存的是一个数字,就是最新的edits_inprogress_的数字,NN根据这个数字进行edits的合并
4. 每次NameNode启动的时候都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动的时候将Fsimage和Edits文件进行了合并
#### oiv查看Fsimage文件
##### 基本语法
```shell
hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后文件输出路径
```
##### 示例
将fsimage内容转换为xml格式内容并保存
```shell
hdfs oiv -p XML -i fsimage_0000000000000000002 -o /opt/fsimage.xml
```
##### 关键内容显示
```xml
16389
16385
DIRECTORY
1589010923785
root:supergroup:rwxr-xr-x
9223372036854775807
-1
16386
DIRECTORY
user
1589010923840
root:supergroup:rwxr-xr-x
-1
-1
16387
DIRECTORY
root
1589010923841
root:supergroup:rwxr-xr-x
-1
-1
16388
DIRECTORY
input
1589011446544
root:supergroup:rwxr-xr-x
-1
-1
16389
FILE
test3.txt
2
1589011447639
1589011446544
134217728
root:supergroup:rw-r--r--
1073741825
1001
10
```
#### oev查看Edits文件
##### 基本语法
```shell
hdfs oev -p 文件类型 -i 编辑日志 -o 转换后文件输出路径
```
##### 示例
```shell
hdfs oev -p XML -i edits_0000000000000000015-0000000000000000016 -o /opt/edits.xml
```
##### 关键内容显示
```xml
OP_START_LOG_SEGMENT
9
OP_ADD
10
0
16389
/user/root/input/test3.txt
2
1589011446544
1589011446544
134217728
DFSClient_NONMAPREDUCE_-1793387438_1
192.168.31.230
true
root
supergroup
420
4cc57333-8519-4b14-bb18-1026f917d971
1
OP_ALLOCATE_BLOCK_ID
11
1073741825
OP_SET_GENSTAMP_V2
12
1001
OP_ADD_BLOCK
13
/user/root/input/test3.txt
1073741825
0
1001
-2
OP_CLOSE
14
0
0
/user/root/input/test3.txt
2
1589011447639
1589011446544
134217728
false
1073741825
10
1001
root
supergroup
420
```
### CheckPoint时间设置
通常情况下,SecondaryNameNode每隔一小时执行一次
在hdfs-default.xml中
```xml
dfs.namenode.checkpoint.period
3600
```
在hdfs-site.xml中设置为1分钟检查1次,或当操作次数达到1,000,000时,SecondaryNameNode执行一次
```xml
dfs.namenode.checkpoint.txns
1000000
操作动作次数
dfs.namenode.checkpoint.check.period
60
1分钟检查一次操作次数
```
### NameNode故障处理
NameNode故障后,可以采用如下两种方法恢复数据
**方法一**:将SecondaryNameNode中数据拷贝到NameNode存储数据的目录
1. 结束NameNode进程
```shell
ps -aux | grep "namenode"
kill -9 11067
```
2. 删除NameNode存储的数据
```shell
rm -rf /opt/module/hadoop-2.7.2/data/tmp/dfs/name/*
```
3. 拷贝SecondaryNameNode中数据到原NameNode存储数据目录
```shell
scp -r root@hadoop197:/opt/module/hadoop-2.7.2/data/tmp/dfs/namesecondary/* name/
```
4. 重新启动NameNode
```shell
hadoop-daemon.sh start namenode
```
**方法二**:使用`-importCheckpoint`选项启动NameNode守护进程,从而将SecondaryNameNode中数据拷贝到NameNode目录中
1. 修改hdfs-site.xml
```xml
dfs.namenode.checkpoint.period
120
dfs.namenode.name.dir
/opt/module/hadoop-2.7.2/data/tmp/dfs/name
```
2. 结束NameNode进程
3. 删除NameNode存储的数据
```shell
rm -rf /opt/module/hadoop-2.7.2/data/tmp/dfs/name/*
```
4. 如果SecondaryNameNode不和NameNode在一个主机节点上,需要将SecondaryNameNode存储数据的目录拷贝到NameNode存储数据的「平级目录」,并删除`in_use.lock`文件
```
[root@hadoop198 dfs]# scp -r root@hadoop197:/opt/module/hadoop-2.7.2/data/tmp/dfs/namesecondary ./
[root@hadoop198 namesecondary]# rm -rf in_use.lock
```
5. 导入检查点数据
```shell
hdfs namenode -importCheckpoint
```
6. 启动NameNode
```shell
hadoop-daemon.sh start namenode
```
### 集群安全模式
#### 概述
**一、NameNode启动**
NameNode启动时,首先将镜像文件(Fsimage)载入内存,并执行编辑日志(Edits)中的各项操作。一旦在内存中成功**建立文件系统元数据的映像**,则创建一个「新的Fsimage文件」和一个「空的编辑日志」。在这过程期间,NameNode一直运行在「安全模式」,即NameNode的文件系统对于客户端来说是只读的。之后,NameNode开始监听DataNode请求
**二、DataNode启动**
系统中的数据块的位置并不是由NameNode维护的,而是以块列表的形式存储在DataNode中。在系统的正常操作期间,NameNode会在内存中保留所有块位置的映射信息。在安全模式下,**各个DataNode会向NameNode发送最新的块列表信息**,NameNode了解到足够多的块位置信息之后,即可高效运行文件系统
**三、安全模式退出判断**
如果满足“最小副本条件”,NameNode会在30秒钟之后就退出安全模式。所谓的最小副本条件指的是在整个文件系统中99.9%的块满足「最小副本级别」(默认值:`dfs.replication.min=1`)。在启动一个**刚刚格式化**的HDFS集群时,因为系统中还没有任何块,所以NameNode**不会进入安全模式**
#### 基本语法
集群处于安全模式,不能执行重要操作写操作,集群启动完成后,自动退出安全模式
1. 查看安全模式状态
```shell
hdfs dfsadmin -safemode get
```
2. 进入安全模式状态
```shell
hdfs dfsadmin -safemode enter
```
3. 离开安全模式状态
```shell
hdfs dfsadmin -safemode leave
```
4. 等待安全模式状态
```shell
hdfs dfsadmin -safemode wait
```
#### 示例
需求:模拟等待安全模式
1. 查看当前模式
```shell
hdfs dfsadmin -safemode get
```
2. 进入安全模式
```shell
hdfs dfsadmin -safemode enter
```
3. 创建并执行脚本
```shell
vim safemode.sh
```
```shell
#!/bin/bash
hdfs dfsadmin -safemode wait
hdfs dfs -put /opt/module/hadoop-2.7.2/README.txt /
```
```shell
chmod 755 safemode.sh
```
```shell
safemode.sh
```
4. 再打开一个窗口退出安全模式
```shell
hdfs dfsadmin -safemode leave
```
### NameNode多目录配置
作用:NameNode的本地目录可以配置成多个,且每个目录存放「内容相同」,增加了可靠性
编辑 hdfs-site.xml 文件
```xml
dfs.namenode.name.dir
file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2
```
## DataNode
### 工作机制
#### 图示分析

#### 流程分析
1. 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是「数据」本身,一个是「元数据」包括数据块的**长度,块数据的校验和以及时间戳**
2. DataNode启动后向NameNode注册,通过后将「周期性」(1小时)地向NameNode上报所有的块信息
3. 心跳是每3秒一次,心跳返回结果带有**NameNode给该DataNode的命令**如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用
4. 集群运行中可以安全加入和退出一些机器
### 数据完整性
1. 当DataNode读取Block的时候,它会计算CheckSum
2. 如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏
3. Client读取其他DataNode上的Block
4. DataNode在其文件创建后周期验证CheckSum

### 掉线时限参数设置
1. DataNode进程死亡或者网络故障造成DataNode无法与NameNode通信
2. NameNode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作「超时时长」
3. HDFS默认的超时时长为「10分钟 + 30秒」
4. 如果定义超时时间为TimeOut,则超时时长的计算公式为
```
TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
```
`dfs.namenode.heartbeat.recheck-interval` 默认为5分钟
`dfs.heartbeat.interval` 默认为3秒
**注意**:hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为「毫秒」,dfs.heartbeat.interval的单位为「秒」
```xml
dfs.namenode.heartbeat.recheck-interval
300000
dfs.heartbeat.interval
3
```
### 服役新数据节点
#### 需求
当数据量越来越大,原有的数据节点的容量已经不能满足存储数据的需求,需要在原有集群基础上「动态添加」新的数据节点
#### 环境准备
1. 再克隆一台hadoop195主机
2. 修改IP地址和主机名称
3. 删除原来HDFS文件系统留存的文件/data和/logs
#### 服役新节点
启动DataNode和NodeManager关联到集群
```shell
hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager
```
**注意**:若无法关联到集群中,出现以下错误
```
2020-05-09 19:05:28,223 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Block pool BP-1093355258-192.168.31.198-1589010135816 (Datanode Uuid null) service to hadoop198/192.168.31.198:9000 beginning handshake with NN
2020-05-09 19:05:28,228 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: Initialization failed for Block pool BP-1093355258-192.168.31.198-1589010135816 (Datanode Uuid null) service to hadoop198/192.168.31.198:9000 Datanode denied communication with namenode because hostname cannot be resolved (ip=192.168.31.195, hostname=192.168.31.195): DatanodeRegistration(0.0.0.0:50010, datanodeUuid=e562e511-d75b-471a-a3c6-734988654112, infoPort=50075, infoSecurePort=0, ipcPort=50020, storageInfo=lv=-56;cid=CID-cccf32b5-4ec3-458a-8c45-51f2493e0d00;nsid=2116542745;c=0)
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.registerDatanode(DatanodeManager.java:863)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.registerDatanode(FSNamesystem.java:4528)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.registerDatanode(NameNodeRpcServer.java:1285)
at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB.registerDatanode(DatanodeProtocolServerSideTranslatorPB.java:96)
at org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos$DatanodeProtocolService$2.callBlockingMethod(DatanodeProtocolProtos.java:28752)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
```
可以尝试以下二种解决方法
1. 在hosts中添加新服务器映射
```
192.168.31.195 hadoop195
```
2. 在hdfs-site.xml文件中添加
```xml
dfs.namenode.datanode.registration.ip-hostname-check
false
```
如果数据不均衡,可以用命令实现集群的再平衡
```shell
start-balancer.sh
```
#### 退役旧数据节点
##### 添加白名单
添加到白名单的主机节点,都允许访问NameNode,不在白名单的主机节点,都会被退出
1. 在NameNode的`/opt/module/hadoop-2.7.2/etc/hadoop`目录下创建`dfs.hosts`文件
2. 添加白名单主机
```
hadoop198
hadoop197
hadoop196
```
3. 在NameNode的`hdfs-site.xml`配置文件中增加`dfs.hosts`属性
```xml
dfs.hosts
/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts
```
4. 配置文件分发
```shell
xsync hdfs-site.xml
```
5. 刷新NameNode
```shell
hdfs dfsadmin -refreshNodes
```
6. 更新ResourceManager节点
```shell
yarn rmadmin -refreshNodes
```
7. 如果数据不均衡,可以用命令实现集群的再平衡
```shell
start-balancer.sh
```
##### 黑名单退役
在黑名单上面的主机都会被强制退出
1. 在NameNode的`/opt/module/hadoop-2.7.2/etc/hadoop`目录下创建`dfs.hosts.exclude`文件
2. 添加黑名单主机
```
hadoop105
```
3. 在NameNode的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性
```xml
dfs.hosts.exclude
/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude
```
4. 刷新NameNode、刷新ResourceManager
```shell
hdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
```
5. 检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点
6. 等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器
7. 停止DataNode和NodeManager
```shell
hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager
```
8. 如果数据不均衡,可以用命令实现集群的再平衡
```shell
start-balancer.sh
```
**注意**:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役,并且**不允许**白名单和黑名单中**同时出现同一个主机名称**
### Datanode多目录配置
DataNode也可以配置成多个目录,每个目录存储的「数据不一样」,数据不是副本
修改`hdfs-site.xml`配置
```xml
dfs.datanode.data.dir
file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2
```
## HDFS 2.X新特性
### 集群间数据拷贝
采用`distcp`命令实现两个Hadoop集群之间的递归数据复制
```shell
hadoop distcp hdfs://haoop198:9000/user/root/hello.txt hdfs://hadoop197:9000/user/root/hello.txt
```
### 小文件存档
#### HDFS存储小文件弊端
每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效。因为大量的小文件会耗尽NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量和数据块的大小无关。例如,一个1MB的文件设置为128MB的块存储,实际使用的是1MB的磁盘空间,而不是128MB
#### 解决存储小文件办法之一
HDFS存档文件或「HAR文件」,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的访问。具体说来,HDFS存档文件**对内还是一个一个独立文件**,**对NameNode而言却是一个整体**,减少了NameNode的内存
#### 示例
1. 启动YARN进程
```shell
start-yarn.sh
```
2. 归档文件,把/user/root/input目录里面的所有文件归档成一个叫`input.har`的归档文件,并把归档后文件存储到/user/root/output路径下
```shell
hadoop archive -archiveName input.har -p /user/root/input /user/root/output
```
3. 查看归档
```shell
hadoop fs -ls -R har:///user/root/output/input.har
```
4. 解归档文件
```shell
hadoop fs -cp har:///user/root/output/input.har/* .
```
### 回收站
开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用
#### 回收站参数设置及工作机制
##### 开启回收站功能参数说明
1. 默认值`fs.trash.interval=0`,`0`表示禁用回收站,其他值表示设置文件的「存活时间」
2. 默认值`fs.trash.checkpoint.interval=0`,检查回收站的间隔时间。如果该值为0,则该值设置和`fs.trash.interval`的参数值相等
3. 要求`fs.trash.checkpoint.interval <= fs.trash.interval`
##### 回收站工作机制

#### 启用回收站
修改`core-site.xml`,配置垃圾回收时间为1分钟
```xml
fs.trash.interval
1
```
#### 查看回收站
回收站在集群中的路径:`/user/root/.Trash/`
#### 修改访问垃圾回收站用户名称
进入垃圾回收站用户名称,默认是`dr.who`,修改为root用户,`core-site.xml`
```xml
hadoop.http.staticuser.user
root
```
**注意**:通过「程序」删除的文件**不会经过回收站**,需要调用`moveToTrash()`才进入回收站
```java
// 实例化回收站对象
Trash trash = new Trash(conf);
// 将指定路径文件移入回收站中
trash.moveToTrash(path);
```
#### 恢复回收站数据
```shell
hadoop fs -mv .Trash/Current/user/root/input input
```
#### 清空回收站
```shell
hadoop fs -expunge
```
## MapReduce 概述
### 定义
MapReduce是一个**分布式运算程序的编程框架**,是用户开发“基于Hadoop的数据分析应用”的核心框架
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
### 优缺点
#### 优点
1. **易于编程**,它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行,就是因为这个特点使得MapReduce编程变得非常流行
2. **良好的扩展性**,当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力
3. **高容错性**,MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成
4. 适合PB级以上**海量数据的离线处理**,可以实现上千台服务器集群并发工作,提供数据处理能力
#### 缺点
1. **不擅长实时计算**,无法像MySQL一样,在毫秒或者秒级内返回结果
2. **不擅长流式计算**,流式计算的输入数据是动态的,而MapReduce的输入数据集是**静态**的,不能动态变化,这是因为MapReduce自身的设计特点决定了数据源必须是静态的
3. **不擅长DAG(有向图)计算**,多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下
### 核心思想
1. MapReduce运算程序一般需要分成2个阶段:**Map阶段和Reduce阶段**
2. Map阶段的并发MapTask,完全**并行运行,互不相干**
3. Reduce阶段的并发ReduceTask,**完全互不相干**,但是他们的数据依赖于上一个阶段的所有**MapTask并发实例的输出**
4. MapReduce编程模型只能包含**一个Map阶段和一个Reduce阶段**,如果业务逻辑非常复杂,那就只能多个MapReduce程序,**串连**运行

### 进程
一个完整的MapReduce程序在分布式运行时有**三类实例进程**
1. **MrAppMaster**,责整个程序的**过程调度及状态协调**
2. **MapTask**,负责**Map阶段**的整个数据处理流程
3. **ReduceTask**,负责**Reduce阶段**的整个数据处理流程
### 常用数据序列化类型
| Java类型 | Hadoop Writable类型 |
| ---------- | ------------------- |
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | IntWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| **String** | **Text** |
| map | MapWritable |
| array | ArrayWritable |
### 编程规范
用户编写的程序分成三个部分:**Mapper、Reducer和Driver**
#### Mapper阶段
1. 自定义的Mapper要继承自己的父类Mapper
2. Mapper的输入数据是KV对的形式,KV的类型可自定义
3. Mapper中的业务逻辑主要写在map()方法中,还有setup()和cleanup()方法
4. Mapper的输出数据是KV对的形式,KV的类型可自定义
5. map()方法(MapTask进程)对每一个调用一次
#### Reducer阶段
1. 自定义的Reducer要继承自己的父类Reducer
2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
3. Reducer的业务逻辑主要写在reduce()方法中,还有setup()和cleanup()方法
4. ReduceTask进程对**每一相同k的组**的调用一次reduce()方法
#### Driver阶段
相当于YARN集群的**客户端**,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
### WordCount案例
#### pom.xml
```xml
junit
junit
4.12
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
```
#### log4j.properties
在项目的src/main/resources目录下,新建一个文件命名为`log4j.properties`
```properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
```
#### WordCountMapper
```java
// map阶段
// 1 继承Mapper,填入4个参数
// KEYIN:输入数据的key,偏移量
// VALUEIN:输入数据的value,值
// KEYOUT:输出数据key的类型
// VALUEOUT:输出数据value的类型
public class WordCountMapper extends Mapper {
Text k = new Text();
IntWritable v = new IntWritable(1); // 1
// 2 重写map方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key: 0, value: roger roger
// 3 获取一行数据:roger roger
String line = value.toString();
// 4 切割单词
String[] words = line.split(" ");
// 5 循环写出
for (String word : words) {
// roger
k.set(word);
context.write(k, v);
}
}
}
```
#### WordCountReducer
```java
// reduce阶段
// 1 继承Reducer,填入4个参数
// KEYIN, VALUEIN:map阶段的k和v
// KEYOUT, VALUEOUT:输出的内容形式 roger 2
public class WordCountReducer extends Reducer {
IntWritable v = new IntWritable();
// 2 重写reduce方法
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// roger 1
// roger 1
// 3 累加求和
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 4 写出
v.set(sum);
context.write(key, v);
}
}
```
#### WordCountDriver
```java
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"input/hello.txt", "output"};
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar存储位置
job.setJarByClass(WordCountDriver.class);
// 3 关联Map和Reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Map阶段输出数据的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交job
// job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
#### 集群测试
1. 将程序打成jar包,然后拷贝到Hadoop集群中
2. 启动Hadoop集群
3. 执行WordCount程序,其中`wc.WordCountDriver`为驱动类的全类名
```shell
hadoop jar hello_mapreduce.jar wc.WordCountDriver input/* output/test_mapred
```
## Hadoop 序列化
### 概述
「序列化」就是把**内存中的对象,转换成字节序列**(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输
「反序列化」就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,**转换成内存中的对象**
「Java的序列化」是一个「重量级」序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。故Hadoop开发了一套序列化机制「Writable」
### Hadoop序列化特点
1. 紧凑 :高效使用存储空间
2. 快速:读写数据的额外开销小
3. 可扩展:随着通信协议的升级而可升级
4. 互操作:支持多语言的交互
### 自定义bean对象实现Writable接口
#### 具体步骤
1. 必须实现Writable接口
```java
public class FlowBean implements Writable
```
2. 反序列化时,需要反射调用空参构造函数,所以必须有「空参构造」
```java
public FlowBean() {}
```
3. 重写「序列化方法」
```java
// 序列化方法
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
```
4. 重写「反序列化方法」
```java
// 反序列化方法
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
```
**注意**:反序列化的顺序和序列化的**顺序完全一致**
5. 重写`toString()`方法,用`\t`分开
```java
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
```
6. 如果需要将自定义的bean放在key中传输,则还需要实现`Comparable`接口,因为MapReduce中的`Shuffle`过程要求对**key必须能排序**
```java
public class FlowBean implements Writable, Comparable
```
```java
// 重写排序方式
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
```
#### 案例分析
##### 需求
统计每一个手机号耗费的总上行流量、下行流量、总流量
##### 思路分析
1. Map阶段
1. 读取一行数据,切分字段
2. 抽取手机号、上行流量、下行流量
3. 以手机号为key,bean对象为value输出
4. bean对象要想能够传输,必须实现序列化接口
2. Reduce阶段
累加上行流量和下行流量得到总流量
##### 具体实现
**FlowBean**
```java
public class FlowBean implements Writable, Comparable {
Long upFlow;
Long downFlow;
Long sumFlow;
public FlowBean() {
}
public FlowBean(Long upFlow, Long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
// 传入上下行流量算出总流量
public void set(Long downFlow, Long upFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = downFlow + upFlow;
}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getSumFlow() {
return sumFlow;
}
public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}
// 序列化方法
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
// 反序列化方法
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
// 重写排序方式
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
}
```
**FlowCountMapper**
```java
public class FlowCountMapper extends Mapper {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
// 7 13560436666 120.196.100.99 1116 954 200
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");
// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
Long upFlow = Long.parseLong(fields[fields.length - 3]);
Long downFlow = Long.parseLong(fields[fields.length - 2]);
// 封装对象
k.set(phoneNum);
v.set(downFlow, upFlow);
// 4 写出
context.write(k, v);
}
}
```
**FlowCountReducer**
```java
public class FlowCountReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}
// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
// 3 写出
context.write(key, resultBean);
}
}
```
**FlowCountDriver**
```java
public class FlowCountDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "input/phone_flow.txt", "output/output_flow" };
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
**输出结果**
```
13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548
```
## MapReduce 框架原理
### InputFormat数据输入
#### 切片与MapTask并行度决定机制
**数据块**:Block是HDFS「物理上」把数据分成一块一块
**数据切片**:数据切片只是在「逻辑上」对输入进行分片,并不会在磁盘上将其切分成片进行存储

#### FileInputFormat切片机制
##### 流程分析
1. 程序找到数据存储的目录
2. 开始遍历处理(规划切片)目录下的每一个文件
3. 遍历第一个文件时
1. `fs.sizeOf()`获取文件大小
2. 计算切片大小,默认情况下,`切片大小=blocksize`
```
computeSplitSize(Math.max(minSize, Math.min(maxSize,blocksize))) = blocksize = 128MB
```
3. 开始切片,形成第1个切片:split1 => 0 ~ 128MB,第2个切片split2 => 128 ~ 256MB,第3个切片split3 => 256 ~ 300MB(每次切片时,都要判断切完剩下的部分是否大于块的`1.1`倍,不大于`1.1`倍就划分一块切片)
4. 将切片信息写到一个「切片规划文件」中
5. 整个切片的核心过程在`getSplit()`方法中完成
6. InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
4. 提交切片规划文件到YARN上,YARN上的`MrAppMaster`就可以根据「切片规划文件」计算开启`MapTask`个数
##### 示例说明
**切片机制**
1. 简单地按照文件的内容长度进行切片
2. 切片大小,默认等于Block大小
3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
现输入数据有两个文件
1. file1.txt => 320MB
2. file2.txt => 10MB
经过`FileInputFormat`的切片机制运算后,形成的切片信息如下
1. file1.txt.split1 => 0 ~ 128MB
2. file1.txt.split2 => 128 ~ 256MB
3. file1.txt.split3 => 256 ~ 320MB
4. file2.txt.split1 => 0 ~ 10MB
##### 切片大小参数配置
**计算切片大小的公式**
```java
Math.max(minSize, Math.min(maxSize, blockSize));
```
其中参数
```
mapreduce.input.fileinputformat.split.minsize = 1 // 默认值为1
mapreduce.input.fileinputformat.split.maxsize = Long.MAXValue // 默认值Long.MAXValue
```
因此默认情况下,`切片大小 = blocksize`
**切片大小设置**
1. maxsize(切片最大值):参数调的比blockSize小,则让切片变小
2. minsize(切片最小值):参数调的比blockSize大,则让切片变大
**获取切片信息**
```java
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
```
#### CombineTextInputFormat切片机制
框架默认的`TextInputFormat`切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率低下
##### 应用场景
CombineTextInputFormat用于「小文件过多」的场景,它可以**将多个小文件从逻辑上规划到一个切片中**,这样,多个小文件就可以交给一个MapTask处理
##### 虚拟存储切片最大值设置
```java
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
```
**注意**:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
##### 流程分析
生成切片过程包括:「虚拟存储过程」和「切片过程」
1. 虚拟存储过程
将输入目录下所有文件大小,依次和设置的`setMaxInputSplitSize`值比较
1. 如果不大于设置的最大值,逻辑上划分一个块
2. 如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块
3. 当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件「均分」成2个虚拟存储块(防止出现太小切片)
例如`setMaxInputSplitSize`值为`4MB`,输入文件大小为`8.02MB`
1. 逻辑上分成一个`4MB`
2. 剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件
3. 因此将剩余的4.02M文件切分成`2.01MB`和`2.01MB`两个文件
2. 切片过程
1. 判断虚拟存储的文件大小是否大于`setMaxInputSplitSize`值,大于等于则单独形成一个切片
2. 如果「不大于」则跟下一个虚拟存储文件进行「合并」,**共同形成一个切片**
例如有4个小文件大小分别为`1.7M`、`5.1M`、`3.4M`和`6.8M`这4个小文件,则「虚拟存储过程」之后形成6个「文件块」,大小分别为:1.7M、2.55M、2.55M、3.4M、3.4M和3.4M
最终会形成3个「切片」,大小分别为(1.7 + 2.55) M,(2.55 + 3.4) M,(3.4 + 3.4) M
##### 案例演示
**需求**
将输入的大量小文件合并成一个切片统一处理
准备4个小文件,期望一个切片处理4个文件
##### 实验过程
不做任何处理,运行默认FileInputFormat的WordCount案例程序,观察切片个数为4
```
2020-05-11 14:48:09,084 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:4
```
将输入格式设置为`CombineTextInputFormat.class`
```java
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
// 虚拟存储切片最大值设置4mb
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
```
此时切片个数为1
```
2020-05-11 14:53:10,335 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1
```
#### FileInputFormat实现类
FileInputFormat常见的接口实现类包括:`TextInputFormat`、`KeyValueTextInputFormat`、`NLineInputFormat`、`CombineTextInputFormat`和`自定义InputFormat`等
##### TextInputFormat
TextInputFormat是「默认」的FileInputFormat实现类。按行读取每条记录,「键」是存储该行在整个文件中的「起始字节偏移量」, LongWritable 类型;「值」是这「行的内容」,**不包括任何行终止符(换行符和回车符)**,Text 类型
**示例分析**
一个分片包含了如下4条文本记录
```
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
```
每条记录表示为以下键值对
```
(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
```
##### KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key和value
可以通过在驱动类中设置`conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");`来设定分隔符
默认分隔符是tab(`\t`)
**示例分析**
一个包含4条记录的分片。其中——>表示一个制表符
```
line1——>Rich learning form
line2——>Intelligent learning engine
line3——>Learning more convenient
line4——>From the real demand for more close to the enterprise
```
每条记录表示为以下键值对
```
(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)
```
此时的「键」是每行排在分隔符之前的「Text」
##### NLineInputFormat
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分
即输入文件的 `总行数 / N = 切片数`,如果不整除则`切片数 = 商 + 1`
**示例分析**
如下4条文本记录为输入数据
```
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
```
若N是2,则每个输入分片包含2行,开启2个MapTask
```
(0,Rich learning form)
(19,Intelligent learning engine)
```
```
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
```
这里的键和值与`TextInputFormat`生成的一样
#### KeyValueTextInputFormat案例分析
##### 需求
统计输入文件中每一行的第一个单词相同的行数
##### 输入数据
```
roger
hello roger
hello mapreduce
roger
hello roger
hello mapreduce
```
##### 具体实现
**KVTextMapper**
```java
public class KVTextMapper extends Mapper {
// 1 设置value
IntWritable v = new IntWritable(1);
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// 2 写出
context.write(key, v);
}
}
```
**KVTextReducer**
```java
public class KVTextReducer extends Reducer {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 1 汇总统计
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
// 2 输出
context.write(key, v);
}
}
```
**KVTextDriver**
```java
public class KVTextDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/hello.txt", "output/kv_output"};
Configuration conf = new Configuration();
// 设置切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
// 1 获取job对象
Job job = Job.getInstance(conf);
// 2 设置jar包位置,关联mapper和reducer
job.setJarByClass(KVTextDriver.class);
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
// 3 设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 4 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 5 设置输入输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 6 设置输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
#### NLineInputFormat案例分析
##### 需求
对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片,要求每3行放入一个切片中
##### 输入数据
```
roger
hello roger
hello mapreduce
roger
hello roger
hello mapreduce
```
##### 具体实现
**NLineMapper**
```java
public class NLineMapper extends Mapper {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] splited = line.split(" ");
// 3 循环写出
for (int i = 0; i < splited.length; i++) {
k.set(splited[i]);
context.write(k, v);
}
}
}
```
**NLineReducer**
```java
public class NLineReducer extends Reducer {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 1 汇总
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
// 2 输出
context.write(key, v);
}
}
```
**NLineDriver**
```java
public class NLineDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"input/hello.txt", "output/nl_output"};
// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar包位置,关联mapper和reducer
job.setJarByClass(NLineDriver.class);
job.setMapperClass(NLineMapper.class);
job.setReducerClass(NLineReducer.class);
// 3 设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 4 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置每个切片InputSplit中划分3条记录
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 使用NLineInputFormat处理记录数
job.setInputFormatClass(NLineInputFormat.class);
// 5 设置输入输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
此时的切片数量为2
```
2020-05-11 16:02:04,571 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:2
```
#### 自定义InputFormat
Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题
##### 步骤
1. 自定义一个类继承`FileInputFormat`
2. 改写`RecordReader`,实现一次读取一个完整文件封装为KV
3. 在输出时使用`SequenceFileOutPutFormat`输出合并文件
##### 案例分析
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时就需要有相应解决方案,可以自定义InputFormat实现小文件的合并
**需求**
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为`文件路径 + 名称`为`key`,`文件内容`为`value`
**需求分析**
1. 自定义一个类继承`FileInputFormat`
1. 重写`isSplitable()`方法,返回`false`不可切割
2. 重写`createRecordReader()`,创建自定义的RecordReader对象并初始化返回
2. 改写`RecordReader`,实现一次读取一个完整文件封装为KV
1. 采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装到了value中
2. 获取文件路径信息和名称,并设置key
3. 设置Driver
```java
// 1 设置输入的inputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 2 设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
```
**具体实现**
**WholeFileInputFormat**
```java
// 1 定义类继承FileInputFormat
public class WholeFileInputFormat extends FileInputFormat {
// 2 重写isSplitable()方法,返回false不可切割
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
// 3 重写createRecordReader()
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(inputSplit,taskAttemptContext);
return reader;
}
}
```
**WholeFileRecordReader**
```java
// 继承RecordReader
public class WholeFileRecordReader extends RecordReader {
FileSplit split;
Configuration conf;
boolean isProgress = true;
Text k = new Text();
BytesWritable v = new BytesWritable();
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
split = (FileSplit) inputSplit;
conf = taskAttemptContext.getConfiguration();
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区
byte[] buffer = new byte[(int) split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath();
fs = path.getFileSystem(conf);
// 3 读取数据
fis = fs.open(path);
// 4 读取文件内容
IOUtils.readFully(fis, buffer, 0, buffer.length);
// 5 输出文件内容
v.set(buffer, 0, buffer.length);
// 6 获取文件路径及名称
String name = split.getPath().toString();
// 7 设置输出的key值
k.set(name);
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
isProgress = false;
return true;
}
return false;
}
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
}
}
```
**SequenceFileMapper**
```java
public class SequenceFileMapper extends Mapper {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
```
**SequenceFileReducer**
```java
public class SequenceFileReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
```
**SequenceFileDriver**
```java
public class SequenceFileDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/combine/*", "output/sequence_output"};
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包存储位置、关联自定义的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 3 设置map输出端的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 设置最终输出端的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 设置输入的inputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
### MapReduce工作流程
#### 图示分析


#### 流程分析
1. MapTask收集map()方法输出的kv对,放到内存缓冲区中
2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3. 多个溢出文件会被合并成大的溢出文件
4. 在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5. ReduceTask根据自己的分区号,去各个MapTask上取相应的结果分区数据
6. ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7. 合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
**注意**
`Shuffle`中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M
### Shuffle机制
#### 流程图分析
Map方法之后,Reduce方法之前的数据处理过程称之为`Shuffle`

#### Partition分区
要求将统计结果按照条件输出到不同文件中(分区)
##### 默认Partitioner分区
```java
public class HashPartitioner extends Partitioner {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
```
默认分区是根据key的hashCode对ReduceTasks个数「取模」得到的,无法控制哪个key存储到哪个分区
##### 自定义Partitioner步骤
1. 自定义类继承Partitioner,重写getPartition()方法
```java
public class CustomPartitioner extends Partitioner {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 控制分区代码逻辑
return partition;
}
}
```
2. 在Job驱动中,设置自定义Partitioner
```java
job.setPartitionerClass(CustomPartitioner.class);
```
3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
```java
job.setNumReduceTasks(5);
```
##### 分区总结
1. 如果`ReduceTask的数量 > getPartition的结果数`,则会多产生几个空的输出文件`part-r-000xx`
2. 如果`1 < ReduceTask的数量 < getPartition的结果数`,则有一部分分区数据无处安放,会报异常
3. 如果`ReduceTask的数量 = 1`,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生「1个」结果文件 `part-r-00000`
4. 分区号必须**从`0`开始,逐`1`累加**
**示例**
假设自定义分区数为5
1. `job.setNumReduceTasks(1);`:正常运行,只会产生一个输出文件
2. `job.setNumReduceTasks(2);`:报错
3. `job.setNumReduceTasks(6);`:大于5,程序正常运行,产生空文件
##### 案例分析
**需求**
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到1个文件中
在手机流量案例的基础上,增加「分区类」
**ProvincePartitioner**
```java
public class ProvincePartitioner extends Partitioner {
public int getPartition(Text key, FlowBean flowBean, int i) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
```
在驱动函数中增加「自定义数据分区设置」和「ReduceTask设置」
**FlowCountDriver**
```java
public class FlowCountDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"input/phone_flow.txt", "output/output_partitioner"};
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
#### WritableComparable排序
##### 概述
排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。**任何应用程序中的数据均会被排序**,而不管逻辑上是否需要
默认排序是按照字典顺序排序,且实现该排序的方法是「快速排序」
对于「MapTask」,它会将处理的结果暂时放到「环形缓冲区」中,当环形缓冲区使用率达到一定「阈值」后,再对缓冲区中的数据进行一次「快速排序」,并将这些有序数据「溢写」到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行「归并排序」
对于「ReduceTask」,它从每个MapTask上远程拷贝相应的数据文件
1. 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中
2. 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件
3. 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上
4. 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次「归并排序」
##### 分类
1. **部分排序**
MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序
2. **全排序**
最终输出结果只有一个文件,且文件内部有序,实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
3. **辅助排序(GroupingComparator分组)**
在Reduce端对key进行分组,应用于在接收的key为bean对象时,想让**一个或几个字段相同(全部字段比较不相同)**的key进入到同一个reduce方法时,可以采用分组排序
4. **二次排序**
在自定义排序过程中,如果`compareTo`中的判断条件为两个即为二次排序
##### 自定义排序
bean对象做为key传输,需要实现`WritableComparable`接口重写`compareTo`方法,就可以实现排序
##### 案例分析 - 全排序
**需求**
根据手机的总流量进行倒序排序
**具体实现**
**FlowBean**
实现WritableComparable接口,重写compareTo方法
```java
// 实现WritableComparable接口
public class FlowBean implements WritableComparable {
```
```java
// 重写排序方式
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return -Long.compare(this.sumFlow, o.getSumFlow());
}
```
**FlowCountSortMapper**
```java
public class FlowCountSortMapper extends Mapper {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
Long upFlow = Long.parseLong(fields[fields.length - 3]);
Long downFlow = Long.parseLong(fields[fields.length - 2]);
// 封装对象
k.set(upFlow, downFlow);
v.set(phoneNum);
context.write(k, v);
}
}
```
**FlowCountSortReducer**
```java
public class FlowCountSortReducer extends Reducer {
@Override
protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
// 循环写出,避免总流量相同
for (Text value : values) {
context.write(value, key);
}
}
}
```
**FlowCountSortDriver**
```java
public class FlowCountSortDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/phone_flow.txt", "output/output_fullsort"};
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
##### 案例分析 - 区内排序
**需求**
要求每个省份手机号输出的文件中按照总流量内部排序
**具体实现**
增加自定义分区类
**ProvincePartitioner**
```java
public class ProvincePartitioner extends Partitioner {
@Override
public int getPartition(FlowBean key, Text value, int i) {
// 1 获取电话号码的前三位
String preNum = value.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
} else if ("137".equals(preNum)) {
partition = 1;
} else if ("138".equals(preNum)) {
partition = 2;
} else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
```
在「驱动类」中添加分区类
```java
// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
// 设置Reducetask个数
job.setNumReduceTasks(5);
```
#### Combiner合并
##### 概述
Combiner是MR程序中Mapper和Reducer之外的一种组件
Combiner组件的父类就是`Reducer`
Combiner和Reducer的区别在于运行的位置
1. Combiner是在每一个**MapTask所在的节点**运行
2. Reducer是**接收全局所有Mapper的输出结果**
Combiner的意义就是对每一个MapTask的输出进行「局部汇总」,以减小网络传输量
Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该跟Reducer的输入kv类型要对应起来
##### 自定义Combiner
1. 自定义一个Combiner继承Reducer,重写reduce方法
2. 在Job驱动类中设置
```java
job.setCombinerClass(WordCountCombiner.class);
```
##### 案例分析
**需求**
统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能
**实现方案**
**方案一**:增加一个WordCountCombiner类继承Reducer
**方案二**:将WordCountReducer作为Combiner在WordCountDriver驱动类中指定
**实现方案一**
**WordCountCombiner**
```java
public class WordCountCombiner extends Reducer {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
```
**WordCountDriver**
```java
job.setCombinerClass(WordCountCombiner.class);
```
**实现方案二**
将WordCountReducer作为Combiner在WordCountDriver驱动类中指定
```java
job.setCombinerClass(WordCountReducer.class);
```
##### 效果对比
使用Combiner之前
```
Combine input records=0
Combine output records=0
```
使用Combiner之后
```
Combine input records=10
Combine output records=3
```
#### GroupingComparator分组
##### 概述
对Reduce阶段的数据根据某一个或几个字段进行「分组」
##### 案例分析
**需求**
有如下订单数据
| 订单id | 商品id | 成交金额 |
| ------- | ------ | -------- |
| 0000001 | Pdt_01 | 222.8 |
| | Pdt_02 | 33.8 |
| 0000002 | Pdt_03 | 522.8 |
| | Pdt_04 | 122.4 |
| | Pdt_05 | 722.4 |
| 0000003 | Pdt_06 | 232.8 |
| | Pdt_02 | 33.8 |
求出每一个订单中最贵的商品
**需求分析**
利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce
在Reduce端利用GroupingComparator将订单id相同的聚合成组,然后取第一个即是该订单中最贵商品
**具体实现**
**OrderBean**
```java
public class OrderBean implements WritableComparable {
Integer orderId;
Double price;
public OrderBean() {
}
public void set(Integer orderId, Double price) {
this.orderId = orderId;
this.price = price;
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderId + "\t" + price;
}
// 重写compareTo方法,先按照id升序排序,再按照price降序排序
@Override
public int compareTo(OrderBean o) {
int compare = Integer.compare(this.orderId, o.getOrderId());
if (compare == 0) {
compare = -Double.compare(this.price, o.getPrice());
}
return compare;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
orderId = in.readInt();
price = in.readDouble();
}
}
```
**OrderSortMapper**
```java
public class OrderSortMapper extends Mapper {
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
// 0000001 Pdt_01 222.8
String[] fields = line.split("\t");
int id = Integer.parseInt(fields[0]);
double price = Double.parseDouble(fields[2]);
k.set(id, price);
context.write(k, NullWritable.get());
}
}
```
**OrderSortGroupingComparator**
```java
public class OrderSortGroupingComparator extends WritableComparator {
public OrderSortGroupingComparator() {
super(OrderBean.class, true);
}
// 重写compare方法,当id相同时返回0,即当id相同时视为同一组
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean o1 = (OrderBean) a;
OrderBean o2 = (OrderBean) b;
return Integer.compare(o1.getOrderId(), o2.getOrderId());
}
}
```
**OrderSortReducer**
```java
public class OrderSortReducer extends Reducer {
@Override
protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException {
// 输出每个分组的第一个数据即为每个订单的最贵商品
context.write(key, NullWritable.get());
}
}
```
**OrderSortDriver**
```java
public class OrderSortDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/GroupingComparator.txt", "output/output_gc"};
// 1 获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包加载路径
job.setJarByClass(OrderSortDriver.class);
// 3 加载map/reduce类
job.setMapperClass(OrderSortMapper.class);
job.setReducerClass(OrderSortReducer.class);
// 4 设置map输出数据key和value类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出数据的key和value类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce端的分组
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
### MapTask工作机制
#### 图示分析

#### 流程分析
1. **Read阶段**:MapTask通过RecordReader,从输入InputSplit中解析出一个个key/value
2. **Map阶段**:该节点主要是将解析出的key/value交给map()函数处理,并产生一系列新的key/value
3. **Collect收集阶段**:在map()函数中,当数据处理完成后,一般会调用`OutputCollector.collect()`输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中
4. **Spill阶段**:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作
1. 利用「快速排序」算法对缓存区内的数据进行排序,排序方式是先按照分区编号Partition进行排序,然后按照key进行排序。这样经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序
2. 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件`output/spillN.out`(N表示当前溢写次数)中。如果设置了Combiner,则写入文件之前对每个分区中的数据进行一次「聚集」操作
3. 将分区数据的「元信息」写到内存索引数据结构`SpillRecord`中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件`output/spillN.out.index`中
5. **Combine阶段**:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
当所有数据处理完后,MapTask会将所有临时文件合并成一个「大文件」,并保存到文件`output/file.out`中,同时生成相应的「索引文件」`output/file.out.index`
在进行文件合并过程中,MapTask以「分区」为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并`io.sort.factor`(默认10)个文件,并**将产生的文件重新加入待合并列表中**,对文件排序后,重复以上过程,直到最终得到一个大文件
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
### ReduceTask工作机制
#### 图示分析

#### 流程分析
1. **Copy阶段**:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
2. **Merge阶段**:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多
3. **Sort阶段**:按照MapReduce语义,reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此ReduceTask只需对所有数据进行一次归并排序即可
4. **Reduce阶段**:reduce()函数将计算结果写到HDFS上
#### 设置ReduceTask并行度
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置
```java
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
```
若`ReduceTask = 0`,表示**没有Reduce阶段**,输出文件个数和Map个数一致
ReduceTask默认值就是`1`,所以输出文件个数为一个
如果数据分布不均匀,就有可能在Reduce阶段产生「数据倾斜」
ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
具体多少个ReduceTask,需要根据集群性能而定
### OutputFormat数据输出
#### FileOutputFormat实现类
##### TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用`toString()`方法把它们转换为字符串
##### SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩
#### 自定义OutputFormat
##### 使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat
##### 步骤
1. 自定义一个类继承FileOutputFormat
2. 改写RecordWriter,具体改写输出数据的方法write()
##### 案例分析
**需求**
过滤输入的log日志,包含google的网站输出到output/google.log,不包含google的网站输出到output/other.log
**输入数据**
```
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
```
**具体实现**
**FilterMapper**
```java
public class FilterMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
```
**FilterReducer**
```java
public class FilterReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
```
**FilterRecordWriter**
```java
public class FilterRecordWriter extends RecordWriter {
FSDataOutputStream googleOut = null;
FSDataOutputStream otherOut = null;
public FilterRecordWriter(TaskAttemptContext job) {
// 1 获取文件系统
FileSystem fs;
try {
fs = FileSystem.get(job.getConfiguration());
// 2 创建输出文件路径
Path googlePath = new Path("output/filter_/google.log");
Path otherPath = new Path("output/filter_/other.log");
// 3 创建输出流
googleOut = fs.create(googlePath);
otherOut = fs.create(otherPath);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException {
// 输出时进行换行操作
key.set(key.toString() + "\r\n");
// 判断是否包含google输出到不同文件
if (key.toString().contains("google")) {
googleOut.write(key.toString().getBytes());
} else {
otherOut.write(key.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 关闭资源
IOUtils.closeStream(googleOut);
IOUtils.closeStream(otherOut);
}
}
```
**FilterOutputFormat**
```java
public class FilterOutputFormat extends FileOutputFormat {
@Override
public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new FilterRecordWriter(taskAttemptContext);
}
}
```
### Join 应用
#### Reduce Join
「Map端」的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用「连接字段」作为key,其余部分和新加的标志作为value,最后进行输出
「Reduce端」的主要工作:在Reduce端以连接字段作为key的分组已经完成,在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并
##### 案例分析
将商品信息表中数据根据商品pid合并到订单数据表中
**输入数据**
**order.txt**
```
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
```
**pd.txt**
```
01 小米
02 华为
03 格力
```
**具体实现**
**TableBean**
```java
public class TableBean implements Writable {
String orderId;
String pId;
Integer amount;
String pName;
String flag;
public TableBean() {
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getpId() {
return pId;
}
public void setpId(String pId) {
this.pId = pId;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return orderId + "\t" + pName + "\t" + amount;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(pId);
out.writeInt(amount);
out.writeUTF(pName);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
orderId = in.readUTF();
pId = in.readUTF();
amount = in.readInt();
pName = in.readUTF();
flag = in.readUTF();
}
}
```
**TableMapper**
```java
public class TableMapper extends Mapper {
String name;
TableBean bean = new TableBean();
Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 1 获取输入文件切片
FileSplit split = (FileSplit) context.getInputSplit();
// 2 获取输入文件名称
name = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取输入数据
String line = value.toString();
// 2 不同文件分别处理
if (name.startsWith("order")) {// 订单表处理
// 2.1 切割
String[] fields = line.split("\t");
// 2.2 封装bean对象
bean.setOrderId(fields[0]);
bean.setpId(fields[1]);
bean.setAmount(Integer.parseInt(fields[2]));
bean.setpName("");
bean.setFlag("order");
k.set(fields[1]);
} else if (name.startsWith("pd")) {// 产品表处理
// 2.3 切割
String[] fields = line.split("\t");
// 2.4 封装bean对象
bean.setOrderId("");
bean.setpId(fields[0]);
bean.setAmount(0);
bean.setpName(fields[1]);
bean.setFlag("pd");
k.set(fields[0]);
}
// 3 写出
context.write(k, bean);
}
}
```
**TableReducer**
```java
public class TableReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 1准备存储订单的集合
List orderBeans = new ArrayList<>();
// 2 准备bean对象
TableBean pdBean = new TableBean();
for (TableBean bean : values) {
if ("order".equals(bean.getFlag())) {
// 订单表
// 拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
orderBeans.add(orderBean);
} else if ("pd".equals(bean.getFlag())) {
// 产品表
try {
// 拷贝传递过来的产品表到内存中
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 3 表的拼接
for (TableBean bean : orderBeans) {
bean.setpName(pdBean.getpName());
// 4 数据写出去
context.write(bean, NullWritable.get());
}
}
}
```
**TableDriver**
```java
public class TableDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/table", "output/output_table"};
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在本地路径
job.setJarByClass(TableDriver.class);
// 3 指定本业务job要使用的Mapper/Reducer业务类
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 4 指定Mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
##### 缺点
这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生「数据倾斜」
#### Map Join
##### 使用场景
Map Join适用于一张表十分小、一张表很大的场景
##### 优点
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜
##### 案例分析
**CacheMapper**
```java
public class CacheMapper extends Mapper {
Map pdMap = new HashMap<>();
Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 1 获取缓存的文件
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
// 2 切割
String[] fields = line.split("\t");
// 3 缓存数据到集合
pdMap.put(fields[0], fields[1]);
}
// 4 关流
reader.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 截取
String[] fields = line.split("\t");
// 3 获取产品id
String pId = fields[1];
// 4 获取商品名称
String pdName = pdMap.get(pId);
// 5 拼接
k.set(line + "\t"+ pdName);
// 6 写出
context.write(k, NullWritable.get());
}
}
```
**CacheDriver**
```java
public class CacheDriver {
public static void main(String[] args) throws Exception {
args = new String[]{"input/table/order.txt", "output/output_cache"};
// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置加载jar包路径
job.setJarByClass(CacheDriver.class);
// 3 关联map
job.setMapperClass(CacheMapper.class);
// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///Users/roger/IdeaProjects/hadoop_learning/input/table/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
### ETL 数据清洗
#### 概述
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序
#### 案例分析
##### 需求
对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据都是合法的数据
##### 具体实现
**LogBean**
```java
public class LogBean {
private String remote_addr; // 记录客户端的ip地址
private String remote_user; // 记录客户端用户名称,忽略属性"-"
private String time_local; // 记录访问时间与时区
private String request; // 记录请求的url与http协议
private String status; // 记录请求状态;成功是200
private String body_bytes_sent; // 记录发送给客户端文件主体内容大小
private String http_referer; // 用来记录从那个页面链接访问过来的
private String http_user_agent; // 记录客户浏览器的相关信息
private boolean valid = true; // 判断数据是否合法
public LogBean() {
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\t").append(this.remote_addr);
sb.append("\t").append(this.remote_user);
sb.append("\t").append(this.time_local);
sb.append("\t").append(this.request);
sb.append("\t").append(this.status);
sb.append("\t").append(this.body_bytes_sent);
sb.append("\t").append(this.http_referer);
sb.append("\t").append(this.http_user_agent);
return sb.toString();
}
}
```
**LogMapper**
```java
public class LogMapper extends Mapper {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行
String line = value.toString();
// 2 解析日志是否合法
LogBean bean = parseLog(line);
if (!bean.isValid()) {
// 系统计数器统计不合法记录的个数
context.getCounter("map", "false").increment(1);
return;
}
// 统计合法记录的个数
context.getCounter("map", "true").increment(1);
k.set(bean.toString());
// 3 输出
context.write(k, NullWritable.get());
}
// 解析日志
private LogBean parseLog(String line) {
LogBean logBean = new LogBean();
// 1 截取
String[] fields = line.split(" ");
if (fields.length > 11) {
// 60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
// 2 封装数据
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);
if (fields.length > 12) {
logBean.setHttp_user_agent(fields[11] + " " + fields[12]);
} else {
logBean.setHttp_user_agent(fields[11]);
}
// 大于400,HTTP错误
if (Integer.parseInt(logBean.getStatus()) >= 400) {
logBean.setValid(false);
}
} else {
logBean.setValid(false);
}
return logBean;
}
}
```
**LogDriver**
```java
public class LogDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/web.log", "output/output_log"};
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(LogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0,取消reduce端
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
其中「系统计数器」输出的结果为
```
map
false=1084
true=13535
```
Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量
### 开发小结
#### 输入数据接口
`InputFormat`
1. 默认使用的实现类是`TextInputFormat `
2. TextInputFormat的功能逻辑是一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
3. `KeyValueTextInputFormat`每一行均为一条记录,被「分隔符」分割为key/value。默认分隔符是`\t`
4. `NLineInputFormat`按照指定的行数N来划分切片
5. `CombineTextInputFormat`可以把多个小文件合并成一个切片处理,提高处理效率
6. `自定义InputFormat`
#### 逻辑处理接口 Mapper
`Mapper`
根据业务需求实现其中三个方法:`map()`、`setup()`、`cleanup() `
#### 分区
`Partitioner`
1. 有默认实现`HashPartitioner`,逻辑是根据`key`的哈希值和`numReduces`来返回一个分区号;`key.hashCode() & Integer.MAXVALUE % numReduces`
2. 自定义分区
#### 排序
`Comparable`
1. 使用自定义的对象作为`key`来输出时,就必须要实现`WritableComparable`接口,重写其中的`compareTo()`方法
2. 部分排序:对最终输出的每一个文件进行内部排序
3. 全排序:对所有数据进行排序,通常只有一个Reduce
4. 二次排序:排序的条件有两个
#### 合并
`Combiner`
Combiner合并可以提高程序执行效率,减少IO传输,但使用时必须不能影响原有的业务处理结果
#### 分组
`GroupingComparator`
在Reduce端对key进行分组。应用于在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
#### 逻辑处理接口 Reducer
`Reducer`
根据业务需求实现其中三个方法:`reduce()`、`setup()`、`cleanup()`
#### 输出数据接口
`OutputFormat`
1. 默认实现类是`TextOutputFormat`,功能逻辑是将每一个KV对,向目标文本文件输出一行
2. 将`SequenceFileOutputFormat`输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩
3. `自定义OutputFormat`
## Hadoop数据压缩
### 概述
压缩技术能够有效减少底层存储系统(HDFS)读写字节数,压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时,I/O操作、网络数据传输、 Shuffle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要
磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。可以在任意MapReduce阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价
压缩是提高Hadoop运行效率的一种「优化策略」
通过对Mapper、Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度
**注意**:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能
**压缩「基本原则」**
1. 运算密集型的job,少用压缩
2. IO密集型的job,多用压缩
### 压缩编码
| 压缩格式 | 是否自带 | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原来的程序是否需要修改 |
| -------- | -------- | ------- | ---------- | ---------- | -------------------------------------- |
| DEFLATE | 是 | DEFLATE | .deflate | 否 | 不需要修改 |
| Gzip | 是 | DEFLATE | .gz | 否 | 不需要修改 |
| bzip2 | 是 | bzip2 | .bz2 | 是 | 不需要修改 |
| LZO | 否 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
| Snappy | 否 | Snappy | .snappy | 否 | 不需要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
| 压缩格式 | 对应的编码/解码器 |
| -------- | ------------------------------------------ |
| DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
| gzip | org.apache.hadoop.io.compress.GzipCodec |
| bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
| LZO | com.hadoop.compression.lzo.LzopCodec |
| Snappy | org.apache.hadoop.io.compress.SnappyCodec |
### 压缩方式
#### Gzip压缩
**优点**
1. 压缩率比较高,而且压缩/解压速度也比较快
2. Hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样
3. 大部分Linux系统都自带Gzip命令,使用方便
**缺点**
不支持切片
**应用场景**
当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件
#### Bzip2压缩
**优点**
1. 支持切片
2. 具有很高的压缩率,比Gzip压缩率都高
3. Hadoop本身自带,使用方便
**缺点**
压缩/解压速度慢
**应用场景**
1. 对速度要求不高,但需要较高的压缩率的时候
2. 输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况
3. 对单个很大的文本文件想压缩减少存储空间,同时又需要支持切片,而且兼容之前的应用程序的情况
#### Lzo压缩
**优点**
1. 压缩/解压速度也比较快,合理的压缩率
2. 支持切片,是Hadoop中最流行的压缩格式
3. 可以在Linux系统下安装lzop命令,使用方便
**缺点**
1. 压缩率比Gzip要低一些
2. Hadoop本身不支持,需要安装
3. 在应用中对Lzo格式的文件需要做一些特殊处理(为了支持切片需要建索引,还需要指定InputFormat为Lzo格式)
**应用场景**
一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,LZO优点越越明显
#### Snappy压缩
**优点**
高速压缩速度和合理的压缩率
**缺点**
1. 不支持切片
2. 压缩率比Gzip要低
3. Hadoop本身不支持,需要安装
**应用场景**
1. 当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式
2. 作为一个MapReduce作业的输出和另外一个MapReduce作业的输入
### 压缩位置
压缩可以在MapReduce作用的任意阶段启用
#### 输入端采用压缩
在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。**无须显示指定使用的编解码方式**,Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。否则,Hadoop就不会使用任何编解码器
#### Mapper输出采用压缩
当Map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据「Shuffle过程」,而Shuffle过程在Hadoop处理过程中是资源消耗最多的环节。如果发现**数据量大造成网络传输缓慢**,应该考虑使用压缩技术。可用于压缩Mapper输出的快速编解码器包括「LZO」或者「Snappy」
**注意**:LZO是供Hadoop压缩数据用的通用压缩编解码器。其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,而不是压缩率。与Gzip编解码器相比,它的压缩速度是Gzip的5倍,而解压速度是Gzip的2倍。同一个文件用LZO压缩后比用Gzip压缩后大50%,但比压缩前小25%~50%。这对改善性能非常有利,Map阶段完成时间快4倍
#### Reducer输出采用压缩
在此阶段启用压缩技术能够**减少要存储的数据量**,因此降低所需的磁盘空间。当MapReduce作业形成作业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效
### 压缩参数
| 参数 | 默认值 | 阶段 | 建议 | 位置 |
| ------------------------------------------------ | ------------------------------------------------------------ | ----------- | -------------------------------------------- | --------------- |
| io.compression.codecs | org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 | core-site.xml |
| mapreduce.map.output.compress | false | mapper输出 | 这个参数设为true启用压缩 | mapred-site.xml |
| mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 多使用LZO或Snappy编解码器在此阶段压缩数据 | mapred-site.xml |
| mapreduce.output.fileoutputformat.compress | false | reducer输出 | 这个参数设为true启用压缩 | mapred-site.xml |
| mapreduce.output.fileoutputformat.compress.codec | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 | mapred-site.xml |
| mapreduce.output.fileoutputformat.compress.type | RECORD | reducer输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK | mapred-site.xml |
### 实际操作
#### 数据流的压缩和解压缩
`CompressionCodec`有两个方法可以用于轻松地压缩或解压缩数据
要想对正在被写入一个输出流的数据进行压缩,我们可以使用`createOutputStream(OutputStreamout)`方法创建一个`CompressionOutputStream`,将其以压缩格式写入底层的流
相反,要想对从输入流读取而来的数据进行解压缩,则调用`createInputStream(InputStreamin)`函数,从而获得一个`CompressionInputStream`,从而从底层的流读取未压缩的数据
##### 压缩方法
```java
// 1 压缩
private static void compress(String filename, String method) throws Exception {
// 1 获取输入流
FileInputStream fis = new FileInputStream(new File(filename));
Class codecClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
// 2 获取输出流
FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);
// 3 流的对拷
IOUtils.copyBytes(fis, cos, 1024 * 1024, false);
// 4 关闭资源
cos.close();
fis.close();
}
```
##### 解压方法
```java
// 2 解压缩
private static void decompress(String filename) throws Exception {
// 1 校验是否能解压缩
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));
if (codec == null) {
System.out.println("非压缩格式: " + filename);
return;
}
// 2 获取输入流
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
// 4 流的对拷
IOUtils.copyBytes(cis, fos, 1024 * 1024, false);
// 5 关闭资源
fos.close();
cis.close();
}
```
##### 方法调用
```java
public static void main(String[] args) throws Exception {
compress("input/hello.txt", "org.apache.hadoop.io.compress.BZip2Codec");
// compress("input/hello.txt", "org.apache.hadoop.io.compress.GzipCodec");
// compress("input/hello.txt", "org.apache.hadoop.io.compress.DefaultCodec");
decompress("input/hello.txt.bz2");
}
```
#### Map输出端采用压缩
即使MapReduce的输入输出文件都是未压缩的文件,仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能
```java
// 1 获取job对象
Configuration conf = new Configuration();
// 开启map端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(conf);
```
#### Reduce输出端采用压缩
```java
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
```
## Yarn资源调度器
### 概述
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序
### 基本架构
YARN主要由`ResourceManager`、`NodeManager`、`ApplicationMaster`和`Container`等组件构成

#### ResourceManager
1. 处理客户端请求
2. 监控NodeManager
3. 启动或监控ApplicationMaster
4. 资源的分配与调度
#### NodeManager
1. 管理单个节点上的资源
2. 处理来自ResourceManager的命令
3. 处理来自ApplicationMaster的命令
#### ApplicationMaster
1. 负责数据的切分
2. 为应用程序申请资源并分配给内部的任务
3. 任务的监控与容错
#### Container
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等
### 工作机制
#### 图示分析

#### 流程分析
1. MR程序提交到客户端所在的节点
2. YarnRunner向ResourceManager申请一个Application
3. RM将该应用程序的资源路径返回给YarnRunner
4. 该程序将运行所需资源提交到HDFS上
5. 程序资源提交完毕后,申请运行mrAppMaster
6. RM将用户的请求初始化成一个Task
7. 其中一个NodeManager领取到Task任务
8. 该NodeManager创建容器Container,并产生MRAppmaster
9. Container从HDFS上拷贝资源到本地
10. MRAppmaster向RM 申请运行MapTask资源
11. RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
12. MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序
13. MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
14. ReduceTask向MapTask获取相应分区的数据
15. 程序运行完毕后,MR会向RM申请注销自己
### 作业提交过程
1. **阶段一:作业提交**
1. Client调用`job.waitForCompletion`方法,向整个集群提交MapReduce作业
2. Client向RM申请一个「作业id」
3. RM给Client返回该job资源的「提交路径」和「作业id」
4. Client提交jar包、切片信息和配置文件到指定的资源提交路径
5. Client提交完资源后,向RM申请运行`MrAppMaster`
2. **阶段二:作业初始化**
1. 当RM收到Client的请求后,将该job添加到容量调度器中
2. 某一个空闲的NM领取到该Job
3. 该NM创建Container,并产生`MRAppmaster`
4. 下载Client提交的资源到本地
3. **阶段三:任务分配**
1. `MrAppMaster`向RM申请运行多个MapTask任务资源
2. RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
4. **阶段四:任务运行**
1. MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序
2. MrAppMaster等待所有MapTask运行完毕后,向RM申请容器运行ReduceTask
3. ReduceTask向MapTask获取相应分区的数据
4. 程序运行完毕后,MR会向RM申请注销自己
5. **阶段五:进度和状态更新**
YARN中的任务将其进度和状态(包括counter)返回给应用管理器,客户端每秒(通过`mapreduce.client.progressmonitor.pollinterval`设置)向应用管理器请求进度更新,展示给用户
6. **阶段六:作业完成**
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用`waitForCompletion()`来检查作业是否完成。时间间隔可以通过`mapreduce.client.completion.pollinterval`来设置。作业完成之后,应用管理器和Container会清理工作状态。作业的信息会被作业「历史服务器」存储以备之后用户核查
### 资源调度器
Hadoop作业调度器主要有三种:`FIFO`、`Capacity Scheduler`和`Fair Scheduler`
Hadoop2.7.2默认的资源调度器是Capacity Scheduler,`yarn-default.xml`
```xml
The class to use as the resource scheduler.
yarn.resourcemanager.scheduler.class
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
```
#### FIFO
先进先出调度器,按照到达时间排序,先到先服务,无并发度

#### Capacity Scheduler
容量调度器,按照到达时间排序,先到先服务
1. 支持多个队列,每个队列可配置一定的资源量,**每个队列采用FIFO调度策略**
2. 为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定
3. 计算每个队列中**正在运行的任务数**与其分得的**计算资源**之间的比值,选择一个该**比值最小**的队列,即**最闲**的队列
4. 按照作业优先级和提交时间顺序,同时考虑用户资源量限制和内存限制对队列内任务排序
5. 多个队列同时按照任务的先后顺序依次执行,排在各个队列最前面,先运行,也是并行运行

#### Fair Scheduler
公平调度器,按照缺额排序,缺额大者优先
1. 支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源
2. 每个队列中的job按照优先级分配资源,优先级越高分配的资源越多,但是每个 job 都会分配到资源以确保公平
3. 在资源有限的情况下,每个job理想情况下获得的计算资源与实际获得的计算资源存在一种差距,这个差距就叫做**缺额**
4. 在同一个队列中,job的资源缺额越大,越先获得资源优先执行,作业是按照缺额的高低来先后执行的,故同一个队列中存在任务并发执行

### 任务推测执行
#### 作业完成时间
取决于最慢的任务完成时间,一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢
#### 推测执行机制
若某个任务运行速度远慢于任务平均速度。为该任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果
#### 执行推测任务前提条件
1. 每个Task只能有一个备份任务
2. 当前Job已完成的Task必须不小于5%
3. 开启推测执行参数设置,mapred-site.xml文件中默认是**打开**的
```xml
mapreduce.map.speculative
true
If true, then multiple instances of some map tasks may be executed in parallel.
mapreduce.reduce.speculative
true
If true, then multiple instances of some reduce tasks may be executed in parallel.
```
#### 不能启用推测执行机制情况
1. 任务间存在严重的负载倾斜
2. 特殊任务,如任务向数据库中写数据
#### 推测执行算法原理
假设某一时刻
1. 任务T的执行「进度」为**progress**
2. 任务「开始时间」为**taskStartTime**
3. 「当前时间」为**currentTimestamp**
4. 其他任务「完成的平均时间」为**averageRunTime**
则可推测出该任务的「运行时间」为**estimatedRunTime**,「完成时刻」为**estimateEndTime**
另一方面,如果此时为该任务启动一个备份任务,则可推断出它可能的完成时刻为**estimateEndTime'**
**算法**
1. `estimatedRunTime = (currentTimestamp - taskStartTime) / progress`
2. `estimateEndTime = estimatedRunTime + taskStartTime`
3. `estimateEndTime' = currentTimestamp + averageRunTime`
**原则**
1. MR总是选择`(estimateEndTime - estimateEndTime')`差值最大的任务,并为之启动备份任务
2. 为了防止大量任务同时启动备份任务造成的资源浪费,MR为每个作业设置了同时启动的备份任务数目上限
3. 推测执行机制实际上采用了经典的优化算法:以空间换时间,它同时启动多个相同任务处理相同的数据,并让这些任务竞争以缩短数据处理时间
4. 这种方法需要占用更多的计算资源,在集群资源紧缺的情况下,应合理使用该机制,争取在使用少量资源的情况下,减少作业的计算时间
## Hadoop优化
### MapReduce 运行时间长的原因
#### 计算机性能
CPU、内存、磁盘、网络
#### I/O 操作
1. 数据倾斜
2. Map和Reduce数设置不合理
3. Map阶段或Shuffle运行时间太长,导致Reduce等待过久
4. 小文件过多
5. 大量的不可分块的超大文件
6. Spill次数过多
7. Merge次数过多
### MapReduce优化方法
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数
#### 数据输入
**合并小文件**,在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢,采用`CombineTextInputFormat`来作为输入,解决输入端大量小文件场景
#### Map阶段
1. 减少溢写次数,通过调整`io.sort.mb`(环形缓冲区大小)及`sort.spill.percent`(溢写阈值),增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO
2. 减少合并次数,通过调整`io.sort.factor`(合并文件数量),增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间
3. 在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少 I/O
#### Reduce阶段
1. 合理设置Map和Reduce数,两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误
2. 设置Map、Reduce共存,调整`slowstart.completedmaps`参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间
3. 规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗,同时减少Shuffle时间
4. 合理设置Reduce端的Buffer,默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得**Buffer中的一部分数据可以直接输送到Reduce**,从而减少IO开销。`mapreduce.reduce.input.buffer.percent`,默认为0.0,当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整
#### IO传输
1. 采用**数据压缩**的方式,减少网络IO的的时间,可安装Snappy和LZO压缩编码器
2. 使用SequenceFile二进制文件
#### 数据倾斜
**数据倾斜现象**
1. 数据频率倾斜——某一个区域的**数据量**要远远大于其他区域
2. 数据大小倾斜——部分记录的**大小**远远大于平均值
**减少数据倾斜的方法**
1. 抽样和范围分区,通过对原始数据进行抽样得到的结果集来预设分区边界值
2. 自定义分区,基于输出键的背景知识进行自定义分区。例如,Map输出键的单词来源于一本书,且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Reduce实例,而将其他的都发送给剩余的Reduce实例
3. Combine,使用Combine可以大量地减小数据倾斜,目的就是聚合并精简数据
4. 采用Map Join,尽量避免Reduce Join
#### 参数调优
### HDFS小文件优化方法
#### HDFS小文件弊端
HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢
#### 解决方案
1. 在**数据采集的时候**,就将小文件或小批数据合成大文件再上传HDFS
2. 在**业务处理之前**,在HDFS上使用MapReduce程序对小文件进行合并
3. 在MapReduce处理时,可采用**CombineTextInputFormat**提高效率
#### 具体实施
1. Hadoop Archive,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用
2. Sequence File,由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件
3. CombineFileInputFormat,是一种InputFormat,用于将多个文件合并成一个单独的Split,另外,它会考虑数据的存储位置
4. 开启JVM重用,对于大量小文件Job,可以开启JVM重用会减少45%运行时间,一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map
## MapReduce扩展案例
### 多Job串联案例
#### 第一次处理
**OneIndexMapper**
```java
public class OneIndexMapper extends Mapper {
String fileName;
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获取文件切片
FileSplit split = (FileSplit) context.getInputSplit();
// 获取文件名称
fileName = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行
String line = value.toString();
// 分割
String[] fields = line.split(" ");
// 循环写出
for (String word : fields) {
k.set(word + "--" + fileName);
context.write(k, v);
}
}
}
```
**OneIndexReducer**
```java
public class OneIndexReducer extends Reducer {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 累加求和
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 封装对象
v.set(sum);
// 写出
context.write(key, v);
}
}
```
**OneIndexDriver**
```java
public class OneIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"input/input_combine", "output/output_oneindex"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OneIndexDriver.class);
job.setMapperClass(OneIndexMapper.class);
job.setReducerClass(OneIndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
#### 第二次处理
**TwoIndexMapper**
```java
public class TwoIndexMapper extends Mapper {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 样本数据:hello--hello.txt 2
// 获取一行
String line = value.toString();
// 分割
String[] fields = line.split("--");
// 封装对象
k.set(fields[0]);
v.set(fields[1]);
// 写出
context.write(k, v);
}
}
```
**TwoIndexReducer**
```java
public class TwoIndexReducer extends Reducer {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 样本数据:hello,hello.txt 2
// 循环拼接
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value.toString().replace("\t", "-->") + "\t");
}
// 封装对象
v.set(sb.toString());
// 写出
context.write(key, v);
}
}
```
**TwoIndexDriver**
```java
public class TwoIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"output/output_oneindex/part-r-00000", "output/output_twoindex"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TwoIndexDriver.class);
job.setMapperClass(TwoIndexMapper.class);
job.setReducerClass(TwoIndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
### TopN案例
#### 需求
输出流量使用量在前10的用户信息
#### 具体实现
**FlowTopNBean**
```java
public class FlowTopNBean implements WritableComparable {
Long upFlow;
Long downFlow;
Long sumFlow;
public FlowTopNBean() {
}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getSumFlow() {
return sumFlow;
}
public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowTopNBean o) {
// 由于TreeMap判断相同的标准即为key对象的对比返回值,若对比返回值为0即视为同一对象
// 此时若只根据总流量作为对比字段,很容易丢失数据,故进行二次排序
int compare = -Long.compare(this.sumFlow, o.getSumFlow());
if (compare == 0) {
compare = -Long.compare(this.downFlow, o.getDownFlow());
}
return compare;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
public void set(String upFlow, String downFlow) {
this.upFlow = Long.parseLong(upFlow);
this.downFlow = Long.parseLong(downFlow);
this.sumFlow = this.upFlow + this.downFlow;
}
}
```
**FlowTopNMapper**
```java
public class FlowTopNMapper extends Mapper {
// 由于TreeMap判断相同的标准即为key对象的对比返回值,若对比返回值为0即视为同一对象
// 此时若只根据总流量作为对比字段,很容易丢失数据,故进行二次排序
TreeMap map = new TreeMap<>();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 13736230513 192.196.100.1 www.google.com 2481 24681 200
String line = value.toString();
String[] fields = line.split("\t");
String phoneNum = fields[1];
String upFlow = fields[fields.length - 3];
String downFlow = fields[fields.length - 2];
Text v = new Text();
FlowTopNBean k = new FlowTopNBean();
v.set(phoneNum);
k.set(upFlow, downFlow);
map.put(k, v);
if (map.size() > 10) {
map.remove(map.lastKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Set set = map.keySet();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
FlowTopNBean k = iterator.next();
Text v = map.get(k);
context.write(k, v);
}
}
}
```
**FlowTopNReducer**
```java
public class FlowTopNReducer extends Reducer {
@Override
protected void reduce(FlowTopNBean key, Iterable values, Context context) throws IOException, InterruptedException {
// 由于使用了TreeMap,故key-value一定唯一,只需直接写出
context.write(values.iterator().next(), key);
}
}
```
**FlowTopNDriver**
```java
public class FlowTopNDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"input/phone_flow.txt", "output/output_topn"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowTopNDriver.class);
job.setMapperClass(FlowTopNMapper.class);
job.setReducerClass(FlowTopNReducer.class);
job.setMapOutputKeyClass(FlowTopNBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowTopNBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
### 共同追随者案例
#### 需求
以下是追随者列表数据,冒号前是一个用户,冒号后是该用户的所有追随者
求出哪些人两两之间有共同追随者,及他俩的共同追随者都有谁
#### 输入数据
```txt
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
```
#### 实现
**第一次处理,找出所有k追随的人**
**FollowersMapper**
```java
public class FollowersMapper extends Mapper {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A:B,C,D,F,E,O
// 获取一行
String line = value.toString();
// 分割
String[] person_followers = line.split(":");
// 获取person和follower
String person = person_followers[0];
String followersStr = person_followers[1];
// 封装person
v.set(person);
// 分割followers
String[] followers = followersStr.split(",");
// 封装followers并循环写出person和followers
for (String follower : followers) {
k.set(follower);
// 由原来的a<-b,变成b->a,则此时k追随v,汇总k即可找出所有k追随的人
context.write(k, v);
}
}
}
```
**FollowersReducer**
```java
public class FollowersReducer extends Reducer {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 汇总拼接所有k追随的人
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value.toString() + ",");
}
// 封装k追随的人们
v.set(sb.toString());
// 写出k和k追随的人们
context.write(key, v);
}
}
```
**FollowersDriver**
```java
public class FollowersDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args=new String[]{"input_friends/friends.txt","output/output_followers"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FollowersDriver.class);
job.setMapperClass(FollowersMapper.class);
job.setReducerClass(FollowersReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
```
**第二次处理,找出两两之间的共同追随者**
**Followers2Mapper**
```java
public class Followers2Mapper extends Mapper {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A I,K,C,B,G,F,H,O,D,
// 获取一行
String line = value.toString();
// 分割
String[] follower_persons = line.split("\t");
String follower = follower_persons[0];
String[] persons = follower_persons[1].split(",");
// 封装value
v.set(follower);
// 排序persons,避免b-c和c-b无法进入一个key分组
Arrays.sort(persons);
// 如果该follower只追随了一个person,则该follower不会成为某些人的共同追随者
if (persons.length < 2) {
return;
}
// 循环输出 b-c a
String pp;
for (int i = 0; i < persons.length - 1; i++) {
for (int j = i + 1; j < persons.length; j++) {
// 封装对象
pp = persons[i] + "-" + persons[j];
k.set(pp);
// 写出
context.write(k, v);
}
}
}
}
```
**Followers2Reducer**
```java
public class Followers2Reducer extends Reducer {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 循环拼接b-c共同followers
StringBuffer sb = new StringBuffer();
for (Text follower : values) {
sb.append(follower + ",");
}
// 封装followers对象
v.set(sb.toString());
//写出 b-c a,d,e,
context.write(key, v);
}
}
```
**Followers2Driver**
```java
public class Followers2Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args=new String[]{"output/output_followers/part-r-00000","output/output_followers_2"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Followers2Driver.class);
job.setMapperClass(Followers2Mapper.class);
job.setReducerClass(Followers2Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
```
#### 最终输出
```
A-B E,C,
A-C D,F,
A-D E,F,
A-E D,B,C,
A-F O,B,C,D,E,
A-G F,E,C,D,
A-H E,C,D,O,
A-I O,
A-J O,B,
A-K D,C,
A-L F,E,D,
A-M E,F,
B-C A,
B-D A,E,
B-E C,
B-F E,A,C,
B-G C,E,A,
B-H A,E,C,
B-I A,
B-K C,A,
B-L E,
B-M E,
B-O A,
C-D A,F,
C-E D,
C-F D,A,
C-G D,F,A,
C-H D,A,
C-I A,
C-K A,D,
C-L D,F,
C-M F,
C-O I,A,
D-E L,
D-F A,E,
D-G E,A,F,
D-H A,E,
D-I A,
D-K A,
D-L E,F,
D-M F,E,
D-O A,
E-F D,M,C,B,
E-G C,D,
E-H C,D,
E-J B,
E-K C,D,
E-L D,
F-G D,C,A,E,
F-H A,D,O,E,C,
F-I O,A,
F-J B,O,
F-K D,C,A,
F-L E,D,
F-M E,
F-O A,
G-H D,C,E,A,
G-I A,
G-K D,A,C,
G-L D,F,E,
G-M E,F,
G-O A,
H-I O,A,
H-J O,
H-K A,C,D,
H-L D,E,
H-M E,
H-O A,
I-J O,
I-K A,
I-O A,
K-L D,
K-O A,
L-M E,F,
```