From 68e36e10549c8bf021aea3468ea3b32d3151a07d Mon Sep 17 00:00:00 2001 From: guoyonggang <13598605979@163.com> Date: Thu, 6 Jul 2023 13:24:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9Ekafka=20demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middle-ware/canal/canal-client/pom.xml | 16 +++ .../demo/canal/client/CanalApplication.java | 12 ++ .../memo/demo/canal/client/CannalClient.java | 120 ++++++++++++++++++ .../canal/client/DirectConnectionCanal.java | 2 +- .../memo/demo/canal/client/RedisClient.java | 52 ++++++++ .../demo/canal/client/entity/CanalBean.java | 64 ++++++++++ .../demo/canal/client/mq/CanalConsumer.java | 63 +++++++++ .../src/main/resources/application.yml | 22 ++++ middle-ware/canal/canal-server/start.txt | 22 ++++ 9 files changed, 372 insertions(+), 1 deletion(-) create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java create mode 100644 middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java create mode 100644 middle-ware/canal/canal-client/src/main/resources/application.yml diff --git a/middle-ware/canal/canal-client/pom.xml b/middle-ware/canal/canal-client/pom.xml index f9a7558e..075a147e 100644 --- a/middle-ware/canal/canal-client/pom.xml +++ b/middle-ware/canal/canal-client/pom.xml @@ -28,6 +28,22 @@ canal.protocol 1.1.5 + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-data-redis + + + + \ No newline at end of file diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java new file mode 100644 index 00000000..b600ad3c --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CanalApplication.java @@ -0,0 +1,12 @@ +package vip.yeee.memo.demo.canal.client; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class CanalApplication { + + public static void main(String[] args) { + SpringApplication.run(CanalApplication.class, args); + } +} diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java new file mode 100644 index 00000000..2d6bce27 --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/CannalClient.java @@ -0,0 +1,120 @@ +package vip.yeee.memo.demo.canal.client; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.List; + +/** + * canal客户端 + */ +@Slf4j +@Component +public class CannalClient implements InitializingBean { + + private final static int BATCH_SIZE = 1000; + + @Override + public void afterPropertiesSet() throws Exception { + + // 创建链接 + CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); + try { + //打开连接 + connector.connect(); + log.info("数据库检测连接成功!"); + //订阅数据库表,全部表 + connector.subscribe(".*\\..*"); + //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 + connector.rollback(); + while (true) { + // 获取指定数量的数据 + Message message = connector.getWithoutAck(BATCH_SIZE); + //获取批量ID + long batchId = message.getId(); + //获取批量的数量 + int size = message.getEntries().size(); + //如果没有数据 + if (batchId == -1 || size == 0) { + try { + //线程休眠2秒 + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + //如果有数据,处理数据 + printEntry(message.getEntries()); + } + //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。 + connector.ack(batchId); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + connector.disconnect(); + } + } + + /** + * 打印canal server解析binlog获得的实体类信息 + */ + private static void printEntry(List entrys) { + for (CanalEntry.Entry entry : entrys) { + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + //开启/关闭事务的实体类型,跳过 + continue; + } + //RowChange对象,包含了一行数据变化的所有特征 + //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等 + CanalEntry.RowChange rowChage; + try { + rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); + } + //获取操作类型:insert/update/delete类型 + EventType eventType = rowChage.getEventType(); + //打印Header信息 + log.info("================》; binlog[{}:{}] , name[{},{}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), + entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), + eventType); + + //判断是否是DDL语句 + if (rowChage.getIsDdl()) { + log.info("================》;isDdl: true,sql:" + rowChage.getSql()); + } + //获取RowChange对象里的每一行数据,打印出来 + for (RowData rowData : rowChage.getRowDatasList()) { + //如果是删除语句 + if (eventType == EventType.DELETE) { + printColumn(rowData.getBeforeColumnsList()); + //如果是新增语句 + } else if (eventType == EventType.INSERT) { + printColumn(rowData.getAfterColumnsList()); + //如果是更新的语句 + } else { + //变更前的数据 + System.out.println("------->; before"); + printColumn(rowData.getBeforeColumnsList()); + //变更后的数据 + System.out.println("------->; after"); + printColumn(rowData.getAfterColumnsList()); + } + } + } + } + + private static void printColumn(List columns) { + for (Column column : columns) { + log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); + } + } +} diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java index 54f2c28c..bdac6947 100644 --- a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/DirectConnectionCanal.java @@ -18,7 +18,7 @@ import java.util.List; @Slf4j public class DirectConnectionCanal { - private static final String CANAL_MONITOR_HOST = "yeee.vip.host"; + private static final String CANAL_MONITOR_HOST = "127.0.0.1"; private static final Integer CANAL_MONITOR_PORT = 11111; diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java new file mode 100644 index 00000000..3983c250 --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/RedisClient.java @@ -0,0 +1,52 @@ +package vip.yeee.memo.demo.canal.client; + +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +/** + * 操作redis工具类 + */ +@Component + +public class RedisClient { + + /** + * 获取redis模版 + */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 设置redis的key-value + */ + public void setString(String key, String value) { + setString(key, value, null); + } + + /** + * 设置redis的key-value,带过期时间 + */ + public void setString(String key, String value, Long timeOut) { + stringRedisTemplate.opsForValue().set(key, value); + if (timeOut != null) { + stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS); + } + } + + /** + * 获取redis中key对应的值 + */ + public String getString(String key) { + return stringRedisTemplate.opsForValue().get(key); + } + + /** + * 删除redis中key对应的值 + */ + public Boolean deleteKey(String key) { + return stringRedisTemplate.delete(key); + } +} diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java new file mode 100644 index 00000000..a8aa375a --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/entity/CanalBean.java @@ -0,0 +1,64 @@ +package vip.yeee.memo.demo.canal.client.entity; + +import lombok.Data; + +import java.util.List; + +@Data +public class CanalBean { + + //数据 + private List data; + //数据库名称 + private String database; + private long es; + //递增,从1开始 + private int id; + //是否是DDL语句 + private boolean isDdl; + //表结构的字段类型 + private MysqlType mysqlType; + //UPDATE语句,旧数据 + private String old; + //主键名称 + private List pkNames; + //sql语句 + private String sql; + private SqlType sqlType; + //表名 + private String table; + private long ts; + //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 + private String type; + + @Data + public class TbCommodityInfo{ + + private String id; + + private String commodityName; + + private String commodityPrice; + + private String number; + private String description; + + } + + @Data + class MysqlType{ + private String id; + private String commodityName; + private String commodityPrice; + private String number; + private String description; + } + @Data + public class SqlType { + private int id; + private int commodity_name; + private int commodity_price; + private int number; + private int description; + } +} diff --git a/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java new file mode 100644 index 00000000..343f6870 --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/java/vip/yeee/memo/demo/canal/client/mq/CanalConsumer.java @@ -0,0 +1,63 @@ +package vip.yeee.memo.demo.canal.client.mq; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import vip.yeee.memo.demo.canal.client.RedisClient; +import vip.yeee.memo.demo.canal.client.entity.CanalBean; + +import javax.annotation.Resource; +import java.util.List; + +/** + * kafka 消费者 + */ +@Component +@Slf4j +public class CanalConsumer { + + @Resource + private RedisClient redisClient; + //监听的队列名称为:canaltopic + @KafkaListener(topics = "canaltopic") + public void receive(ConsumerRecord consumer) { + String value = (String) consumer.value(); + log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value); + //转换为javaBean + CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class); + //获取是否是DDL语句 + boolean isDdl = canalBean.isDdl(); + //获取类型 + String type = canalBean.getType(); + //不是DDL语句 + if (!isDdl) { + List tbCommodityInfos = canalBean.getData(); + //过期时间 + long TIME_OUT = 600L; + if ("INSERT".equals(type)) { + //新增语句 + for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { + String id = tbCommodityInfo.getId(); + //新增到redis中,过期时间是10分钟 + redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); + } + } else if ("UPDATE".equals(type)) { + //更新语句 + for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { + String id = tbCommodityInfo.getId(); + //更新到redis中,过期时间是10分钟 + redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); + } + } else { + //删除语句 + for (CanalBean.TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { + String id = tbCommodityInfo.getId(); + //从redis中删除 + redisClient.deleteKey(id); + } + } + } + } +} diff --git a/middle-ware/canal/canal-client/src/main/resources/application.yml b/middle-ware/canal/canal-client/src/main/resources/application.yml new file mode 100644 index 00000000..18028071 --- /dev/null +++ b/middle-ware/canal/canal-client/src/main/resources/application.yml @@ -0,0 +1,22 @@ +spring: + redis: + host: 127.0.0.1 + port: 6379 + database: 0 + password: + kafka: + # Kafka服务地址 + bootstrap-servers: 127.0.0.1:9002 + consumer: + # 指定一个默认的组名 + group-id: consumer-group1 + #序列化反序列化 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringDeserializer + value-serializer: org.apache.kafka.common.serialization.StringDeserializer + # 批量抓取 + batch-size: 65536 + # 缓存容量 + buffer-memory: 524288 \ No newline at end of file diff --git a/middle-ware/canal/canal-server/start.txt b/middle-ware/canal/canal-server/start.txt index e366682a..a104e83e 100644 --- a/middle-ware/canal/canal-server/start.txt +++ b/middle-ware/canal/canal-server/start.txt @@ -50,3 +50,25 @@ docker cp /home/canal/bin/bin/startup.sh canal:/home/admin/canal-server/bin/star 重启容器 +#安装zookeeper 和 kafka【均为单机启动】 +docker pull zookeeper:3.6 +docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:3.6 + +docker pull wurstmeister/kafka:2.12-2.5.0 + +docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 ^ +--name kafka -p 9002:9002 ^ +-e KAFKA_BROKER_ID=0 ^ +-e KAFKA_ZOOKEEPER_CONNECT=192.168.100.84:2181/kafka ^ +-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.84:9002 ^ +-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9002 ^ +-v /etc/localtime:/etc/localtime wurstmeister/kafka:2.12-2.5.0 + +#进入容器 +docker exec -it ${CONTAINER ID} /bin/bash +cd opt/bin +#单机方式:创建一个主题 +kafka-topics.sh --create --zookeeper 192.168.100.84:2181/kafka --replication-factor 1 --partitions 1 --topic canaltopic + +#使用命令进行监听消费 +kafka-console-consumer.sh --bootstrap-server 192.168.100.84:9002 --from-beginning --topic canaltopic -- Gitee