From e41ceb7bd017e6c3625527df6edcdfd799f68046 Mon Sep 17 00:00:00 2001 From: thatwhy <843279946@qq.com> Date: Sun, 12 Jan 2020 12:41:32 +0800 Subject: [PATCH] week_05-057 --- week_05/57/Executors-057.txt | 89 ++++++++++ week_05/57/Thread-057.txt | 224 ++++++++++++++++++++++++++ week_05/57/ThreadLocal-057.txt | 59 +++++++ week_05/57/ThreadPoolExecutor-057.txt | 185 +++++++++++++++++++++ 4 files changed, 557 insertions(+) create mode 100644 week_05/57/Executors-057.txt create mode 100644 week_05/57/Thread-057.txt create mode 100644 week_05/57/ThreadLocal-057.txt create mode 100644 week_05/57/ThreadPoolExecutor-057.txt diff --git a/week_05/57/Executors-057.txt b/week_05/57/Executors-057.txt new file mode 100644 index 0000000..fecae1e --- /dev/null +++ b/week_05/57/Executors-057.txt @@ -0,0 +1,89 @@ +线程池工具类 +一、主要方法 +1.创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。 +public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } +2.jdk1.7以后新线程池方法,可以窃取其他线程任务执行 +public static ExecutorService newWorkStealingPool(int parallelism) { + return new ForkJoinPool + (parallelism, + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } +3. +public static ExecutorService newWorkStealingPool() { + return new ForkJoinPool + (Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } +4. +public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } +5.创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 +(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要, +一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务, +并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同, +可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。 +public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } +6. +public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); + } +7.创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。 +对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。 +调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的, +则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 +因此,长时间保持空闲的线程池不会使用任何资源。注意, +可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。 +public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } +8. +public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } +9. +public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); + } +10. +public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1, threadFactory)); + } +11.创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 +public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); + } +12. +public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } +13.返回用于创建新线程的默认线程工厂。 +public static ThreadFactory defaultThreadFactory() { + return new DefaultThreadFactory(); + } \ No newline at end of file diff --git a/week_05/57/Thread-057.txt b/week_05/57/Thread-057.txt new file mode 100644 index 0000000..68d3d4e --- /dev/null +++ b/week_05/57/Thread-057.txt @@ -0,0 +1,224 @@ +public class Thread implements Runnable +java中线程类,实现了Runnable接口 + +一、构造方法(9个) + 1.public Thread() { + init(null, null, "Thread-" + nextThreadNum(), 0); + } + 2.public Thread(Runnable target) { + init(null, target, "Thread-" + nextThreadNum(), 0); + } + 3.Thread(Runnable target, AccessControlContext acc) { + init(null, target, "Thread-" + nextThreadNum(), 0, acc, false); + } + 4.public Thread(ThreadGroup group, Runnable target) { + init(group, target, "Thread-" + nextThreadNum(), 0); + } + 5.public Thread(String name) { + init(null, null, name, 0); + } + 6.public Thread(ThreadGroup group, String name) { + init(group, null, name, 0); + } + 7.public Thread(Runnable target, String name) { + init(null, target, name, 0); + } + 8.public Thread(ThreadGroup group, Runnable target, String name) { + init(group, target, name, 0); + } + 9.public Thread(ThreadGroup group, Runnable target, String name, + long stackSize) { + init(group, target, name, stackSize); + } + 所传的参数不同,实际上都是调用的一个方法 + private void init(ThreadGroup g, Runnable target, String name, + long stackSize) { + init(g, target, name, stackSize, null, true); + } + 这个init方法实际上又是调用的下面这个init方法 + private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + + /* If there is a security manager, ask the security manager + what to do. */ + if (security != null) { + g = security.getThreadGroup(); + } + + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + if (g == null) { + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + /* + * Do we have the required permissions? + */ + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + + /* Set thread ID */ + tid = nextThreadID(); + } + +二、成员变量 + name、priority(优先级)、daemon(是否守护线程)等等 + +三、成员方法 + 1.包括一些本地方法 + public static native void yield(); + public static native void sleep(long millis) throws InterruptedException; + private native void start0(); + 2.其他方法 + public static void sleep(long millis, int nanos) + throws InterruptedException { + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (nanos < 0 || nanos > 999999) { + throw new IllegalArgumentException( + "nanosecond timeout value out of range"); + } + + if (nanos >= 500000 || (nanos != 0 && millis == 0)) { + millis++; + } + + sleep(millis); + } + 4.启动线程方法 + public synchronized void start() { + /** + * This method is not invoked for the main method thread or "system" + * group threads created/set up by the VM. Any new functionality added + * to this method in the future may have to also be added to the VM. + * + * A zero status value corresponds to state "NEW". + */ + if (threadStatus != 0) + throw new IllegalThreadStateException(); + + /* Notify the group that this thread is about to be started + * so that it can be added to the group's list of threads + * and the group's unstarted count can be decremented. */ + group.add(this); + + boolean started = false; + try { + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } + } + 5.打断线程 + public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); + } + 6.判断是否被打断 + public static boolean interrupted() { + return currentThread().isInterrupted(true); + } + public boolean isInterrupted() { + return isInterrupted(false); + } + 7.设置线程优先级 0-10 默认5 + public final void setPriority(int newPriority) { + ThreadGroup g; + checkAccess(); + if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { + throw new IllegalArgumentException(); + } + if((g = getThreadGroup()) != null) { + if (newPriority > g.getMaxPriority()) { + newPriority = g.getMaxPriority(); + } + setPriority0(priority = newPriority); + } + } + 8.等待该线程终止的时间最长为 millis 毫秒 + public final synchronized void join(long millis) + throws InterruptedException { + long base = System.currentTimeMillis(); + long now = 0; + + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (millis == 0) { + while (isAlive()) { + wait(0); + } + } else { + while (isAlive()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + wait(delay); + now = System.currentTimeMillis() - base; + } + } + } + public final void join() throws InterruptedException { + //无参数,默认传入0 + join(0); + } \ No newline at end of file diff --git a/week_05/57/ThreadLocal-057.txt b/week_05/57/ThreadLocal-057.txt new file mode 100644 index 0000000..48b255d --- /dev/null +++ b/week_05/57/ThreadLocal-057.txt @@ -0,0 +1,59 @@ +该类提供了线程局部变量。这些变量不同于它们的普通对应物, +因为访问一个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量, +它独立于变量的初始化副本。ThreadLocal 实例通常是类中的私有静态字段, +它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。 + +一、方法 +1. private static int nextHashCode() { + return nextHashCode.getAndAdd(HASH_INCREMENT); + } +2.protected T initialValue() { + return null; + } +3. public T get() { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) { + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + return setInitialValue(); + } +4.private T setInitialValue() { + T value = initialValue(); + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + return value; + } +5. public void set(T value) { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + } +6.public void remove() { + ThreadLocalMap m = getMap(Thread.currentThread()); + if (m != null) + m.remove(this); + } +7.ThreadLocalMap getMap(Thread t) { + return t.threadLocals; + } +8.void createMap(Thread t, T firstValue) { + t.threadLocals = new ThreadLocalMap(this, firstValue); + } +9.static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { + return new ThreadLocalMap(parentMap); + } +10.内部类 +static class ThreadLocalMap { \ No newline at end of file diff --git a/week_05/57/ThreadPoolExecutor-057.txt b/week_05/57/ThreadPoolExecutor-057.txt new file mode 100644 index 0000000..b8da3cd --- /dev/null +++ b/week_05/57/ThreadPoolExecutor-057.txt @@ -0,0 +1,185 @@ +创建线程池方法 +一、构造方法 +1.public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } +2.public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); + } +3.public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); + } +4.public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.acc = System.getSecurityManager() == null ? + null : + AccessController.getContext(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } + 最终都是调用第四个带七个参数的构造方法 + +其他方法 +5.执行任务方法 在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 +如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量, +则该任务由当前 RejectedExecutionHandler 处理。 +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + /* + * Proceed in 3 steps: + * + * 1. If fewer than corePoolSize threads are running, try to + * start a new thread with the given command as its first + * task. The call to addWorker atomically checks runState and + * workerCount, and so prevents false alarms that would add + * threads when it shouldn't, by returning false. + * + * 2. If a task can be successfully queued, then we still need + * to double-check whether we should have added a thread + * (because existing ones died since last checking) or that + * the pool shut down since entry into this method. So we + * recheck state and if necessary roll back the enqueuing if + * stopped, or start a new thread if there are none. + * + * 3. If we cannot queue task, then we try to add a new + * thread. If it fails, we know we are shut down or saturated + * and so reject the task. + */ + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); + } +6.关闭线程池,不会关闭正在执行的任务 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。 +public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + mainLock.unlock(); + } + tryTerminate(); + } +7.关闭线程池,并且关闭正在执行的任务 +尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。 +此实现通过 Thread.interrupt() 取消任务,所以如果任何任务屏蔽或无法响应中断,则可能永远无法终止该任务。 + +public List shutdownNow() { + List tasks; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(STOP); + interruptWorkers(); + tasks = drainQueue(); + } finally { + mainLock.unlock(); + } + tryTerminate(); + return tasks; + } +8.请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。 +public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (;;) { + if (runStateAtLeast(ctl.get(), TERMINATED)) + return true; + if (nanos <= 0) + return false; + nanos = termination.awaitNanos(nanos); + } + } finally { + mainLock.unlock(); + } + } +9.当不再引用此执行程序时,调用 shutdown。 +protected void finalize() { + SecurityManager sm = System.getSecurityManager(); + if (sm == null || acc == null) { + shutdown(); + } else { + PrivilegedAction pa = () -> { shutdown(); return null; }; + AccessController.doPrivileged(pa, acc); + } + } +10.设置核心线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前值,则多余的现有线程将在下一次空闲时终止。 +如果较大,则在需要时启动新线程来执行这些排队的任务。 +public void setCorePoolSize(int corePoolSize) { + if (corePoolSize < 0) + throw new IllegalArgumentException(); + int delta = corePoolSize - this.corePoolSize; + this.corePoolSize = corePoolSize; + if (workerCountOf(ctl.get()) > corePoolSize) + interruptIdleWorkers(); + else if (delta > 0) { + // We don't really know how many new threads are "needed". + // As a heuristic, prestart enough new workers (up to new + // core size) to handle the current number of tasks in + // queue, but stop if queue becomes empty while doing so. + int k = Math.min(delta, workQueue.size()); + while (k-- > 0 && addWorker(null, true)) { + if (workQueue.isEmpty()) + break; + } + } + } +11.启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。 +如果已启动所有核心线程,此方法将返回 false。 + public boolean prestartCoreThread() { + return workerCountOf(ctl.get()) < corePoolSize && + addWorker(null, true); + } \ No newline at end of file -- Gitee