# logback-kafka-appender **Repository Path**: chaojunma/logback-kafka-appender ## Basic Information - **Project Name**: logback-kafka-appender - **Description**: logback自定义appender把日志写入到kafka中 通过logstash最终落地到elasticsearch中,简单易用 只需引入pom依赖,无代码侵入 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 17 - **Created**: 2021-09-17 - **Last Updated**: 2021-09-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### logback-kafka-appender #### 介绍 logback-kafka-appender 一个把日志以json的格式写入到kafka消息队列的logback appender 特别是对于分布式的微服务来说更是一个神器,不用运维人员来配置繁琐的日志采集,利用kafka的高吞吐率提高效率。 根据traceID的请求链路跟踪更加方便日志的追踪和调试 json格式的输入,方便logstash的采集 #### 编译安装 ``` git clone https://gitee.com/dengmin/logback-kafka-appender.git cd logback-kafka-appender mvn clean install ``` #### 引入pom依赖 ``` com.gitee.dengmin logback-kafka-appender 1.0 ``` #### 配置logback-spring.xml - 从springboot项目的配置文件中导入spring.application.name变量,方便elasticsearch通过项目名检索日志 ``` ``` - 配置 appender 替换hosts中kafka的地址, 和发送的kafka topic ``` #替换appender中的topic 和 hosts ${name} cloud_logs 10.10.47.6:9092,10.10.47.9:9092 4096 true ``` ### MDC请求链路跟踪 (可选) 如果想要跟踪请求链路产生traceId, 只需要在配置一个spring 拦截器 ``` public class LogMDCInterceptor implements HandlerInterceptor { private static final long serialVersionUID = 1L; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String REQUEST_ID = "REQUEST-ID"; MDC.put(REQUEST_ID, UUID.randomUUID().toString().replace("-","")); return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { MDC.clear(); } } ``` 配置拦截器 ``` @Configuration public class AutoConfiguration implements WebMvcConfigurer{ @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LogMDCInterceptor()).addPathPatterns("/**"); } } ``` ### 配置logstash日志采集 编辑 /etc/logstash/conf.d/kafka.conf 文件 ``` input { kafka { bootstrap_servers => "10.10.47.6:9092,10.10.47.9:9092" topics => ["cloud_logs"] # appender中配置的topic consumer_threads => 5 decorate_events => true codec => json {charset => "UTF-8"} auto_offset_reset => "latest" client_id => "kafka_log_collect" } } output { elasticsearch { hosts => "10.10.47.10:9200" #旧版本的logstash和新版的格式不一样,新版要增加[kafka] index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}" index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}" } #stdout {codec=>rubydebug} } ``` ### 发送到kafka中的数据格式 ``` { "appName": "demo", "method": "restartedMain", "level": "INFO", "className": "org.springframework.cloud.context.scope.GenericScope", "dateTime": "2020-06-09 17:36:07", "line": "295", "message": "BeanFactory id\u003d507e95a4-a9a1-3a1c-80dd-fcb5d8753cf2", "traceId": "5413d7d744ea497e84e2bca954587cd5" } ``` ### elasticsearch中创建mapping ``` curl -XPUT "http://192.168.0.111:9200/_template/cloud_logs" -H 'Content-Type: application/json' -d'{ "index_patterns": [ "cloud_logs*" ], "mappings": { "properties": { "line": { "type": "long" }, "@timestamp": { "type": "date" }, "dateTime": { "type": "text" }, "appName": { "type": "text" }, "message": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "className": { "type": "text" }, "level": { "type": "text" }, "method": { "type": "text" }, "traceId": { "type": "text" }, "timestamp": { "type": "long" } } }, "aliases": {}}' ```