# Go-Pocket-ETL **Repository Path**: onyx/go-pocket-etl ## Basic Information - **Project Name**: Go-Pocket-ETL - **Description**: 一个为开发者和运维人员设计的、轻量级、高性能的命令行ETL工具。 欢迎Star,Fork, 感谢感谢 - **Primary Language**: Go - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: https://gitee.com/onyx/go-pocket-etl - **GVP Project**: No ## Statistics - **Stars**: 11 - **Forks**: 5 - **Created**: 2025-10-19 - **Last Updated**: 2025-12-01 ## Categories & Tags **Categories**: big-data **Tags**: None ## README # Go-Pocket-ETL ![Go Version](https://img.shields.io/badge/go-1.18+-blue.svg) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) **一个为开发者和运维人员设计的、轻量级、高性能的命令行ETL工具。** 无需复杂的配置和部署,通过一个简单的二进制文件和一个YAML配置文件,即可快速实现常见的数据提取、转换和加载任务。 ### Github 地址: https://github.com/changhe626/go-pocket-etl --- ## 为什么选择 Go-Pocket-ETL? 在日常的开发和运维工作中,我们经常会遇到一些中小型的数据处理需求: - 需要将一个 CSV 文件的数据快速导入到数据库中进行分析。 - 需要将数据库中的某些数据脱敏后,同步到另一个系统或消息队列中。 - 需要对一个 JSON 数据文件进行清洗、筛选和格式转换。 对于这些任务,使用重量级的大数据平台(如 Spark, Flink)显得“杀鸡用牛刀”,而手写一次性的脚本又缺乏可复用性和可维护性。 **Go-Pocket-ETL 正是为了解决这个痛点而生。** 它为您提供了一个: - **简单直接的解决方案**: 你只需要关心定义数据“从哪里来”、“做什么转换”以及“到哪里去”。 - **高性能的执行引擎**: 基于 Go 语言的并发模型,以流式方式处理数据,内存占用低,执行效率高。 - **易于扩展的插件化架构**: 可以非常方便地添加新的数据源和数据汇,满足您特定的业务需求。 ### 核心理念 +----------+ +----------------+ +--------+ | | | | | | | Source +----->+ Processors +----->+ Sink | | (数据源) | | (处理器链) | | (数据汇)| | | | | | | +----------+ +----------------+ +--------+ Source (数据源): 数据从哪里来?(例如:一个 CSV 文件, 一个数据库查询) • Processors (处理器): 对数据做什么?(例如:筛选行, 脱敏字段, 重命名列) • Sink (数据汇): 数据到哪里去?(例如:另一个数据库, Kafka, 控制台) 你只需要在 YAML 文件中定义好这三部分,剩下的交给 Go-Pocket-ETL 的高性能并发引擎来完成。 ### 为什么使用 Go 开发? 选择 Go 语言作为开发语言,为本项目带来了几项核心优势: 1. **极致的性能**: Go 语言原生支持的 Goroutines 和 Channels,让我们能够以非常低的成本构建出高效的并发数据处理管道,最大限度地利用多核 CPU 资源。 2. **静态编译与部署**: Go 可以将整个项目(包括所有依赖)编译成一个**无依赖的、单一的可执行文件**。这意味着你可以将这个文件复制到任何同架构的服务器上直接运行,无需安装任何运行时环境(如 JVM, Python 解释器等),极大地简化了部署和分发。 3. **跨平台能力**: Go 语言的工具链可以轻松地将代码交叉编译为不同操作系统(Linux, Windows, macOS)的可执行文件,让我们的工具可以服务于所有平台的开发者。 ### 功能特征 - **极其简单**: 零依赖,单个二进制文件 + 一个 YAML 配置即可运行。 - **高性能**: 基于 Go 的并发模型,以流式方式处理数据,内存占用低。 - **插件化架构**: 内置丰富的组件,并且可以轻松地添加自定义的 Source, Processor, 或 Sink。 - **跨平台**: 可在 Windows, macOS, Linux 上运行。 - **为开发者设计**: 清晰的日志、配置校验功能和详细的文档,让你完全掌控你的数据管道。 ## 支持的组件 | 类型 | 名称 | 描述 | | :--- | :--- | :--- | | **数据源 (Source)** | `csv` | 从本地 CSV 文件读取数据。 | | | `json` | 从本地 JSON 文件(对象数组)流式读取数据。 | | | `mysql` | 执行 SQL 查询并从 MySQL 数据库读取结果。 | | | `pgsql` | 执行 SQL 查询并从 PostgreSQL 数据库读取结果。 | | **处理器 (Processor)** | `select_columns` | 选择要保留的列。 | | | `filter_rows` | 根据列值和条件过滤数据行。 | | | `rename_column` | 对列进行重命名。 | | | `convert_type` | 转换列的数据类型(例如,string 到 integer)。 | | | `mask_data` | 对敏感数据进行哈希脱敏(MD5, SHA256)。 | | **数据汇 (Sink)** | `mysql` | 批量写入数据到 MySQL。 | | | `pgsql` | 使用 `COPY` 命令高效地批量写入数据到 PostgreSQL。 | | | `kafka` | 将数据作为 JSON 消息批量发送到 Kafka topic。 | | | `stdout` | 将处理后的数据以格式化的 JSON 打印到控制台,便于调试。 | ## 快速开始 (Quick Start) 让我们通过一个最简单的例子,在 3 分钟内体验 Go-Pocket-ETL 的核心功能:将一个 CSV 文件的数据,经过处理后,打印到你的控制台。 ### 第一步: 创建一个 CSV 数据文件 在你的项目目录中,创建一个名为 `my_data.csv` 的文件,并复制以下内容: ```csv id,name,email,age 1,zhang san,changhe626@163.com,25 2,li si,lisi@example.com,35 3,wang wu,wangwu@example.com,28 ``` ### 第二步: 创建一个 YAML 配置文件 在同一个目录中,创建一个名为 `my_config.yml` 的文件,并复制以下内容: ```yaml # my_config.yml source: type: csv params: file_path: "./my_data.csv" # 指向你刚刚创建的 CSV 文件 has_header: true processors: # 我们只选择年龄大于 26 岁的用户 - type: filter_rows params: column: "age" operator: ">" value: 26 sink: # 将最终结果打印到控制台,便于查看 type: stdout ``` ### 第三步: 运行! 打开你的终端,确保你在项目的根目录中,然后运行以下命令: ```bash # 如果你已经编译了二进制文件 ./go-pocket-etl run -c my_config.yml # 或者,在开发时直接使用 go run (更方便) go run ./cmd/go-pocket-etl run -c my_config.yml ``` ### 预期输出 如果一切顺利,你将在控制台中看到两条经过筛选和格式化的 JSON 数据: ```json { "age": "35", "email": "lisi@example.com", "id": "2", "name": "li si" } { "age": "28", "email": "wangwu@example.com", "id": "3", "name": "wang wu" } ``` 恭喜你!你已经成功地运行了你的第一个 ETL 管道。 --- ## 安装与使用 ### 1. 从源码编译 首先,确保你已经安装了 Go (版本 1.18 或更高)。 ```bash # 克隆项目 git clone https://github.com/your-username/go-pocket-etl.git cd go-pocket-etl # 下载依赖并编译 go mod tidy go build -o go-pocket-etl ./cmd/go-pocket-etl ``` 现在,你的项目根目录下应该有一个名为 `go-pocket-etl` (在 Windows 上是 `go-pocket-etl.exe`) 的可执行文件。 ### 2. 运行任务 - **使用编译好的二进制文件:** - 在 Linux / macOS 上: `./go-pocket-etl run -c ./configs/your_config.yml` - 在 Windows 上: `.\go-pocket-etl.exe run -c .\configs\your_config.yml` - **在本地开发时直接运行 (推荐):** ```bash # 确保 -c 后面的路径是正确的! go run ./cmd/go-pocket-etl run -c ./configs/your_config.yml ``` ### 命令行用法 run - 运行ETL任务 go-pocket-etl run --config /path/to/your_config.yml # 简写 go-pocket-etl run -c your_config.yml ### 3. 校验配置文件 在运行复杂任务前,先用 `validate` 命令检查配置。它会尝试初始化所有组件并报告错误,但不会真正处理数据。 ```bash ./go-pocket-etl validate -c ./configs/your_config.yml ``` ## 配置文件详解 配置文件是驱动 Go-Pocket-ETL 的核心。它主要由 `source`, `processors`, `sink`, 和 `pipeline` 四个部分组成。 ### `source` (数据源) 定义数据从哪里来。 - `type`: 指定使用哪个数据源组件 (例如 `csv`, `mysql`)。 - `params`: 一个包含该 `type` 所需特定参数的 map。 ### `processors` (处理器) 定义数据需要经过的、按顺序执行的转换步骤 (可选)。数据记录会像在流水线上一样,依次通过这个数组中的每一个处理器。 ### `sink` (数据汇) 定义数据最终到哪里去。 - `type`: 指定使用哪个数据汇组件 (例如 `mysql`, `kafka`, `stdout`)。 - `params`: 包含该 `type` 所需的特定参数。 ### `pipeline` (管道配置) 一个可选部分,用于对管道的性能进行微调。 - `batch_size`: Sink 端批量写入的大小。默认值为 `100`。 - `channel_size`: 管道中每个通道的缓冲大小。默认值为 `1000`。 --- ### 关键参数详解 #### ⚠️ `dsn` (数据库连接字符串) 这是最常见的出错点!如果你的数据库密码包含特殊字符 (如 `\`, `#`, `@`, `?` 等),**强烈建议你用单引号 (`'`) 将整个 DSN 字符串包裹起来**,以防止 YAML 解析器将其作为特殊语法进行转义。 **错误示例 (密码为 `my#pass\word`):** ```yaml dsn: "root:my#pass\word@tcp(127.0.0.1:3306)/mydb" # 会报错! ``` **正确示例:** ```yaml dsn: 'root:my#pass\word@tcp(127.0.0.1:3306)/mydb' # 正确! ``` #### `column_mapping` (列映射) 这个参数告诉程序,**“把这个原始列,存到那个目标列里去”**。 - **`前面` (键)**: 是**流入 Sink 的数据**的列名。 - **`后面` (值)**: 是**目标数据库**的列名。 **重要**: 如果你使用了 `processors`,那么 `column_mapping` 的“键”必须是**经过所有处理器处理后,最终剩下的列名**。 **图解:** `CSV (name)` -> `处理器 (rename to full_name)` -> **`流入 Sink 的数据 (full_name)`** -> `column_mapping` -> `数据库 (user_name)` **示例:** ```yaml processors: - type: rename_column params: mapping: name: "full_name" # 在这里,name 被重命名为 full_name sink: type: mysql params: table: "users" column_mapping: # 所以,这里的键必须是 full_name,而不是原始的 name full_name: "user_name" id: "user_id" ``` ### 组件参考手册 这是所有内置组件的参数详解。 --- #### **数据源 (Sources)**
csv - 从 CSV 文件读取 ```yaml source: type: csv params: file_path: "/path/to/your/data.csv" # (必需) 文件路径 has_header: true # (可选, 默认 true) 第一行是否为表头 delimiter: "," # (可选, 默认 ',') 分隔符 ```
json - 从 JSON 文件读取 **注意**: 只支持 JSON 数组格式 `[{"a":1}, {"b":2}]`,不支持 JSON Lines。 ```yaml source: type: json params: file_path: "/path/to/your/data.json" # (必需) 文件路径 ```
mysql - 从 MySQL 读取 ```yaml source: type: mysql params: dsn: 'user:password@tcp(127.0.0.1:3306)/database' # (必需) 数据库连接字符串 query: "SELECT id, name, email FROM users WHERE status='active';" # (必需) 查询语句 ```
pgsql - 从 PostgreSQL 读取 ```yaml source: type: pgsql params: dsn: 'postgres://user:password@localhost/dbname?sslmode=disable' # (必需) 数据库连接字符串 query: "SELECT id, name, email FROM users WHERE status='active';" # (必需) 查询语句 ```
--- #### **处理器 (Processors)**
select_columns - 选择要保留的列 (白名单) ```yaml processors: - type: select_columns params: columns: ["id", "email", "status"] # (必需) 要保留的列名列表 ```
rename_column - 重命名列 ```yaml processors: - type: rename_column params: mapping: # (必需) 一个或多个重命名映射 "old_name_1": "new_name_1" "old_name_2": "new_name_2" ```
filter_rows - 根据条件过滤行 **比较逻辑**: 优先尝试按数字比较,如果失败则按字符串比较。 - **数字操作符**: `=`, `==`, `!=`, `<>`, `>`, `>=`, `<`, `<=` - **字符串操作符**: `=`, `==`, `!=`, `<>` ```yaml processors: - type: filter_rows params: column: "age" # (必需) 要比较的列 operator: ">" # (必需) 比较操作符 value: 25 # (必需) 要比较的值 ```
convert_type - 转换列的数据类型 **支持的类型**: `integer` (`int`), `float` (`double`), `string`, `boolean` (`bool`) ```yaml processors: - type: convert_type params: column: "age_str" # (必需) 要转换的列 type: "integer" # (必需) 目标类型 ```
mask_data - 对敏感数据进行哈希脱敏 **支持的方法**: `md5`, `sha256` **安全警告**: 这是单向哈希,不是加密,请勿用于需要可逆的场景。 ```yaml processors: - type: mask_data params: column: "email" # (必需) 要脱敏的列 method: "sha256" # (必需) 哈希算法 ```
--- #### **数据汇 (Sinks)**
stdout - 打印到控制台 (用于调试) ```yaml sink: type: stdout params: format: "json" # (可选, 默认 "json") 目前只支持 json ```
mysql - 写入到 MySQL ```yaml sink: type: mysql params: dsn: 'user:password@tcp(127.0.0.1:3306)/database' # (必需) table: "target_table" # (必需) column_mapping: # (必需) 列映射 "record_field_1": "db_column_1" "record_field_2": "db_column_2" ```
pgsql - 写入到 PostgreSQL ```yaml sink: type: pgsql params: dsn: 'postgres://user:password@localhost/dbname?sslmode=disable' # (必需) table: "target_table" # (必需) column_mapping: # (必需) 列映射 "record_field_1": "db_column_1" "record_field_2": "db_column_2" ```
kafka - 写入到 Kafka ```yaml sink: type: kafka params: brokers: ["kafka-1:9092", "kafka-2:9092"] # (必需) Broker 地址列表 topic: "my-topic" # (必需) 目标 Topic ```
--- ## 常见错误与调试 - **`did not find expected hexdecimal number`**: 99% 的可能是你的 `dsn` 字符串中包含特殊字符,并且没有用**单引号**包裹。请检查你的 `dsn` 配置。 - **`config is missing or has invalid 'column_mapping'`**: 检查你的 `sink` 配置中是否正确地包含了 `column_mapping` 部分,并注意 YAML 的缩进。 - **`no such file or directory`**: `source` 中配置的 `file_path` 不正确,请使用相对或绝对路径确保程序能找到文件。 - **`access denied for user ...`**: 数据库的用户名或密码错误,请检查 `dsn` 配置。 - **`Unknown column '...' in 'field list'`**: `column_mapping` 的“值”部分,即你指定的数据库目标列名,在你的数据库表中不存在。 ## 如何测试 项目包含单元测试和集成测试。 ### 1. 运行所有测试 在项目根目录下运行以下命令: ```bash # 这会自动寻找并执行所有 *_test.go 文件 go test ./... ``` **注意**: 运行集成测试需要你的本地环境中已经安装并运行了 **Docker**。测试会在运行时动态启动临时的数据库容器。 ### 2. 仅运行单元测试 如果你没有安装 Docker,或者只想快速运行不依赖外部服务的单元测试(例如,测试所有的 `processors`),可以使用 `-short` 标志。 ```bash # -short 标志会跳过被标记为集成测试的用例 go test ./... -short ``` ## 如何扩展 (Adding New Components) Go-Pocket-ETL 的核心优势之一就是其出色的扩展性。你可以通过实现 `core` 包中定义的接口,轻松地添加自己的组件。 ### 1. 定义核心接口 所有的组件都必须实现以下三个核心接口之一: - `core.Source`: 定义了 `Open()`, `Read()`, `Close()` 方法。 - `core.Processor`: 定义了 `Open()`, `Process()`, `Close()` 方法。 - `core.Sink`: 定义了 `Open()`, `Write()`, `Close()` 方法。 ### 2. 实现你的组件 假设你想添加一个 `mongodb` Sink。在 `pkg/sinks/` 目录下创建一个新的 `mongodb/sink.go` 文件,并实现 `core.Sink` 接口。 ### 3. 向工厂进行自我注册 在你的组件文件中,创建一个返回 `core.Sink` 接口的构造函数,并在 `init()` 函数中向组件工厂注册你的新组件。 ```go // in pkg/sinks/mongodb/sink.go import "github.com/example-user/go-pocket-etl/pkg/factory" func init() { // "mongodb" 是用户在 a.yml 文件中使用的 type 名称 factory.RegisterSink("mongodb", NewSink) } func NewSink() core.Sink { return &MongoSink{} } ``` ### 4. 让主程序加载你的包 最后,在 `cmd/go-pocket-etl/main.go` 文件的 `import` 部分,添加一个匿名导入,以确保你的新组件包在程序启动时被加载。 ```go // in cmd/go-pocket-etl/main.go import ( // ... other imports _ "github.com/example-user/go-pocket-etl/pkg/sinks/mongodb" // Add this line ) ``` **完成了!** 现在,用户就可以在他们的 YAML 配置文件中使用 `type: mongodb` 了。 ## 贡献 欢迎任何形式的贡献!请随时提交 Pull Request 或创建 Issue。 ## 许可证 本项目采用 [Apache 2.0 许可证](LICENSE)。