# Go-Pocket-ETL
**Repository Path**: onyx/go-pocket-etl
## Basic Information
- **Project Name**: Go-Pocket-ETL
- **Description**: 一个为开发者和运维人员设计的、轻量级、高性能的命令行ETL工具。
欢迎Star,Fork, 感谢感谢
- **Primary Language**: Go
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: https://gitee.com/onyx/go-pocket-etl
- **GVP Project**: No
## Statistics
- **Stars**: 11
- **Forks**: 5
- **Created**: 2025-10-19
- **Last Updated**: 2025-12-01
## Categories & Tags
**Categories**: big-data
**Tags**: None
## README
# Go-Pocket-ETL

[](https://opensource.org/licenses/Apache-2.0)
**一个为开发者和运维人员设计的、轻量级、高性能的命令行ETL工具。**
无需复杂的配置和部署,通过一个简单的二进制文件和一个YAML配置文件,即可快速实现常见的数据提取、转换和加载任务。
### Github 地址:
https://github.com/changhe626/go-pocket-etl
---
## 为什么选择 Go-Pocket-ETL?
在日常的开发和运维工作中,我们经常会遇到一些中小型的数据处理需求:
- 需要将一个 CSV 文件的数据快速导入到数据库中进行分析。
- 需要将数据库中的某些数据脱敏后,同步到另一个系统或消息队列中。
- 需要对一个 JSON 数据文件进行清洗、筛选和格式转换。
对于这些任务,使用重量级的大数据平台(如 Spark, Flink)显得“杀鸡用牛刀”,而手写一次性的脚本又缺乏可复用性和可维护性。
**Go-Pocket-ETL 正是为了解决这个痛点而生。** 它为您提供了一个:
- **简单直接的解决方案**: 你只需要关心定义数据“从哪里来”、“做什么转换”以及“到哪里去”。
- **高性能的执行引擎**: 基于 Go 语言的并发模型,以流式方式处理数据,内存占用低,执行效率高。
- **易于扩展的插件化架构**: 可以非常方便地添加新的数据源和数据汇,满足您特定的业务需求。
### 核心理念
+----------+ +----------------+ +--------+
| | | | | |
| Source +----->+ Processors +----->+ Sink |
| (数据源) | | (处理器链) | | (数据汇)|
| | | | | |
+----------+ +----------------+ +--------+
Source (数据源): 数据从哪里来?(例如:一个 CSV 文件, 一个数据库查询)
•
Processors (处理器): 对数据做什么?(例如:筛选行, 脱敏字段, 重命名列)
•
Sink (数据汇): 数据到哪里去?(例如:另一个数据库, Kafka, 控制台)
你只需要在 YAML 文件中定义好这三部分,剩下的交给 Go-Pocket-ETL 的高性能并发引擎来完成。
### 为什么使用 Go 开发?
选择 Go 语言作为开发语言,为本项目带来了几项核心优势:
1. **极致的性能**: Go 语言原生支持的 Goroutines 和 Channels,让我们能够以非常低的成本构建出高效的并发数据处理管道,最大限度地利用多核 CPU 资源。
2. **静态编译与部署**: Go 可以将整个项目(包括所有依赖)编译成一个**无依赖的、单一的可执行文件**。这意味着你可以将这个文件复制到任何同架构的服务器上直接运行,无需安装任何运行时环境(如 JVM, Python 解释器等),极大地简化了部署和分发。
3. **跨平台能力**: Go 语言的工具链可以轻松地将代码交叉编译为不同操作系统(Linux, Windows, macOS)的可执行文件,让我们的工具可以服务于所有平台的开发者。
### 功能特征
- **极其简单**: 零依赖,单个二进制文件 + 一个 YAML 配置即可运行。
- **高性能**: 基于 Go 的并发模型,以流式方式处理数据,内存占用低。
- **插件化架构**: 内置丰富的组件,并且可以轻松地添加自定义的 Source, Processor, 或 Sink。
- **跨平台**: 可在 Windows, macOS, Linux 上运行。
- **为开发者设计**: 清晰的日志、配置校验功能和详细的文档,让你完全掌控你的数据管道。
## 支持的组件
| 类型 | 名称 | 描述 |
| :--- | :--- | :--- |
| **数据源 (Source)** | `csv` | 从本地 CSV 文件读取数据。 |
| | `json` | 从本地 JSON 文件(对象数组)流式读取数据。 |
| | `mysql` | 执行 SQL 查询并从 MySQL 数据库读取结果。 |
| | `pgsql` | 执行 SQL 查询并从 PostgreSQL 数据库读取结果。 |
| **处理器 (Processor)** | `select_columns` | 选择要保留的列。 |
| | `filter_rows` | 根据列值和条件过滤数据行。 |
| | `rename_column` | 对列进行重命名。 |
| | `convert_type` | 转换列的数据类型(例如,string 到 integer)。 |
| | `mask_data` | 对敏感数据进行哈希脱敏(MD5, SHA256)。 |
| **数据汇 (Sink)** | `mysql` | 批量写入数据到 MySQL。 |
| | `pgsql` | 使用 `COPY` 命令高效地批量写入数据到 PostgreSQL。 |
| | `kafka` | 将数据作为 JSON 消息批量发送到 Kafka topic。 |
| | `stdout` | 将处理后的数据以格式化的 JSON 打印到控制台,便于调试。 |
## 快速开始 (Quick Start)
让我们通过一个最简单的例子,在 3 分钟内体验 Go-Pocket-ETL 的核心功能:将一个 CSV 文件的数据,经过处理后,打印到你的控制台。
### 第一步: 创建一个 CSV 数据文件
在你的项目目录中,创建一个名为 `my_data.csv` 的文件,并复制以下内容:
```csv
id,name,email,age
1,zhang san,changhe626@163.com,25
2,li si,lisi@example.com,35
3,wang wu,wangwu@example.com,28
```
### 第二步: 创建一个 YAML 配置文件
在同一个目录中,创建一个名为 `my_config.yml` 的文件,并复制以下内容:
```yaml
# my_config.yml
source:
type: csv
params:
file_path: "./my_data.csv" # 指向你刚刚创建的 CSV 文件
has_header: true
processors:
# 我们只选择年龄大于 26 岁的用户
- type: filter_rows
params:
column: "age"
operator: ">"
value: 26
sink:
# 将最终结果打印到控制台,便于查看
type: stdout
```
### 第三步: 运行!
打开你的终端,确保你在项目的根目录中,然后运行以下命令:
```bash
# 如果你已经编译了二进制文件
./go-pocket-etl run -c my_config.yml
# 或者,在开发时直接使用 go run (更方便)
go run ./cmd/go-pocket-etl run -c my_config.yml
```
### 预期输出
如果一切顺利,你将在控制台中看到两条经过筛选和格式化的 JSON 数据:
```json
{
"age": "35",
"email": "lisi@example.com",
"id": "2",
"name": "li si"
}
{
"age": "28",
"email": "wangwu@example.com",
"id": "3",
"name": "wang wu"
}
```
恭喜你!你已经成功地运行了你的第一个 ETL 管道。
---
## 安装与使用
### 1. 从源码编译
首先,确保你已经安装了 Go (版本 1.18 或更高)。
```bash
# 克隆项目
git clone https://github.com/your-username/go-pocket-etl.git
cd go-pocket-etl
# 下载依赖并编译
go mod tidy
go build -o go-pocket-etl ./cmd/go-pocket-etl
```
现在,你的项目根目录下应该有一个名为 `go-pocket-etl` (在 Windows 上是 `go-pocket-etl.exe`) 的可执行文件。
### 2. 运行任务
- **使用编译好的二进制文件:**
- 在 Linux / macOS 上: `./go-pocket-etl run -c ./configs/your_config.yml`
- 在 Windows 上: `.\go-pocket-etl.exe run -c .\configs\your_config.yml`
- **在本地开发时直接运行 (推荐):**
```bash
# 确保 -c 后面的路径是正确的!
go run ./cmd/go-pocket-etl run -c ./configs/your_config.yml
```
### 命令行用法
run - 运行ETL任务
go-pocket-etl run --config /path/to/your_config.yml
# 简写
go-pocket-etl run -c your_config.yml
### 3. 校验配置文件
在运行复杂任务前,先用 `validate` 命令检查配置。它会尝试初始化所有组件并报告错误,但不会真正处理数据。
```bash
./go-pocket-etl validate -c ./configs/your_config.yml
```
## 配置文件详解
配置文件是驱动 Go-Pocket-ETL 的核心。它主要由 `source`, `processors`, `sink`, 和 `pipeline` 四个部分组成。
### `source` (数据源)
定义数据从哪里来。
- `type`: 指定使用哪个数据源组件 (例如 `csv`, `mysql`)。
- `params`: 一个包含该 `type` 所需特定参数的 map。
### `processors` (处理器)
定义数据需要经过的、按顺序执行的转换步骤 (可选)。数据记录会像在流水线上一样,依次通过这个数组中的每一个处理器。
### `sink` (数据汇)
定义数据最终到哪里去。
- `type`: 指定使用哪个数据汇组件 (例如 `mysql`, `kafka`, `stdout`)。
- `params`: 包含该 `type` 所需的特定参数。
### `pipeline` (管道配置)
一个可选部分,用于对管道的性能进行微调。
- `batch_size`: Sink 端批量写入的大小。默认值为 `100`。
- `channel_size`: 管道中每个通道的缓冲大小。默认值为 `1000`。
---
### 关键参数详解
#### ⚠️ `dsn` (数据库连接字符串)
这是最常见的出错点!如果你的数据库密码包含特殊字符 (如 `\`, `#`, `@`, `?` 等),**强烈建议你用单引号 (`'`) 将整个 DSN 字符串包裹起来**,以防止 YAML 解析器将其作为特殊语法进行转义。
**错误示例 (密码为 `my#pass\word`):**
```yaml
dsn: "root:my#pass\word@tcp(127.0.0.1:3306)/mydb" # 会报错!
```
**正确示例:**
```yaml
dsn: 'root:my#pass\word@tcp(127.0.0.1:3306)/mydb' # 正确!
```
#### `column_mapping` (列映射)
这个参数告诉程序,**“把这个原始列,存到那个目标列里去”**。
- **`前面` (键)**: 是**流入 Sink 的数据**的列名。
- **`后面` (值)**: 是**目标数据库**的列名。
**重要**: 如果你使用了 `processors`,那么 `column_mapping` 的“键”必须是**经过所有处理器处理后,最终剩下的列名**。
**图解:**
`CSV (name)` -> `处理器 (rename to full_name)` -> **`流入 Sink 的数据 (full_name)`** -> `column_mapping` -> `数据库 (user_name)`
**示例:**
```yaml
processors:
- type: rename_column
params:
mapping:
name: "full_name" # 在这里,name 被重命名为 full_name
sink:
type: mysql
params:
table: "users"
column_mapping:
# 所以,这里的键必须是 full_name,而不是原始的 name
full_name: "user_name"
id: "user_id"
```
### 组件参考手册
这是所有内置组件的参数详解。
---
#### **数据源 (Sources)**
csv - 从 CSV 文件读取
```yaml
source:
type: csv
params:
file_path: "/path/to/your/data.csv" # (必需) 文件路径
has_header: true # (可选, 默认 true) 第一行是否为表头
delimiter: "," # (可选, 默认 ',') 分隔符
```
json - 从 JSON 文件读取
**注意**: 只支持 JSON 数组格式 `[{"a":1}, {"b":2}]`,不支持 JSON Lines。
```yaml
source:
type: json
params:
file_path: "/path/to/your/data.json" # (必需) 文件路径
```
mysql - 从 MySQL 读取
```yaml
source:
type: mysql
params:
dsn: 'user:password@tcp(127.0.0.1:3306)/database' # (必需) 数据库连接字符串
query: "SELECT id, name, email FROM users WHERE status='active';" # (必需) 查询语句
```
pgsql - 从 PostgreSQL 读取
```yaml
source:
type: pgsql
params:
dsn: 'postgres://user:password@localhost/dbname?sslmode=disable' # (必需) 数据库连接字符串
query: "SELECT id, name, email FROM users WHERE status='active';" # (必需) 查询语句
```
---
#### **处理器 (Processors)**
select_columns - 选择要保留的列 (白名单)
```yaml
processors:
- type: select_columns
params:
columns: ["id", "email", "status"] # (必需) 要保留的列名列表
```
rename_column - 重命名列
```yaml
processors:
- type: rename_column
params:
mapping: # (必需) 一个或多个重命名映射
"old_name_1": "new_name_1"
"old_name_2": "new_name_2"
```
filter_rows - 根据条件过滤行
**比较逻辑**: 优先尝试按数字比较,如果失败则按字符串比较。
- **数字操作符**: `=`, `==`, `!=`, `<>`, `>`, `>=`, `<`, `<=`
- **字符串操作符**: `=`, `==`, `!=`, `<>`
```yaml
processors:
- type: filter_rows
params:
column: "age" # (必需) 要比较的列
operator: ">" # (必需) 比较操作符
value: 25 # (必需) 要比较的值
```
convert_type - 转换列的数据类型
**支持的类型**: `integer` (`int`), `float` (`double`), `string`, `boolean` (`bool`)
```yaml
processors:
- type: convert_type
params:
column: "age_str" # (必需) 要转换的列
type: "integer" # (必需) 目标类型
```
mask_data - 对敏感数据进行哈希脱敏
**支持的方法**: `md5`, `sha256`
**安全警告**: 这是单向哈希,不是加密,请勿用于需要可逆的场景。
```yaml
processors:
- type: mask_data
params:
column: "email" # (必需) 要脱敏的列
method: "sha256" # (必需) 哈希算法
```
---
#### **数据汇 (Sinks)**
stdout - 打印到控制台 (用于调试)
```yaml
sink:
type: stdout
params:
format: "json" # (可选, 默认 "json") 目前只支持 json
```
mysql - 写入到 MySQL
```yaml
sink:
type: mysql
params:
dsn: 'user:password@tcp(127.0.0.1:3306)/database' # (必需)
table: "target_table" # (必需)
column_mapping: # (必需) 列映射
"record_field_1": "db_column_1"
"record_field_2": "db_column_2"
```
pgsql - 写入到 PostgreSQL
```yaml
sink:
type: pgsql
params:
dsn: 'postgres://user:password@localhost/dbname?sslmode=disable' # (必需)
table: "target_table" # (必需)
column_mapping: # (必需) 列映射
"record_field_1": "db_column_1"
"record_field_2": "db_column_2"
```
kafka - 写入到 Kafka
```yaml
sink:
type: kafka
params:
brokers: ["kafka-1:9092", "kafka-2:9092"] # (必需) Broker 地址列表
topic: "my-topic" # (必需) 目标 Topic
```
---
## 常见错误与调试
- **`did not find expected hexdecimal number`**: 99% 的可能是你的 `dsn` 字符串中包含特殊字符,并且没有用**单引号**包裹。请检查你的 `dsn` 配置。
- **`config is missing or has invalid 'column_mapping'`**: 检查你的 `sink` 配置中是否正确地包含了 `column_mapping` 部分,并注意 YAML 的缩进。
- **`no such file or directory`**: `source` 中配置的 `file_path` 不正确,请使用相对或绝对路径确保程序能找到文件。
- **`access denied for user ...`**: 数据库的用户名或密码错误,请检查 `dsn` 配置。
- **`Unknown column '...' in 'field list'`**: `column_mapping` 的“值”部分,即你指定的数据库目标列名,在你的数据库表中不存在。
## 如何测试
项目包含单元测试和集成测试。
### 1. 运行所有测试
在项目根目录下运行以下命令:
```bash
# 这会自动寻找并执行所有 *_test.go 文件
go test ./...
```
**注意**: 运行集成测试需要你的本地环境中已经安装并运行了 **Docker**。测试会在运行时动态启动临时的数据库容器。
### 2. 仅运行单元测试
如果你没有安装 Docker,或者只想快速运行不依赖外部服务的单元测试(例如,测试所有的 `processors`),可以使用 `-short` 标志。
```bash
# -short 标志会跳过被标记为集成测试的用例
go test ./... -short
```
## 如何扩展 (Adding New Components)
Go-Pocket-ETL 的核心优势之一就是其出色的扩展性。你可以通过实现 `core` 包中定义的接口,轻松地添加自己的组件。
### 1. 定义核心接口
所有的组件都必须实现以下三个核心接口之一:
- `core.Source`: 定义了 `Open()`, `Read()`, `Close()` 方法。
- `core.Processor`: 定义了 `Open()`, `Process()`, `Close()` 方法。
- `core.Sink`: 定义了 `Open()`, `Write()`, `Close()` 方法。
### 2. 实现你的组件
假设你想添加一个 `mongodb` Sink。在 `pkg/sinks/` 目录下创建一个新的 `mongodb/sink.go` 文件,并实现 `core.Sink` 接口。
### 3. 向工厂进行自我注册
在你的组件文件中,创建一个返回 `core.Sink` 接口的构造函数,并在 `init()` 函数中向组件工厂注册你的新组件。
```go
// in pkg/sinks/mongodb/sink.go
import "github.com/example-user/go-pocket-etl/pkg/factory"
func init() {
// "mongodb" 是用户在 a.yml 文件中使用的 type 名称
factory.RegisterSink("mongodb", NewSink)
}
func NewSink() core.Sink {
return &MongoSink{}
}
```
### 4. 让主程序加载你的包
最后,在 `cmd/go-pocket-etl/main.go` 文件的 `import` 部分,添加一个匿名导入,以确保你的新组件包在程序启动时被加载。
```go
// in cmd/go-pocket-etl/main.go
import (
// ... other imports
_ "github.com/example-user/go-pocket-etl/pkg/sinks/mongodb" // Add this line
)
```
**完成了!** 现在,用户就可以在他们的 YAML 配置文件中使用 `type: mongodb` 了。
## 贡献
欢迎任何形式的贡献!请随时提交 Pull Request 或创建 Issue。
## 许可证
本项目采用 [Apache 2.0 许可证](LICENSE)。