# slf4j-kafka
**Repository Path**: 123zhangzhenbin/slf4j-kafka
## Basic Information
- **Project Name**: slf4j-kafka
- **Description**: 把slf4j日志输出到kafka,目前支持logback,log4j
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 25
- **Forks**: 15
- **Created**: 2017-05-22
- **Last Updated**: 2024-08-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# slf4j日志输出到Kafka扩展
> BY ZZB
## 背景
如今分布式系统已经成为开发web的标配,且当前大数据系统通常都在集群中运行,如此多的系统集成在一起,日志管理变得非常繁杂艰难。因此一套同一的日志管理
方案迫在眉睫。网上有很多日志统一方案,大部分是通过filebeat或者logstash采集本地日志,再输出到ES中,通过索引的方式获取不同系统,不同级别的日志。
这种方案的确可靠有效,但是多了一个采集层的工作,而如果需要采集的日志给其他监控系统消费时则需要配多个采集器,并且在yarn中无法定位日志在哪台机器的
什么位置的情况下,这种方案也显得比较无力。
本项目是通过扩展slf4j这个java世界通用的日志框架,将日志按照一定的格式直接输出到kafka,然后不同的系统可以定义不同的消费者从kafka中获取日志,
做具体业务。
## 简介
本项目通过扩展Slf4j的Appender,将日志像输出到文件或者控制台一样简单的配置,就可以输出到kafka中。
项目提供了KafkaAppender和KafkaConcurrentAppender两种方案,均是异步输出到kafka中。
| appender | 插入顺序 | 性能消耗 | 插入速度 |
| -------- | ------- | ------- | ------ |
| KafkaAppender | 保证顺序 | 较大 | 较慢 |
| KafkaConcurrentAppender | 不保证顺序 | 小 | 快 |
当前已支持log4j1.2.X和logback
## log4j
在maven中定义log4j依赖
```xml
slf4j-kafka-over-log4j12
com.tuandai.log
1.7.5
```
在需要用到日志的地方定义Slf4jLogger
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);
```
定义log4j.properties
```properties
#控制台输出Appender
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
#增强日志格式转换器
log4j.appender.myConsoleAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#控制台输出日志格式
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %m%n
#kafka输出Appender
log4j.appender.kafkaAppender=com.tuandai.log.log4j12.appender.KafkaAppender
#增强日志格式转换器
log4j.appender.kafkaAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#kafka输出日志格式
log4j.appender.kafkaAppender.layout.ConversionPattern={"@timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}","beat": { "hostname": "%host_name","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" }
#输出日志级别, 应用 控制台输出Appender 和 kafka输出Appender
log4j.rootLogger=DEBUG, myConsoleAppender, kafkaAppender
#关闭kafka输出日志[重要!因为kafka提供的javaapi使用了log4j做日志,如果不屏蔽掉会出现循环依赖,造成栈溢出]
log4j.logger.org.apache.kafka=OFF
```
定义sl4j-kafka.properties
```properties
#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app
```
## logback
在maven中定义log4j依赖
```xml
slf4j-kafka-over-logback
com.tuandai.log
1.7.5
```
在需要用到日志的地方定义Slf4jLogger
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);
```
定义logback.xml
```xml
${LOG_PATTERN}
true
${KAFKA_PATTERN}
```
定义sl4j-kafka.properties
```properties
#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app
```
## 附:在Spark-yarn 中使用
当spark运行的时候,会有driver和executor两种角色,其中driver负责提交spark任务和收集executor执行结果,而executor是由yarn分配到集群中的某些机器来运行被driver提交的任务。
在我的理解看来,driver做了驱动和对结果集的聚合,executor就负责执行具体任务产出结果集。
通过我们运行spark-submit的后日志可以发现,提交程序后不会有程序的日志输出,具体的日志在各个节点中的yarn工作目录下。如/yarns/nm/userXX/。
节点任务的日志中包含了spark节点的启动日志和spark具体业务日志。
具体的我自己没有透彻前就不误人子弟了,说到这里就够了,有兴趣可以详细研究下spark-yarn工作方式。
以上说的这些只是想表述一个问题,spark-yarn下的日志的配置不是在运行起任务的时候指定的,而是在提交任务是就必须指定了。
所以在spark-yarn使用和普通的业务有所不同。
**进入正题**
将log4j.properties 和 slf4j-kafka.properties 拷贝到spark提交目录下,内容参照上
_如_
```bash
[cdh]$ pwd
/home/datapipe/spark-jobs/spark-job
[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties
```
将编译好的slf4j-kafka-over-log4j12和slf4j-kafka-producer的jar包放到该目录下
```bash
[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties
slf4j-kafka-over-log4j12-1.7.5.jar slf4j-kafka-producer-1.7.5.jar
```
编写启动脚本
```bash
[cdh]$ touch startup.sh
[cdh]$ chmod +x startup.sh
```
脚本内容
```bash
1 nohup spark-submit --class com.tuandai.XXXMainFunctionClass \
2 --master yarn --deploy-mode cluster --executor-memory 512m --driver-memory 512m --num-executors 1 --name XXX \
3 --jars slf4j-kafka-producer-1.7.5.jar,slf4j-kafka-over-log4j12-1.7.5.jar \
4 --files "log4j-kafka.properties,slf4j-kafka.properties" \
5 --conf "spark.driver.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
6 --conf "spark.executor.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
7 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
8 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
9 spark-job.jar &
10 echo "spark job started"
```
**逐行讲解下**
1. 提交的任务需要运行的main方法
2. 使用yarn 集群模式 executor节点内存512m driver节点内存分配512m executor数量1个 任务名
3. 上传第三方jar包,这里就是slf4j-kafka的支持了
4. 上传配置文件
5. driver对第三方jar包生效
6. executor对第三方jar包生效
7. driver运行参数,这里是生效上传的配置文件
8. executor运行参数,这里是生效上传的配置文件
9. spark任务jar包