# KafkaExpress **Repository Path**: sciencestar2020/kafka-express ## Basic Information - **Project Name**: KafkaExpress - **Description**: kafka实战 - **Primary Language**: Java - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-07-10 - **Last Updated**: 2025-07-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Kafka Express 一个基于Spring Boot和Apache Kafka的消息处理应用程序,提供了高性能、可靠的消息生产和消费功能。 ## 项目特性 - 🚀 **高性能**: 支持异步消息发送和批量处理 - 🔒 **可靠性**: 完善的错误处理和重试机制 - 📝 **详细日志**: 使用SLF4J进行结构化日志记录 - ⚙️ **灵活配置**: 支持多种配置选项和自定义参数 - 🛠️ **工具类**: 提供消息处理、验证等实用工具 - 🏗️ **模块化设计**: 清晰的包结构和职责分离 ## 项目结构 ``` src/main/java/org/pr/kafkaexpress/ ├── KafkaExpressApplication.java # 主应用类 ├── config/ │ └── KafkaConfig.java # Kafka配置类 ├── constants/ │ └── KafkaConstants.java # 常量定义 ├── consumer/ │ └── MessageConsumer.java # 消息消费者 ├── exception/ │ └── KafkaException.java # 自定义异常 ├── procedure/ │ └── MessageProducer.java # 消息生产者 └── util/ └── MessageUtils.java # 消息处理工具类 ``` ## 主要优化内容 ### 1. 依赖注入优化 - 使用构造器注入替代字段注入,提高代码可测试性 - 遵循Spring最佳实践 ### 2. 异常处理 - 创建自定义`KafkaException`类 - 在消息生产和消费过程中添加完善的异常处理 - 提供详细的错误信息和上下文 ### 3. 日志记录 - 使用Lombok的`@Slf4j`注解 - 结构化日志记录,包含关键信息 - 消息内容截断,避免日志过大 ### 4. 消息验证 - 消息大小限制 - 空值检查 - 消息格式验证 ### 5. 配置管理 - 集中化配置管理 - 支持多种配置选项 - 环境变量支持 ### 6. 工具类 - 消息哈希计算 - 时间戳添加 - 消息格式化 - 心跳消息处理 ## 快速开始 ### 前置条件 - Java 17+ - Maven 3.6+ - Apache Kafka 2.8+ ### 1. 启动Kafka ```bash # 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Kafka bin/kafka-server-start.sh config/server.properties ``` ### 2. 构建项目 ```bash mvn clean compile ``` ### 3. 运行应用 ```bash mvn spring-boot:run ``` ## 配置说明 ### 主要配置项 ```yaml spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: test-group auto-offset-reset: earliest producer: acks: all retries: 3 batch-size: 16384 compression-type: snappy logging: level: org.pr.kafkaexpress: INFO file: name: logs/kafka-express.log ``` ### 环境变量支持 - `SPRING_KAFKA_BOOTSTRAP_SERVERS`: Kafka服务器地址 - `SPRING_KAFKA_CONSUMER_GROUP_ID`: 消费者组ID - `LOGGING_LEVEL_ORG_PR_KAFKAEXPRESS`: 日志级别 ## API说明 ### MessageProducer ```java // 同步发送消息 producer.send("topic", "message"); // 异步发送消息 CompletableFuture> future = producer.sendAsync("topic", "message"); // 发送到指定分区 producer.sendToPartition("topic", 0, "key", "message"); ``` ### MessageConsumer ```java // 自动监听指定主题 @KafkaListener(topics = "test-topic", groupId = "test-group") public void listen(String message) { // 处理消息 } // 批量处理 @KafkaListener(topics = "batch-topic", batch = "true") public void listenBatch(List messages) { // 批量处理消息 } ``` ## 监控和日志 ### 日志文件 - 位置: `logs/kafka-express.log` - 轮转: 最大10MB,保留30天 ### 关键指标 - 消息发送成功率 - 消息处理延迟 - 错误率统计 ## 故障排除 ### 常见问题 1. **连接失败** - 检查Kafka服务器是否运行 - 验证bootstrap-servers配置 2. **消息发送失败** - 检查主题是否存在 - 验证权限配置 3. **消费者不工作** - 检查消费者组配置 - 验证主题名称 ## 开发指南 ### 添加新功能 1. 在相应的包中创建新类 2. 添加单元测试 3. 更新文档 ### 代码规范 - 使用Lombok减少样板代码 - 遵循Java命名规范 - 添加适当的注释 ## 许可证 本项目采用MIT许可证。详见[LICENSE](LICENSE)文件。 ## 贡献 欢迎提交Issue和Pull Request来改进这个项目。 ## 联系方式 如有问题,请通过以下方式联系: - 邮箱: [your-email@example.com] - GitHub: [your-github-profile]