# ChatIM **Repository Path**: cculin/chat-im ## Basic Information - **Project Name**: ChatIM - **Description**: ChatIM 致力于打造一个高性能、高可用、高扩展性的企业级即时通讯(IM)解决方案。系统支持亿级用户和千万级日活,提供稳定可靠的私聊、群聊、文件传输及音视频通话功能,满足社交、办公协作、在线客服等多种场景需求。 说的很牛逼,作为小白,一步步来把 - **Primary Language**: Go - **License**: MIT - **Default Branch**: dev - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-24 - **Last Updated**: 2025-09-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README #### **1.1. 项目目标** ChatIM 致力于打造一个高性能、高可用、高扩展性的企业级即时通讯(IM)解决方案。系统支持亿级用户和千万级日活,提供稳定可靠的私聊、群聊、文件传输及音视频通话功能,满足社交、办公协作、在线客服等多种场景需求。 #### **1.2. 核心特性** * **实时通讯**:基于 WebSocket 的全双工通信,消息延迟毫秒级。 * **多端同步**:支持 Web、Mobile (iOS/Android)、Desktop (Windows/macOS) 平台,消息实时同步。 * **消息必达**:通过 ACK 机制和离线存储,确保消息不丢失、不重复。 * **可扩展架构**:微服务架构,业务逻辑解耦,支持独立扩展和部署。 * **安全通信**:全程 TLS 加密,支持端到端加密(E2EE)选项。 * **丰富功能**:支持文本、表情、图片、语音、视频、文件、地理位置等多种消息类型。 #### **1.3. 技术选型** * **后端语言**: Go (Gin, gRPC) * **前端框架**: Vue 3 (Vite, Pinia, TypeScript) * **数据库**: MySQL 8.0, MongoDB 6.0 * **缓存/状态**: Redis 7.0 * **消息队列**: Kafka 3.x * **实时通讯**: WebSocket * **音视频**: WebRTC (集成第三方 SFU/MCU 服务,如 LiveKit, Janus) * **对象存储**: MinIO * **搜索引擎**: Elasticsearch 8.x * **服务治理**: etcd / Consul * **部署**: Docker, Kubernetes (K8s) * **监控**: Prometheus + Grafana + Alertmanager * **日志**: ELK Stack (Elasticsearch, Logstash, Kibana) 或 EFK (Fluentd) --- ### **2. 系统架构设计** #### **2.1. 整体分层架构** 系统采用清晰的分层架构,各层职责明确。 ```text +-------------------------------------------------------------------+ | Client Layer | | (Vue WebApp, iOS/Android App, Electron Desktop App) | +-------------------------------------------------------------------+ | (HTTPS/WSS) +-------------------------------------------------------------------+ | Gateway Layer | | +-----------------------------------------------------+ | | | API Gateway (Nginx/Kong) | | | | (LB, SSL Termination, Auth, Routing, Rate Limit) | | | +-----------------------------------------------------+ | | +-----------------------------------------------------+ | | | IM Gateway (Go, WebSocket Server) | | | | (Connection Mgmt, Protocol Parsing, Heartbeat) | | | +-----------------------------------------------------+ | +-------------------------------------------------------------------+ | (gRPC / Kafka) +-------------------------------------------------------------------+ | Service Layer (Go) | | +----------+ +-----------+ +----------+ +--------+ +----------+ | | | User Svc | | Message Svc | | Group Svc | | File Svc | | Push Svc | | | +----------+ +-----------+ +----------+ +--------+ +----------+ | | +----------+ +-----------+ +----------+ +--------+ | | | Video Svc| | Search Svc| | Notify Svc| | ...etc | | | +----------+ +-----------+ +----------+ +--------+ | +-------------------------------------------------------------------+ | (TCP/IP) +-------------------------------------------------------------------+ | Data Layer | | +---------+ +----------+ +---------+ +---------+ +---------+ +---+| | | MySQL | | MongoDB | | Redis | | Kafka | | MinIO | |ES | | +---------+ +----------+ +---------+ +---------+ +---------+ +---+| +-------------------------------------------------------------------+ ``` #### **2.2. 核心概念定义** * **Connection (连接)**: 客户端与 IM Gateway 之间的一个 TCP 连接,通常是 WebSocket。 * **Session (会话)**: 用户登录后在系统中的逻辑状态,包含用户ID、设备ID、连接所在的 Gateway 节点信息等。存储在 Redis 中。 * **Message ID (消息ID)**: 全局唯一的消息标识符,可使用 Snowflake 算法生成,保证趋势递增。 * **Conversation ID (会话ID)**: 唯一标识一个对话。私聊的会话ID可由双方用户ID拼接而成(小的在前,大的在后);群聊的会话ID即群组ID。 * **Sequence ID (序列号, SEQ)**: 在每个会话(Conversation)内单调递增的序列号。用于保证消息顺序和实现消息同步。 #### **2.3. 服务拆分与职责** * **IM Gateway (im-gateway)** * **职责**: 维护客户端 WebSocket 长连接,心跳检测,协议解析/封装,将客户端消息推送到 Kafka,并从 Kafka 消费需要推送给客户端的消息。 * **技术**: Go, Gorilla WebSocket。 * **特性**: 无状态,可水平扩展。每个 Gateway 在启动时向 Redis 注册自身地址。 * **User Service (user-srv)** * **职责**: 用户注册、登录、JWT 认证、用户信息管理、好友关系管理。 * **接口**: 提供 gRPC 接口供其他服务调用。 * **数据库**: MySQL。 * **Message Service (message-srv)** * **职责**: 核心消息处理。接收来自 Kafka 的消息,进行存储(MongoDB),生成 SEQ,并将消息分发到推送队列。提供历史消息拉取接口。 * **数据库**: MongoDB, Redis (用于存储 SEQ)。 * **Group Service (group-srv)** * **职责**: 群组的创建/解散,成员增删改查,群信息管理,权限控制。 * **数据库**: MySQL。 * **Push Service (push-srv)** * **职责**: 消息推送逻辑。从 Kafka 消费待推送消息,查询用户 Session 信息(Redis),确定目标 Gateway,再将消息投递到目标 Gateway 的特定 Kafka Topic 中。对于离线用户,调用第三方推送服务(APNs, FCM)。 * **File Service (file-srv)** * **职责**: 处理文件、图片、音视频的上传,生成访问 URL。 * **存储**: MinIO。 * **数据库**: MongoDB (存储文件元数据)。 #### **2.4. 前端架构设计 (Vue.js)** * **技术栈**: Vue 3 + Vite + TypeScript + Pinia + Vue Router + Axios + Sass。 * **项目结构**: ``` src/ ├── api/ # API 请求模块 (Axios 封装) ├── assets/ # 静态资源 ├── components/ # 公共组件 │ ├── common/ # 基础组件 (Button, Input, Modal) │ └── business/ # 业务组件 (MessageItem, ContactList) ├── layouts/ # 布局组件 ├── router/ # 路由配置 ├── store/ # 状态管理 (Pinia) │ ├── user.ts # 用户信息、token │ ├── contact.ts# 好友、群组列表 │ └── chat.ts # 聊天会话、消息记录 ├── services/ # 服务层 │ └── im.ts # WebSocket 封装、消息处理逻辑 ├── utils/ # 工具函数 ├── views/ # 页面组件 └── main.ts # 入口文件 ``` * **状态管理 (Pinia)**: * `userStore`: 管理用户登录状态、Token 和个人信息。 * `contactStore`: 管理好友列表、群组列表,并处理其更新。 * `chatStore`: 管理当前所有会话列表(Conversation List)和每个会话的消息。消息按 `conversation_id` 存储,实现高效查找。 * **WebSocket 封装 (`services/im.ts`)**: * 封装 WebSocket 的连接、断线重连、心跳机制。 * 提供 `send(message)` 方法,将业务数据封装成 Protobuf 格式发送。 * 监听 `onmessage` 事件,解析收到的 Protobuf 消息,根据消息类型分发给 Pinia store 进行状态更新,触发视图响应。 #### **2.5. 通信协议设计** * **客户端 <-> IM Gateway**: WebSocket + Protocol Buffers (Protobuf)。 * **原因**: Protobuf 序列化/反序列化速度快,体积小,节省带宽,且强类型定义利于协作。 * **包结构**: `Packet { int32 type; bytes body; }`,`type` 表示消息类型(如登录、心跳、单聊消息等),`body` 是具体消息内容的 Protobuf 序列化字节流。 * **服务间通信**: gRPC。 * **原因**: 基于 HTTP/2 和 Protobuf,性能高,提供服务发现和负载均衡,适合微服务架构。 --- ### **3. 核心流程与详细设计** #### **3.1. 用户认证与连接建立** ```mermaid sequenceDiagram participant Client participant API Gateway participant User Svc participant IM Gateway participant Redis Client->>API Gateway: 1. POST /auth/login (username, password) API Gateway->>User Svc: 2. RPC: Login(req) User Svc->>User Svc: 3. 验证凭证, 生成JWT Token User Svc-->>API Gateway: 4. 返回 Token API Gateway-->>Client: 5. 返回 Token Client->>IM Gateway: 6. WebSocket Connect(WSS://im.chat.com?token=...) IM Gateway->>User Svc: 7. RPC: VerifyToken(token) User Svc-->>IM Gateway: 8. 返回 UserID, DeviceID等信息 IM Gateway->>Redis: 9. HSET session:UserID:DeviceID {gateway_addr, status} IM Gateway-->>Client: 10. Login Success Ack ``` **流程说明**: 1. 客户端通过 HTTP POST 请求登录,获取 JWT Token。 2. 客户端携带 Token 发起 WebSocket 连接请求。 3. IM Gateway 截取 Token,通过 gRPC 调用 User Service 验证其有效性。 4. 验证通过后,IM Gateway 在 Redis 中创建/更新该用户的 Session 信息,记录当前连接的 Gateway 地址。 5. 向客户端发送登录成功确认,连接正式建立。 #### **3.2. 私聊消息流转 (A -> B)** ```mermaid sequenceDiagram participant Client A participant IM Gateway A participant Kafka participant Message Svc participant Push Svc participant Redis participant IM Gateway B participant Client B Client A->>IM Gateway A: 1. SendMsg(to:B, content:"Hi") IM Gateway A->>Kafka: 2. Produce to `msg_transfer_topic` Message Svc->>Kafka: 3. Consume message Message Svc->>Redis: 4. INCR conversation_seq:conv_id (获取SEQ) Message Svc->>MongoDB: 5. Save message (with SEQ) Message Svc->>Kafka: 6. Produce to `msg_push_topic` (带SEQ) Push Svc->>Kafka: 7. Consume message Push Svc->>Redis: 8. Get session:UserID_B (查询B的在线状态) alt B is Online Push Svc->>Kafka: 9. Produce to `gateway_topic_{gateway_id_b}` IM Gateway B->>Kafka: 10. Consume from its own topic IM Gateway B->>Client B: 11. PushMsg over WebSocket else B is Offline Push Svc->>Push Svc: 9b. Call APNs/FCM for push notification end ``` **流程说明**: 1. **发送**: Client A 将消息通过 WebSocket 发送给其连接的 IM Gateway A。 2. **投递**: IM Gateway A 将消息原样投递到 Kafka 的 `msg_transfer_topic`。 3. **处理与存储**: Message Service 消费消息,为该会话生成一个递增的 SEQ,将消息(包含SEQ)存入 MongoDB。 4. **分发**: Message Service 将处理后的消息投递到 Kafka 的 `msg_push_topic`。 5. **推送**: Push Service 消费消息,查询接收者 B 的 Session 状态。 * **在线**: 找到 B 所在的 IM Gateway B 的地址,并将消息投递到该 Gateway 专属的 Kafka Topic (`gateway_topic_b`)。IM Gateway B 消费后通过 WebSocket 推送给 Client B。 * **离线**: 调用苹果/谷歌的推送服务,发送离线通知。 #### **3.3. 消息可靠性与顺序性保证** * **ACK 机制**: * **发送方**: Client 发送消息后,会收到一个 `MsgSentAck`,其中包含 `MsgID` 和 `SEQ`。这表示服务端已成功接收并处理了消息。如果超时未收到 ACK,客户端会重试。 * **接收方**: Client 收到消息后,向服务端发送一个 `MsgReadAck`,通知服务端消息已读。 * **顺序性**: * 通过在 Message Service 中为每个会话维护一个单调递增的 SEQ,保证了消息在服务端的绝对顺序。 * 客户端维护一个本地的 `last_read_seq`。当收到新消息时,如果其 `SEQ` 大于 `last_read_seq + 1`,说明中间有消息丢失,此时客户端会主动向服务端请求 `(last_read_seq, new_msg_seq)` 之间的消息,实现消息同步。 #### **3.4. 多端消息同步机制** 当用户在设备 C 登录时,需要同步所有会话的最新状态。 1. 客户端 C 登录成功后,向服务端请求其所有会话列表以及每个会话的 `last_read_seq` 和 `latest_seq`。 2. 客户端 C 遍历会话列表,对比本地 `last_read_seq` 和服务端的 `latest_seq`。 3. 如果服务端的 `latest_seq` 更大,则向 Message Service 发起拉取历史消息的请求,参数为 `conversation_id` 和 `start_seq` (本地 `last_read_seq` + 1)。 4. 拉取到的消息在本地存储并渲染,完成同步。 5. 此后,该用户的任意一端发送或接收消息,Push Service 会将消息推送到其所有在线设备(所有 Gateway)。 --- ### **4. 数据库设计** #### **4.1. MySQL (关系型数据)** * **ER 图 (简化)** ``` [Users] 1--N [Contacts] N--1 [Users] | | 1..N [GroupMembers] | N..1 [Groups] ``` * **`users` 表** ```sql CREATE TABLE `users` ( `id` bigint unsigned NOT-NULL AUTO_INCREMENT, `user_id` varchar(64) NOT NULL COMMENT '业务用户ID', `nickname` varchar(255) NOT NULL DEFAULT '', `avatar` varchar(255) NOT NULL DEFAULT '', `password_hash` varchar(255) NOT NULL, `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_user_id` (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` * **`contacts` (好友关系) 表** ```sql CREATE TABLE `contacts` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `owner_user_id` varchar(64) NOT NULL, `friend_user_id` varchar(64) NOT NULL, `remark` varchar(255) DEFAULT '', `status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '0:好友, 1:拉黑', `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_owner_friend` (`owner_user_id`, `friend_user_id`), KEY `idx_owner` (`owner_user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` * **`groups` 表 和 `group_members` 表** (设计略,包含群ID,群主,群成员,角色等信息) #### **4.2. MongoDB (消息数据)** * **集合: `messages`** * **选择原因**: 消息数据量巨大,写入频繁,结构可灵活扩展,MongoDB 的水平扩展能力(分片)非常适合此场景。 * **文档结构 (示例)** ```json { "_id": ObjectId("..."), "msg_id": "1589784321000123456", // Snowflake ID "conversation_id": "u_userA_userB", // or "g_group123" "seq": 1024, "sender_id": "userA", "receiver_id": "userB", // or "group123" "msg_type": 1, // 1:text, 2:image, 3:audio... "content": "{\"text\": \"Hello, World!\"}", // JSON string for extensibility "send_time": ISODate("2023-10-27T10:00:00Z"), "status": 0 // 0: normal } ``` * **索引策略**: * 在 `(conversation_id, seq)` 上创建复合索引,这是消息拉取的核心索引。 * 在 `msg_id` 上创建唯一索引。 * **分片键**: 使用 `conversation_id` 作为分片键,可以将同一个会话的消息尽可能地落在同一个分片上,提高查询效率。 #### **4.3. Redis (缓存与状态)** * **`STRING`**: 存储会话内的 SEQ。`KEY: conv_seq:`, `VALUE: latest_seq`。使用 `INCR` 命令保证原子性。 * **`HASH`**: 存储用户 Session 信息。`KEY: session:`, `FIELD: `, `VALUE: { "gateway_addr": "...", "conn_id": "..." }`。 * **`ZSET`**: 用于网关负载统计(可选)。`KEY: gateway_load`, `MEMBER: `, `SCORE: `。 --- ### **5. 基础设施与运维部署** #### **5.1. 容器化方案 (Docker)** * 为每个微服务编写 `Dockerfile`,基于轻量级镜像(如 `alpine` 或 `scratch` for Go)构建。 * `Dockerfile` 示例 (Go Service): ```dockerfile # Build stage FROM golang:1.20-alpine AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux go build -o /app/main ./cmd/ # Final stage FROM alpine:latest WORKDIR /app COPY --from=builder /app/main . COPY ./configs/ /app/configs/ EXPOSE 8080 9090 # HTTP and gRPC ports CMD ["./main"] ``` #### **5.2. 编排与部署 (Kubernetes)** * **`Deployment`**: 为每个无状态服务(如 `user-srv`, `im-gateway`)创建 Deployment,配置副本数(`replicas`)和滚动更新策略。 * **`StatefulSet`**: 为有状态服务(如 MySQL, Kafka, Redis)创建 StatefulSet,确保稳定的网络标识和持久化存储。 * **`Service`**: 为服务创建 ClusterIP 类型的 Service,实现服务间的发现和通信。 * **`Ingress`**: 配置 Ingress Controller (如 Nginx Ingress),将外部的 HTTP/WebSocket 流量路由到 API Gateway 和 IM Gateway。 * **`ConfigMap`/`Secret`**: 统一管理应用配置和敏感信息。 * **`PersistentVolume` (PV) / `PersistentVolumeClaim` (PVC)**: 为数据库和消息队列提供持久化存储。 #### **5.3. CI/CD 流程** 使用 Jenkins 或 GitLab CI/CD 实现自动化流程: 1. **Commit**: 开发者提交代码到 Git 仓库。 2. **Build**: CI 服务器触发构建,运行单元测试,执行静态代码检查,构建 Docker 镜像并推送到镜像仓库(如 Harbor)。 3. **Deploy**: 镜像构建成功后,CD 阶段自动更新 K8s 集群中对应服务的 Deployment yaml 文件中的镜像版本,并执行 `kubectl apply`,触发滚动更新。 #### **5.4. 监控与告警** * **Prometheus**: * 通过 Go 客户端库 (`prometheus/client_golang`) 在各服务中暴露 metrics 端点 (`/metrics`)。 * 监控指标:QPS, 延迟, 错误率, CPU/内存使用率, Go GC 统计,长连接数等。 * **Grafana**: * 创建 Dashboard,将 Prometheus 采集的数据进行可视化展示。 * **Alertmanager**: * 配置告警规则,当指标超过阈值时(如 API 延迟 > 500ms),通过邮件、Slack、钉钉等方式发送告警。 * **Logging (EFK)**: * 应用日志输出到 `stdout`/`stderr`。 * K8s 节点上部署 Fluentd Agent,采集容器日志并发送到 Elasticsearch。 * 通过 Kibana 进行日志的检索、分析和可视化。 --- ### **6. 安全设计** * **传输安全**: 全站启用 HTTPS 和 WSS,对所有数据传输进行 TLS 加密。 * **认证授权**: 使用 JWT 进行无状态认证,JWT 中包含 `user_id`, `device_id` 和过期时间。 * **数据安全**: 数据库密码等敏感信息使用 K8s Secrets 管理。对用户密码进行加盐哈希(如 bcrypt)。 * **防攻击**: API Gateway 层配置 Rate Limiting 防止暴力破解和DDoS攻击。对用户输入进行严格的校验和清理,防止 XSS 和 SQL 注入。 --- ### **7. 未来规划** * **端到端加密 (E2EE)**: 集成 Signal Protocol,实现消息内容仅对收发双方可见。 * **机器人与开放平台**: 提供 API 供第三方开发者创建聊天机器人和集成应用。 * **智能化**: 引入 AI 能力,实现消息智能摘要、违规内容审核等。 * **全球化部署**: 考虑多机房部署,实现就近接入和异地容灾。