# redis-replicator
**Repository Path**: feijinzhang/redis-replicator
## Basic Information
- **Project Name**: redis-replicator
- **Description**: Redis-replicator是一款用java写的redis rdb以及命令解析软件.
它可以实时解析,过滤,广播rdb以及command事件
支持redis2.8+,内部采用psync命令同步数据
支持rdb version 6,rdb version 7
支持注册命令解析器.
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 48
- **Created**: 2019-03-01
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
内容索引([Table of Contents](./README.md))
=================
* [1. Redis-replicator](#1-redis-replicator)
* [1.1. 简介](#11-简介)
* [1.2. QQ讨论组](#12-qq讨论组)
* [1.3. 联系作者](#13-联系作者)
* [2. 安装](#2-安装)
* [2.1. 安装前置条件](#21-安装前置条件)
* [2.2. Maven依赖](#22-maven依赖)
* [2.3. 安装源码到本地maven仓库](#23-安装源码到本地maven仓库)
* [3. 简要用法](#3-简要用法)
* [3.1. 通过socket同步](#31-通过socket同步)
* [3.2. 读取并解析rdb文件](#32-读取并解析rdb文件)
* [3.3. 读取并解析aof文件](#33-读取并解析aof文件)
* [3.4. 读取混合格式文件](#34-读取混合格式文件)
* [3.4.1. redis混合文件格式](#341-redis混合文件格式)
* [3.4.2. redis混合文件格式配置](#342-redis混合文件格式配置)
* [3.4.3. 应用Replicator读取混合格式文件](#343-应用replicator读取混合格式文件)
* [3.5. 备份远程redis的rdb文件](#35-备份远程redis的rdb文件)
* [3.6. 备份远程redis的实时命令](#36-备份远程redis的实时命令)
* [4. 高级主题](#4-高级主题)
* [4.1. 命令扩展](#41-命令扩展)
* [4.1.1. 首先写一个command类](#411-首先写一个command类)
* [4.1.2. 然后写一个command parser](#412-然后写一个command-parser)
* [4.1.3. 注册这个command parser到replicator](#413-注册这个command-parser到replicator)
* [4.1.4. 处理这个注册的command事件](#414-处理这个注册的command事件)
* [4.2. Module扩展(redis-4.0及以上)](#42-module扩展redis-40及以上)
* [4.2.1. 编译redis源码中的测试modules](#421-编译redis源码中的测试modules)
* [4.2.2. 打开redis配置文件redis.conf中相关注释](#422-打开redis配置文件redisconf中相关注释)
* [4.2.3. 写一个module parser](#423-写一个module-parser)
* [4.2.4. 再写一个command parser](#424-再写一个command-parser)
* [4.2.5. 注册module parser和command parser并处理相关事件](#425-注册module-parser和command-parser并处理相关事件)
* [4.3. 编写你自己的rdb解析器](#43-编写你自己的rdb解析器)
* [5. 其他主题](#5-其他主题)
* [5.1. 内置的Command Parser](#51-内置的command-parser)
* [5.2. 当出现EOFException](#52-当出现eofexception)
* [5.3. 跟踪事件日志log](#53-跟踪事件日志log)
* [5.4. SSL安全链接](#54-ssl安全链接)
* [5.5. redis认证](#55-redis认证)
* [5.6. 避免全量同步](#56-避免全量同步)
* [5.7. FullSyncEvent事件](#57-fullsyncevent事件)
* [5.8. 处理原始字节数组](#58-处理原始字节数组)
* [6. 贡献者](#6-贡献者)
* [7. 相关引用](#7-相关引用)
# 1. Redis-replicator
## 1.1. 简介
[](https://gitter.im/leonchen83/redis-replicator?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[](https://travis-ci.org/leonchen83/redis-replicator)
[](https://coveralls.io/github/leonchen83/redis-replicator?branch=master)
[](https://maven-badges.herokuapp.com/maven-central/com.moilioncircle/redis-replicator)
[](http://www.javadoc.io/doc/com.moilioncircle/redis-replicator)
[](https://github.com/leonchen83/redis-replicator/blob/master/LICENSE)
Redis Replicator是一款rdb解析以及命令解析的工具. 此工具完整实现了redis replication协议.
支持sync,psync,psync2等三种同步命令. 还支持远程rdb文件备份以及数据同步等功能.
此文中提到的 `命令` 特指redis中的写命令,不包括读命令(比如 `get`,`hmget`)
## 1.2. QQ讨论组
**479688557**
## 1.3. 联系作者
**chen.bao.yi@qq.com**
# 2. 安装
## 2.1. 安装前置条件
jdk 1.7+
maven-3.2.3+
redis 2.4 - 4.0-rc2
## 2.2. Maven依赖
```java
com.moilioncircle
redis-replicator
2.0.0-rc3
```
## 2.3. 安装源码到本地maven仓库
```
$mvn clean install package -Dmaven.test.skip=true
```
# 3. 简要用法
## 3.1. 通过socket同步
```java
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
```
## 3.2. 读取并解析rdb文件
```java
Replicator replicator = new RedisReplicator(new File("dump.rdb"), FileType.RDB, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
System.out.println(kv);
}
});
replicator.open();
```
## 3.3. 读取并解析aof文件
```java
Replicator replicator = new RedisReplicator(new File("appendonly.aof"), FileType.AOF, Configuration.defaultSetting());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
```
## 3.4. 读取混合格式文件
### 3.4.1. redis混合文件格式
```java
[RDB file][AOF tail]
```
### 3.4.2. redis混合文件格式配置
```java
aof-use-rdb-preamble yes
```
### 3.4.3. 应用Replicator读取混合格式文件
```java
final Replicator replicator = new RedisReplicator(new File("appendonly.aof"), FileType.MIXED,
Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
```
## 3.5. 备份远程redis的rdb文件
```java
final FileOutputStream out = new FileOutputStream(new File("./dump.rdb"));
final RawByteListener rawByteListener = new RawByteListener() {
@Override
public void handle(byte... rawBytes) {
try {
out.write(rawBytes);
} catch (IOException ignore) {
}
}
};
//保存server端传来的rdb文件
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
replicator.addRawByteListener(rawByteListener);
}
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
replicator.removeRawByteListener(rawByteListener);
try {
out.close();
replicator.close();
} catch (IOException ignore) {
}
}
});
replicator.open();
//检查写入的rdb文件
replicator = new RedisReplicator(new File("./dump.rdb"), FileType.RDB, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
System.out.println(kv);
}
});
replicator.open();
```
## 3.6. 备份远程redis的实时命令
```java
final FileOutputStream out = new FileOutputStream(new File("./appendonly.aof"));
final RawByteListener rawByteListener = new RawByteListener() {
@Override
public void handle(byte... rawBytes) {
try {
out.write(rawBytes);
} catch (IOException ignore) {
}
}
};
//保存1000条命令
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
}
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
replicator.addRawByteListener(rawByteListener);
}
});
final AtomicInteger acc = new AtomicInteger(0);
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (acc.incrementAndGet() == 1000) {
try {
out.close();
replicator.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
replicator.open();
//检查写入的aof文件
replicator = new RedisReplicator(new File("./appendonly.aof"), FileType.AOF, Configuration.defaultSetting());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
```
# 4. 高级主题
## 4.1. 命令扩展
### 4.1.1. 首先写一个command类
```java
public static class YourAppendCommand implements Command {
private final String key;
private final String value;
public YourAppendCommand(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "YourAppendCommand{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}
}
```
### 4.1.2. 然后写一个command parser
```java
public class YourAppendParser implements CommandParser {
@Override
public YourAppendCommand parse(Object[] command) {
return new YourAppendCommand((String) command[1], (String) command[2]);
}
}
```
### 4.1.3. 注册这个command parser到replicator
```java
Replicator replicator = new RedisReplicator("127.0.0.1",6379,Configuration.defaultSetting());
replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());
```
### 4.1.4. 处理这个注册的command事件
```java
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if(command instanceof YourAppendCommand){
YourAppendCommand appendCommand = (YourAppendCommand)command;
//你的业务代码写在这
}
}
});
```
## 4.2. Module扩展(redis-4.0及以上)
### 4.2.1. 编译redis源码中的测试modules
```java
$cd /path/to/redis-4.0-rc2/src/modules
$make
```
### 4.2.2. 打开redis配置文件redis.conf中相关注释
```java
loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so
```
### 4.2.3. 写一个module parser
```java
public class HelloTypeModuleParser implements ModuleParser {
@Override
public HelloTypeModule parse(RedisInputStream in) throws IOException {
DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
int elements = (int) parser.loadUnSigned();
long[] ary = new long[elements];
int i = 0;
while (elements-- > 0) {
ary[i++] = parser.loadSigned();
}
return new HelloTypeModule(ary);
}
}
public class HelloTypeModule implements Module {
private final long[] value;
public HelloTypeModule(long[] value) {
this.value = value;
}
public long[] getValue() {
return value;
}
@Override
public String toString() {
return "HelloTypeModule{" +
"value=" + Arrays.toString(value) +
'}';
}
}
```
### 4.2.4. 再写一个command parser
```java
public class HelloTypeParser implements CommandParser {
@Override
public HelloTypeCommand parse(Object[] command) {
String key = (String) command[1];
long value = Long.parseLong((String) command[2]);
return new HelloTypeCommand(key, value);
}
}
public class HelloTypeCommand implements Command {
private final String key;
private final long value;
public long getValue() {
return value;
}
public String getKey() {
return key;
}
public HelloTypeCommand(String key, long value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "HelloTypeCommand{" +
"key='" + key + '\'' +
", value=" + value +
'}';
}
}
```
### 4.2.5. 注册module parser和command parser并处理相关事件
```java
public static void main(String[] args) throws IOException {
RedisReplicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
if (kv instanceof KeyStringValueModule) {
System.out.println(kv);
}
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (command instanceof HelloTypeCommand) {
System.out.println(command);
}
}
});
replicator.open();
}
```
## 4.3. 编写你自己的rdb解析器
* 写一个类继承 `RdbVisitor`抽象类
* 通过`Replicator`的`setRdbVisitor`方法注册你自己的 `RdbVisitor`.
# 5. 其他主题
## 5.1. 内置的Command Parser
| **命令** |**命令** | **命令** |**命令**|**命令** | **命令** |
| ---------- | ------------ | ---------------| ---------- | ------------ | ---------------|
| **PING** | **APPEND** | **SET** | **SETEX** | **MSET** | **DEL** |
| **SADD** | **HMSET** | **HSET** | **LSET** | **EXPIRE** | **EXPIREAT** |
| **GETSET** | **HSETNX** | **MSETNX** | **PSETEX** | **SETNX** | **SETRANGE** |
| **HDEL** | **UNLINK** | **SREM** | **LPOP** | **LPUSH** | **LPUSHX** |
| **LRem** | **RPOP** | **RPUSH** | **RPUSHX** | **ZREM** | **RENAME** |
| **INCR** | **DECR** | **INCRBY** |**PERSIST** | **SELECT** | **FLUSHALL** |
|**FLUSHDB** | **HINCRBY** | **ZINCRBY** | **MOVE** | **SMOVE** | **PFADD** |
|**PFCOUNT** | **PFMERGE** | **SDIFFSTORE** |**RENAMENX**| **PEXPIREAT**|**SINTERSTORE** |
|**ZADD** | **BITFIELD** |**SUNIONSTORE** |**RESTORE** | **LINSERT** |**ZINTERSTORE** |
|**GEOADD** | **PEXPIRE** |**ZUNIONSTORE** |**EVAL** | **SCRIPT** |**BRPOPLPUSH** |
|**PUBLISH** | **BITOP** |**SETBIT** | | | |
## 5.2. 当出现EOFException
* 调整redis server中的以下配置. 相关配置请参考 [redis.conf](https://raw.githubusercontent.com/antirez/redis/3.0/redis.conf)
```java
client-output-buffer-limit slave 0 0 0
```
**警告: 这个配置可能会使redis-server中的内存溢出**
## 5.3. 跟踪事件日志log
* 日志级别调整成 **debug**
* 如果你项目中使用log4j2,请加入如下Logger到配置文件:
```xml
```
```java
Configuration.defaultSetting().setVerbose(true);
```
## 5.4. SSL安全链接
```java
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "your_type");
Configuration.defaultSetting().setSsl(true);
//可选设置
Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
Configuration.defaultSetting().setSslParameters(sslParameters);
Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
```
## 5.5. redis认证
```java
Configuration.defaultSetting().setAuthPassword("foobared");
```
## 5.6. 避免全量同步
* 调整redis-server中的如下配置
```java
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-period
```
`repl-ping-slave-period` **必须** 小于 `Configuration.getReadTimeout()`
默认的 `Configuration.getReadTimeout()` 是30秒.
## 5.7. FullSyncEvent事件
```java
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
final long start = System.currentTimeMillis();
final AtomicInteger acc = new AtomicInteger(0);
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
System.out.println("pre full sync");
}
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
acc.incrementAndGet();
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
long end = System.currentTimeMillis();
System.out.println("time elapsed:" + (end - start));
System.out.println("rdb event count:" + acc.get());
}
});
replicator.open();
```
## 5.8. 处理原始字节数组
* 当kv.getValueRdbType() == 0时, 可以得到原始的字节数组. 在某些情况(比如HyperLogLog)下会很有用.
```java
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair> kv) {
if (kv instanceof KeyStringValueString) {
KeyStringValueString ksvs = (KeyStringValueString) kv;
System.out.println(Arrays.toString(ksvs.getRawBytes()));
}
}
});
replicator.open();
```
# 6. 贡献者
* Leon Chen
* Adrian Yao
# 7. 相关引用
* [rdb.c](https://github.com/antirez/redis/blob/unstable/src/rdb.c)
* [Redis RDB File Format](https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format)
* [Redis Protocol specification](http://redis.io/topics/protocol)
* [Redis Replication](http://redis.io/topics/replication)