From a2983563a21208db28c02eb7cf30f964858b2ec5 Mon Sep 17 00:00:00 2001 From: Leon <> Date: Fri, 29 Mar 2019 21:40:05 +0800 Subject: [PATCH] update --- daydayup-algorithm/readme.md | 15 ++- daydayup-design-patterns/readme.md | 10 -- daydayup-high-concurrency/readme.md | 85 ++++++++++++- .../concurrent_tools/BankWaterService.java | 72 +++++++++++ .../concurrent_tools/CountDownLatchTest.java | 28 +++++ .../concurrent_tools/CycliBarrierTest.java | 36 ++++++ .../concurrent_tools/CycliBarrierTest2.java | 53 ++++++++ .../concurrent_tools/CyclicBarrierTest3.java | 37 ++++++ .../concurrent_tools/ExchangerTest.java | 46 +++++++ .../concurrent_tools/SemaphoreTest.java | 39 ++++++ daydayup-rocket-mq-study/readme.md | 119 +++++++++++++++++- .../com/lecoboy/rocketmqstudy/Consumer.java | 49 ++++++++ .../com/lecoboy/rocketmqstudy/Producer.java | 46 +++++++ .../rocketmqstudy/sync/SyncProducer.java | 9 -- readme.md | 1 + 15 files changed, 620 insertions(+), 25 deletions(-) create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/BankWaterService.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CountDownLatchTest.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest2.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CyclicBarrierTest3.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/ExchangerTest.java create mode 100644 daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/SemaphoreTest.java create mode 100644 daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Consumer.java create mode 100644 daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Producer.java delete mode 100644 daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/sync/SyncProducer.java diff --git a/daydayup-algorithm/readme.md b/daydayup-algorithm/readme.md index 086d554..4d1d358 100644 --- a/daydayup-algorithm/readme.md +++ b/daydayup-algorithm/readme.md @@ -1,5 +1,18 @@ https://blog.csdn.net/wfq784967698/article/details/79551476 -###算法 + +###算法 `algorithm` + +![时间复杂度](https://img-my.csdn.net/uploads/201304/23/1366700675_1111.JPG) + +- [ ] 插入排序 +- [ ] 选择排序 +- [ ] 冒泡排序 +- [ ] 快速排序(重点) +- [ ] 堆排序 +- [ ] 归并排序 +- [ ] 二分查找 + + #### 术语说明 > - 稳定:如果a原本在b前面,而a=b,排序之后a仍然在b的前面; > - 不稳定:如果a原本在b的前面,而a=b,排序之后a可能会出现在b的后面; diff --git a/daydayup-design-patterns/readme.md b/daydayup-design-patterns/readme.md index f139a4a..250086c 100644 --- a/daydayup-design-patterns/readme.md +++ b/daydayup-design-patterns/readme.md @@ -29,16 +29,6 @@ - [ ] 解释器模式 -##### 算法 `algorithm` -![时间复杂度](https://img-my.csdn.net/uploads/201304/23/1366700675_1111.JPG) -- [ ] 插入排序 -- [ ] 选择排序 -- [ ] 冒泡排序 -- [ ] 快速排序(重点) -- [ ] 堆排序 -- [ ] 归并排序 -- [ ] 二分查找 - #### 软件架构 Spring Boot 2.1、Gradle \ No newline at end of file diff --git a/daydayup-high-concurrency/readme.md b/daydayup-high-concurrency/readme.md index aa47156..2312428 100644 --- a/daydayup-high-concurrency/readme.md +++ b/daydayup-high-concurrency/readme.md @@ -7,6 +7,11 @@ 也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能 够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。 ``` + +jps +jstack 线程id >-/work/dump1 +grep 'java.lang.Thread.State' dump1 | awk '{print $2$3$4%5}' |sort |uniq -c + #### CAS自旋锁 demo:`CasAutomicCounter.java` @@ -87,7 +92,8 @@ Mutex和ReentrantLock基本都是排它锁,这些锁在同一时刻只允许 -##### 13个原子操作类 + +##### 13个原子操作类 `atomicdemo` 从JDK 1.5开始提供了java.util.concurrent.atomic包,这个包中的原子操作类提供了一种用法简单、 性能高效、线程安全地更新一个变量的方式。 //TODO @@ -135,3 +141,80 @@ AtomicIntegerArray类主要是提供原子的方式更新数组里的整型, >- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器 >- AtomicLongFieldUpdater:原子更新长整型字段的更新器 >- AtomicStampedReference:原子更新带有版本号的引用类型。该类型将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题 + +示例:`AtomicIntegerFieldUpdaterTest` + +##### Java中的并发工具类 `concurrent_tools` +###### 等待多线程完成的CountDownLatch +构造一个int类型的计数器,使用await()方法等待线程,每完成一个线程countDown一次,计数器-1,直到0则往下继续运行,否则一直等待。 +我们不可能让主线程一直等待,所以可以使用await(long time,TimeUnit unit)设置超时时间 + +`正常使用join()方法等待线程完成,join()实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中wait(0)表示永远等待下去 +直到join线程终止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是JVM里实现的,需要查看JVM源码` + +示例:`CountDownLatchTest.java` + +###### 同步屏障CyclicBarrier +可循环使用的屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行 +`每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,让后当前线程被阻塞` + +示例:`CycliBarrierTest.java` + +CyclicBarrier还提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程达到屏障时,优先执行barrierAction,方便处理更复杂的业务场景。 +- await() 返回值为当前线程的索引,0表示当前线程是最后一个到达的线程 +- await(long timeout, TimeUnit unit) 在await()的基础上增加超时机制,如果超出指定的等待时间,则抛出 TimeoutException 异常。如果该时间小于等于零,则此方法根本不会等待。 +- isBroken() 在异常区域获取当前线程是否被终止,返回boolen类型 +- getNumberWaiting() 可以获取waiting状态的线程数量 +- reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。 + +示例:`CycliBarrierTest2.java` + +应用场景: +可以用于多线程计数数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。 +示例:`BankWaterService.java` + +####### CountDownLatch和CycliBarrier的区别 +CountDownLatch的计数器智能使用一次,而CycliBarrier的计数器可以使用reset()方法重置。 +所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重新计数器,并让线程重新执行一次。 + +##### 控制并发线程数的Semaphore +```Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。``` +1.应用场景 +Semaphore可以用作流量控制,特别是公用资源有限的应用场景,比如数据库连接。 + +|方法|说明| +|---|---| +| acquire() | 获得许可证,线程可以进行执行| +| tryAcquire() | 尝试获取许可证 | +| relase() | 释放许可证 | +| intavailablePermits() | 返回此信号量中当前可用的许可证数 | +| intgetQueueLength() | 返回正在等待获取许可证的线程数 | +| booleanhasQueuedThreads() | 是否有线程正在等待获取许可证 | +| void reducePermits(int reduction) | 减少reduction个许可证,是个protected方法 | +| Collection getQueuedThreads() | 返回所有等待获取许可证的线程集合,是个protected方法 | +示例:`SemaphoreTest.java` + +##### 线程间交换数据的Exchanger +``` +Exchanger(交换者)是一个用于线程间协作的工具类。用于进行线程间数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。 +这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法。当两个线程到达同步点时, +这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。 +``` +1.应用场景 +可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据, +并使用交叉规则得出2个交配结果。 +也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水, +为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel, +并对两个Excel数据进行校对,看看是否录入一致。 + +示例:`ExchangerTest.java` + +#### Java中的线程池 +```Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的 程序都可以使用线程池。``` +在开发过程中,合理地使用线程池能够带来3个好处 +- 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗 +- 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行 +- 第三:提高线程的课管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, +还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌 + +###### 线程池的实现原理 \ No newline at end of file diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/BankWaterService.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/BankWaterService.java new file mode 100644 index 0000000..685f0ef --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/BankWaterService.java @@ -0,0 +1,72 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 银行流水处理服务类 + */ +public class BankWaterService { + /** + * 创建4个屏障,处理完之后执行当前类的run方法 + */ + private CyclicBarrier c = new CyclicBarrier(4, new JoinAllResult()); + + /** + * 假设只有4个sheet,所以启动4个线程 + */ + private Executor executor = Executors.newFixedThreadPool(4); + /** + * 保存4个sheet计算出的银行流水结果 + */ + private ConcurrentHashMap sheetBankWaterCount = new ConcurrentHashMap<>(); + + public static void main(String[] args) { + BankWaterService bankWaterService = new BankWaterService(); + bankWaterService.count(); + } + private void reset(){ + c.reset(); + } + + private void count() { + for (int i = 0; i < 4; i++) { + int count=0; + executor.execute(new Runnable() { + @Override + public void run() { + //计算当前sheet的银行流水数据 + int water = 10; + sheetBankWaterCount.put(Thread.currentThread().getName(), water); + try { + + //银行计算完成,插入一个屏障 + c.await(); + System.out.println(Thread.currentThread().getName() + "到达屏障"); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + }); + } + } + + class JoinAllResult implements Runnable { + int result = 0; + + @Override + public void run() { + System.out.println("----开始执行计算sum----"); + //汇总每个sheet计算出的结果 + for (Map.Entry sheet : sheetBankWaterCount.entrySet()) { + result += sheet.getValue(); + } + //输出结果 + sheetBankWaterCount.put("result", result); + System.out.println("sheet总和:" + result); + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CountDownLatchTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CountDownLatchTest.java new file mode 100644 index 0000000..44292b7 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CountDownLatchTest.java @@ -0,0 +1,28 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * 并发工具CountDownLatch测试类 + */ +public class CountDownLatchTest { + static CountDownLatch countDownLatch = new CountDownLatch(2); + + public static void main(String[] args) throws InterruptedException { + new Thread(new Runnable() { + @Override + public void run() { + System.out.println(1); + countDownLatch.countDown(); + System.out.println(2); + countDownLatch.countDown(); + } + }).start(); + System.out.println(3); + //await会阻塞主线程 直到countDown 计数器为0 + countDownLatch.await(); + System.out.println(4); + + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest.java new file mode 100644 index 0000000..8092650 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest.java @@ -0,0 +1,36 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +/** + * 同步屏障CycliBarrier测试类 + */ +public class CycliBarrierTest { + static CyclicBarrier c = new CyclicBarrier(2); + + public static void main(String[] args) { + new Thread(new Runnable() { + @Override + public void run() { + try { + c.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + System.out.println(1); + } + }).start(); + + try { + c.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + System.out.println(2); + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest2.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest2.java new file mode 100644 index 0000000..1c84d9f --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CycliBarrierTest2.java @@ -0,0 +1,53 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * CycliBarrier高级功测试类 + */ +public class CycliBarrierTest2 { + //当线程达到屏障后 优先执行 A线程 + static CyclicBarrier c = new CyclicBarrier(2, new A()); + //通过计数器测试reset方法 + static AtomicInteger ai = new AtomicInteger(0); + + + public static void main(String[] args) { + new Thread(new Runnable() { + @Override + public void run() { + try { + if (ai.get() == 0) { + ai.getAndIncrement(); + Thread.currentThread().wait(); + //throw new NullPointerException("空指针哦"); + } + System.out.println("1就位"); + c.await(); + } catch (Exception e) { + } + } + }).start(); + + try { + System.out.println("2就位"); + c.await(); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("waiting线程数:"+c.getNumberWaiting()); + c.reset(); + + System.out.println("完成"); + } + + static class A implements Runnable { + @Override + public void run() { + System.out.println(3); + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CyclicBarrierTest3.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CyclicBarrierTest3.java new file mode 100644 index 0000000..438527f --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/CyclicBarrierTest3.java @@ -0,0 +1,37 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 测试 reset方法示例 TODO 不够完整 需要重新编辑测试 + */ +public class CyclicBarrierTest3 { + + public static void main(String[] args) throws InterruptedException { + ThreadPoolExecutor service = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { + System.out.println("全都到了 【开吃!】"); + }); + for (int i = 0; i < 3; i++) { + final int number = i; + service.execute(() -> { + try { + + System.out.println("编号:" + number + "开始出发 【去聚餐】"); + Thread.sleep((number + 1) * 1000); + System.out.println("编号:" + number + " 【到达聚餐地点】"); + cyclicBarrier.await(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + Thread.sleep(1 * 1000); + System.out.println(cyclicBarrier.getNumberWaiting()); + //cyclicBarrier.reset(); + System.out.println(cyclicBarrier.getNumberWaiting()); + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/ExchangerTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/ExchangerTest.java new file mode 100644 index 0000000..f0958e7 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/ExchangerTest.java @@ -0,0 +1,46 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Exchanger测试类 + * 如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待 + * 可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时间 + */ +public class ExchangerTest { + private static final Exchanger exgr = new Exchanger<>(); + private static ExecutorService threadPool = Executors.newFixedThreadPool(2); + + public static void main(String[] args) { + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + //A录入银行流水时间 + String A = "银行流水A"; + //TimeUnit.SECONDS.sleep(5); + exgr.exchange(A); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + //A录入银行流水时间 + String B = "银行流水B"; + String A = exgr.exchange("B"); + System.out.println("A和B数据是否一致:" + A.equals(B) + " A录入的是:" + A + ",B录入的是:" + B); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/SemaphoreTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/SemaphoreTest.java new file mode 100644 index 0000000..20e6606 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/concurrent_tools/SemaphoreTest.java @@ -0,0 +1,39 @@ +package com.lecoboy.highconcurrency.concurrent_tools; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * 信号量测试类 + * 线程使用acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证 + * 还可以用tryAcquire()方法尝试获取许可证 + */ +public class SemaphoreTest { + private static final int THREAD_COUNT = 30; + private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); + //10表示允许10个线程获取许可证,也就是最大并发数10 + private static Semaphore s = new Semaphore(10); + + public static void main(String[] args) { + for (int i = 0; i < THREAD_COUNT; i++) { + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + s.acquire(); + //延迟为了视觉显示每次10个并发 + TimeUnit.SECONDS.sleep(2); + System.out.println("save data"); + s.release(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + } + threadPool.shutdown(); + } + +} diff --git a/daydayup-rocket-mq-study/readme.md b/daydayup-rocket-mq-study/readme.md index 00cbee0..2be2135 100644 --- a/daydayup-rocket-mq-study/readme.md +++ b/daydayup-rocket-mq-study/readme.md @@ -1,6 +1,117 @@ ### RocketMQ学习 -目录结构 -``` -sync 异步消息 +[](https://www.jianshu.com/p/2838890f3284) -``` \ No newline at end of file +## 一、MQ背景&选型 +消息队列做为高并发系统核心组件之一,能够帮助业务系统结构提升开发效率和系统稳定性。 +主要具有以下优势: +- 学风填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统崩溃等问题) +- 系统解耦(结局不同重要程度、不同能力级别系统之间以来导致一死全死) +- 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统) +- 续流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测) + +目前主流的MQ是RocketMQ、kafka、RabbitMQ,RocketMQ相比于RabbitMQ、Kafka具有主要优势特性有: +- 支持事务型消息(消息发送和DB操作保持两房的最终一致性,rabbitmq和kafka不支持) +- 支持结合rocketMQ的多个系统之间的数据最终一致性(多方事务,二方事务是前提) +- 支持18个级别的延迟消息(RabbitMQ和Kafka不支持) +- 支持制定次数和时间间隔的失败消息重发(kafka不支持,RabbitMQ需要手动确认) +- 支持consumer端tag过滤,减少不必要的网络传输(RabbitMQ和Kafka不支持) +- 支持重复消费(RabbitMQ不支持,kafka支持) +详细对比: +![sd](https://upload-images.jianshu.io/upload_images/12619159-ebd12b24d5ae33d9.png) + +## 二、RocketMQ集群概述 +### 1.RocketMQ集群部署结构 +![集群部署架构图](https://upload-images.jianshu.io/upload_images/12619159-a858d38e0b38c406.png) +#### 1)Name Server(协调者) +Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 +#### 2)Broker(MQ消息服务器,中转角色,用于消息存储与生产消费转发) +Broker部署相对复杂,Broker分为Master和Slave,一个Master可以对应多个Slave,但 +是一个Slave只能对应一个Master,Master和Slave的对应关系通过制定相同的Broker Name, +不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。 + +每个Broker和Name Server集群中的所有节点建立长链接,定时(每隔30s)注册Topic信息到所有Name Server。 +Name Server定时(每隔10s)扫描所有存活的Broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。 + +#### 3) Producer(生产者) +Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取 +Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。 + +Producer每隔30s(由ClientConfig和pollNameServerInterval)从Name Server获取所有Topic队列的最新情况, +这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。 + +Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的Broker发送心跳, +Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。 + +#### 4)Consumer(消费者) +Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接, +且定时向Master、Slave发送心跳。Consumer即可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。 + +Consumer每隔30s从Name Server获取Topic的最新队列情况,这意味着Broker不可用时,Consumer最后最多需要30s才能感知。 + +Consumer每隔30s(由ClinetConfig中heartbeatBrokerInterval决定)向所有关联的Broker发送心跳, +Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知, +Group内的Consumer重新分配队列,然后继续消费。 + +当Consumer得到master当即通知后,专项Slave消费,Slave不能保证master的消息100%都同步过来, +因此会有少量的消息丢失。但是一旦mster恢复,未同步过去的消息会被最终消费掉。 + +消费者队列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识{IP}@{消费者Group}拓展为{IP}@{消费者Group}{topic}{tag}, +例如:(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。 +任何一个元素不同,都认为是不同的消费端,每隔消费端会拥有一份自己消费队列(默认是Broker队列数量*Broker数量)。新挂在的消费者队列中拥有commitlog中的所有数据。 + + +### 三、RokderMQ如何支持分布式事务消息 +####场景 +A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check), +B和MQ保证事务一致(通过重试),从而达到最终事务一致性。 + +原理:大事务 = 小事务 + 异步 + +#### 1.MQ与DB一致性原理(两方事务) +流程图 +![流程图](https://upload-images.jianshu.io/upload_images/12619159-6f4f6754d6f02058.png) +上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。 +MQ消息、DB操作一致性方案: +1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。 + +2)执行DB操作;Db操作成功commit DB操作,DB执行失败Rolllback DB操作。 + +3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败, +回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。 + +4)MQ内部提供一个名为"事务状态服务"的服务,此服务会检查事务消息的状态,如果发现 +消息未COMMIT,则通过Producer启动时注册的TransationCheckListenter来回调业务系统, +业务系统在checkLocalTransactionState方法中间差DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE. + +说明: + +上边以DB为例,其实此处可以是任务业务或者数据源。 + +以上SEND_OK,COMMIT_MESSAGE、ROLLBACK_MESSAGE均为client jar提供的状态,在MQ服务器内部是一个数字。 + +TransactionCheckListener是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。 +这种消息丢失支存在于断网或者rocketMQ集群挂了的情况下。当RocketMQ集群挂了,如果采用异步刷盘,存在1s内数据丢失风险, +异步刷盘场景下保证事务没有意义。所以如果要核心业务用RocketMQ解决分布式事务问题,建议选择同步刷盘模式。 + +#### 2.多系统之间数据一致性(多方事务) +![多系统之间数据一致性](https://upload-images.jianshu.io/upload_images/12619159-cb4ce1a4c8b79fb1.png) + +当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过RocketMq的事务性消息解决) +已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel) + +以上图交易系统为例: +1)交易系统创建订单(往Db插入一条记录),同时发送订单创建消息。通过RocketMQ事务性消息保证一致性。 + +2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。 + +3)交易系统接收自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMQ的重试功能,设置第二次发送消息 +为定时任务的延迟创建消息。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。 +延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。PS:多个RPC可以创建一个回滚任务,通过一个消费组接收一次消息就可以;也可以通过创建 +多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。回滚任务失败,通过MQ的重发来重试。 + +以上是交易系统和其他系统之间保持最终一致性的解决方案。 + +#### 3.案例分析 +##### 1)单机环境下的 + +未完待续... \ No newline at end of file diff --git a/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Consumer.java b/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Consumer.java new file mode 100644 index 0000000..d56470d --- /dev/null +++ b/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Consumer.java @@ -0,0 +1,49 @@ +package com.lecoboy.rocketmqstudy; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class Consumer { + public static void main(String[] args) { + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("PushConsumer"); + consumer.setNamesrvAddr("192.168.199.244:9876"); + try { + //订阅PushTopic下Tag为push的消息 + consumer.subscribe("PushTopic", "push"); + + //程序第一次启动从消息队列头取数据 + consumer.setConsumeFromWhere( + ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.registerMessageListener(new MessageListenerConcurrently() { + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext Context) { + Message msg = list.get(0); +// System.out.println(msg.toString()); + + String topic = msg.getTopic(); + System.out.println("topic = " + topic); + byte[] body = msg.getBody(); + System.out.println("body: " + new String(body)); + String keys = msg.getKeys(); + System.out.println("keys = " + keys); + String tags = msg.getTags(); + System.out.println("tags = " + tags); + System.out.println("-----------------------------------------------"); + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + ); + consumer.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Producer.java b/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Producer.java new file mode 100644 index 0000000..a08f63c --- /dev/null +++ b/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/Producer.java @@ -0,0 +1,46 @@ +package com.lecoboy.rocketmqstudy; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + +public class Producer { + public static void main(String[] args) { + DefaultMQProducer producer = new DefaultMQProducer("Producer"); + producer.setNamesrvAddr("192.168.199.244:9876"); + try { + producer.start(); + for (int i = 0; i < 100; i++) { + TimeUnit.SECONDS.sleep(1); + String keys = i+"keys"; + String push = "push"; + Message msg = new Message("PushTopic", + push, + keys, + "Just for test.".getBytes()); + try { + SendResult result = producer.send(msg); + System.out.println(result.toString()); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + producer.shutdown(); + } + } +} diff --git a/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/sync/SyncProducer.java b/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/sync/SyncProducer.java deleted file mode 100644 index 564d0d2..0000000 --- a/daydayup-rocket-mq-study/src/main/java/com/lecoboy/rocketmqstudy/sync/SyncProducer.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.lecoboy.rocketmqstudy.sync; - -import org.apache.rocketmq.client.producer.DefaultMQProducer; - -public class SyncProducer { - public static void main(String[] args) { - //DefaultMQProducer - } -} diff --git a/readme.md b/readme.md index 945343c..42181ea 100644 --- a/readme.md +++ b/readme.md @@ -3,6 +3,7 @@ #### 介绍 用来针对模块化学习的项目 + ### 结构 > ##### 常用23种设计模式 `daydayup-design-patterns` > ##### 算法 `daydayup-algorithm` -- Gitee