# 基于kafka的高可用nginx-web日志处理系统 **Repository Path**: zjdobhap/NginxWebLogBasedKafka ## Basic Information - **Project Name**: 基于kafka的高可用nginx-web日志处理系统 - **Description**: 项目模拟企业的业务环境,基于nginx搭建了 4/7层负载均衡的高可用高并发web集群,同时使用高吞吐的kafka做消息队列,对web集群日志做集中处理,zookeeper负责统一管理kafka,python程序用来消费日志并清洗入库,最后使用压力测试检验集群性能,找出瓶颈不断优化。zabbix负责监控,实时掌握业务运行情况,及时维护 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2022-07-12 - **Last Updated**: 2023-06-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: 消息队列, 负载均衡, 高可用, 监控 ## README # 基于kafka的高可用nginx-web日志处理系统 #### 介绍 项目模拟企业的业务环境,基于nginx搭建了 4/7层负载均衡的高可用高并发web集群,同时使用高吞吐的kafka做消息队列,对web集群日志做集中处理,zookeeper负责统一管理kafka,python程序用来消费日志并清洗入库,最后使用压力测试检验集群性能,找出瓶颈不断优化。zabbix负责监控,实时掌握业务运行情况,及时维护 #### 软件架构 centos 7、nginx、keepalived、ab、nfs、zabbix、kafka、zookeeper、python3、mysql、filebeat #### 安装教程 1. 画出整个项目的架构图,安装所需的虚拟机,并规划好ip地址,确保主机间网络畅通 ![输入图片说明](%E5%9F%BA%E4%BA%8Ekafka%E7%9A%84%E9%AB%98%E5%8F%AF%E7%94%A8nginx-web%E6%97%A5%E5%BF%97%E6%94%B6%E9%9B%86%E7%B3%BB%E7%BB%9F.png) 2. 在三台虚拟机上部署nginx和zabbix-agent,配置好nginx的并发数,和worker进程数以及zabbix-agent的相关配置 scp /root/install_nginx.sh 192.168.43.3:/root:远程copy ssh root@192.168.43.3 bash /root/install_nginx.sh:远程连接并执行nginx安装脚本 [root@zhangjian nginx]# cat install_nginx.sh ``` #!/bin/bash #安装依赖 yum install epel-release -y yum -y install zlib zlib-devel openssl openssl-devel pcre pcre-devel gcc gcc-c++ autoconf automake make psmisc net-tools lsof vim wget #新建用户和组 id lilin || useradd lilin -s /sbin/nologin #下载nginx软件包 mkdir /lilin99 -p cd /lilin99 wget https://nginx.org/download/nginx-1.21.4.tar.gz #解压软件 tar -xf nginx-1.21.4.tar.gz cd nginx-1.21.4 #编译前做好配置 ./configure --prefix=/usr/local/sclilin99 --user=lilin --group=lilin --with-http_ssl_module --with-threads --with-http_v2_module --with-http_stub_status_module --with-stream --with-http_gunzip_module #判断配置是否成功 if (( $? != 0 ));then echo "编译配置出错,已退出" exit fi #开始编译,启动两个进程,提高速度 make -j 2 #安装 make install #修改PATH变量,方便执行nginx相关命令 echo "PATH=$PATH:/usr/local/sclilin99/sbin" >> /root/.bashrc source /root/.bashrc #停一秒,让上一条命令执行完 sleep 1 #关闭防火墙并设置开机不启动,同时关闭selinux systemctl stop firewalld systemctl disable firewalld setenforce 0 sed -i '/^SELINUX=/ s/enforcing/disabled/' /etc/selinux/config #设置nginx开机自启 chmod +x /etc/rc.d/rc.local echo "/usr/local/sclilin99/sbin/nginx" >> /etc/rc.local #修改nginx.conf的配置,例如:端口号、worker进程数、线程数、服务域名 sed -i '/worker_processes/ s/1/4/' /usr/local/sclilin99/conf/nginx.conf sed -i '/worker_connections/ s/1024/2048/' /usr/local/sclilin99/conf/nginx.conf sed -i -r '36c \\tlisten 80;' /usr/local/sclilin99/conf/nginx.conf sed -i -r '37c \\tserver_name www.lilin.com;' /usr/local/sclilin99/conf/nginx.conf #杀死nginx进程 killall -9 nginx #启动nginx /usr/local/sclilin99/sbin/nginx ``` 三台web server上部署配置好zabbix-agent让zabbix-server能过来拿数据,同时开启nginx状态统计页面 ``` location = /status { stub_status; } ``` 3. 搭建nfs服务器,上传网站内容,让web服务器都挂载到nfs共享目录获取数据,确保网站数据一致性 搭建nfs服务器 ``` yum install nfs-utils -y service nfs restart mkdir /data vim /etc/exports,添加这一行/data 192.168.43.5/24(rw,no_root_squash,no_all_squash,sync) ``` 三台web server上挂载到nfs共享的目录获取数据 后端服务器上也安装nfs,yum install nfs-utils -y 后端服务器上执行命令,mount 192.168.43.5:/data /usr/local/sclilin99/html/,把nfs上的目录挂载到html上来 4. 两台机器做负载均衡器,用nginx实现,调度算法采用round-robin,并用keepalived软件给负载均衡器做高可用,实现双vip主机互为主从,避免单点故障 在两台负载均衡器上使用nginx来实现负载均衡功能 ``` http { //添加一个负载均衡器myapp1 upstream myapp1 { server 192.168.43.3 weight=3; //加权轮询 server 192.168.43.4; server 192.168.43.7; } server { listen 80; location / { proxy_pass http://myapp1; } } } ``` 负载均衡器上安装配置keepalived,实现双vip高可用 ``` vrrp_instance VI_1 { state BACKUP interface ens33 virtual_router_id 88 priority 100 advert_int 1 authentication { auth_type PASS auth_type 1111 } virtual_ipaddress { 192.168..43.88 } } vrrp_instance VI_2 { state BACKUP interface ens33 virtual_router_id 15 priority 100 advert_int 1 authentication { auth_type PASS auth_type 1111 } virtual_ipaddress { 192.168..43.15 } } ``` 5. 三台虚拟机上安装kafka和zookeeper,配置好相关参数,启动服务进行测试,让zookeeper管理leader选举、broker、元数据等,实现对kafka的控制,确保整个日志系统的高可用高并发 ``` ###################kafka 1、安装: 安装java:yum install java wget -y 安装kafka: wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz 解包: tar xf kafka_2.12-2.8.1.tgz 使用自带的zookeeper集群配置 安装zookeeper: wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 2、配置kafka 修改config /server.properties: broker.id=0 listeners=PLAINTEXT://nginx-kafka01:9092 zookeeper.connect=192.168.0.94:2181,192.168.0.95:2181,192.168.0.96:2181 3、配置zk 进入/opt/apache-zookeeper-3.6.3-bin/confs cp zoo_sample.cfg zoo.cfg 修改zoo.cfg, 添加如下三行: server.1=192.168.0.94:3888:4888 server.2=192.168.0.95:3888:4888 server.3=192.168.0.96:3888:4888 3888和4888都是端口 一个用于数据传输,一个用于检验存活性和选举 创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容 如:在192.168.0.94机器上 echo 1 > /tmp/zookeeper/myid 启动zookeeper: bin/zkServer.sh start 开启zk和kafka的时候,一定是先启动zk,再启动kafka 关闭服务的时候,kafka先关闭,再关闭zk #查看 [root@nginx-kafka03 apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader 启动kafka: bin/kafka-server-start.sh -daemon config/server.properties zookeeper使用: 运行 bin/zkCli.sh [zk: localhost:2181(CONNECTED) 1] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, sc, zookeeper] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [1, 2, 3] ``` 6. web服务器上部署好filebeat,收集web日志并转发,协调kafka集中处理日志 ``` > filebeat部署 > #安装 > 1、rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch > 2、编辑 vim /etc/yum.repos.d/fb.repo > [elastic-7.x] > name=Elastic repository for 7.x packages > baseurl=https://artifacts.elastic.co/packages/7.x/yum > gpgcheck=1 > gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch > enabled=1 > autorefresh=1 > ​type=rpm-md > 3、yum安装 > yum install filebeat -y > > rpm -qa |grep filebeat #可以查看filebeat有没有安装 rpm -qa 是查看机器上安装的所有软件包 > rpm -ql filebeat 查看filebeat安装到哪里去了,牵扯的文件有哪些 > > 4、设置开机自启 > systemctl enable filebeat > > #ymal格式 > { > "filebeat.inputs": [ > { "type":"log", > "enabled":true, > "paths":["/var/log/nginx/sc_access"] > }, > ], > > } > #配置 > 修改配置文件/etc/filebeat/filebeat.yml > filebeat.inputs: > - type: log > # Change to true to enable this input configuration. > enabled: true > # Paths that should be crawled and fetched. Glob based paths. > paths: > - /var/log/nginx/sc_access.log > #==========------------------------------kafka----------------------------------- > output.kafka: > hosts: ["192.168.229.139:9092","192.168.229.140:9092"] > topic: nginxlog > keep_alive: 10s > > > #创建主题nginxlog > bin/kafka-topics.sh --create --zookeeper 192.168.50.140:2181 --replication-factor 3 --partitions 1 --topic nginxlog > > > #启动服务: > systemctl start filebeat ``` 7. 编写python脚本,导入pykafka模块让脚本从kafka里获取数据,并清洗为所需格式写入mysql ``` import json import requests import time import pymysql taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=" # 查询ip地址的信息(省份和运营商isp),通过taobao网的接口 def resolv_ip(ip): response = requests.get(taobao_url + ip) if response.status_code == 200: tmp_dict = json.loads(response.text) prov = tmp_dict["data"]["region"] isp = tmp_dict["data"]["isp"] return prov, isp return None, None # 将日志里读取的格式转换为我们指定的格式 def trans_time(dt): # 把字符串转成时间格式 timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S") # timeStamp = int(time.mktime(timeArray)) # 把时间格式转成字符串 new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) return new_time # 从kafka里获取数据,清洗为我们需要的ip,时间,带宽 from pykafka import KafkaClient client = KafkaClient(hosts="192.168.50.140:9092,192.168.50.141:9092,192.168.50.142:9092") topic = client.topics['nginxlog'] balanced_consumer = topic.get_balanced_consumer( consumer_group='testgroup', auto_commit_enable=True, zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181' ) # consumer = topic.get_simple_consumer() db = pymysql.connect(host="192.168.50.88", user="root", password="123456", port=3306, database="consumer", charset="utf8") cursor = db.cursor() for message in balanced_consumer: if message is not None: line = json.loads(message.value.decode("utf-8")) log = line["message"] tmp_lst = log.split() ip = tmp_lst[0] dt = tmp_lst[5].replace("[", "") bt = tmp_lst[9] dt = trans_time(dt) prov, isp = resolv_ip(ip) if prov and isp: print(prov, isp, dt,bt) try: cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s", "%s", "%s", "%s")' % (dt, prov, isp, bt)) db.commit() print("保存成功") except Exception as err: print("修改失败", err) db.rollback() db.close() ``` 8. 在客户机上用ab软件进行压力测试,以检验集群的实际使用情况 9. web服务器上编写脚本让zabbix能调取其相关参数 ,然后在zabbix网页端添加服务器和监控项,设置触发器并调用微信报警通知,实时监控整个web集群的状态 ![微信报警脚本](%E5%BE%AE%E4%BF%A1%E6%8A%A5%E8%AD%A6%E8%84%9A%E6%9C%AC.png)