From a557f692629ffaf0a846cba8471b6ae923f173da Mon Sep 17 00:00:00 2001 From: sljie1988 Date: Tue, 31 Dec 2019 23:09:55 +0800 Subject: [PATCH] week_03 --- ...45\255\230\346\250\241\345\236\213-012.md" | 64 +++ week_03/12/1synchronized-012.md | 41 ++ ...45\270\203\345\274\217\351\224\201-012.md" | 44 ++ week_03/12/2AbstractQueuedSynchronizer-012.md | 486 ++++++++++++++++++ week_03/12/2ReentrantLock-012.md | 129 +++++ week_03/12/2Semaphore-012.md | 160 ++++++ 6 files changed, 924 insertions(+) create mode 100644 "week_03/12/1java\345\206\205\345\255\230\346\250\241\345\236\213-012.md" create mode 100644 week_03/12/1synchronized-012.md create mode 100644 "week_03/12/1\345\210\206\345\270\203\345\274\217\351\224\201-012.md" create mode 100644 week_03/12/2AbstractQueuedSynchronizer-012.md create mode 100644 week_03/12/2ReentrantLock-012.md create mode 100644 week_03/12/2Semaphore-012.md diff --git "a/week_03/12/1java\345\206\205\345\255\230\346\250\241\345\236\213-012.md" "b/week_03/12/1java\345\206\205\345\255\230\346\250\241\345\236\213-012.md" new file mode 100644 index 0000000..bacf93a --- /dev/null +++ "b/week_03/12/1java\345\206\205\345\255\230\346\250\241\345\236\213-012.md" @@ -0,0 +1,64 @@ +#### java内存模型 +``` +1.1 java内存模型概念与作用? +java内存模型(Java Memory Model,JMM)是java虚拟机规范定义的,用来屏蔽掉java程序在各种不同的硬件和操作系统对内存的访问的差异,这样就可以实现java程序在各种不同的平台上都能达到内存访问的一致性。 + +1.2 如何实现各个线程能达到内存访问一致性? +cpu的高速缓存与主内存,通过缓存一致性原则,保证各个处理器的高速缓存和主内存的数据的一致性。 +java线程的工作内存与主内存,通过java虚拟机定义的8种操作遵守的原则,保证各个线程的工作内存与主内存的数据的一致性。 + +1.3 保证数据一致性的8种操作和原则是什么? +lock、unlock +read、write +load、store +use、asign + +1.4 8种操作的原则: +8个动作必须是原子的,不可分割的 +不允许read和load、store和write操作之一单独出现 +不允许一个线程丢弃最近的assign操作 +不允许一个线程回写没有修改的变量到主内存 +变量只能在主内存中产生 + +lock原则: +一个变量在同一时刻只能被一个线程对其进行lock操作 +对变量执行lock操作,就会清空工作空间该变量的值 +不允许对没有lock的变量执行unlock操作 +对一个变量执行unlock之前,必须先把变量同步回主内存中 +``` + +#### volatile +``` +概念:volatile修饰的数据可以保持内存可见性、防止指令重排 + +2.1 保持内存可见性原理: +load、use必须连续一起出现:使用变量前必须从主内存去重新获取最新的值,用于保证读线程能看得见其他线程对变量的最新的修改后的值。 +asign、store必须连续一起出现:修改变量后必须立刻同步回主内存,用于保证写线程对变量的修改能立刻被其他线程看到。 + +2.2 防止指令重排原理: +通过在读写操作增加内存屏障,来防止指令重排; +写操作是在前面和后面分别插入内存屏障,而 volatile 读操作是在后面插入两个内存屏障 +StoreStore 屏障:禁止上面的普通写和下面的 volatile 写重排序 +StoreLoad 屏障:防止上面的 volatile 写与下面可能有的 volatile 读/写重排序 +LoadLoad 屏障:禁止下面所有的普通读操作和上面的 volatile 读重排序 +LoadStore 屏障:禁止下面所有的普通写操作和上面的 volatile 读重排序 + +``` + +#### 并发内存模型的3个特征 +``` +原子性、可见性、有序性 +``` + +#### hapen-before +``` +概念:Java内存模型中定义的两个操作之间的偏序关系。比如说操作A先行发生于操作B,那么在B操作发生之前,A操作产生的“影响”都会被操作B感知到。这里的影响是指修改了内存中的共享变量、发送了消息、调用了方法等 + +原则: +程序次序原则 +线程锁定规则 +volatile变量规则 +线程启动、终止、中断、规则 +对象终结规则 +传递性 +``` \ No newline at end of file diff --git a/week_03/12/1synchronized-012.md b/week_03/12/1synchronized-012.md new file mode 100644 index 0000000..fdac4b4 --- /dev/null +++ b/week_03/12/1synchronized-012.md @@ -0,0 +1,41 @@ +特征 +``` +原子性:被synchronized修饰的数据具有原子性,即一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。 +可见性:多个线程访问一个资源时,该资源的状态、值信息等对于其他线程都是可见的 +有序性:有序性值程序执行的顺序按照代码先后执行 +``` + +实现原理 +``` +java对象由三部分构成:对象头、实例数据、对其填充 +2.1 对象头 主要结构是由Mark Word 和 Class Metadata Address组成, +Mark Word存储对象的hashCode、锁信息或分代年龄或GC标志等信息 +Class Metadata Address是类型指针指向对象的类元数据,JVM通过该指针确定该对象是哪个类的实例 + +锁在JDK1.6之后有四个状态:无锁状态、偏向锁、轻量级锁、重量级锁,锁的类型和状态在对象头Mark Word中都有记录,在申请锁、锁升级等过程中JVM都需要读取对象的Mark Word数据 + +2.2 实例数据存放类的属性数据信息,包括父类的属性信息,如果是数组的实例部分还包括数组的长度,这部分内存按4字节对齐。 + +2.3 对其填充不是必须部分,由于虚拟机要求对象起始地址必须是8字节的整数倍,对齐填充仅仅是为了使字节对齐。 +``` + +JVM对synchronized的优化 +``` +锁膨胀升级: +无锁-->偏向锁-->轻量级锁-->重量级锁 +偏向锁:一个线程获得了锁,那么锁就进入偏向模式。 +轻量级锁:第二个线程申请同一个锁对象时,偏向锁就会立即升级为轻量级锁。注意这里的第二个线程只是申请锁,不存在两个线程同时竞争锁,可以是一前一后地交替执行同步块。 +重量级锁:同一时间有多个线程竞争锁时,锁就会被升级成重量级锁,此时其申请锁带来的开销也就变大。 + +锁消除: +在JIT编译时,对运行上下文进行扫描,去除不可能存在竞争的锁 + +锁粗化: +通过扩大锁的范围,避免反复加锁和释放锁 + +自旋锁: +许多情况下,共享数据的锁定状态持续时间较短,切换线程不值得,通过让线程执行循环等待锁的释放,不让出CPU,会带来许多的性能开销。 + +自适应自旋锁: +它的自旋的次数不再固定,其自旋的次数由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定,这就解决了自旋锁带来的缺点。 +``` \ No newline at end of file diff --git "a/week_03/12/1\345\210\206\345\270\203\345\274\217\351\224\201-012.md" "b/week_03/12/1\345\210\206\345\270\203\345\274\217\351\224\201-012.md" new file mode 100644 index 0000000..cd8b0da --- /dev/null +++ "b/week_03/12/1\345\210\206\345\270\203\345\274\217\351\224\201-012.md" @@ -0,0 +1,44 @@ +锁: +``` +单进程模式下,多个线程可以同时改变某个数据,就需要对数据做同步; +同步的本质通过锁实现,多个线程在一个时刻同一个代码块只能有一个线程可执行,需定义一个标记,这个标记必须每个线程都能看到,此标记可以理解为锁; +需要保证对标记修改具有原子性和内存可见性。 +``` + +分布式: +``` +分布式CAP理论: +任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。 + +分布式指的是集群模式下,多个相同服务同时开启 + +分布式与单机情况下最大的不同在于其不是多线程而是多进程 +多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。 +``` + +分布式锁: +``` +当在分布式模型下,需要利用锁的技术控制某一时刻修改同一数据的进程数 +不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题 +分布式锁的标记应该放到所有进程都可以看到的公共内存 +``` + +基于数据库 +``` +基于表主键唯一 +基于表字段版本号 +基于数据库排他锁 +``` + +基于redis +``` +基于SETNX()、EXPIRE() 方法 +基于 REDIS 的 SETNX()、GET()、GETSET()方法 +基于 REDLOCK +基于 REDISSON +``` + +基于zookeeper +``` +利用临时节点与 watch 机制 +``` \ No newline at end of file diff --git a/week_03/12/2AbstractQueuedSynchronizer-012.md b/week_03/12/2AbstractQueuedSynchronizer-012.md new file mode 100644 index 0000000..bcf8768 --- /dev/null +++ b/week_03/12/2AbstractQueuedSynchronizer-012.md @@ -0,0 +1,486 @@ +#### 问题 +独有模式与共享模式:任务是否允许多个线程同时执行
+可重入与不可重入:执行线程是否有再次直接获取锁的权利
+中断有什么作用,对中断如何处理
+加锁与解锁流程
+条件锁,等待与唤醒流程
+获取锁超时后怎么处理
+ +#### 分析 +ConditionObject +``` +await() +设定状态为0 +阻塞自己 +条件队列+1 +唤醒等待队列线程 + +signal() +条件队列-1 +等待队列+1 + 清除阻塞 + 设定状态为1 +修改条件状态 +``` + +#### 简介 +是个抽象类,提供锁相关操作的公共基本方法,如何获取锁与释放锁,共享模式与私有模式线程的处理,有条件的阻塞与唤醒线程操作,线程中断的处理,获取锁超时的处理 + +#### 继承体系 +public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
+AbstractOwnableSynchronizer:设定当前线程 + +#### 类结构说明 +Node +``` +/** indicate a node is waiting in shared mode */ +static final Node SHARED = new Node(); +/** indicate a node is waiting in exclusive mode */ +static final Node EXCLUSIVE = null; + +/** indicate thread has cancelled */ +static final int CANCELLED = 1; +/** indicate successor's thread needs unparking */ +static final int SIGNAL = -1; +/** indicate thread is waiting on condition */ +static final int CONDITION = -2; +/** indicate the next acquireShared should unconditionally propagate */ +static final int PROPAGATE = -3; + +volatile int waitStatus; + +volatile Node prev; + +volatile Node next; + +volatile Thread thread; + +Node nextWaiter; + +final boolean isShared() { + return nextWaiter == SHARED; +} + +Node(Thread thread, Node mode) { // Used by addWaiter + this.nextWaiter = mode; + this.thread = thread; +} + +Node(Thread thread, int waitStatus) { // Used by Condition + this.waitStatus = waitStatus; + this.thread = thread; +} +``` + +ConditionObject +``` +public class ConditionObject implements Condition, java.io.Serializable + +/** First node of condition queue. */ +private transient Node firstWaiter; +/** Last node of condition queue. */ +private transient Node lastWaiter; + +public final void await() +public final boolean await(long time, TimeUnit unit) +public final void awaitUninterruptibly() + +public final void signal() +public final void signalAll() +``` + +#### 源码解析 +##### 属性 +private transient volatile Node head; +private transient volatile Node tail; +private volatile int state; + +##### 主要方法 +status +``` +protected final int getState() +protected final void setState(int newState) +protected final boolean compareAndSetState(int expect, int update) +``` + +acquire +``` +获取锁流程:acquire(int arg) +// 尝试获取锁 +tryAcquire(arg) +// 获取锁成功,直接返回不进入下面方法,线程继续执行 +// 获取锁失败,添加等待队列 +addWaiter(Node.EXCLUSIVE) +// 会一直获取锁,直到成功 +// 获取成功,在等待队列中清除该节点;获取失败,前任节点状态改为signal +acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) +// 线程中途若中断,根据意愿处理 +selfInterrupt() + +public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} + +// 1.1 ReentrantLock.FairSync.tryAcquire +// 尝试获取锁 +protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + // 队列中没有前任即head时才可获取锁,公平锁的象征 + if (!hasQueuedPredecessors() && + // 修改状态为1,即获取到锁 + compareAndSetState(0, acquires)) { + // 设定当前线程 + setExclusiveOwnerThread(current); + return true; + } + } + // 可重入象征,状态+1 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} + +// 1.2 +// 获取锁失败,添加等待队列 +private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode); + Node pred = tail; + // 尾部插入 + if (pred != null) { + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + enq(node); + return node; +} + +// 1.21 +private Node enq(final Node node) { + for (;;) { + Node t = tail; + // 初始化头尾部,头尾是一个节点 + if (t == null) { // Must initialize + if (compareAndSetHead(new Node())) + tail = head; + // 头部后插入该节点,并将该节点设为尾部 + } else { + node.prev = t; + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} + +// 1.3 +// 会一直获取锁,直到成功 +// 获取成功,在等待队列中清除该节点;获取失败,前任节点状态改为signal +// @return {@code true} if interrupted while waiting +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + // 获取锁成功 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + // 获取锁失败,上一个节点状态修改为signal // todo why + if (shouldParkAfterFailedAcquire(p, node) && + // 阻塞当前线程,返回线程中断状态 + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + // 获取锁成功,等待队列中清除 + if (failed) + cancelAcquire(node); + } +} + +// 1.31 阻塞当前线程,返回线程中断状态 +private final boolean parkAndCheckInterrupt() { + LockSupport.park(this); + return Thread.interrupted(); +} + +// 1.32 +// 获取锁失败,上一个节点状态修改为signal // todo why +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus; + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + if (ws > 0) { + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry. + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} + +// 1.33 +// 获取锁成功,等待队列中清除 +private void cancelAcquire(Node node) { + if (node == null) + return; + + node.thread = null; + + // Skip cancelled predecessors + Node pred = node.prev; + while (pred.waitStatus > 0) + node.prev = pred = pred.prev; + + Node predNext = pred.next; + + node.waitStatus = Node.CANCELLED; + + // If we are the tail, remove ourselves. + if (node == tail && compareAndSetTail(node, pred)) { + compareAndSetNext(pred, predNext, null); + } else { + // If successor needs signal, try to set pred's next-link + // so it will get one. Otherwise wake it up to propagate. + int ws; + if (pred != head && + ((ws = pred.waitStatus) == Node.SIGNAL || + (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && + pred.thread != null) { + Node next = node.next; + if (next != null && next.waitStatus <= 0) + compareAndSetNext(pred, predNext, next); + } else { + unparkSuccessor(node); + } + + node.next = node; // help GC + } +} + +// 1.4 中断结果返回为true,中断当前线程 +static void selfInterrupt() { + Thread.currentThread().interrupt(); +} +``` + +release +``` +// 释放锁 +public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; +} + +// 1 尝试释放锁 +protected final boolean tryRelease(int releases) { + // 因为是可重入锁,需完全释放该线程持有锁的状态 + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + // 当前线程设定为null + setExclusiveOwnerThread(null); + } + setState(c); + return free; +} + +// 2 解除阻塞,head的下一个可用节点 +private void unparkSuccessor(Node node) { + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + + // 如果下一节点被取消或为null,从尾部遍历获取第一个节点 + Node s = node.next; + if (s == null || s.waitStatus > 0) { + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + LockSupport.unpark(s.thread); +} +``` + +ConditionObject +await +``` +public final void await() throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + // 增加到条件队列 + Node node = addConditionWaiter(); + // 完全释放该线程,若未释放成功将该线程状态设为cancel + int savedState = fullyRelease(node); + int interruptMode = 0; + // 不在等待队列,即还处于等待状态 + while (!isOnSyncQueue(node)) { + LockSupport.park(this); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + } + // 从条件队列已转移到等待队列,等待状态已消除,可以尝试获取执行器 + // todo 线程中断处理 + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) // clean up if cancelled + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); +} + +// 1.1 增加到条件队列 +// firstWaiter和lastWaiter都有真实的条件线程节点 +private Node addConditionWaiter() { + Node t = lastWaiter; + // If lastWaiter is cancelled, clean out. + if (t != null && t.waitStatus != Node.CONDITION) { + unlinkCancelledWaiters(); + t = lastWaiter; + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); + if (t == null) + firstWaiter = node; + else + t.nextWaiter = node; + lastWaiter = node; + return node; +} + +// 1.2 +// 因为是可重入锁,需完全释放该线程 +// 若未释放成功将该线程状态设为cancel +final int fullyRelease(Node node) { + boolean failed = true; + try { + int savedState = getState(); + if (release(savedState)) { + failed = false; + return savedState; + } else { + throw new IllegalMonitorStateException(); + } + } finally { + if (failed) + node.waitStatus = Node.CANCELLED; + } +} + +// 1.3 等待队列中是否有 +// 若从条件队列已转移到等待队列,则该条件线程已唤醒 +final boolean isOnSyncQueue(Node node) { + if (node.waitStatus == Node.CONDITION || node.prev == null) + return false; + if (node.next != null) // If has successor, it must be on queue + return true; + return findNodeFromTail(node); +} + +``` + +signal +``` +public final void signal() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignal(first); +} + +private void doSignal(Node first) { + do { + if ( (firstWaiter = first.nextWaiter) == null) + lastWaiter = null; + // 相当于把头节点从队列中出队 // todo why + first.nextWaiter = null; + } while (!transferForSignal(first) && + (first = firstWaiter) != null); +} + +// 从条件队列转移等待队列 +// @return true if successfully transferred (else the node was cancelled before signal) +final boolean transferForSignal(Node node) { + /* + * If cannot change waitStatus, the node has been cancelled. + */ + if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) + return false; + + // 加入等待队列,返回上一个节点 + Node p = enq(node); + int ws = p.waitStatus; + // 如果上一个节点已取消了,或者更新状态为SIGNAL失败(即上一个节点已经取消) + // 则直接唤醒当前节点对应的线程 + if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) + LockSupport.unpark(node.thread); + return true; +} +``` + +signalAll +``` +public final void signalAll() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignalAll(first); +} + +private void doSignalAll(Node first) { + lastWaiter = firstWaiter = null; + do { + Node next = first.nextWaiter; + first.nextWaiter = null; + transferForSignal(first); + first = next; + } while (first != null); +} +``` + +#### 总结 +``` +AQS提供获取和释放锁的操作 +对于已获取锁的线程,需要等待其他线程执行达到条件时才可执行的情况,AQS提供了条件等待和唤醒的功能 +若获取锁超时,从等待队列中清除 +对线程中断提供了处理方案 // todo +提供共享线程和独有线程操作的方法 +``` \ No newline at end of file diff --git a/week_03/12/2ReentrantLock-012.md b/week_03/12/2ReentrantLock-012.md new file mode 100644 index 0000000..8675d8c --- /dev/null +++ b/week_03/12/2ReentrantLock-012.md @@ -0,0 +1,129 @@ +#### 问题 +公平锁与非公平锁:被阻塞线程获取执行权是否按等待队列顺序获取
+ +#### 简介 +可重入锁,分为公平锁和非公平锁,根据线程获取锁是否按顺序获取划分,提供条件锁功能。通过静态内部类集成AQS实现 + +#### 继承体系 +public class ReentrantLock implements Lock, java.io.Serializable
+Lock:提供锁的相关操作
+lock()、lockInterruptibly()、unlock()、tryLock()、tryLock(long time, TimeUnit unit)、newCondition()
+ +#### 类结构说明 +abstract static class Sync extends AbstractQueuedSynchronizer +``` +abstract void lock(); +final boolean nonfairTryAcquire(int acquires) +// 释放锁 +protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + setState(c); + return free; +} +final ConditionObject newCondition() +``` + +static final class FairSync extends Sync +``` +final void lock() { + acquire(1); +} + +protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + // 没有其他线程在等待队列中,则尝试获取锁 + if (!hasQueuedPredecessors() && + // 修改状态,获取锁 + compareAndSetState(0, acquires)) { + // 设定当前线程 + setExclusiveOwnerThread(current); + return true; + } + } + // 可重入锁特征 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} + +// 没有其他线程在等待队列中,则尝试获取锁 +public final boolean hasQueuedPredecessors() { + // The correctness of this depends on head being initialized + // before tail and on head.next being accurate if the current + // thread is first in queue. + Node t = tail; // Read fields in reverse initialization order + Node h = head; + Node s; + // 第一个线程刚放入等待队列,刚初始化的状态 + return h != t && + // 等待队列中第一个线程 + ((s = h.next) == null || s.thread != Thread.currentThread()); +} +``` + +static final class NonfairSync extends Sync +``` +final void lock() { + // 直接尝试获取锁 + if (compareAndSetState(0, 1)) + setExclusiveOwnerThread(Thread.currentThread()); + else + acquire(1); +} + +protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); +} + +// 与公平锁相比,少了个检查等待队列中是否有别的线程,即当前线程是否是第一个等待线程 +final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} +``` + + +#### 源码解析 +##### 属性 +private final Sync sync; +##### 构造方法 +##### 主要方法 +``` +public void lock() { + sync.lock(); +} +public void unlock() { + sync.release(1); +} +``` + +#### 总结 +可重入锁,分为公平锁和非公平锁,根据线程获取锁是否按顺序获取划分,提供条件锁功能。通过静态内部类集成AQS实现 diff --git a/week_03/12/2Semaphore-012.md b/week_03/12/2Semaphore-012.md new file mode 100644 index 0000000..232df3d --- /dev/null +++ b/week_03/12/2Semaphore-012.md @@ -0,0 +1,160 @@ +#### 问题 +如何限制指定数量线程同时访问 +指定数量线程访问后剩余线程还可以继续访问吗 + +#### 简介 +信号量,控制同时访问资源的最大线程数 + +#### 继承体系 +public class Semaphore implements java.io.Serializable + +#### 类结构说明 +abstract static class Sync extends AbstractQueuedSynchronizer +``` +Sync(int permits) { + setState(permits); +} + +final int getPermits() { + return getState(); +} + +// 尝试获取锁 +final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } +} + +// 尝试释放锁 +protected final boolean tryReleaseShared(int releases) { + for (;;) { + int current = getState(); + int next = current + releases; + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + if (compareAndSetState(current, next)) + return true; + } +} + +// 减少许可数 +final void reducePermits(int reductions) { + for (;;) { + int current = getState(); + int next = current - reductions; + if (next > current) // underflow + throw new Error("Permit count underflow"); + if (compareAndSetState(current, next)) + return; + } +} + +// 许可数减为0 +final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) + return current; + } +} +``` + +static final class NonfairSync extends Sync +``` +NonfairSync(int permits) { + super(permits); +} + +// 尝试获取公平锁 +protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); +} +``` + +static final class FairSync extends Sync +``` +FairSync(int permits) { + super(permits); +} + +// 尝试获取非公平锁 +protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) + return -1; + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } +} +``` + +#### 源码解析 +##### 属性 +private final Sync sync + +##### 构造方法 +``` +public Semaphore(int permits) { + sync = new NonfairSync(permits); +} +public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); +} +``` + +##### 主要方法 +acquire +``` +public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} +``` + +release +``` +public void release() { + sync.releaseShared(1); +} +``` + +其他方法 +``` +// 获取多个执行权 +public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); +} + +// 线程中断直接抛异常 +public void acquireUninterruptibly() { + sync.acquireShared(1); +} + +public boolean tryAcquire() { + return sync.nonfairTryAcquireShared(1) >= 0; +} + +// 减少许可 +protected void reducePermits(int reduction) { + if (reduction < 0) throw new IllegalArgumentException(); + sync.reducePermits(reduction); +} + +// 清空许可为0 +public int drainPermits() { + return sync.drainPermits(); +} +``` + +#### 总结 +semaphore,控制资源同时访问的线程数 + +#### 示例 -- Gitee