# slf4j-kafka **Repository Path**: 123zhangzhenbin/slf4j-kafka ## Basic Information - **Project Name**: slf4j-kafka - **Description**: 把slf4j日志输出到kafka,目前支持logback,log4j - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 25 - **Forks**: 15 - **Created**: 2017-05-22 - **Last Updated**: 2024-08-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # slf4j日志输出到Kafka扩展 > BY ZZB ## 背景 如今分布式系统已经成为开发web的标配,且当前大数据系统通常都在集群中运行,如此多的系统集成在一起,日志管理变得非常繁杂艰难。因此一套同一的日志管理 方案迫在眉睫。网上有很多日志统一方案,大部分是通过filebeat或者logstash采集本地日志,再输出到ES中,通过索引的方式获取不同系统,不同级别的日志。 这种方案的确可靠有效,但是多了一个采集层的工作,而如果需要采集的日志给其他监控系统消费时则需要配多个采集器,并且在yarn中无法定位日志在哪台机器的 什么位置的情况下,这种方案也显得比较无力。 本项目是通过扩展slf4j这个java世界通用的日志框架,将日志按照一定的格式直接输出到kafka,然后不同的系统可以定义不同的消费者从kafka中获取日志, 做具体业务。 ## 简介 本项目通过扩展Slf4j的Appender,将日志像输出到文件或者控制台一样简单的配置,就可以输出到kafka中。 项目提供了KafkaAppender和KafkaConcurrentAppender两种方案,均是异步输出到kafka中。 | appender | 插入顺序 | 性能消耗 | 插入速度 | | -------- | ------- | ------- | ------ | | KafkaAppender | 保证顺序 | 较大 | 较慢 | | KafkaConcurrentAppender | 不保证顺序 | 小 | 快 | 当前已支持log4j1.2.X和logback ## log4j 在maven中定义log4j依赖 ```xml slf4j-kafka-over-log4j12 com.tuandai.log 1.7.5 ``` 在需要用到日志的地方定义Slf4jLogger ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; private static Logger logger = LoggerFactory.getLogger(Main.class); ``` 定义log4j.properties ```properties #控制台输出Appender log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender #增强日志格式转换器 log4j.appender.myConsoleAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout #控制台输出日志格式 log4j.appender.myConsoleAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %m%n #kafka输出Appender log4j.appender.kafkaAppender=com.tuandai.log.log4j12.appender.KafkaAppender #增强日志格式转换器 log4j.appender.kafkaAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout #kafka输出日志格式 log4j.appender.kafkaAppender.layout.ConversionPattern={"@timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}","beat": { "hostname": "%host_name","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" } #输出日志级别, 应用 控制台输出Appender 和 kafka输出Appender log4j.rootLogger=DEBUG, myConsoleAppender, kafkaAppender #关闭kafka输出日志[重要!因为kafka提供的javaapi使用了log4j做日志,如果不屏蔽掉会出现循环依赖,造成栈溢出] log4j.logger.org.apache.kafka=OFF ``` 定义sl4j-kafka.properties ```properties #kafka-support kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092 kafka.topic=log-log4j-test kafka.appId=app ``` ## logback 在maven中定义log4j依赖 ```xml slf4j-kafka-over-logback com.tuandai.log 1.7.5 ``` 在需要用到日志的地方定义Slf4jLogger ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; private static Logger logger = LoggerFactory.getLogger(Main.class); ``` 定义logback.xml ```xml ${LOG_PATTERN} true ${KAFKA_PATTERN} ``` 定义sl4j-kafka.properties ```properties #kafka-support kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092 kafka.topic=log-log4j-test kafka.appId=app ``` ## 附:在Spark-yarn 中使用 当spark运行的时候,会有driver和executor两种角色,其中driver负责提交spark任务和收集executor执行结果,而executor是由yarn分配到集群中的某些机器来运行被driver提交的任务。 在我的理解看来,driver做了驱动和对结果集的聚合,executor就负责执行具体任务产出结果集。 通过我们运行spark-submit的后日志可以发现,提交程序后不会有程序的日志输出,具体的日志在各个节点中的yarn工作目录下。如/yarns/nm/userXX/。 节点任务的日志中包含了spark节点的启动日志和spark具体业务日志。 具体的我自己没有透彻前就不误人子弟了,说到这里就够了,有兴趣可以详细研究下spark-yarn工作方式。 以上说的这些只是想表述一个问题,spark-yarn下的日志的配置不是在运行起任务的时候指定的,而是在提交任务是就必须指定了。 所以在spark-yarn使用和普通的业务有所不同。 **进入正题** 将log4j.properties 和 slf4j-kafka.properties 拷贝到spark提交目录下,内容参照上 _如_ ```bash [cdh]$ pwd /home/datapipe/spark-jobs/spark-job [cdh]$ ls spark-job.jar log4j.properties slf4j-kafka.properties ``` 将编译好的slf4j-kafka-over-log4j12和slf4j-kafka-producer的jar包放到该目录下 ```bash [cdh]$ ls spark-job.jar log4j.properties slf4j-kafka.properties slf4j-kafka-over-log4j12-1.7.5.jar slf4j-kafka-producer-1.7.5.jar ``` 编写启动脚本 ```bash [cdh]$ touch startup.sh [cdh]$ chmod +x startup.sh ``` 脚本内容 ```bash 1 nohup spark-submit --class com.tuandai.XXXMainFunctionClass \ 2 --master yarn --deploy-mode cluster --executor-memory 512m --driver-memory 512m --num-executors 1 --name XXX \ 3 --jars slf4j-kafka-producer-1.7.5.jar,slf4j-kafka-over-log4j12-1.7.5.jar \ 4 --files "log4j-kafka.properties,slf4j-kafka.properties" \ 5 --conf "spark.driver.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \ 6 --conf "spark.executor.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \ 7 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \ 8 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \ 9 spark-job.jar & 10 echo "spark job started" ``` **逐行讲解下** 1. 提交的任务需要运行的main方法 2. 使用yarn 集群模式 executor节点内存512m driver节点内存分配512m executor数量1个 任务名 3. 上传第三方jar包,这里就是slf4j-kafka的支持了 4. 上传配置文件 5. driver对第三方jar包生效 6. executor对第三方jar包生效 7. driver运行参数,这里是生效上传的配置文件 8. executor运行参数,这里是生效上传的配置文件 9. spark任务jar包