# Redis
**Repository Path**: wengyoshino/Redis
## Basic Information
- **Project Name**: Redis
- **Description**: 本项目旨在学习Redis,有
Redis基础篇:常用数据结构,常用指令,快速入门案例。
Redis实战篇:各种业务场景下的应用
Redis高级篇:Redis部署和配置相关内容,lua语法入门
Redis原理篇:从源码看Redis底层结构,了解网络模型与数据结构
仅用于学习,非商用!
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 4
- **Forks**: 1
- **Created**: 2023-01-16
- **Last Updated**: 2024-04-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: Redis, Java, Lua
## README
# Redis
** :重点知识点
*** : 重点应用
## 基础篇
### 认识NoSQL
**非关系型数据库**
| 区别 | SQL | NoSQL |
| ------------ | ------------------------------------------------------- | ------------------------------------------------------------ |
| **数据结构** | 结构化 | 非结构化 |
| **数据关联** | 关联的,多表中的外键约束 | 无关联的 |
| **SQL查询** | 使用固定的(sql)语句查询,通用 | 相对简单,不统一 |
| **事务特性** | ACID:原子性,一致性,隔离性,持久性 | BASE(基本可用,允许中间态,最终一致性 |
| 存储方式 | 磁盘 | 内存 |
| | 垂直 | 水平 |
| **使用场景** | 1. 数据结构固定 2.相关业务对数据安全性、一致性要求较高 | 1. 数据结构不固定 2. 对一致性、安全性要求不高 3. 对性能要求 |
### 认识Redis
**特征**
- 键值(key-value)型,value支持多种不同数据结构,功能丰富
- 单线程,每个命令具备原子性。6.0 网络请求处理为多线程
- **低延迟,速度快(基于*内存*、IO多路复用、良好的编码)。**
- 支持数据持久化
- 支持主从集群、分片集群
- 支持多语言客户端
### Redis命令
#### Redis 数据结构介绍
Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:
#### Redis通用命令
通用指令是部分数据类型的,都可以使用的指令,常见的有:help commands——帮助文档
1. KEYS
- **查看符合模板的所有key**,不建议在生产环境设备上使用
- 
2. DEL
- 删除一个指定的key
3. EXISTS
- 判断 key 是否存在
4. EXPIRE
- 给一个 key 设置有效期,有效期到期时该 key 会被自动删除
5. TTL
- 查看一个 key 的剩余有效期
- -1:永久有效; -2:过期
#### string类型
String类型,也就是字符串类型,是Redis中最简单的存储类型。
其value是字符串,不过根据字符串的格式不同,又可以分为3类:
- string:普通字符串
- int:整数类型,可以做自增、自减操作
- float:浮点类型,可以做自增、自减操作
不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m
**常见命令**
- SET:添加或者修改已经存在的一个String类型的键值对
- GET:根据key获取String类型的value
- **MSET**:批量添加多个String类型的键值对
- **MGET**:根据多个key获取多个String类型的value
- *INCR*:让一个整型的 key 自增1
- INCRBY:让一个整型的key自增并指定步长,例如:incrby num 2让 num 值自增2
- *INCRBYFLOAT*:让一个浮点类型的数字自增并指定步长
- **SETNX**:添加一个String类型的键值对,前提是这个key不存在,否则不执行
- **SETEX**:添加一个String类型的键值对,并且指定有效期
#### key的结构
Redis的key允许有多个单词形成层级结构,多个单词之间用':'隔开,格式如下:
**项目名:业务名:类型:id**
这个格式并非固定,也可以根据自己的需求来删除或添加词条。
#### Hash类型
Hash类型,也叫散列,其value是一个无序字典,类似于引ava中的HashMap结构。
**常见命令**
- **HSET** key field value:添加或者修改hash类型key的field的值
- HGET key field:获取一个hash类型key的field的值
- HMSET:批量添加多个hash类型key的field的值
- HMGET:批量获取多个hash类型key的field的值
- **HGETALL**:获取一个hash类型的key中的所有的field和value(entrySet
- HKEYS:获取一个hash类型的key中的所有的field(keySet
- HVALS:获取一个hash类型的key中的所有的value(values
- HINCRBY:让一个hash类型key的字段值自增并指定步长
- **HSETNX**:添加一个hash类型的key的field值,*前提是这个**field**不存在,否则不执行*
#### List类型
Redis中的List类型与ava中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。
**特征**
- 有序
- 元素可以重复
- 插入和删除快
- 查询速度一般
**常见命令**
- **LPUSH** key element..:向列表左侧插入一个或多个元素
- LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil
- RPUSH key element..:向列表右侧插入一个或多个元素
- RPOP key:移除并返回列表右侧的第一个元素
- **LRANGE** key star end:返回一段角标范围内的所有元素
- *BLPOP和BRPOP*:与LPOP和RPOP类似,只不过在没有元素时**等待指定时间**,而不是直接返回nil
**问题**
1. 模拟一个栈
1. 入口和出口在同一边
2. LPUSH 和 LPOP / RPUSH 和 RPOP
2. 模拟一个队列
1. 入口和出口在不同边
2. LPUSH 和 RPOP
3. 模拟一个阻塞队列
1. 入口和出口在不同边
2. 出队时采用 BLPOP 或 BRPOP
#### set类型
Redis的Set结构与java中的HashSet:类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:
- 无序
- 元素不可重复
- 查找快
- 支持交集、并集、差集等功能
**常见命令**
- 单集合操作
- SADD key member..:向set中添加一个或多个元素
- SREM key member.:移除set中的指定元素
- **SCARD** key:返回set中元素的个数
- SISMEMBER key member:判断一个元素是否存在于set中
- SMEMBERS:获取set中的所有元素
- 多集合操作
- SINTER key1key2..:求key1与key2的 交集
- SDIFF key1key2..:求key1与key2的 差集
- SUNION key1key2.:求key1和key2的 并集
#### SortedSet类型
Redist的SortedSet是一个可排序的set集合,与java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以***基于score属性对元素排序***,底层的实现是一个跳表(SkipList)加hash表。
**特性**
- 可排序
- 元素不重复
- 查询速度快
因为SortedSet的可排序特性,经常被用来实现 **排行榜** 这样的功能。
**常见命令**
- ZADD key **score** member:添加一个或多个元素到sorted set,如果已经存在则更新其score值
- ZREM key member:删除sorted set中的一个指定元素
- **ZSCORE** key member:获取sorted set中的**指定元素的score值**
- **ZRANK** key member:获取sorted set中的指定元素的**排名**
- ZCARD key:获取sorted set中的元素个数
- **ZCOUNT** key min max:统计**score值在给定范围内**的所有元素的**个数**
- ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
- **ZRANGE** key min max :按照score排序后,获取**指定排名范围内的元素**(按角标 0 ~ n)
- **ZRANGEBYSCORE** key min max [WITHSCORES] [LIMIT offset count]:按照score:排序后,获取指定score范围内的元素(按分数查,offset:第一次参数的偏移量,count:查几条)
- 滚动查询:
- ZRANGEBYSCORE z1 0 10000 WITHSCORES LIMIT **0** 3
- ZRANGEBYSCORE z1 **2** 10000 WITHSCORES LIMIT 1 3
- ZDIFF、ZINTER、ZUNION:求差集、交集、并集
**注意**:所有的排名**默认都是升序**,如果要降序则在命令的 ***Z 后面添加 REV 即可***
### Redis的java客户端
### Jedis
#### Jedisi连接池
Jedis.本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用 jedis连接池 代替 jedis 的直连方式
### SpringData
SpringData:是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https:/spring.io/projects/spring-data-redis
- 提供了对不同Redis客户端的整合(Lettuce 和 jedis)
- 提供了RedisTemplate:统一API来操作 Redis
- 支持 Redis 的发布订阅模型
- 支持 Redis 哨兵和 Redis 集群
- 支持基于 Lettuce 的响应式编程
- 支持基于刊 JDK、JS0N、字符串、Spring对象的数据序列化及反序列化
- 支持基于 Redis 的 JDKCollection:实现
#### **快速入门**
2. 配置文件
```yaml
spring:
redis:
host: 192.168.153.128
port: 6379
password:
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 100
```
#### 序列化方式
RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object/序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:

- 可读性差
- 内存占用较大
##### **自定义序列化方式**
```java
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
// 创建 RedisTemplate 对象
RedisTemplate template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(connectionFactory);
// 创建 JSON 序列化工具
GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 设置 key 序列化
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// 设置 value 序列化
template.setValueSerializer(RedisSerializer.json());
template.setValueSerializer(RedisSerializer.json());
//返回
return template;
}
}
```
##### **存在的问题**
为了在 反序列化 时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。
##### 解决措施
为了节省内存空间,我们并不会使用 JSON 序列化器来处理 value ,而是**统一使用String序列化器**,要求只能存储 String 类型的key和value。当需要存储 Java 对象时,**手动完成**对象的序列化和反序列化。
##### StringRedisTemplate
Spring默认提供了一个StringRedisTemplate:类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程:
```java
@Resource
private StringRedisTemplate stringRedisTemplate;
@Test
void testSaveUser() {
User user = new User("yoshino", 22);
// 手动序列化
Gson gson = new Gson();
String json = gson.toJson(user);
stringRedisTemplate.opsForValue().set("user:100", json);
String jsonUser = stringRedisTemplate.opsForValue().get("user:100");
// 手动反序列化
User user1 = gson.fromJson(jsonUser, User.class);
System.out.println(json);
System.out.println(user1);
}
```
## 实战篇
### 短信登录
#### 导入
**数据库**
**后端直接导入,修改 yml 文件**
输入:[localhost:8081/shop-type/list](http://localhost:8081/shop-type/list) 测试
**前端导入**
#### 基于 Session 实现登录
##### 发送短信验证码
##### 短信验证码登录
##### 登录验证功能
##### 隐藏用户敏感信息
```java
//需要引入
cn.hutool
hutool-all
5.7.17
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
```
#### 集群的 session 共享问题
**session共享问题**:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
**方案应该满足:**
- 数据共享
- 内存存储
- key、value:结构
#### 基于 Redis 实现共享 Session 登录
```java
// 7.2将 User 对象转为 Hash 存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
```
**Redis代替session需要考虑的问题:**
- 选择合适的数据结构
- 选择合适的key
- 选择合适的存储粒度
- ***注意过期时间***
#### 登录拦截器的优化
**问题**
### 商品查询缓存
#### 什么是缓存
**缓存**就是数据交换的缓冲区(称作 Cache),是存贮数据的临时地方,一般读写性能较高
#### 添加 redis 缓存
**给店铺类型查询业务添加缓存**
#### 缓存更新策略
**业务场景**
- 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
**主动更新策略**
3:停电数据丢失,数据库的数据非准确的
**选择 1 策略**
#### 给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的业务逻辑,满足下面的需求:
- 根据d查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
- 根据id修改店铺时,先修改数据库,再删除缓存
### **缓存穿透
**缓存穿透** 是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

**解决方案**
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点
- 额外的内存消耗(设置 TTL)
- 可能造成短期的不一致
- 布隆过滤
- 优点:内存占用较少,没有多余 key
- 缺点
- 实现复杂
- 存在误判可能
**缓存穿透产生的原因是什么?**
- 用户请求的数据在缓存中和数据库中**都不存在**,不断发起这样的请求,给数据库带来巨大压力
**缓存穿透的解决方案有哪些?**
- 缓存 null 值
- 布隆过滤
- 增强 id 的复杂度,避免被猜测 id 规律
- **做好数据的基础格式校验**(在访问前就将不符合格式的 id 查询拦截
- 加强用户权限校验
- 做好热点参数的限流
### **缓存雪崩
**缓存雪崩 **是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
**解决方案**
- 给不同的 Key 的 TTL 添加随机值
- 利用 Redis 集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
### **缓存击穿(热点 key)
缓存击穿问题也叫**热点 Key 问题**,就是一个被**高并发访问**并且**缓存重建业务较复杂**的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
**解决方案**
- 互斥锁
- 逻辑过期
### 商品查询缓存
#### 基于互斥锁方式解决缓存击穿问题
需求:修改根据d查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
#### 基于逻辑过期方式解决缓存击穿问题
需求:修改根据d查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
**活动开始前,就已经存好(已经预热过的,在活动中的热点店铺)**
### ***缓存工具封装(CacheClient)
基于StringRedisTemplate:封装一个缓存工具类,满足下列需求:
- 方法1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
- 方法2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
### ***优惠券秒杀
#### 全局唯一 ID
##### 全局ID生成器
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
全局引D生成器,是一种在分布式系统下用来**生成全局唯一 ID 的工具**,一般要满足下列特性:
为了增加 ID 的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
**ID的组成部分:**
- 符号位:1bit, 永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同 ID
##### 总结
**全局唯一 ID 生成策略**
- UUID
- Redis 自增
- snowflake 算法
- 数据库自增
**Redis 自增 ID 策略**
- 每天一个 key,方便统计订单量
- ID 构造是 时间戳 + 计数器
#### 实现添加优惠券秒杀下单
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
- tb_voucher:优惠券的基本信息,优惠金额、使用规则等
- tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
#### 实现秒杀下单
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
#### 超卖问题
**测试**
使用 JMeter 测试
**错误流程图**
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是 **加锁**:
##### **乐观锁**
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
- 版本号
- CAS 法
> stock = 原来的,改为 stock > 1
**超卖这样的线程安全问题,解决方案有哪些**?
1. 悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
2. 乐观锁:不加锁,在更新时判断是否有其它线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
### 一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
### 集群模式下线程并发问题
#### 一人一单的并发安全问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
> Ctrl + D 实现

**出现一人多单的问题**
### 分布式锁
#### 工作原理
**什么是分布式锁**
分布式锁:满足分布式系统或集群模式下 **多进程可见** 并且 **互斥** 的锁。
#### 基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法:
- 获取锁
- 互斥:确保只能有一个线程获取锁
-
-
- 释放锁
- 手动释放
- 超时释放:获取锁时添加一个超时时间
- DEL lock
#### 基于Redis:实现分布式锁初级版本
需求:定义一个类,实现下面接口,利用Redis.实现分布式锁功能。
```java
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
```
单台 jvm 适用
**误删问题**
#### 改进Redis的分布式锁:对value值进行验证
需求:修改之前的分布式锁实现,满足:
1. 在获取锁时存入线程标示(可以用UUD表示)
2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
**误删问题**
#### ***Redis的Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redist命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https:/www.runoob.com/lua/lua-tutorial.html
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
**释放锁的业务流程**是这样的:
1. 获取锁中的线程标示
2. 判断是否与指定的标示(当前线程标示)一致
3. 如果一致则释放锁(删除)
4. 如果不一致则什么都不做
**Lua 脚本表示:**
```lua
-- 获取锁中的线程标示 get key
local id = redis.call('get', KEYS[1])
-- 比较线程标示与锁中的标示是否一致
if (id == ARGV[1]) then
-- 释放锁
return redis.call('del', KEYS[1])
end
return 0
```
#### 再次改进 Redis 的分布式:使用 lua 脚本
**实现**
**基于setnx实现的分布式锁存在下面的问题:**
### **Redisson
Redisson:是一个在**Redis的基础上**实现的 **java 驻内存数据网格**(In-Memory Data Grid)。它不仅提供了一系列的分布式的 java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson
#### Redisson 入门
1. 引入依赖:
```xml
org.redisson
redisson
3.17.5
```
2. 配置Redisson客户端:
```java
/**
* @author yoshino
**/
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
config.useSingleServer().setAddress(redisAddress);
// 创建 RedissonClient 对象
return Redisson.create(config);
}
}
```
3. 使用 Redisson 的分布式锁:
#### Redisson 可重入锁原理:使用 hash 结构
##### **获取锁的 lua 脚本(Redisson 获取锁的 lua 源码)**
```lua
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1];-- 线程唯一标识
local releaseTime = ARGV[2];-- 锁的自动释放时间
-- 判断惭是否存往
if (redis.call('exists', key) == 0) then
-- 不存在,获取锁
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return nil; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if (redis.call('hexists', key, threadId) == 1)
then
-- 存在,获取锁,重入次数 + 1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return nil;--返回结果
end ;
return redis.call('pttl', key);--代码走到这里,说明获取锁的不是自己,获取锁失败,返回锁的剩余有效期
```
##### **释放锁的 lua 脚本(Redisson 释放锁的 lua 源码)**
```lua
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1];-- 线程唯一标识
local releaseTime = ARGV[2];-- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0)
then
return nil;-- 如果已经不是自己,则直接返回
end
--是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,
-- 重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return 0;
else
-- 等于0说明可以释放锁直接删除
redis.call('DEL', key);
rediscall('publish',KEYS[2],threadId); -- 发布消息通知
return 1;
end
return nil;
```
#### 基于 Redisson 的分布式锁的优化:可重试和超时续约
优化 setnx 实现的分布式锁带来的问题
**trylock 携带 等待时间时 (锁重试): **利用 消息队列 和 信号量 方式
**trylock 不携带 释放时间时(-1) (锁重试):**看门狗机制 每 10s 更新一次 ttl
**如何取消(释放锁时)**
##### 获取锁
##### 释放锁
#### Redisson分布式锁主从一致性问题:RedissonMultiLock
`lock = redissonClient.getMultiLock(lock1, lock2, lock3);`
**必须获取所有 Node 的锁才算成功 获取,否则算失败**(联锁)
#### **Redisson分布式锁原理
- **可重入:**利用 **hash** 结构记录线程 id 和重入次数
- **可重试:**利用 信号量 和 Pub-Sub 功能实现等待、唤醒、获取锁失败的重试机制(有等待时间)
- **超时续约:**利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间(释放时间参数必须为 -1,默认提供的释放时间 30s)
#### 锁总结
1. 不可重入Redis分布式锁:
- 原理:利用setnx的互斥性;利用eX避免死锁;释放锁时判断线程标示
- 缺陷:不可重入、无法重试、锁超时失效
2. 可重入的Redis:分布式锁:
- 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
3. Redisson的multiLock:
- 原理:多个独立的Redis节点,**必须在所有节点都获取重入锁**,才算获取锁成功
- 缺陷:运维成本高、实现复杂
### Redis 优化秒杀
上图,为串联的执行顺序,效率低
**解决方案**
**redis 的数据结构**
**redis 中的操作(lua 脚本)**
**tomcat 中的操作**
#### 改进秒杀业务,提高并发性能
**需求:**
1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
#### 秒杀业务的优化思路是什么?
1. 先利用Redis完成库存余量、一人一单判断,完成抢单业务
2. 再将下单业务放入阻塞队列,利用独立线程异步下单
#### 基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题(BlockQuery 使用 JDK 的内存,需要加以限制)
- 数据安全问题(数据库操作失败后,Redis 的数据 和 数据库不一致 问题)
### Redis消息队列实现异步秒杀
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
**Redis提供了三种不同的方式来实现消息队列:**
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
#### 基于 List 结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是**入口和出口不在一边**,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。
因此这里应该使用**BRPOP或者BLPOP**来实现阻塞效果。
**基于Lst的消息队列有哪些优缺点?**
优点:
- 利用Redis存储,不受限于 JVM 内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失(拿到了就 remove 了,不能确保一定完成业务)
- 只支持单消费者
#### 基于 PubSub 的消息队列
**PubSub(发布订阅)**是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel[channel]:订阅一个或多个频道
- PUBLISH channel msg:向一个频道发送消息
- PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道
**基于PubSub的消息队列有哪些优缺点?**
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化(没有消费者时,消息丢失)
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失(仅在消费者有缓存窗口)
#### ***基于Stream的消息队列
Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
**发送消息的命令**
##### **读取消息的方式之一: XREAD**
> 阻塞时间 :0 表示等待时间无限长
**!注意**
当我们指定起始 ID 为$时,代表读取最新的消息,如果我们处理一条消息的过程中又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现 **漏读消息** 的问题。
**STREAM:类型消息队列的XREAD命令特点:**
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
##### 基于Stream的消息队列-消费者组
消费者组(**Consumer Group**):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
**创建消费者组**
**从消费者组读取消息**
**确认消息**

**读取pending-list中的消息(消费但未确认)**

`XPENDING s1 g1 - + 10`:读10条,- + 全部
**STREAM类型消息队列的XREADGROUP命令特点:**
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
#### 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
**需求:**
1. 创建一个Stream类型的消息队列,名为stream.orders `127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM`
2. 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherld、userld、orderld
3. 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
### 达人探店
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
- tb_bog:探店笔记表,包含笔记中的标题、文字、图片等
- tb_blog_comments:其他用户对探店笔记的评价
#### 发布探店笔记
点击首页最下方菜单栏中的+按钮,即可发布探店图文:
#### 实现查看发布探店笔记的接口
需求:点击首页的探店笔记,会进入详情页面,实现该页面的查询接口:
#### 点赞
在首页的探店笔记排行榜和探店图文详情页面都有点赞的功能:
**需求**
- 同一个用户只能点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段 Blog 类的 isLike 属性)
#### 点赞排行榜
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
**实现查询点赞排行榜的接口**
需求:按照点赞时间先后排序,返回Top5的用户
**问题**
数据库查询时,是使用 in 来获取,导致点赞顺序的排序问题
解决:`WHERE id IN ( 1010 , 1 ) ORDER BY FIELD(id, 1010, 1)`
拼接:`String idStr = StrUtil.join(",", ids);`
查询语句:`query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()`
### 好友关注
#### 关注和取关
需求:基于该表数据结构,实现两个接口:
1. 关注和取关接口
2. 判断是否关注的接口
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb follow表来标示:
#### 共同关注
需求:利用Rdis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友。
#### 关注推送
关注推送也叫做Feed流,直译为 **投喂** 。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
Feed流产品有两种常见模式:
- **Timeline**:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
- **智能排序**:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做Feed流,因此采用 Timeline 的模式。该模式的实现方案有三种:
- 拉模式:也叫做读扩散
- 推模式:也叫做写扩散
- 推拉结合:也叫做读写混合,兼具推和拉两种模式的优点
#### **基于推模式实现关注推送功能**
**需求**:
1. 修改新增探店笔记的业务,在保存bog到数据库的同时,推送到粉丝的收件箱
2. 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
3. 查询收件箱数据时,可以实现分页查询
**Feed流的分页问题**
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
**Feed 流的滚动分页**
#### 实现关注推送页面的分页查询
**需求**:在个人主页的“关注”卡片中,查询并展示推送的blog信息:

### 附近商户
#### GEO数据结构
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
#### 附近商户搜索
在首页中点击某个频道,即可看到频道下的商户:
按照**商户类型**做分组,类型相同的商户作为同一组,以 typeld 为 key 存入同一个GEO集合中即可
SpringDataRedisl的2.3.9版本并不支持Redis6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM文件,内容如下:
### 用户签到
#### BitMap用法
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为**位图(BitMap)**
Redis中是利用**string**类型数据结构实现BitMap,因此最大上限是**512M**,转换为bit则是2^32个bit位
#### 签到功能
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
#### 签到统计
#### 实现签到统计功能
### UV 统计
#### HyperLogLog 用法
首先我们搞懂两个概念:
**UV**:全称 Unique Visitor,也叫*独立访客量*,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
**PV**:全称Page View,也叫*页面访问量或点击量*,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
#### 实现 UV 统计
## 高级篇
### 分布式缓存
#### 单点 redis 的问题
数据丢失问题:Reds是内存存储,服务重启可能会丢失数据
并发能力问题:单节点Redis并发能力虽然不错,但也无法满足如618这样的高并发场景
故障恢复问题:如果Redis:宕机,则服务不可用,需要一种自动的故障恢复手段
存储能力问题:Redis基于内存,单节点能存储的数据量难以满足海量数据需求
**方法:**
实现 Redis 数据持久化
搭建主从集群,实现读写分离
利用 Redis 哨兵,实现健康检测和自动恢复
搭建分片集群,利用插槽机制实现动态扩容
#### Redis 持久化
##### RDB持久化
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做***Redis数据快照***。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
快照文件称为RDB文件,默认是保存在当前运行目录。
Redis**停机时**会执行一次RDB。保存到当前运行的目录下
**配置**
**原理**
bgsave开始时会 fork 主进程得到子进程,子进程**共享**主进程的内存数据。完成 fork 后读取内存数据并写入RDB文件。
fork采用的是copy-on-write技术:
- 当主进程执行读操作时,访问共享内存;
- 当主进程执行写操作时,则会拷贝一份数据,执行写操作。
**总结**
RDB方式bgsave的基本流程?
- fork 主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的 RDB 文件
- 用新RDB文件 **替换** 旧的 RDB 文件。
RDB会在什么时候执行?Save 60 1000代表什么含义?
- 默认是服务停止时。
- 代表60秒内至少执行1000次修改则触发RDB
RDB的缺点?
- RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
- fork子进程、压缩、写出RDB文件都比较耗时
##### AOF特久化
AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。
**配置**
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
AOF的命令记录的频率也可以通过redis.conf文件来配:
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行 ***bgrewriteaof*** 命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。
Redis也会在触发阈值时**自动**去重写AOF文件。阈值也可以在redis.conf中配置:
##### 总结
### Redis 主从集群
#### 搭建主从架构
单节点Redis的并发能力是有上限的,要进一步提高Redist的并发能力,就需要搭建主从集群,实现读写分离。
#### 搭建主从集群
视频 P102
假设有A、B两个Redis:实例,如何让B作为A的slave节点?
- 在B节点执行命令:slaveof A的IP A的port
#### 数据同步原理
master如何判断slave是不是第一次来同步数据?这里会用到两个很重要的概念:
- **Replication Id**:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave 则会继承master节点的replid
- **offset**:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave 完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。
因此 slave 做数据同步,必须**向 master** 声明自己的**replication id和offset**,master才可以判断到底需要同步哪些数据
slave 第一次同步前,自身就是 master,replid 记录为本身
**简述全量同步的流程?**
- slave 节点请求增量同步
- master 节点判断 replid ,发现不一致,拒绝增量同步
- master 将完整内存数据生成 RDB,发送RDB到slave
- slave 清空本地数据,加载 master 的RDB
- master 将RDB期间的命令记录在 repl_baklog ,并持续将log中的命令发送给 slave
- slave 执行接收到的命令,保持与 master 之间的同步
#### 总结
简述全量同步和增量同步区别?
- 全量同步:master将完整内存数据生成RDB,发送RDB到slave。后续命令则记录在repl_baklog,逐个发送给slave。
- 增量同步:slave提交自己的 offset 到master,master获取repl_baklog中从offset之后的命令给slave
什么时候执行全量同步?
- slave节点**第一次**连接masteri节点时
- slave节点**断开时间太久**,repl_baklog中的 offset 已经被覆盖时
什么时候执行增量同步?
- slave节点**断开又恢复**,并且在 repl_baklog 中**能找到offse**t时
### Redis 哨兵
slave节点宕机恢复后可以找master节点同步数据,那master节点宕机怎么办?
#### 哨兵的作用和原理
Redis:提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下:
**服务状态监控**
#### 总结
Sentinel的三个作用是什么?
- 监控
- 故障转移
- 通知
Sentinel如何判断一个redis实例是否健康?
- 每隔 1s 发送一次 ping 命令,如果超过一定时间没有相向,则认为是 **主观下线**
- 如果 大多数 sentinel 都认为实例主观下线,则判断 服务下线
故障转移步骤有哪些?
- 首先选定一个slave作为新的master,执行slaveof no
- 然后让所有节点都执行slaveof新master
- 修改故障节点配置,添加slaveof新master
#### 搭建哨兵集群
视频 p106
#### RedisTemplate 的哨兵模式
在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换。
#### Sentinel之间互相监控
Sentinel是特殊的Redis节点,也能发布订阅;
Sentinel没有主从之分;
Sentinel**订阅所有Redis节点的_sentinel_:hello频道**,并在**上线时**给所有Redis节点的_sentinel_:hello**频道发送消息**,包括自己的host、进程ID(runid)、以及Master配置,让**其他Sentinel感知**自己,**更新存储的Sentinel列表**(如果是新的host、新的进程号,则进行添加;如果已经有host相同,但进程ID不同的Sentinel,代表原有的Sentinel重启了,会进行替换),如果Master配置不同,代表其他的Sentinel的Master配置的版本低,也会进行替换。
Sentinel上线后会每隔1秒给其他Sentinel发送消息。
### Redis 分片集群
#### 搭建分片集群
##### 分片集群结构
主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:
- 海量数据存储问题
- 高并发写的问题
使用分片集群可以解决上述问题,分片集群特征:
- 集群中有多个master,每个master保存不同数据
- 每个master都可以有多个slave节点
- master之间通过ping监测彼此健康状态
- 客户端请求可以访问集群任意节点,最终都会被转发到正确节点

##### **搭建**
p108
#### 散列插槽
Redis会把每一个 master 节点映射到 0~16383 共16384个插槽(hash slot)上,查看集群信息时就能看到:
数据**key**不是与节点绑定,而是**与插槽绑定**。redis会根据key的有效部分计算插槽值,分两种情况:
- key 中包含"{}",且“{}”中至少包含1个字符,“{}” 中的部分是有效部分
- key 中不包含“{}”,整个 key 都是有效部分
**例如**:key是num,那么就根据num计算,如果是{itcast}num,则根据itcasti计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
##### 总结
Redis如何判断某个key应该在哪个实例?
- 将16384个插槽分配到不同的实例
- 根据key的有效部分计算哈希值,对16384取余
- 余数作为插槽,寻找插槽所在实例即可
如何将**同一类数据固定**的**保存在同一个Redis实例**?
- 这一类数据使用相同的有效部分,例如key都以 {typeld} 为前缀
#### 集群伸缩
##### 添加一个节点到集群
**向集群中添加一个新的masteri节点,并向其中存储num=10**
需求:
- 启动一个新的redis.实例,端口为7004
-
- 添加7004到之前的集群,并作为一个master节点
-
- 给7004节点分配插槽,使得num这个key可以存储到7004实例
-
-
-
#### 故障转移
在7002这个slave节点执行**手动故障转移**,重新夺回master:地位
步骤如下:
1. 利用redis-cli连接7002这个节点
2. 执行cluster failovert命令
#### RedisTemplate访问分片集群
### 多级缓存
#### 传统缓存的问题
传统的缓存策略一般是请求到达 Tomcat 后,先查询 Redis ,如果未命中则查询数据库,存在下面的问题:
- 请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈
- Redis缓存失效时,会对数据库产生冲击
#### 多级缓存方案
多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能:
用作缓存的Nginx是业务Nginx,需要部署为集群,再有专门的Nginx用来做反向代理:
#### ***JVM进程缓存
##### 导入商品案例 P114
安装 mysql ,导入工程
##### **初识Caffeine
**本地进程缓存**
缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:
- 分布式缓存,例如Redis:
- 优点:存储容量更大、可靠性更好、可以在 **集群间共享**
- 缺点:访问缓存有 **网络开销**
- 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
- 进程本地缓存,例如HashMap、GuavaCache:
- 优点:读取本地内存,没有网络开销,**速度更快**
- 缺点:存储容量有限、可靠性较低、**无法共享**
- 场景:性能要求较高,缓存数据量较小
**Caffeine**是一个基于刊 java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine
```xml
com.github.ben-manes.caffeine
caffeine
2.9.3
```
> For Java 11 or above, use `3.x` otherwise use `2.x`.
**Caffeine 的驱逐策略**
##### 实现进程缓存
**实现商品的查询的本地进程缓存**
利用**Caffeine**实现下列需求:
- 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
- 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
- 缓存初始大小为100
- 缓存上限为10000
**配置 Caffeine**
通过注入,使用 get 方法即可
#### Lua语法入门
##### 初识Lua
Lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵话的扩展和定制功能。官网:https://www.lua.org/
##### 变量和循环
##### 条件控制、函数
```lua
function printArr(arr)
for index,value in ipairs(arr)do
print(value)
end
end
```
#### 多级缓存
##### 安装OpenResty P121
OpenResty是一个基于Nginx的高性能Web平台,用于方便地搭建能够处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。具备下列特点:
- 具备Nginx的完整功能
- 基于Lua语言进行扩展,集成了大量精良的Lua库、第三方模块
- 允许使用Lua自定义业务逻辑、自定义库
官方网站:https://openresty.org/cn/
##### **OpenRest快速入门
**实现商品详情页数据查询**
**步骤一**:修改nginx.conf文件
**步骤二**:编写item.lua文件
##### 请求参数处理
**获取请求路径中的商品id信息,拼接到json结果中返回**
```lua
--获取路径参数
local id = ngx.var[1]
--返回结果
ngx.say ('{"id":'.. id ..'}')
```
##### ***查询Tomcat
**获取请求路径中的商品id信息,根据id向Tomcat查询商品信息**
这里要修改item.lua,满足下面的需求:
1. 获取请求参数中的id
2. 根据id向Tomcat服务发送请求,查询商品信息
3. 根据id向Tomcat服务发送请求,查询库存信息
4. 组装商品信息、库存信息,序列化为SON格式并返回
**nginx内部发送Http请求**
nginx提供了内部API用以发送http请求:
**返回的响应内容包括**:
- resp.status:响应状态码
- resp.header:响应头,是一个table
- resp.body:响应体,就是响应数据
**注意**:这里的path是路径,并**不包含lP和端口**。这个请求会被nginx内部的server监听并处理。
**封装http查询的函数**
我们可以把http查询的请求封装为一个函数,放到OpenResty函数库中,方便后期使用。
**JSON结果处理**
OpenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化。
官方地址:https://github.com/openresty/lua-cjson/
**使用Http函数查询数据**
> item.lua
```lua
--导入common函数库
local common = require('common')
local read_http = common.read_http
--导入 cjson 库
local cjson = require('cjson')
--获取路径参数
local id = ngx.var[1]
--查询商品信息
local itemJSON = read_http("/item/" .. id, nil)
--查询库存信息
local stockJSON = read_http("/item/stock/" .. id, nil)
-- JSON 转为 lua 的 table
local item = cjosn.decode(itemJSON)
local stock = cjson.decoode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把 item 序列化为 JSON 返回结果
ngx.say(cjson.encode(item))
```
**Tomcat:集群的负载均衡**
> hash :表示会根据请求路径url 进行hash运算,使得同一个访问地址指向同一个 tomcat 服务器
##### **Redis缓存预热
**冷启动与缓存预热**
**冷启动**:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。
**缓存预热**:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。
> 我们数据量较少,可以在启动时将所有数据都放入缓存中。
```java
private static final ObjectMapper MAPPER = new objectMapper();
@Override
public void afterPropertiesSet()throws Exception {
//初始化缓存
//1.查询商品信息
List- itemList = itemService.list();
//2.放入缓存
for (Item item : itemList) {
//2.1.item 序列化为切 JSON
String json = MAPPER.writeValueAsString(item);
//2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(),json);
}
}
```
> **注意** :需要实现 InitializingBean 接口,重写 afterPropertiesSet() 方法,会在 RedisHandler 创建后,成员变量完成初始化后 执行
##### ***查询Redis缓存
**OpenResty的 Redis 模块**
> common.lua
```lua
--导入redis
local redis = require('resty.redis')
--初始化redis
Local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
--关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idLe_time = l0000 --连接的空闲时间,单位是毫秒
local poo1_size = 100 --连接池大小
Local ok,err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR,"放入redis连接池失败:",err)
end
end
--查询redis的方法ip和port是redis,地址,key是查询的key
local function read_redis(ip,port,key)
--获取一个连接
local ok,err = red:connect(ip,port)
if not ok then
ngx.log(ngx.ERR,"连接redis:失败:",err)
return nil
end
--查询redis
Local resp,err = red:get(key)
--查询失败处理
if not resp then
ngx.log(ngx.ERR,"查询Redis失败:",err,",key=",key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR,"查询Redis数据为空,key=",key)
end
close_redis(red)
return resp
end
--封装函数,发送http请求,并解析响应
Local function = read_http(path,params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
--记录错误信息,返回404
ngx.log(ngx.ERR,"http查询失败,path:",path,",args:",args)
ngx.exit(404)
end
return resp.body
end
--一将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
```
**查询商品时,优先Redis缓存查询**
**需求**:
修改item.lua,封装一个函数read data,实现先查询Redis,如果未命中,再查询tomcat
修改item.lua,查询商品和库存时都调用read data这个函数
##### **Nginx本地缓存
OpenResty为 Nginx 提供了 **shard dict** 的功能,可以在 nginx 的多个worker之间共享数据,实现缓存功能。
**在查询商品时,优先查询OpenResty的本地缓存**
需求:
- 修改item.lua中的read_data函数,**优先查询本地缓存**,未命中时再查询Redis、Tomcat
- 查询 Redis 或 Tomcat 成功后,将数据写入本地缓存,并**设置有效期**
- 商品基本信息,有效期30分钟
- 库存信息,有效期1分钟
#### **缓存同步策略
##### 数据同步策略
缓存数据同步的常见方式有三种:
- **设置有效期**:给缓存设置有效期,到期后自动删除。再次查询时更新
- 优势:简单、方便
- 缺点:**时效性差**,缓存过期之前可能不一致
- 场景:**更新频率较低**,时效性要求低的业务
- **同步双写**:在修改数据库的同时,直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高,
- 场景:对一致性、时效性要求较高的缓存数据
- **异步通知**:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
##### 安装Canal
**初识Canal**
Canal,译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于刊 java 开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal
Canal就是把自己 **伪装成MySQL的一个slave节点** ,从而**监听masterl的binary log变化**。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
**安装和配置Canal** P131
1. 开启 MySQL 主从
1. 开启bin_log
2. 设置用户权限
2. 安装 Canal
1. 创建网络
2. 安装 Canal
##### 监听Canal
**Canal客户端**
Canal 提供了各种语言的客户端,当Canall监听到 bin_log 变化时,会通知 Canal 的客户端。
不过这里我们会使用GitHub上的第三方开源的canal-starter。地址:https://github.com/NormanGyllenhaal/canal-client
**编写监听器,监听 Canal 消息:**
Canal推送给canal-client的是被修改的这一行数据(row),而我们引入的canal-client则会帮我们把行数据封装到 Item 实体类中。这个过程中需要知道数据库与实体的映射关系,要用到到PA的几个注解:
### **总结

## 实践篇
### Redis键值设计
#### 优雅的key结构
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
#### 拒绝BigKey
BigKey通常以**Key的大小和Key中成员的数量**来综合判定,例如:
- Key本身的数据量过大:一个String类型的Key,它的值为5MB。
- Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个。
- Key中成员的数据量过大:一个Hash类型的Key,它的成员数量虽然只有1,000个,但这些成员的Value(值)总大小为100MB。
**查询一个 key 的准确大小**
**看元素长度**
**Big Key的危害**
- 网络阻塞
对BigKey执行读请求时,少量的QPS就可能导致带宽使用率被占满,导致Redis实例,乃至所在物理机变慢
- 数据倾斜
BigKey所在的Redis实例内存使用率远超其他实例,无法使数据分片的内存资源达到均衡
- Redis阻塞
对元素较多的hash、list、Zset等做运算会耗时较旧,使主线程被阻塞
- CPU压力
对BigKey的数据序列化和反序列化会导致CPU的使用率飙升,影响Redis实例和本机其它应用
**如何发现BigKey**
- redis-cli --bigkeys
利用redis-cli提供的-bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与**每个数据的Top1的big key**
- scan扫描
自己编程,利用scan扫描Redist中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)
- 第三方工具
利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况
- 网络监控
自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
**如何删除 BigKey**
#### 恰当的数据类型
#### 总结
Key的最佳实践:
- 固定格式:[业务名]:[数据名]:[id]
- 足够简短:不超过44字节
- 不包含特殊字符
Value的最佳实践:
- 合理的拆分数据,拒绝BigKey
- 选择合适数据结构
- Hash 结构的 entry 数量不要超过1000(**默认配置为 500**)
- 设置合理的超时时间
### 批处理优化
#### Pipeline
> 结论:批量执行的效率大大提高
**MSET**
Redis提供了很多MXX这样的命令,可以实现批量插入数据,例如:
- mset
- hmset
> 不要在一次批处理中传输太多命令,否则单次命令**占用宽带过多,会导致网络阻塞**
> 不同
>
> Pipeline没有限制,可以**任何数据结构做组合**
>
> mset 是 redis 原生操作,具有原子性,会一次执行完毕。
**总结**
批量处理的方案:
1. 原生的M操作
2. Pipeline批处理
**注意事项:**
1. 批处理时不建议一次携带太多命令
2. **Pipeline的多个命令之间不具备原子性**
#### 集群下的批处理
如 MSET 或 Pipeline 这样的批处理需要在一次请求中携带多条命令,而此时如果 **Redis 是一个集群**,那批处理命令的**多个key必须落在一个插槽**中,否则就会导致执行失败。
> **Spring 已经提供 并行slot 的方式实现批处理,在集群模式下**
>
> StringRedisTemplate中,multiSet 方法,已实现并行slot方法
>
> 首先会按照 传递数组中的 key值(entrySet),来计算 插槽 得到 map> partitioned,然后遍历 map,将插槽一样的数据放入一个 map 集合中并开启异步任务,依次遍历完 partitioned 中的所有插槽。
### 服务端优化
#### 持久化配置
Rdis的持久化虽然可以保证数据安全,但也会带来很多额外的开销,因此持久化请遵循下列建议:
#### 慢查询
慢查询:在Redis执行时耗时超过某个阈值的命令,称为慢查询。
#### 命令及安全配置
Redis会绑定在0.0.0.0:6379,这样将会将Redis服务暴露到公网上,而Redis如果没有做身份认证,会出现严重的安全漏洞.漏洞重现方式:https://cloud.tencent.com/developer/article/1039000
漏洞出现的核心的原因有以下几点:
- Redis服务暴露到公网
- Redis未设置密码
- 利用了Redis的config seti命令动态修改Redis配置
- 使用了Root账号权限启动Redis
为了避免这样的漏洞,这里给出一些建议:
1. Redis一定要设置密码
2. 禁止线上使用下面命令:keys、flushall、flushdb、config set等命令。可以**利用rename-command禁用**。
3. bind:限制网卡,禁止外网网卡访问
4. 开启防火墙
5. 不要使用Root账户启动Redis
6. 尽量不是有默认的端口
#### 内存配置
当Redis内存不足时,可能导致Key频繁被删除、响应时间变长、QPS不稳定等问题。当内存使用率达到90%以上时就需要我们警惕,并快速定位到内存占用的原因。
**数据内存的问题**
Redis:提供了一些命令,可以查看到Redis目前的内存分配状态:
- info memory
- memory xxx
**内存缓存区配置**
内存缓存区常见的有三种:
- **复制缓冲区: **主从复制的 repl_backlog_buf,如果太小可能导致频繁的全量复制,影响性能。通过 repl_backlog_size 来设置,默认 1mb
- **AOF 缓冲区:**AOF 刷盘之前的缓存区域,AOF 执行 rewrite 的缓冲区。无法设置容量上限
- **客户端缓冲区:**分为输入缓冲区和输出缓冲区,输入缓冲区最大1G且不能设置。输出缓冲区可以设置
### **集群最佳实践
集群虽然具备高可用特性,能实现自动故障恢复,但是如果使用不当,也会存在一些问题:
1. 集群完整性问题
在Rdis的**默认配置**中,如果发现**任意一个插槽不可用,则整个集群都会停止对外服务**:
为了保证高可用特性,这里建议将cluster-require-full-coverage配置为 no
2. 集群带宽问题
**集群节点之间**会不断的**互相Ping**来确定集群中其它节点的状态。每次Ping携带的信息至少包括:
- 插槽信息
- 集群状态信息
集群中节点越多,集群状态信息数据量也越大,10个节点的相关信息可能达到1kb,此时每次集群互通需要的带宽会非常高。
解决途径:
1. 避免大集群,集群节点数不要太多,最好少于1000,如果业务庞大,则建立多个集群。
2. 避免在单个物理机中运行太多Redis实例
3. 配置合适的cluster--node-timeout值
3. 数据倾斜问题
4. 客户端性能问题
5. 命令的集群兼容性问题
mset 的 key 问题,插槽是否在一个结点上
6. lua和事务问题
mset 的 key 问题,插槽是否在一个结点上
## **原理篇
### 数据结构
#### **动态字符串SDS
我们都知道Redis中保存的Key是字符串,value往往是字符串或者字符串的集合。可见字符串是Redis中最常用的一种数据结构。
Redis构建了一种新的字符串结构,称为**简单动态字符串**(Simple Dynamic String),简称 **SDS**。
#### IntSet:底层 整数数组
IntSet是Redist中set集合的一种实现方式,基于**整数数组**来实现,并且具备 **长度可变、有序** 等特征。
---
**IntSet 升级**
**总结**
Intset可以看做是特殊的整数数组,具备一些特点:
1. Redis会确保Intset中的元素**唯一、有序**
2. 具备类型**升级机制**,可以节省内存空间
3. 底层采用**二分查找**方式来查询
#### **Dict:底层 hash 表
我们知道Redis是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过Dict来实现的。
Dict由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)
> size 必须为 **2 的 n 次幂**,用于得到掩码 做 & 运算(和 % 的效果一致)。
当我们向Dict添加键值对时,Redis首先根据 key 计算出hash值(h),然后利用 **h & sizemask** 来计算元素应该存储到数组中的哪个索引位置。
---
**Dict 的扩容**

---
**Dict 的收缩**

---
**Dict 的 rehash**

> 由于为需要主线程操作,若是一次性复制完,十分影响性能,故采用**渐进式rehsh**

> **每次增伤改查,只迁移旧数组中的一列**

---
**总结**
Dict的结构:
- 类似 java 的 HashTable,底层是**数组加链表**来解决哈希冲突
- Dict包含两个哈希表,ht[0]平常用,**ht[1]用来rehash**
Dict的伸缩;
- 当LoadFactor 大于5或者 LoadFactor 大于1并且没有子进程任务时,Dict 扩容
- 当LoadFactor小于0.1时,Dict收缩
- 扩容大小为第一个大于等于used + 1的 2^n
- 收缩大小为第一个大于等于used 的 2^n
- Dict采用**渐进式 rehash**,每次访问 Dict 时执行一次rehash
- rehash 时 ht[0] 只减不增,**新增操作只在 ht[1] 执行**,**其它操作在两个哈希表**
#### ZipList:连续内存块,双端,省内存
zipList 是一种特殊的 “双端链表”,由一系列特殊编码的 **连续内存块组成 **。**可以在任意一端进行压入/弹出操作**,并且该操作的时间复杂度为O(1)。
---

---
**ZipList 的连锁更新问题**
ZipList的每个Entry都包含previous_entry_length来记录上一个节点的大小,长度是1个或5个字节:
- 如果前一节点的长度小于254字节,则采用1个字节来保存这个长度值
- 如果前一节点的长度大于等于254字节,则采用5个字节来保存这个长度值,第一个字节为0xfe,后四个字节才是真实长度数据
> **注意:**并未解决,因为发生的概率十分低,必须连续的字节长度为 250~253 的 entry
**总结**
ZipList特性:
1. 压缩列表的可以看做一种连续内存空间的"双向链表"
2. 列表的节点之间不是通过指针连接,而是记录上一节点和本节点长度来寻址,内存占用较低
3. 如果列表数据过多,导致链表过长,可能影响查询性能
4. 增或删较大数据时**有可能发生连续更新问题**
#### QuickList:是结点为 ZipList 的双端链表

**总结**
QuickList的特点:
- 是**一个节点为 ZipList 的双端链表**
- 节点采用 ZipList,解决了传统链表的内存占用问题
- 控制了 ZipList 大小,解决连续内存空间申请效率问题
- **中间节点可以压缩**,进一步节省了内存
#### SkipList:双向链表,查询效率高
SkipList**(跳表)**首先是链表,但与传统链表相比有几点差异:
- 元素按照**升序**排列存储
- 节点可能 包含多个指针 ,指针跨度不同。

**总结**
skipList的特点:
- 跳跃表是一个**双向链表**,每个节点都包含 score 和 ele 值
- 节点按照 **score 值排序**,score 值一样则按照 ele 字典排序
- 每个节点都可以包含**多层指针**,层数是1到32之间的随机数
- 不同层指针到下一个节点的跨度不同,**层级越高,跨度越大**
- 增删改查效率与红黑树基本一致,实现却更简单
#### RedisObject
Redis中的任意数据类型的**键和值**都会**被封装**为一个RedisObject,也叫做Redis对象,源码如下:
**encoding**
#### 五种数据结构
##### String:RedisObj / SDS
String是Redis中最常见的数据存储类型:
- 其基本编码方式是 **RAW**,基于 *简单动态字符串(SDS)*实现,存储上限为512mb。
- 如果存储的 *SDS 长度小于44字节*,则会采用 **EMBSTR** 编码,此时object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。
-
- > 原因:Redis 内部内存分配,每次分配的空间为 2 的 n 次幂
- 如果存储的字符串是**整数值**,并且大小在 LONG_MAX 范围内,则会采用 **INT** 编码:直接将数据保存在 RedisObject 的 *ptr 指针位置*(刚好8字节),不再需要SDS了。
-
##### List:QuickList
Redis的List类型可以从首、尾操作列表中的元素
- LinkedList:普通链表,可以从双端访问,内存占用较高,内存碎片较多.
- ZipList :压缩列表,可以从双端访问,内存占用低,存储上限低
- QuickList : LinkedList + ZipList,可以从双端访问,内存占用较低,包含多个ZipList,存储上限高
- 在3.2版本之前,Redis采用ZipList和LinkedList来实现List,当元素数量小于512并且元素大小小于64字节时采用zipList编码,超过则采用LinkedList编码。
- 在3.2版本之后,Redis统一采用QuickList来实现List
##### Set:Dict
Set是Redis中的单列集合,满足下列特点:
- 不保证有序性
- 保证元素唯一(可以判断元素是否存在)
- 求交集、并集、差集
可以看出,Set对查询元素的效率要求非常高,思考一下,什么样的数据结构可以满足?
- HashTable,也就是Redis中的Dict,不过Dict是双列集合(可以存键、值对)
Set是Redis中的集合,不一定确保元素有序,*可以满足元素唯一、查询效率要求极高*
- 为了查询效率和唯一性,set采用HT编码(Dict)。Dict中的 key 用来存储元素 value 统一为 null。
- 当存储的所有数据都是 *整数*,并且元素数量不超过set-max-intset-entries时,Set会采用IntSet编码,以节省内存。
##### ZSet:ZipList / SkipList+ Dict
ZSet也就是SortedSet,其中每一个元素都需要指定一个score值和member值:
- 可以根据score值排序后
- member必须唯—
- 可以根据member查询分数
因此,zset底层数据结构必须满足 **键值存储、键必须唯一、可排序** 这几个需求。之前学习的哪种编码结构可以满足?
- **SkipList**: 可以排序,并且可以同时存储 score 和 ele 值(member)
- **HT (Dict)** : 可以键值存储,并且可以根据 key 找 value
---
当**元素数量不多**时,HT 和 SkipList 的优势不明显,而且更耗内存。因此zset还会采用 **ZipList 结构**来节省内存,不过需要同时满足两个条件:
1. 元素数量小于 zset_max_ziplist_entries,默认值128
2. 每个元素都小于 zset_max_ziplist_value 字节,默认值64
**ziplist本身没有排序功能**,而且没有键值对的概念,因此需要有zset通过编码实现:
- ZipList 是连续内存,因此 score 和 element 是紧挨在一起的两个 entry,element 在前 score 在后
- score越小越接近队首,score越大越接近队尾,**按照score值升序排列**
##### Hash:ZipList / Dict
Hash结构与Redis中的Zset非常类似:
- 都是键值存储
- 都需求根据键获取值
- 键必须唯一
**区别**如下:
- zset的键是member,值是score; hash的键和值都是任意值
- zset要根据score排序; hash则无需排序
因此,Hash底层采用的编码与Zset也基本一致,只需要把排序有关的SkipList去掉即可:
- Hash结构**默认采用ZipList编码**,用以节省内存。ZipList中相邻的两个entry分别保存field和value
- 当数据量较大时,Hash结构会转为HT编码,也就是Dict,触发条件有两个:
1. ZipList中的元素数量超过了hash-max-ziplist-entries(默认512)
在 插入元素后 进行判断,若超过,则执行转换成 HT
2. ZipList中的任意entry大小超过了hash-max-ziplist-value (默认64字节)
在 插入元素前 就先进行一次集体插入元素的判断,若中间出现超过的情况,则执行转换;全部判断完,在执行插入指令
### **网络模型
#### 用户空间和内核空间
**用户态 和 内核态 切换**
#### 阻塞lO
#### 非阻塞IO
#### IO多路复用
无论是阻塞IO还是非阻塞lO,用户应用在一阶段都需要调用 recvfrom 来获取数据,差别在于无数据时的处理方案:
- 如果调用recvfrom时,恰好**没有**数据,阻塞IO会使进程阻塞,非阻塞IO使CPU空转,都不能充分发挥CPU的作用。
- 如果调 recvfrom时,恰好**有**数据,则用户进程可以直接进入第二阶段,读取并处理数据
比如服务端处理客户端Socket请求时,在单线程情况下,只能依次处理每一个socket,如果正在处理的socket恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有其它客户端socket都必须等待,性能自然会很差。
那么问题来了:用户进程 **如何知道内核中数据是否就绪** 呢?
**文件描述符**(File Descriptor):简称FD,是一个从0开始递增的无符号整数,用来关联Linux中的一个文件。在Linux中,一切皆文件,例如常规文件、视频、硬件设备等,当然也包括网络套接字(Socket)。
**IO多路复用**︰是利用单个线程来同时监听多个FD,并在某个FD可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源
不过监听FD的方式、通知的方式又有多种实现,常见的有:
- select
- poll
- epoll
**差异**:
- *select 和 poll* 只会通知用户进程有FD就绪,但不确定具体是哪个FD,需要用户进程**逐个遍历FD**来确认
- *epoll* 则会在通知用户进程FD就绪的同时,把已就绪的 FD 写入用户空间
##### Select

> 用户态 和 核心态 内存空间不共享
##### poll
##### epoll
**区别**
select:每次调用,都得把 监听的fd数组,拷贝到内核;结果也需要全部拷贝到用户空间。
epoll:
- 将select功能拆分,直接操作需要监听的fd(epoll_ctl),减少了fd数据的拷贝(如:直接删除已经监听到的fd,而select需要删除后对应的数据后,将新的整个fd数组拷贝到内核)
- 只拷贝就绪的 fd
- 使用了红黑树,查询性能变化幅度不大
##### 总结
**select**模式存在的三个问题:
- 能监听的FD最大不超过1024
- 每次 select 都需要 **把所有要监听的FD都拷贝到内核空间**
- 每次都要 **遍历所有FD** 来判断就绪状态
**poll**模式的问题:
- poll利用 **链表** 解决了 select 中监听 FD 上限的问题,但 **依然要遍历** 所有FD,如果监听较多,性能会下降(**理论上无限**,需要遍历)
**epoll**模式中如何解决这些问题的?
- 基于 epoll 实例中的 **红黑树** 保存要监听的FD,理论上无上限,而且增删改查效率都非常高,*性能**不会**随监听的FD数量增多而下降*
- 每个FD**只需要执行一次epoll_ctl添加到红黑树**,以后每次epoll_wait无需传递任何参数,**无需重复拷贝FD到内核空间**
- 内核会将**就绪的FD直接拷贝到用户空间的指定位置**,用户进程**无需遍历**所有FD就能知道就绪的FD是谁
##### 事件通知机制
当FD有数据可读时,我们调用epoll_wait就可以得到通知。但是事件通知的模式有两种:
- LevelTriggered:简称LT。当FD有数据可读时,会重复通知多次,直至数据处理完成。是Epoll的默认模式。
- EdgeTriggered:简称ET。当FD有数据可读时,只会被通知一次,不管数据是否处理完成
**结论**:
- ET模式避免了LT模式可能出现的**惊群现象**(多个进程同时监听一个 fd,其实只需要一个进程进行读取就行,LT每次重复通知都会唤醒所有的进程)
- **ET模式**最好**结合非阻塞IO读取FD数据**,相比LT会复杂一些 (性能较好)
##### Web 服务流程
#### 信号驱动lO
#### 异步lO
#### **Redis网络模型
**Redis到底是单线程还是多线程?**
- 如果仅仅聊Redis的 **核心业务部分**(命令处理),答案是**单线程**
- 如果是聊**整个Redis**,那么答案就是**多线程**
在Redis版本迭代过程中,在两个重要的时间节点上引入了多线程的支持:
- Redis v4.0:引入多线程异步处理一些耗时较长的任务,例如异步删除命令**unlink**
- Redis v6.0∶在**核心网络模型中引入 多线程**,进一步提高对于多核CPU的利用率
1. **为什么Redis要选择单线程?**
- 抛开持久化不谈,Redis是 **纯内存操作,执行速度非常快,**它的**性能瓶颈是网络延迟**而不是执行速度,因此多线程并不会带来巨大的性能提升。
- 多线程会导致过多的**上下文切换**,带来不必要的**开销**
- 引入多线程会面临 **线程安全问题**,必然要引入 **线程锁** 这样的安全手段,实现复杂度增高,而且性能也会大打折扣
##### 单线程网络模型
来看下Redis **单线程网络模型** 的整个流程:
服务器初始化:
---
---
##### 多线程网络模型

### 通信协议
#### RESP协议
Redis是一个**CS架构**的软件,通信一般分两步(不包括pipeline和PubSub) :
1. 客户端(client)向服务端( server)发送一条命令
2. 服务端解析并执行命令,返回响应结果给客户端
因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。
而在Redis中采用的是RESP ( Redis Serialization Protocol)协议:
- Redis 1.2版本引入了RESP协议
- Redis 2.0版本中成为与Redis服务端通信的标准,称为RESP2
- Redis 6.0版本中,从RESP2升级到了RESP3协议,增加了更多数据类型并且支持6.0的新特性--客户端缓存
但目前,**默认使用的依然是RESP2协议**,也是我们要学习的协议版本(以下简称RESP)。
##### 数据类型
#### 模拟Redis客户端
```java
public class Main {
static Socket s;
static PrintWriter writer;
static BufferedReader reader;
public static void main ( String[] args) {
try {
//1.建立连接
String host = "192.168.150.101";
int port = 6379;
s = new Socket(host,port);
//2.获取输出流、输入流
writer = new Printwriter(new OutputStreamwriter( s.getOutputStream(),StandardCharsets.UTF_8));
reader = new BufferedReader(new InputStreamReader(s.getInputStream(),StandardCharsets.UTF_8));
//3.发出请求
//3.1.获取授权
sendRequest("auth", "123456");
Object obj = handleResponse();
System.out.println( "obj = " + obj);
//3.2.set name 虎哥
sendRequest("set", "name", "yoshino");
//4.解析响应
obj = handleResponse();
System.out.println( "obj = " + obj);
sendRequest("get", "name");
Object obj = handleResponse();
System.out.println( "obj = " + obj);
} catch (IOException e) {
e.printStackTrace();
} finally {
//5.释放连接
try {
if ( reader != null) reader.close();
if (writer != null) writer.close();
if (s != null) s.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
private static Object handleResponse() throws IOException {
//读取首字节
int prefix = reader.read();
//判断数据类型标示
switch (prefix) {
case '+'://单行字符串,直接读一行
return reader.readLine();
case '-'://异常,也读一行
throw new RuntimeException(reader.readLine());
case ':'://数字
return Long.parseLong( reader.readLine());
case '$'://多行字符串
//先读长度
int len = Integer.parseInt(reader.readLine());
if ( len == -1){
return null;
}
if (len == 0){
return "";
}
//再读数据,读len个字节。我们假设没有特殊字符,所以读一行(简化)
return reader.readLine();
case '*':
return readBulkString();
default:
throw new RuntimeException("错误的数据格式! ");
}
return null;
}
private static Object readBulkString() throws IOException {
//获取数组大小
int len = Integer.parseInt(reader.readLine());
if (len <= 0) {
return null;
}
//遍历,依次读取每个元素
List<0bject> list = new ArrayList<> (len);//遍历,依次读取每个元素
for (int i = 0; i < len; i++){
list.add (handleResponse());
}
return list;
}
private static void sendRequest(String ... args) {
writer.println("*" + args.length);
for(String arg : args) {
writer.println("$" + arg.getBytes (StandardCharsets.UTF_8).length);
writer.println(arg);
}
writer.flush();
}
}
```
### **内存策略
**Redis内存回收**
Redis之所以性能强,最主要的原因就是基于内存存储。然而单节点的Redis其内存大小不宜过大,会影响持久化或主从同步性能。
我们可以通过修改配置文件来设置Redis的最大内存:
```
# 格式:
# maxmemory
#例如∶
maxmemory 1gb
```
当内存使用达到上限时,就无法存储更多数据了。
#### 过期策略
**这里有两个问题需要我们思考:**
1. Redis是如何知道一个key是否过期呢?
利用两个Dict分别记录key-value对及key-ttl对
2. 是不是TTL到期就立即删除了呢?
- 惰性删除
- 顾名思义并不是在TTL到期后就立刻删除,而是在 **访问一个 key ** 的时候,**检查该 key 的存活时间** ,如果已经过期才执行删除
-
- 问题:已过期但一直未被访问的数据堆积
- 周期删除
- 通过一个 **定时任务** ,周期性的 **抽样部分过期的 key** ,然后执行删除。执行周期有两种
- Redis会设置一个定时任务serverCron(),按照server.hz的频率来执行过期key清理,模式为SLOW
- Redis的每个事件循环前会调用 beforeSleep() 函数,执行过期key清理,模式为FAST
-
-
**DB结构**
**总结**
RedisKey的TTL记录方式:
- 在RedisDB中通过一个 Dict 记录每个 Key 的 TTL 时间
过期 key 的删除策略:
- **惰性**清理: 每次**查找 key 时**判断是否过期,如果过期则删除
- **定期**清理: **定期抽样**部分key,判断是否过期,如果过期则删除。
定期清理的两种模式:
- SLOW 模式执行频率默认为10,每次不超过25ms
- FAST 模式执行频率不固定,但两次间隔不低于2ms,每次耗时不超过1ms
#### 淘汰策略
**内存淘汰**︰就是当Redis内存使用**达到设置的阈值**时,Redis **主动挑选部分key** 删除以释放更多内存的流程。
---
**performEvictions() 方法 流程图**

## TODO
1. 逻辑过期实现的缓存击穿 和 缓存穿透 没有兼容
- 访问店铺时,调用了一个 url,可以将 访问逻辑过期实现的 url 加上 hot (原因:击穿需要预先手动添加,为如热点店铺)
## 知识点
### spring 框架事务失效问题(aop)
> 事务实现需要依赖其代理对象(由spring管理创建)中的方法
>
> 该图调用是使用 this. 获取的,事务失效
还需要额外添加:
```xml
org.aspectj
aspectjweaver
```

### 获取上传文件代码
```、java
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
@GetMapping("/blog/delete")
public Result deleteBlogImg(@RequestParam("name") String filename) {
File file = new File(SystemConstants.IMAGE_UPLOAD_DIR, filename);
if (file.isDirectory()) {
return Result.fail("错误的文件名称");
}
FileUtil.del(file);
return Result.ok();
}
private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);
// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;
// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}
// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}
public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = " \\nginx-1.18.0\\html\\hmdp\\imgs\\"; // nginx 前端服务器存图片的位置
public static final String USER_NICK_NAME_PREFIX = "user_";
public static final int DEFAULT_PAGE_SIZE = 5;
public static final int MAX_PAGE_SIZE = 10;
}
```
### fork
- fork 是linux指令,创建子进程,只复制页表数据给子进程
- 主进程 - 页表虚拟内存 - 真实内存
- fork采用的是copy-on-write技术:
- 当主进程执行读操作时,访问共享内存;
- 当主进程执行写操作时,则会拷贝一份数据,执行写操作。