diff --git a/second/week_05/75/Executors.md b/second/week_05/75/Executors.md new file mode 100644 index 0000000000000000000000000000000000000000..9eb86b7ed1cf2cc7042fcbea2318a6443a4f65a7 --- /dev/null +++ b/second/week_05/75/Executors.md @@ -0,0 +1,226 @@ +## Executors +>用于创建线程池、包装成Callable线程、创建默认线程工厂的工具类 +### 创建线程池 + +- 固定大小的线程池,核心线程数量和最大线程数量是一样,所以只有核心线程nTHreads个 + + ```java + public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + ``` + +- 创建一个携程线程池(java的携程:用户线程和核心线程的多对多模型) + + ```java + public static ExecutorService newWorkStealingPool(int parallelism) { + return new ForkJoinPool + (parallelism, + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true);d + } + ``` +- 创建一个根据自身核心数来创建携程池 + + ```java + public static ExecutorService newWorkStealingPool() { + return new ForkJoinPool + (Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } + ``` +- 创建一个固定大小线程池,可自己选择线程工厂 + + ```java + public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + ``` +- 创建一个只有一个线程的线程池 + + ```java + public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } + ``` + + +- 创建一个只有一个线程的线程池,可自己选择线程工厂 + + ```java + public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); + } + ``` + +- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 + + ```java + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } + ``` + +- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 且可以自定义创建线程工厂 + + ```java + public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + ``` + +- 创建一个线程的线程池,执行定时及周期性任务执行 + + ```java + public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); + } + ``` + +- 创建一个线程的线程池,执行定时及周期性任务执行,可自定义线程工厂 + + ```java + public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1, threadFactory)); + } + ``` + +- 创建一个定长线程池,支持定时及周期性任务执行。 + + ```java + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); + } + ``` + +- 创建一个定长线程池,支持定时及周期性任务执行。 可自定义线程工厂 + + ```java + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + + ``` + +- 把ExecutorService包装成DelegatedExecutorService + + ```java + public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedExecutorService(executor); + } + ``` + +- 把ScheduledExecutorService包装成DelegatedScheduledExecutorService + + ```java + public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedScheduledExecutorService(executor); + } + ``` + +###创建线程工厂 + +- defaultThreadFactory创建一个默认的线程工厂 + + ```java + public static ThreadFactory defaultThreadFactory() { + return new DefaultThreadFactory(); + } + ``` + +- privilegedThreadFactory创建一个具有权限限制的线程工厂 + + ```java + public static ThreadFactory privilegedThreadFactory() { + return new PrivilegedThreadFactory(); + } + ``` + +###转化任意线程为Callable线程 + +- 把Runnable包装成Callable类型,并把返回结果放入T中 + + ```java + public static Callable callable(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); + // RunnableAdapter 是Callable的实现类 + return new RunnableAdapter(task, result); + } + ``` + +- 根据传入的线程,生成一个Callable的装饰子类 + + ```java + public static Callable callable(Runnable task) { + if (task == null) + throw new NullPointerException(); + return new RunnableAdapter(task, null); + } + ``` + +- 把PrivilegedAction包装成Callable类型 + + ```java + public static Callable callable(final PrivilegedAction action) { + if (action == null) + throw new NullPointerException(); + return new Callable() { + public Object call() { return action.run(); }}; + } + ``` + +- 把PrivilegedExceptionAction 包装成Callable类型 + + ```java + public static Callable callable(final PrivilegedExceptionAction action) { + if (action == null) + throw new NullPointerException(); + return new Callable() { + public Object call() throws Exception { return action.run(); }}; + } + ``` + +- 把Callable包装成privilegedCallable(Callable实现类) + + ```java + public static Callable privilegedCallable(Callable callable) { + if (callable == null) + throw new NullPointerException(); + return new PrivilegedCallable(callable); + } + ``` +- 把Callable包装成PrivilegedCallableUsingCurrentClassLoader(Callable实现类) + + ```java + public static Callable privilegedCallableUsingCurrentClassLoader(Callable callable) { + if (callable == null) + throw new NullPointerException(); + return new PrivilegedCallableUsingCurrentClassLoader(callable); + } + ``` \ No newline at end of file diff --git a/second/week_05/75/Thread.md b/second/week_05/75/Thread.md new file mode 100644 index 0000000000000000000000000000000000000000..ef9c243669ab0ac3e2e2c2ea0f350ea555520c86 --- /dev/null +++ b/second/week_05/75/Thread.md @@ -0,0 +1,144 @@ +## Thread +### introduce +>Thread是执行线程,JVM运行多个线程同步执行。 +>每个线程都有优先级,优先级较高的线程通常先于优先级较低的线程执行。 +>每个线程被创建时,拥有和父线程相同的初始优先级。并且当父线程是守护线程时,被创建的线程也会是守护线程。 +>当JVM启动时,通常会有一个非守护线程(main线程)。 +### constructor +>所有的构造方法都指向内部方法 +>private void init(ThreadGroup g, Runnable target, String name, long stackSize) +```java +private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + + /* If there is a security manager, ask the security manager + what to do. */ + if (security != null) { + g = security.getThreadGroup(); + } + + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + if (g == null) { + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + /* + * Do we have the required permissions? + */ + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + + /* Set thread ID */ + tid = nextThreadID(); + } +``` +### 重要方法 +```java +public class Thread implements Runnable { + //获取当前线程 + public static native Thread currentThread(); + //调用yield方法会先让别的线程执行,但是不确保真正让出 + public static native void yield(); + //线程休眠,结束后进入就绪状态 + public static native void sleep(long millis) throws InterruptedException; + + public synchronized void start() { + /** + * This method is not invoked for the main method thread or "system" + * group threads created/set up by the VM. Any new functionality added + * to this method in the future may have to also be added to the VM. + * + * A zero status value corresponds to state "NEW". + */ + if (threadStatus != 0) + throw new IllegalThreadStateException(); + + /* Notify the group that this thread is about to be started + * so that it can be added to the group's list of threads + * and the group's unstarted count can be decremented. */ + group.add(this); + + boolean started = false; + try { + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } + } + + public void run() { + if (target != null) { + target.run(); + } + } + + //interrupt不会真正停止一个线程,它仅仅是给这个线程发了一个信号告诉它,它应该要结束了 + public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); + } + + //等待当前线程死亡,实际是调用Object.wait() + public final void join() throws InterruptedException { + join(0); + } +} +``` +[Thread的状态](https://user-gold-cdn.xitu.io/2018/4/18/162d8e16a948847f?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) \ No newline at end of file diff --git a/second/week_05/75/ThreadLocal.md b/second/week_05/75/ThreadLocal.md new file mode 100644 index 0000000000000000000000000000000000000000..5683be4a7b7ed4c3bc275c1412e43a31f916eb39 --- /dev/null +++ b/second/week_05/75/ThreadLocal.md @@ -0,0 +1,88 @@ +## ThreadLocal +>ThreadLocal的作用是提供线程内的局部变量,在各线程内部创建一个变量的副本 +>相比于使用各种锁机制访问变量,ThreadLocal的思想就是用空间换时间,使各线程都能访问属于自己这一份的变量副本, +>变量值不互相干扰,减少同一个线程内的多个函数或者组件之间一些公共变量传递的复杂度。 +### 储存结构 +![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91cGxvYWQtaW1hZ2VzLmppYW5zaHUuaW8vdXBsb2FkX2ltYWdlcy83NDMyNjA0LWFkMmZmNTgxMTI3YmE4Y2MuanBn?x-oss-process=image/format,png) + +- 每个Thread线程内部都有一个Map +- Map里面存储线程本地对象(key)和线程的变量副本(value) +- 但是Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向Map获取和设置线程的副本变量值 +- 所以对于不通的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本之,形成了副本的隔离,互不干扰 +### 关键内部类 +```java +static class ThreadLocalMap { + + private Entry[] table; + + //初始化map + ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { + table = new Entry[INITIAL_CAPACITY]; + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + table[i] = new Entry(firstKey, firstValue); + size = 1; + setThreshold(INITIAL_CAPACITY); + } + + //复用map + private ThreadLocalMap(ThreadLocalMap parentMap) { + Entry[] parentTable = parentMap.table; + int len = parentTable.length; + setThreshold(len); + table = new Entry[len]; + + for (int j = 0; j < len; j++) { + Entry e = parentTable[j]; + if (e != null) { + @SuppressWarnings("unchecked") + ThreadLocal key = (ThreadLocal) e.get(); + if (key != null) { + Object value = key.childValue(e.value); + Entry c = new Entry(key, value); + int h = key.threadLocalHashCode & (len - 1); + while (table[h] != null) + h = nextIndex(h, len); + table[h] = c; + size++; + } + } + } + } + +} +``` +### 重要方法 +```java +public class ThreadLocal { + + public T get() { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) { + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + //新建ThreadLocalMap,key为当前线程,value为null + return setInitialValue(); + } + + public void set(T value) { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + } + + public void remove() { + ThreadLocalMap m = getMap(Thread.currentThread()); + if (m != null) + m.remove(this); + } +} +``` diff --git a/second/week_05/75/ThreadPoolExecutor.md b/second/week_05/75/ThreadPoolExecutor.md new file mode 100644 index 0000000000000000000000000000000000000000..2c48cd3fae6adc3cf1e3a6d82f10090230ea8b59 --- /dev/null +++ b/second/week_05/75/ThreadPoolExecutor.md @@ -0,0 +1,90 @@ +## ThreadPoolExecutor +### 线程池工作流程 +[工作流程](https://mmbiz.qpic.cn/mmbiz_png/C91PV9BDK3yWnVA1dVtauPhPeo92By2bT6scia91W3Ohic4KbictW16WoNgfL9sQ0x2C2Vmjw7g9sAfPthialD5wLA/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) +### 重要属性 + - private final BlockingQueue workQueue; 工作队列,新加入的任务或加入队列 + - private int largestPoolSize; 最大线程数 + - private volatile int corePoolSize; 核心线程数 + - private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); 拒绝策略 +### 构造方法 +```java +public class ThreadPoolExecutor{ + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + //非核心线程空闲时的存活时间 + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.acc = System.getSecurityManager() == null ? + null : + AccessController.getContext(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } +} +``` +### 内部类Worker +>实现了Runnable接口,继承了AQS +```java +private final class Worker + extends AbstractQueuedSynchronizer + implements Runnable{ + + final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + while (task != null || (task = getTask()) != null) { + w.lock(); + // If pool is stopping, ensure thread is interrupted; + // if not, ensure thread is not interrupted. This + // requires a recheck in second case to deal with + // shutdownNow race while clearing interrupt + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + beforeExecute(wt, task); + Throwable thrown = null; + try { + task.run(); + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + afterExecute(task, thrown); + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } + } +} +``` \ No newline at end of file diff --git "a/second/week_05/75/\344\272\244\346\233\277\346\211\223\345\215\260FooBar.md" "b/second/week_05/75/\344\272\244\346\233\277\346\211\223\345\215\260FooBar.md" new file mode 100644 index 0000000000000000000000000000000000000000..ee9fdf0aee510da0e8cbdd981b77afcea812c0be --- /dev/null +++ "b/second/week_05/75/\344\272\244\346\233\277\346\211\223\345\215\260FooBar.md" @@ -0,0 +1,36 @@ +```java +class FooBar { + private int n; + private AtomicInteger printed = new AtomicInteger(0); + + public FooBar(int n) { + this.n = n; + } + + public void foo(Runnable printFoo) throws InterruptedException { + + for (int i = 0; i < n; i++) { + while (printed.get() == 1) { + Thread.yield(); + } + // printFoo.run() outputs "foo". Do not change or remove this line. + printFoo.run(); + printed.set(1); + } + } + + public void bar(Runnable printBar) throws InterruptedException { + + for (int i = 0; i < n; i++) { + while (printed.get() == 0) { + Thread.yield(); + } + // printBar.run() outputs "bar". Do not change or remove this line. + printBar.run(); + printed.set(0); + } + } +} + + +``` \ No newline at end of file diff --git "a/second/week_05/75/\346\214\211\345\272\217\346\211\223\345\215\260.md" "b/second/week_05/75/\346\214\211\345\272\217\346\211\223\345\215\260.md" new file mode 100644 index 0000000000000000000000000000000000000000..90706f71801fb7243e2fc8d03640cce1cae1af31 --- /dev/null +++ "b/second/week_05/75/\346\214\211\345\272\217\346\211\223\345\215\260.md" @@ -0,0 +1,28 @@ +```java +import java.util.concurrent.Semaphore; +class Foo { + private Semaphore seam1 = new Semaphore(0); + + private Semaphore seam2 = new Semaphore(0); + + public Foo() { + + } + + public void first(Runnable printFirst) throws InterruptedException { + printFirst.run(); + seam1.release(); + } + + public void second(Runnable printSecond) throws InterruptedException { + seam1.acquire(); + printSecond.run(); + seam2.release(); + } + + public void third(Runnable printThird) throws InterruptedException { + seam2.acquire(); + printThird.run(); + } +} +``` \ No newline at end of file