diff --git a/daydayup-high-concurrency/readme.md b/daydayup-high-concurrency/readme.md index b791d8ed59fb2246fabb6077f44da57ad888189e..6101afbe5a1d421de35fc32a053a53d7a3960f7a 100644 --- a/daydayup-high-concurrency/readme.md +++ b/daydayup-high-concurrency/readme.md @@ -7,8 +7,13 @@ 也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能 够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。 ``` +#### CAS自旋锁 +demo:`CasAutomicCounter.java` + #### 线程的状态 -| 状态名称 | 说明 | +demo:`ThreadState.java` + +| 状态名称 | 说明 | | ------ | ------ | | NEW | 初始状态,线程被构建,但是还没有调用start()方法 | | RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地乘坐"运行中" | @@ -16,3 +21,75 @@ | WAITING | 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定的动作(通知或中断) | | TIME_WAITING | 超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的 | | TERMINATED | 终止状态,表示当前线程已经执行完毕 | + +#### 线程操作 +demo:`WaitNotify.java`、`TheadState.java` + +| 操作 | 作用 | +| ------ | ------ | +| start() | 启动线程 | +| join() | 线程阻塞 | +| interrupt() | 中断线程 | +| notify() | 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁 | +| notifyAll() | 通知所有等待在该对象上的线程 | +| wait() | 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁 | +| wait(long) | 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回 | +| wait(long,int) | 对于超时时间更细粒度的控制,可以达到纳秒 | + +#### TheadLocal +可使用在AOP记录耗时,demo参考`TheadLocalDemo.java` + +#### 一个简单的数据连接池示例 +demo:`connectionDemo`文件夹 + +包含以下知识点: +> - 代理模式 +> - 超时模式 +> - CountDownLanch并发工具 +> - 双向链表结构 +> - 原子操作计数器 + + +### Lock接口 +`锁是用来控制多线程访问共享资源的方式,一个锁能够防止多个线程同时访问共享资源 +` +##### Lock接口提供的synchronized关键字不具备的主要特性 + +|特性|描述 | +|----|----| +|尝试非阻塞低获取锁 | 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁 | +|能被中断低获取锁| 与synchronized不同,获取到锁的线程能够相应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放| +|超时获取锁|在制定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回| + +##### Lock的API + +|方法名称|描 述| +|---|---| +| void lock()|获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回| +| void lockInterruptibly() throws InterrupedException| 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程| +| boolead tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false| +| boolean tryLock(Long time,TimeUnit unit) throws InterrupedException | 超市的获取锁,当前线程在一下3中情况下回返回 1、当前线程在超市时间内获得了锁 2、当前线程在超市时间内被中断 3、超市时间结束,返回false| +| void unlock() | 释放锁| +| Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁| + +##### 队列同步器 +AbstractQueuedSynchronizer和常用的ReentrantLock +Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的 +同步器的设计是基于模板方法模式 + +##### 重入锁 +ReentrantLock + +##### 读写锁 +Mutex和ReentrantLock基本都是排它锁,这些锁在同一时刻只允许一个线程进行访问 +而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写 +线程均被阻塞。读写锁维护了一堆锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一半的排他锁有了很大提升。 + + + + + + + + + diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/HighConcurrencyApplication.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/HighConcurrencyApplication.java index b2dd16545c798b7fce8285874822a3405c9d025c..0656db227ec1c86734acab5f1d20b562e8c5e21b 100644 --- a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/HighConcurrencyApplication.java +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/HighConcurrencyApplication.java @@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class HighConcurrencyApplication { + //test public static void main(String[] args) { SpringApplication.run(HighConcurrencyApplication.class, args); } diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionDriver.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionDriver.java new file mode 100644 index 0000000000000000000000000000000000000000..59f541e6e331e690e0bdee0fa007ae6f00526742 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionDriver.java @@ -0,0 +1,38 @@ +package com.lecoboy.highconcurrency.connectionDemo; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.sql.Connection; +import java.util.concurrent.TimeUnit; + +/** + * 动态代理构造一个Connection 该Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒 + */ +public class ConnectionDriver { + public final String testName="测试"; + + static class ConnectionHandler implements InvocationHandler{ + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if(method.getName().equals("commit")){ + // System.out.println("进入commit休眠100ms"); + TimeUnit.MICROSECONDS.sleep(100); + } + return null; + } + } + + /** + * 创建一个connection的代理,在commit时休眠100毫秒 + * @return + */ + public static final Connection createConnection(){ + return (Connection)Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), + new Class[]{ + Connection.class + }, + new ConnectionHandler()); + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPool.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPool.java new file mode 100644 index 0000000000000000000000000000000000000000..71a758362afa0786ab9585ca956ed6c3f4187c51 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPool.java @@ -0,0 +1,79 @@ +package com.lecoboy.highconcurrency.connectionDemo; + +import java.sql.Connection; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; + +public class ConnectionPool { + //双向链表结构定义连接集合 + //双向链表结构链接集合 + private LinkedList pool = new LinkedList(); + + /** + * 初始化连接数 + * + * @param initialSize 初始化数量 + */ + public ConnectionPool(int initialSize) { + if (initialSize > 0) { + for (int i = 0; i < initialSize; i++) { + pool.addLast(ConnectionDriver.createConnection()); + } + } + } + + /** + * 释放连接 + * @param connection + */ + public void releaseConnection(Connection connection) { + if (connection != null) { + synchronized (pool) { + //连接释放后需要进行通知,这样其他消费则能够感知到连接池中已经归还了一个连接 + //System.out.println("增加一个"); + pool.addLast(connection); + pool.notifyAll(); + } + } + } + + /** + * 批量连接 + * 在mills内无法获取到连接,将会返回null + * + * @param mills 超时时间 毫秒 + * @return + * @throws InterruptedException + */ + public Connection fetchConnection(Long mills) throws InterruptedException { + synchronized (pool) { + //完全超时 + if (mills <= 0) { + while (pool.isEmpty()) { + pool.wait(); + } + return pool.removeFirst(); + } else { + //超时计算 + + //当前时间+超时时间 + long future = System.currentTimeMillis() + mills; + long remaining = mills; + //如果集合为空 并且 超时时间大于0 + //保护机制 如果集合为空则等待超时 + while (pool.isEmpty() && remaining > 0) { + //模拟超时 等待时间和超时时间一致 + //System.out.println("进入超时"); + + pool.wait(remaining); + remaining = future - System.currentTimeMillis(); + } + Connection result = null; + if (!pool.isEmpty()) { + result = pool.removeFirst(); + } + return result; + } + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPoolTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPoolTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8ec293e12ed155c748cf66221855c3743a0e0244 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/ConnectionPoolTest.java @@ -0,0 +1,91 @@ +package com.lecoboy.highconcurrency.connectionDemo; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConnectionPoolTest { + static ConnectionPool pool = new ConnectionPool(10); + //保证所有ConnectionRunner能够同时开始 + static CountDownLatch startCountDown = new CountDownLatch(1); + //main线程将会等待所有ConnectionRunner结束后才能继续执行 + static CountDownLatch endCountDown; + + public static void main(String[] args) throws InterruptedException { + //线程数量,可以修改线程数量进行观察 + int threadCount = 1000; + endCountDown = new CountDownLatch(threadCount); + + //循环次数 + int count = 20; + //获取连接数 + AtomicInteger got = new AtomicInteger(); + //未获取到连接数 + AtomicInteger notGot = new AtomicInteger(); + for (int i = 0; i < threadCount; i++) { + Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread"); + thread.start(); + } + + startCountDown.countDown(); + //阻塞等待执行完成 + endCountDown.await(); + System.out.println("total invoke:" + (threadCount * count)); + System.out.println("got connection: " + got); + System.out.println("notGot connection: " + notGot); + + + } + + static class ConnectionRunner implements Runnable { + int count; + //计数器 + AtomicInteger got; + AtomicInteger notGot; + + public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) { + this.count = count; + this.got = got; + this.notGot = notGot; + } + + @Override + public void run() { + try { + startCountDown.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //每个线程循环次数 + while (count > 0) { + try { + //从线程池中获取连接,如果100ms内无法获取到,将会返回null + //分别统计连接获取的数量got 和未获取到的数量notgot + Connection connection = pool.fetchConnection(100L); + + if (connection != null) { + try { + connection.createStatement(); + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + pool.releaseConnection(connection); + got.incrementAndGet(); + } + } else { + notGot.incrementAndGet(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + count--; + } + } + //减少个数 位置一定要在循环外层 + endCountDown.countDown(); + } + } + +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/SimpleHttpServer.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/SimpleHttpServer.java new file mode 100644 index 0000000000000000000000000000000000000000..5c5ef516348bd0210985875fd8b60cf9ebcb67da --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/SimpleHttpServer.java @@ -0,0 +1,108 @@ +package com.lecoboy.highconcurrency.connectionDemo; + +import com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo.DefaultThreadPool; +import com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo.ThreadPool; + +import java.io.*; +import java.net.ServerSocket; +import java.net.Socket; + +public class SimpleHttpServer { + //处理HttpRequest的线程池 + static ThreadPool threadPool = new DefaultThreadPool<>(1); + //SimpleHttpServer的根路径 + static String basePath; + static ServerSocket serverSocket; + //服务监听端口 + static int port = 8080; + + public static void setPort(int port) { + if (port > 0) { + SimpleHttpServer.port = port; + } + } + + public static void setBasePath(String basePath) { + if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) { + SimpleHttpServer.basePath = basePath; + } + } + + //启动SimpleHttpServer + public static void start() throws IOException { + serverSocket = new ServerSocket(port); + Socket socket = null; + while ((socket = serverSocket.accept()) != null) { + //接受一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行 + threadPool.execute(new HttpRequestHandler(socket)); + } + serverSocket.close(); + } + + + static class HttpRequestHandler implements Runnable { + private Socket socket; + + public HttpRequestHandler(Socket socket) { + this.socket = socket; + } + + @Override + public void run() { + String line = null; + BufferedReader br = null; + BufferedReader reader = null; + PrintWriter out = null; + InputStream in = null; + try { + reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String header = reader.readLine(); + //由相对路径计算出绝对路径 + String filePath = basePath + header.split(" ")[1]; + out = new PrintWriter(socket.getOutputStream()); + //如果请求资源的后缀为jpg或者ico 则读取资源并输出 + if (filePath.endsWith("jpg") || filePath.endsWith("ico")) { + in = new FileInputStream(filePath); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int i = 0; + while ((i = in.read()) != -1) { + baos.write(i); + } + byte[] array = baos.toByteArray(); + out.println("HTTP/1.1 200 OK"); + out.println("Server: Molly"); + out.println("Content-Type: image/jpeg"); + out.println("Content-Length: " + array.length); + out.println(""); + socket.getOutputStream().write(array, 0, array.length); + } else { + br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); + out = new PrintWriter(socket.getOutputStream()); + out.println("HTTP/1.1 200 OK"); + out.println("Server: Molly"); + out.println("Content-Type: text/html; charset=UTF-8"); + out.println(""); + while ((line = br.readLine()) != null) { + out.println(line); + } + } + out.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + //关闭流或者Socket + private static void close(Closeable... closeables) { + if (closeables != null) { + for (Closeable closeable : closeables) { + try { + closeable.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/AbstractThreadPoolTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/AbstractThreadPoolTest.java new file mode 100644 index 0000000000000000000000000000000000000000..aee209176001418bbeff7854d9849e31a1106df1 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/AbstractThreadPoolTest.java @@ -0,0 +1,56 @@ +package com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo; + + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 日志打点 测试类 可有可无 + *

+ * 为了统计线程执行成功和失败次数 + */ +public abstract class AbstractThreadPoolTest { + //成功计数器 + private AtomicInteger successCountI = new AtomicInteger(); + //失败计数器 + private AtomicInteger errorCountI = new AtomicInteger(); + //共计工作者 + private AtomicInteger workerI = new AtomicInteger(); + + //进入run线程个数 + private AtomicInteger runI = new AtomicInteger(); + + public void icrSuccessCount() { + //successCountI.incrementAndGet(); + safeCount(successCountI); + } + + public void icrErrorCount() { + errorCountI.incrementAndGet(); + } + + public void icrWorker() { + workerI.incrementAndGet(); + } + + public void icrRun() { + runI.incrementAndGet(); + } + + public void getExecteResult() { + System.out.println("工作者个数:" + workerI.get()); + System.out.println("进入run个数:" + runI.getAndIncrement()); + System.out.println("成功个数:" + successCountI.getAndIncrement()); + + System.out.println("失败个数:" + errorCountI.getAndIncrement()); + } + + public void safeCount(AtomicInteger count){ + for (;;){ + int i=count.get(); + boolean suc = count.compareAndSet(i,++i); + if(suc){ + break; + } + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/DefaultThreadPool.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/DefaultThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..29e65ccd7aa9b235a9f778431263f421268f02b0 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/DefaultThreadPool.java @@ -0,0 +1,220 @@ +package com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 简单的线程池 + *

+ * 成千上万的任务递交给服务器,频繁的创建销毁线程,CPU频繁进行线程上下文切换,无故增加系统的负载,浪费系统资源 + *

+ * 线程池很好的解决了这个问题,它预先创建若干数量的线程,并且不能由用户直接对线程的创建进行控制, + * 在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销, + * 另一方面,面对过量任务的提交能够平缓的劣化。 + * + *

+ * 客户端可以通过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。 + *

+ * 除了execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。 + *

+ * AbstractThreadPoolTest 是为了不破坏原始结构作者做的测试抽象类 + * + * @param + */ +public class DefaultThreadPool extends AbstractThreadPoolTest implements ThreadPool { + private AtomicInteger successCount=new AtomicInteger(); + private AtomicInteger errorCount=new AtomicInteger(); + + + + //线程池最大限制数 + private static final int MAX_WORKER_NUMBERS = 10; + //线程池默认的数量 + private static final int DEFAULT_WORKER_NUMBERS = 5; + + //线程池最小的数量 + private static final int MIN_WORKER_NUMBERS = 1; + //这是一个工作列表,将会向里面插入工作 + private final LinkedList jobList = new LinkedList(); + //工作者列表 + private final List workers = Collections.synchronizedList(new ArrayList()); + + //工作者线程的数量 + private int workerNum = DEFAULT_WORKER_NUMBERS; + //线程编号生成 + private AtomicLong threadNum = new AtomicLong(); + + public DefaultThreadPool() { + initalizeWokders(DEFAULT_WORKER_NUMBERS); + } + + public DefaultThreadPool(int num) { + //确保创建线程数在限制以内 + workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; + initalizeWokders(workerNum); + } + + /** + * 执行任务 + * + * @param job + */ + @Override + public void execute(Job job) { + if (job != null) { + synchronized (jobList) { + jobList.addLast(job); + + //这里是notify(),而不是notifyAll(),因为能够确定有公祖者线程唤醒, + // 这时使用notify()方法将会比notifyAll()方法获得更小的开销 + // (避免将等待队列中的线程全部移动到阻塞对列中) + jobList.notify(); + + //test + icrWorker(); + } + } + } + + /** + * 取消任务 + */ + @Override + public void shutdown() { + for (Worker worker : workers) { + worker.shutdown(); + } + + } + + @Override + public void addWorkers(int num) { + synchronized (jobList) { + //限制新增的Worker数量不能超过最大值 + if (num + this.workerNum > MAX_WORKER_NUMBERS) { + num = MAX_WORKER_NUMBERS - this.workerNum; + + } + initalizeWokders(num); + this.workerNum += num; + } + } + + @Override + public void removeWorker(int num) { + synchronized (jobList) { + if (num >= this.workerNum) { + throw new IllegalArgumentException("beyond workNum"); + } + //按照给定的数量停止Worker + int count = 0; + while (count < num) { + Worker worker = workers.get(count); + if (workers.remove(worker)) { + worker.shutdown(); + count++; + } + } + this.workerNum -= count; + } + } + + @Override + public int getJobSize() { + return jobList.size(); + } + + //********* test 可有可无********* + + @Override + public void getExecteResult() { + super.getExecteResult(); + System.out.println(successCount.getAndIncrement()); + } + //********** end ************ + + /** + * 初始化工作者 + * + * @param num + */ + private void initalizeWokders(int num) { + for (int i = 0; i < num; i++) { + Worker worker = new Worker(successCount,errorCount); + workers.add(worker); + Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); + thread.start(); + } + } + + /** + * 工作者线程 负责消费任务 + */ + class Worker implements Runnable { + //是否工作 + private volatile boolean running = true; + + AtomicInteger successCount; + AtomicInteger errorCount; + + Worker(){ + + } + Worker(AtomicInteger successCount,AtomicInteger errorCount){ + this.successCount = successCount; + this.errorCount = errorCount; + } + + @Override + public void run() { + while (running) { + + Job job; + //通过jobList的notify()来通知这里,因为jobList解锁后这里才能进行获取锁 + synchronized (jobList) { + //如果工作者列表是空的,那么久wait + while (jobList.isEmpty()) { + try { + jobList.wait(); + } catch (InterruptedException e) { + //感知到外部对WorkerThread的中断操作,返回 + Thread.currentThread().interrupt(); + return; + } + } + + //取出一个Job + job = jobList.removeFirst(); + + + } + //日志打点 + icrRun(); + + if (job != null) { + try { + //测试 稍微睡10Ms 能测出来不同数量任务执行成功的个数 + //TimeUnit.MICROSECONDS.sleep(1); + job.run(); + //测试 + icrSuccessCount(); + safeCount(successCount); + + } catch (Exception ex) { + //忽略Job执行中的Exception + + } + } + } + } + + public void shutdown() { + running = false; + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPool.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..c8db7811cb2d9fa2a77ed21da350d09160aabf41 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPool.java @@ -0,0 +1,27 @@ +package com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo; + +/** + * 简单的线程池接口 + * + * @param + */ +public interface ThreadPool { + //执行一个job,这个job需要实现Runnable + void execute(Job job); + + //关闭线程池 + void shutdown(); + + //增加工作者线程 + void addWorkers(int num); + + //减少工作者线程 + void removeWorker(int num); + + //得到正在等待执行的任务数量 + int getJobSize(); + + + //***********test 可有可无*********** + void getExecteResult(); +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPoolTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPoolTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d1d7b81ea41fa21405d0f0fac4344d74e5274951 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/connectionDemo/thread_pool_demo/ThreadPoolTest.java @@ -0,0 +1,59 @@ +package com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 手写线程池测试类 + */ +public class ThreadPoolTest { + + + public static void main(String[] args) throws InterruptedException { + + for (int j = 0; j < 50; j++) { + + AtomicInteger count = new AtomicInteger(); + //定义线程池数 可根据改变线程池数来看执行效率 + ThreadPool threadPool = new DefaultThreadPool(4); + //计数器 + + for (int i = 0; i < 20000; i++) { + //执行job + threadPool.execute(new MyJob(count)); + } + threadPool.getExecteResult(); + + System.out.println(j + "实际执行:" + count.get()); + System.out.println(); + TimeUnit.SECONDS.sleep(4); + } + } + + /** + * 实际执行线程 + */ + static class MyJob implements Runnable { + + AtomicInteger count; + + MyJob(AtomicInteger count) { + this.count = count; + } + + @Override + public void run() { + //count.incrementAndGet(); + safeCount(count); + } + } + private static void safeCount(AtomicInteger count){ + for (;;){ + int i=count.get(); + boolean suc = count.compareAndSet(i,++i); + if(suc){ + break; + } + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/Cache.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/Cache.java new file mode 100644 index 0000000000000000000000000000000000000000..e3956fe1af2fd8c7a89bebcb1dfe123190543255 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/Cache.java @@ -0,0 +1,51 @@ +package com.lecoboy.highconcurrency.lockdemo; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * 通过读写锁使HashMap线程安全 控制写锁,不影响读 + */ +public class Cache { + static Map map = new HashMap<>(); + static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + static Lock r = rwl.readLock(); + static Lock w = rwl.writeLock(); + + + //获取Key对应的value + public static final Object get(String key) { + r.lock(); + try { + return map.get(key); + } finally { + r.unlock(); + } + } + + //设置key对应的value,并返回旧的value + public static final Object put(String key, Object value) { + w.lock(); + try { + return map.put(key, value); + } finally { + w.unlock(); + } + } + + //清空所有的内容 + public static final void clear() { + w.lock(); + try { + map.clear(); + } finally { + w.unlock(); + } + } + + public static int size() { + return map.size(); + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/CacheAndHashMapTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/CacheAndHashMapTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1b7d41097682e492161ddf27d865a2923213fe31 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/CacheAndHashMapTest.java @@ -0,0 +1,46 @@ +package com.lecoboy.highconcurrency.lockdemo; + +import java.util.HashMap; +import java.util.Map; + +/** + * 读写锁测试和 HashMap非线程安全测试 + */ +public class CacheAndHashMapTest { + + public static void main(String[] args) { + //非线程安全的map + Map map = new HashMap(1000 * 10); + + + for (int i = 0; i < 10; i++) { + Thread thread = new Runner(map); + thread.start(); + + //这个方法可以测试,线程安全时,Hashmap的大小最后能够达到多少 + // new Runner(map).run(); + } + } + + static class Runner extends Thread { + Map map; + + Runner(Map map) { + this.map = map; + } + + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + map.put(this.getName() + i, i); + + //线程安全的缓存HashMap + Cache.put(this.getName() + i, i); + } + //如果是线程安全,那么HashMap的大小,最后能够达到1W. + System.out.println("===非线程安全" + this.getName() + ": " + map.size()); + System.out.println("---线程安全" + this.getName() + ": " + Cache.size()); + + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/FairAndUnfairTest.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/FairAndUnfairTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0614107e168f12c566714a16ace7edac993449e4 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/FairAndUnfairTest.java @@ -0,0 +1,93 @@ +package com.lecoboy.highconcurrency.lockdemo; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 公平锁和非公平锁测试 + */ +public class FairAndUnfairTest { + private static Lock fairLock = new ReentrantLock2(true); + private static Lock unfairLock = new ReentrantLock2(false); + private static CountDownLatch start; + + @Test + public void fair() { + testLock(fairLock); + } + + @Test + public void unfair() { + testLock(unfairLock); + } + + private void testLock(Lock lock) { + start = new CountDownLatch(1); + for (int i = 0; i < 5; i++) { + //启动5个Job + /*Thread t1 = new Thread(new Job(lockdemo), "" + i); + t1.start();*/ + Thread thread = new Job(lock); + thread.setName(""+i); + thread.start(); + } + start.countDown(); + + } + + private static class Job extends Thread { + private Lock lock; + + public Job(Lock lock) { + this.lock = lock; + } + + @Override + public void run() { + try { + start.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //连续2次打印当前的Thread和等待队列中的Thread + + for (int i = 0; i < 2; i++) { + lock.lock(); + try { + System.out.println("Lock by [" + getName() + "] Waiting by " + ((ReentrantLock2) lock).getQueuedThreads()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + + + } + public String toString(){ + return getName(); + } + + } + + private static class ReentrantLock2 extends ReentrantLock { + public ReentrantLock2(boolean fair) { + //使用父级构造方法 + super(fair); + } + + public Collection getQueuedThreads() { + //获取当前排队的线程 + List arrayList = new ArrayList<>(super.getQueuedThreads()); + Collections.reverse(arrayList); + return arrayList; + } + } +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/LockUseCase.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/LockUseCase.java new file mode 100644 index 0000000000000000000000000000000000000000..f52dc3b272cb066823f7f13f7aed9205ad4afdbe --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/lockdemo/LockUseCase.java @@ -0,0 +1,17 @@ +package com.lecoboy.highconcurrency.lockdemo; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LockUseCase { + public static void main(String[] args) { + Lock lock = new ReentrantLock(); + lock.lock(); + try { + + }finally { + lock.unlock(); + } + } + +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/CasAutomicCounter.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/CasAutomicCounter.java similarity index 97% rename from daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/CasAutomicCounter.java rename to daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/CasAutomicCounter.java index 570d8361c3198df318c4e4eb1851d0ab494abd88..a0cfefb1c7be90218083e9dd6b5b1b3e995c0b33 100644 --- a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/CasAutomicCounter.java +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/CasAutomicCounter.java @@ -1,4 +1,4 @@ -package com.lecoboy.highconcurrency; +package com.lecoboy.highconcurrency.threaddemo; import java.util.ArrayList; import java.util.List; diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/MultiThread.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/MultiThread.java similarity index 94% rename from daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/MultiThread.java rename to daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/MultiThread.java index 66ab28137457377b434893a3b0d08c6516f8c1a7..da205e36f8966203b4f8bf341534923294a82e5d 100644 --- a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/MultiThread.java +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/MultiThread.java @@ -1,4 +1,4 @@ -package com.lecoboy.highconcurrency; +package com.lecoboy.highconcurrency.threaddemo; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/TheadLocalDemo.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/TheadLocalDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..6f63c243cfd9f4da6abc7174111bf11f6a19eba7 --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/TheadLocalDemo.java @@ -0,0 +1,32 @@ +package com.lecoboy.highconcurrency.threaddemo; + +import java.util.concurrent.TimeUnit; + +/** + * TheadLocal可以被使用在调用好使统计的功能上,在方法的入口前执行begin()方法 + * 在方法调用后执行en()方法,好处是两个方法的调用不用再一个方法或类中,比如在AOP中 + * 可以在方法调用前的切入点执行begin()方法,在方法调用后执行end()这样依旧可以获得方法的执行耗时 + */ +public class TheadLocalDemo { + //第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次 + private static final ThreadLocal TIME_THERADLOCAL = new ThreadLocal(){ + protected Long initialValue(){ + return System.currentTimeMillis(); + } + }; + + public static final void begin(){ + TIME_THERADLOCAL.set(System.currentTimeMillis()); + } + + public static final long end(){ + return System.currentTimeMillis() - TIME_THERADLOCAL.get(); + } + + public static void main(String[] args) throws InterruptedException { + TheadLocalDemo.begin(); + TimeUnit.SECONDS.sleep(1); + System.out.println("Cost:"+TheadLocalDemo.end()+" mills"); + } + +} diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/ThreadState.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/ThreadState.java similarity index 97% rename from daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/ThreadState.java rename to daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/ThreadState.java index 2452416a581e15206ea3dfc1ee211ca6468b4453..f67d826e8bc461058b3e6d4cd9b35103cad2bdcd 100644 --- a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/ThreadState.java +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/ThreadState.java @@ -1,4 +1,4 @@ -package com.lecoboy.highconcurrency; +package com.lecoboy.highconcurrency.threaddemo; import java.util.concurrent.TimeUnit; diff --git a/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/WaitNotify.java b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/WaitNotify.java new file mode 100644 index 0000000000000000000000000000000000000000..e20d0a774a4acfc832994df78615561a0330781c --- /dev/null +++ b/daydayup-high-concurrency/src/main/java/com/lecoboy/highconcurrency/threaddemo/WaitNotify.java @@ -0,0 +1,77 @@ +package com.lecoboy.highconcurrency.threaddemo; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * 等待/通知机制模拟 + */ +public class WaitNotify { + static boolean flag = true; + static Object lock = new Object(); + + public static void main(String[] args) throws InterruptedException { + Thread waitThread = new Thread(new Wait(), "WaitThread"); + waitThread.start(); + TimeUnit.SECONDS.sleep(1); + Thread notifyThread = new Thread(new Notify(), "NotifyThread"); + notifyThread.start(); + } + + /** + * 使用wait、notify、notifyAll 时需要先对调用对象加锁 + */ + static class Wait implements Runnable { + + @Override + public void run() { + // 加锁,拥有lock的Monitor + synchronized (lock) { + //当条件不满足时,继续wait,同时释放了lock的锁 + while (flag) { + try { + System.out.println(Thread.currentThread() + " flag is true. wait@ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); + // 调用wait() 线程状态由RUNNING 变为 WAITING,并将当前线程放置到对象的等待队列 + lock.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + //条件满足时,完成工作 + System.out.println(Thread.currentThread() + " 当lock释放后输出 flag is false. wait@ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); + + } + } + } + + static class Notify implements Runnable { + + @Override + public void run() { + // 加锁,拥有lock的Monitor + synchronized (lock) { + //获取lock的锁,然后进行通知,通知时不会释放Lock的锁 + //直到当前线程释放了lock后,waitThread才能从wait方法中返回 + System.out.println(Thread.currentThread() + " 锁住 lockdemo . notify@ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); + + //notifyAll()将等待队列中所有线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED + //直到释放锁 才由BLOCKED变为RUNNING + lock.notifyAll(); + flag = false; + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + //再次加锁 + synchronized (lock) { + System.out.println(Thread.currentThread() + " 再次锁住lock,释放后wait的线程才继续. sleep@ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); + + } + } + } + +} diff --git a/daydayup-high-concurrency/src/main/resources/application.properties b/daydayup-high-concurrency/src/main/resources/application.properties index 8b137891791fe96927ad78e64b0aad7bded08bdc..2ec1144e08dfb0efeab5cdf42d26874f6574793f 100644 --- a/daydayup-high-concurrency/src/main/resources/application.properties +++ b/daydayup-high-concurrency/src/main/resources/application.properties @@ -1 +1,2 @@ - +server.port=8080 +spring.profiles.active=dev \ No newline at end of file diff --git a/daydayup-high-concurrency/src/main/resources/index.html b/daydayup-high-concurrency/src/main/resources/index.html new file mode 100644 index 0000000000000000000000000000000000000000..6dcf738f154835c729a41ef1733b26596fb1b2e8 --- /dev/null +++ b/daydayup-high-concurrency/src/main/resources/index.html @@ -0,0 +1,16 @@ + + + + + 测试页面 + + +

第一张图片

+ +

第二张图片

+ +

第三张图片

+ + + + \ No newline at end of file