From 68e36e10549c8bf021aea3468ea3b32d3151a07d Mon Sep 17 00:00:00 2001
From: guoyonggang <13598605979@163.com>
Date: Thu, 6 Jul 2023 13:24:14 +0800
Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9Ekafka=20demo?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
middle-ware/canal/canal-client/pom.xml | 16 +++
.../demo/canal/client/CanalApplication.java | 12 ++
.../memo/demo/canal/client/CannalClient.java | 120 ++++++++++++++++++
.../canal/client/DirectConnectionCanal.java | 2 +-
.../memo/demo/canal/client/RedisClient.java | 52 ++++++++
.../demo/canal/client/entity/CanalBean.java | 64 ++++++++++
.../demo/canal/client/mq/CanalConsumer.java | 63 +++++++++
.../src/main/resources/application.yml | 22 ++++
middle-ware/canal/canal-server/start.txt | 22 ++++
9 files changed, 372 insertions(+), 1 deletion(-)
create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java
create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java
create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java
create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java
create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java
create mode 100644 middle-ware/canal/canal-client/src/main/resources/application.yml
diff --git a/middle-ware/canal/canal-client/pom.xml b/middle-ware/canal/canal-client/pom.xml
index f9a7558e..075a147e 100644
--- a/middle-ware/canal/canal-client/pom.xml
+++ b/middle-ware/canal/canal-client/pom.xml
@@ -28,6 +28,22 @@
canal.protocol
1.1.5
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+
+
\ No newline at end of file
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java
new file mode 100644
index 00000000..b600ad3c
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java
@@ -0,0 +1,12 @@
+package vip.yeee.memo.demo.canal.client;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CanalApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CanalApplication.class, args);
+ }
+}
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java
new file mode 100644
index 00000000..2d6bce27
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java
@@ -0,0 +1,120 @@
+package vip.yeee.memo.demo.canal.client;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.CanalEntry.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Component;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * canal客户端
+ */
+@Slf4j
+@Component
+public class CannalClient implements InitializingBean {
+
+ private final static int BATCH_SIZE = 1000;
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+
+ // 创建链接
+ CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
+ try {
+ //打开连接
+ connector.connect();
+ log.info("数据库检测连接成功!");
+ //订阅数据库表,全部表
+ connector.subscribe(".*\\..*");
+ //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
+ connector.rollback();
+ while (true) {
+ // 获取指定数量的数据
+ Message message = connector.getWithoutAck(BATCH_SIZE);
+ //获取批量ID
+ long batchId = message.getId();
+ //获取批量的数量
+ int size = message.getEntries().size();
+ //如果没有数据
+ if (batchId == -1 || size == 0) {
+ try {
+ //线程休眠2秒
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ //如果有数据,处理数据
+ printEntry(message.getEntries());
+ }
+ //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
+ connector.ack(batchId);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ connector.disconnect();
+ }
+ }
+
+ /**
+ * 打印canal server解析binlog获得的实体类信息
+ */
+ private static void printEntry(List entrys) {
+ for (CanalEntry.Entry entry : entrys) {
+ if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+ //开启/关闭事务的实体类型,跳过
+ continue;
+ }
+ //RowChange对象,包含了一行数据变化的所有特征
+ //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
+ CanalEntry.RowChange rowChage;
+ try {
+ rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+ } catch (Exception e) {
+ throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
+ }
+ //获取操作类型:insert/update/delete类型
+ EventType eventType = rowChage.getEventType();
+ //打印Header信息
+ log.info("================》; binlog[{}:{}] , name[{},{}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
+ entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
+ eventType);
+
+ //判断是否是DDL语句
+ if (rowChage.getIsDdl()) {
+ log.info("================》;isDdl: true,sql:" + rowChage.getSql());
+ }
+ //获取RowChange对象里的每一行数据,打印出来
+ for (RowData rowData : rowChage.getRowDatasList()) {
+ //如果是删除语句
+ if (eventType == EventType.DELETE) {
+ printColumn(rowData.getBeforeColumnsList());
+ //如果是新增语句
+ } else if (eventType == EventType.INSERT) {
+ printColumn(rowData.getAfterColumnsList());
+ //如果是更新的语句
+ } else {
+ //变更前的数据
+ System.out.println("------->; before");
+ printColumn(rowData.getBeforeColumnsList());
+ //变更后的数据
+ System.out.println("------->; after");
+ printColumn(rowData.getAfterColumnsList());
+ }
+ }
+ }
+ }
+
+ private static void printColumn(List columns) {
+ for (Column column : columns) {
+ log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
+ }
+ }
+}
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java
index 54f2c28c..bdac6947 100644
--- a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java
@@ -18,7 +18,7 @@ import java.util.List;
@Slf4j
public class DirectConnectionCanal {
- private static final String CANAL_MONITOR_HOST = "yeee.vip.host";
+ private static final String CANAL_MONITOR_HOST = "127.0.0.1";
private static final Integer CANAL_MONITOR_PORT = 11111;
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java
new file mode 100644
index 00000000..3983c250
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java
@@ -0,0 +1,52 @@
+package vip.yeee.memo.demo.canal.client;
+
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 操作redis工具类
+ */
+@Component
+
+public class RedisClient {
+
+ /**
+ * 获取redis模版
+ */
+ @Resource
+ private StringRedisTemplate stringRedisTemplate;
+
+ /**
+ * 设置redis的key-value
+ */
+ public void setString(String key, String value) {
+ setString(key, value, null);
+ }
+
+ /**
+ * 设置redis的key-value,带过期时间
+ */
+ public void setString(String key, String value, Long timeOut) {
+ stringRedisTemplate.opsForValue().set(key, value);
+ if (timeOut != null) {
+ stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * 获取redis中key对应的值
+ */
+ public String getString(String key) {
+ return stringRedisTemplate.opsForValue().get(key);
+ }
+
+ /**
+ * 删除redis中key对应的值
+ */
+ public Boolean deleteKey(String key) {
+ return stringRedisTemplate.delete(key);
+ }
+}
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java
new file mode 100644
index 00000000..a8aa375a
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java
@@ -0,0 +1,64 @@
+package vip.yeee.memo.demo.canal.client.entity;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class CanalBean {
+
+ //数据
+ private List data;
+ //数据库名称
+ private String database;
+ private long es;
+ //递增,从1开始
+ private int id;
+ //是否是DDL语句
+ private boolean isDdl;
+ //表结构的字段类型
+ private MysqlType mysqlType;
+ //UPDATE语句,旧数据
+ private String old;
+ //主键名称
+ private List pkNames;
+ //sql语句
+ private String sql;
+ private SqlType sqlType;
+ //表名
+ private String table;
+ private long ts;
+ //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
+ private String type;
+
+ @Data
+ public class TbCommodityInfo{
+
+ private String id;
+
+ private String commodityName;
+
+ private String commodityPrice;
+
+ private String number;
+ private String description;
+
+ }
+
+ @Data
+ class MysqlType{
+ private String id;
+ private String commodityName;
+ private String commodityPrice;
+ private String number;
+ private String description;
+ }
+ @Data
+ public class SqlType {
+ private int id;
+ private int commodity_name;
+ private int commodity_price;
+ private int number;
+ private int description;
+ }
+}
diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java
new file mode 100644
index 00000000..343f6870
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java
@@ -0,0 +1,63 @@
+package vip.yeee.memo.demo.canal.client.mq;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+import vip.yeee.memo.demo.canal.client.RedisClient;
+import vip.yeee.memo.demo.canal.client.entity.CanalBean;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * kafka 消费者
+ */
+@Component
+@Slf4j
+public class CanalConsumer {
+
+ @Resource
+ private RedisClient redisClient;
+ //监听的队列名称为:canaltopic
+ @KafkaListener(topics = "canaltopic")
+ public void receive(ConsumerRecord, ?> consumer) {
+ String value = (String) consumer.value();
+ log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
+ //转换为javaBean
+ CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
+ //获取是否是DDL语句
+ boolean isDdl = canalBean.isDdl();
+ //获取类型
+ String type = canalBean.getType();
+ //不是DDL语句
+ if (!isDdl) {
+ List tbCommodityInfos = canalBean.getData();
+ //过期时间
+ long TIME_OUT = 600L;
+ if ("INSERT".equals(type)) {
+ //新增语句
+ for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
+ String id = tbCommodityInfo.getId();
+ //新增到redis中,过期时间是10分钟
+ redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
+ }
+ } else if ("UPDATE".equals(type)) {
+ //更新语句
+ for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
+ String id = tbCommodityInfo.getId();
+ //更新到redis中,过期时间是10分钟
+ redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
+ }
+ } else {
+ //删除语句
+ for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
+ String id = tbCommodityInfo.getId();
+ //从redis中删除
+ redisClient.deleteKey(id);
+ }
+ }
+ }
+ }
+}
diff --git a/middle-ware/canal/canal-client/src/main/resources/application.yml b/middle-ware/canal/canal-client/src/main/resources/application.yml
new file mode 100644
index 00000000..18028071
--- /dev/null
+++ b/middle-ware/canal/canal-client/src/main/resources/application.yml
@@ -0,0 +1,22 @@
+spring:
+ redis:
+ host: 127.0.0.1
+ port: 6379
+ database: 0
+ password:
+ kafka:
+ # Kafka服务地址
+ bootstrap-servers: 127.0.0.1:9002
+ consumer:
+ # 指定一个默认的组名
+ group-id: consumer-group1
+ #序列化反序列化
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-serializer: org.apache.kafka.common.serialization.StringDeserializer
+ # 批量抓取
+ batch-size: 65536
+ # 缓存容量
+ buffer-memory: 524288
\ No newline at end of file
diff --git a/middle-ware/canal/canal-server/start.txt b/middle-ware/canal/canal-server/start.txt
index e366682a..a104e83e 100644
--- a/middle-ware/canal/canal-server/start.txt
+++ b/middle-ware/canal/canal-server/start.txt
@@ -50,3 +50,25 @@ docker cp /home/canal/bin/bin/startup.sh canal:/home/admin/canal-server/bin/star
重启容器
+#安装zookeeper 和 kafka【均为单机启动】
+docker pull zookeeper:3.6
+docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:3.6
+
+docker pull wurstmeister/kafka:2.12-2.5.0
+
+docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 ^
+--name kafka -p 9002:9002 ^
+-e KAFKA_BROKER_ID=0 ^
+-e KAFKA_ZOOKEEPER_CONNECT=192.168.100.84:2181/kafka ^
+-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.84:9002 ^
+-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9002 ^
+-v /etc/localtime:/etc/localtime wurstmeister/kafka:2.12-2.5.0
+
+#进入容器
+docker exec -it ${CONTAINER ID} /bin/bash
+cd opt/bin
+#单机方式:创建一个主题
+kafka-topics.sh --create --zookeeper 192.168.100.84:2181/kafka --replication-factor 1 --partitions 1 --topic canaltopic
+
+#使用命令进行监听消费
+kafka-console-consumer.sh --bootstrap-server 192.168.100.84:9002 --from-beginning --topic canaltopic
--
Gitee