diff --git a/week_04/35/ArrayBlockingQueue-035.md b/week_04/35/ArrayBlockingQueue-035.md new file mode 100644 index 0000000000000000000000000000000000000000..8399a3b6c8e78df9090e26e0a805756c605b8759 --- /dev/null +++ b/week_04/35/ArrayBlockingQueue-035.md @@ -0,0 +1,19 @@ +####ArrayBlockingQueue + ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue(实现BlockingQueue继承Queue继承Collection)接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。常用的操作包括 add,offer,put,remove,poll,take,peek + 基于数组的阻塞队列 + 有界队列,有界也就意味着,它不能够存储无限多数量的对象。所以在创建 ArrayBlockingQueue 时,必须要给它指定一个队列的大小 + +####ArrayBlockingQueue几个重要方法 + add(E e):把 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则报异常 + offer(E e):表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false + put(E e):把 e 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续 + poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null + take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止 + remainingCapacity():剩余可用的大小。等于初始容量减去当前的 size + +####使用场景 + * 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素) + * 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作) + * 队列不支持空元素 + + \ No newline at end of file diff --git a/week_04/35/ConcurrentHashMap-035.md b/week_04/35/ConcurrentHashMap-035.md new file mode 100644 index 0000000000000000000000000000000000000000..0098bbbd6a40920a44482bdf10cf12c6b7ce5238 --- /dev/null +++ b/week_04/35/ConcurrentHashMap-035.md @@ -0,0 +1,270 @@ +####ConcurrentHashMap + 那么它到底是如何实现线程安全的? + 其中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性 + +#####基本参数变量 + // node数组最大容量:2^30=1073741824 + private static final int MAXIMUM_CAPACITY = 1 << 30; + // 默认初始值,必须是2的幕数 + private static final int DEFAULT_CAPACITY = 16 + //数组可能最大值,需要与toArray()相关方法关联 + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + //并发级别,遗留下来的,为兼容以前的版本 + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + // 负载因子 + private static final float LOAD_FACTOR = 0.75f; + // 链表转红黑树阀值,> 8 链表转换为红黑树 + static final int TREEIFY_THRESHOLD = 8; + //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) + static final int UNTREEIFY_THRESHOLD = 6; + static final int MIN_TREEIFY_CAPACITY = 64; + private static final int MIN_TRANSFER_STRIDE = 16; + private static int RESIZE_STAMP_BITS = 16; + // 2^15-1,help resize的最大线程数 + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + // 32-16=16,sizeCtl中记录size大小的偏移量 + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + // forwarding nodes的hash值 + static final int MOVED = -1; + // 树根节点的hash值 + static final int TREEBIN = -2; + // ReservationNode的hash值 + static final int RESERVED = -3; + // 可用处理器数量 + static final int NCPU = Runtime.getRuntime().availableProcessors(); + //存放node的数组 + transient volatile Node[] table; + /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 + *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容 + *当为0时:代表当时的table还没有被初始化 + *当为正数时:表示初始化或者下一次进行扩容的大小 + private transient volatile int sizeCtl; + +#####put + /** Implementation for put and putIfAbsent */ + final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + //1. 计算key的hash值 + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化 + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //4. 当前正在扩容 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + synchronized (f) { + if (tabAt(tab, i) == f) { + //5. 当前为链表,在链表中插入新的键值对 + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + // 6.当前为红黑树,将新的键值对插入到红黑树中 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树 + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 + addCount(1L, binCount); + return null; + } + + 1.判断Node[]数组是否初始化,没有则进行初始化操作 + 2.通过hash定位数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头节点),添加失败则进入下次循环 + 3.检查到内部正在扩容,就帮助它一块扩容 + 4.如果f!=null,则使用synchronized锁住f元素(链表/红黑树的头元素)。如果是Node(链表结构)则执行链表的添加操作;如果是TreeNode(树型结构)则执行树添加操作 + 5.判断链表长度已经达到临界值8(默认值),当节点超过这个值就需要把链表转换为树结构 + 6.如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容 + + static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; + } + 将key的hashCode的低16位于高16位进行异或运算,不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾 +#####初始化 + private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + if ((sc = sizeCtl) < 0) + // 1. 保证只有一个线程正在进行初始化操作 + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if ((tab = table) == null || tab.length == 0) { + // 2. 得出数组的大小 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + // 3. 这里才真正的初始化数组 + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + // 4. 计算数组中可用的大小:实际大小n*0.75(加载因子) + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + break; + } + } + return tab; + } + +#####扩容 + final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + // 如果 table 不是空 且 node 节点是转移类型,数据检验 + // 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验 + // 尝试帮助扩容 + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { + // 根据 length 得到一个标识符号 + int rs = resizeStamp(tab.length); + // 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改 + // 且 sizeCtl < 0 (说明还在扩容) + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + // 如果 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了) + // 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) + // 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535) + // 或者转移下标正在调整 (扩容结束) + // 结束循环,返回 table + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容) + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { + // 进行转移 + transfer(tab, nextTab); + // 结束循环 + break; + } + } + return nextTab; + } + return table; + } + 重点再transfer方法 + +#####链表转红黑树的过程 + private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + //如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了 + //因为这个阈值扩容可以减少hash冲突,不必要去转红黑树 + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + synchronized (b) { + if (tabAt(tab, index) == b) { + TreeNode hd = null, tl = null; + for (Node e = b; e != null; e = e.next) { + //封装成TreeNode + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) + hd = p; + else + tl.next = p; + tl = p; + } + //通过TreeBin对象对TreeNode转换成红黑树 + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } + } + +#####get + ConcurrentHashMap的get操作的流程: + 1.计算hash值,定位到该table索引位置,如果是首节点符合就返回 + 2.如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回 + 3.以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null + + public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); //计算两次hash + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素 + if ((eh = e.hash) == h) { //如果该节点就是首节点就返回 + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来 + //查找,查找到就返回 + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历 + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } +#####szie方法的逻辑 + 对于size的计算,在扩容和addCount()方法就已经有处理了,可以注意一下Put函数,里面就有addCount()函数,早就计算好的,然后你size的时候直接给你 + public int size() { + long n = sumCount(); + return ((n < 0L) ? 0 : + (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : + (int)n); + } + final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; //变化的数量 + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; + } + + \ No newline at end of file diff --git a/week_04/35/ConcurrentLinkedQueue-035.md b/week_04/35/ConcurrentLinkedQueue-035.md new file mode 100644 index 0000000000000000000000000000000000000000..61670068e0e08cf2eb6f0d6c6aedb737084f98db --- /dev/null +++ b/week_04/35/ConcurrentLinkedQueue-035.md @@ -0,0 +1,42 @@ +####ConcurrentLinkedQueue + ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了'wait-free'算法来实现,该算法在Michael & Scott算法上进行了一些修改 + ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列 + + ConcurrentLinkedQueue只实现了Queue接口,并没有实现BlockingQueue接口,所以它不是阻塞队列,也不能用于线程池中,但是它是线程安全的,可用于多线程环境中 + + + private static class Node { + // 节点中的元素 + volatile E item; + // 下一个节点,没有上一个节点,表示它是一个单向链表的形式 + volatile Node next; + // 构造一个节点 + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + // 使用 CAS 的方式设置节点的元素 + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + // 设置下一个节点 + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + // 采用 CAS 的方式设置下一个节点 + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + // Unsafe 类的一些初始化 + } + + E item 元素和 Node next 节点都使用了 volatile 来修饰,这说明了元素或某个节点被一个线程修改了之后,其他的线程是立马看到修改后的值的 + + // 头节点, + private transient volatile Node head; + // 尾节点,尾节点不一定是链表的最后一个节点 + private transient volatile Node tail; + // 构造 + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } + 头节点 head 和尾节点 tail 都被 volatile 修饰,节点被一个线程修改了之后,是会把修改的最新的值刷新到主内存中去,当其他线程去读取该值的时候,会中主内存中获取最新的值,也就是一个线程修改了之后,对其他线程是立即可见的 \ No newline at end of file diff --git a/week_04/35/CopyOnWriteArrayList-035.md b/week_04/35/CopyOnWriteArrayList-035.md new file mode 100644 index 0000000000000000000000000000000000000000..b9ee3c1cd43ded24e144ed87c8c53c289371bcb8 --- /dev/null +++ b/week_04/35/CopyOnWriteArrayList-035.md @@ -0,0 +1,31 @@ +####CopyOnWriteArrayList + 实现List, RandomAccess, Cloneable, Serializable + 内部持有一个ReentrantLock lock = new ReentrantLock() + 底层是用volatile transient声明的数组array + 读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给array + + volatile(挥发物、易变的):变量修饰符,只能用来修饰变量。volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。 + 而且,当成员变量发生变 化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值 + transient(暂短的、临时的):修饰符,只能用来修饰字段。在对象序列化的过程中,标记为transient的变量不会被序列化 + + CopyOnWrite并发容器用于读多写少的并发场景 + + CopyOnWrite:对一块内存进行修改时,不直接在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来指向的内存指针指到新的内存,原来的内存就可以被回收 + + + 读取操作没有任何同步控制和锁操作,理由就是内部数组array不会发生修改,只会被另外一个array替换,因此可以保证数据安全 + 写入操作add()方法在添加集合的时候加了锁,保证同步,避免多线程写的时候会copy出多个副本 + + ####CopyOnWrite的缺点  + CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。 + + 内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象 + (注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存) + + 针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap + + 数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器 + +####CopyOnWriteArrayList为什么并发安全且性能比Vector好 +  我知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况 + \ No newline at end of file diff --git a/week_04/35/DelayQueue-035.md b/week_04/35/DelayQueue-035.md new file mode 100644 index 0000000000000000000000000000000000000000..1848743b11a48db5bccfcf19069c9caa8265a1cf --- /dev/null +++ b/week_04/35/DelayQueue-035.md @@ -0,0 +1,25 @@ +####DelayQueue + DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素 + public class DelayQueue extends AbstractQueue + implements BlockingQueue { + /** 可重入锁 */ + private final transient ReentrantLock lock = new ReentrantLock(); + /** 支持优先级的BlockingQueue */ + private final PriorityQueue q = new PriorityQueue(); + /** 用于优化阻塞 */ + private Thread leader = null; + /** Condition */ + private final Condition available = lock.newCondition(); + + DelayQueue的元素都必须继承Delayed接口。同时也可以从这里初步理清楚DelayQueue内部实现的机制了:以支持优先级无界队列的PriorityQueue作为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高 + + PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序 + PriorityBlockingQueue底层采用二叉堆来实现 + + 二叉堆是一种特殊的堆,就结构性而言就是完全二叉树或者是近似完全二叉树,满足树结构性和堆序性。树机构特性就是完全二叉树应该有的结构,堆序性则是:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。它有两种表现形式:最大堆、最小堆。 + 最大堆:父节点的键值总是大于或等于任何一个子节点的键值 + 最小堆:父节点的键值总是小于或等于任何一个子节点的键值 + +####使用场景 + * 缓存系统 : 当能够从 DelayQueue 中获取元素时,说该缓存已过期 + * 定时任务调度 \ No newline at end of file diff --git a/week_04/35/com.dans.demo/ArrayBlockingQueueDemo.java b/week_04/35/com.dans.demo/ArrayBlockingQueueDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..6e31e944de89a0f0185988a693b4f5142fdf5b99 --- /dev/null +++ b/week_04/35/com.dans.demo/ArrayBlockingQueueDemo.java @@ -0,0 +1,32 @@ +import java.util.AbstractQueue; +import java.util.concurrent.ArrayBlockingQueue; + +/** + * @author dans + * @ClassName: ArrayBlockingQueueDemo + * @Function: TODO + * @Date: 2020/1/2 10:57 + */ +public class ArrayBlockingQueueDemo { + + public static void main(String[] args) { + ArrayBlockingQueue queue = new ArrayBlockingQueue(1); + try { + queue.put(1); + System.out.println(1); + //移出 + queue.remove(1); + queue.put(1); + System.out.println(2); + //是否添加进队列,返回false + queue.offer(1); + System.out.println(3); + //阻塞 + queue.put(1); + System.out.println(4); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/week_04/35/com.dans.demo/CopyOnWriteArrayListDemo.java b/week_04/35/com.dans.demo/CopyOnWriteArrayListDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..1c7a08b8dd71dc7a08e2918d635b069dc62888eb --- /dev/null +++ b/week_04/35/com.dans.demo/CopyOnWriteArrayListDemo.java @@ -0,0 +1,27 @@ +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * @author dans + * @ClassName: CopyOnWriteArrayListDemo + * @Function: TODO + * @Date: 2019/12/30 10:39 + */ +public class CopyOnWriteArrayListDemo { + + public static void main(String[] args) { + List copy = new CopyOnWriteArrayList(); + copy.add(1); + copy.add(1); + copy.add(2); + copy.add(3); + copy.forEach(e ->{ + if (e == 1) {//直接移出 + copy.remove(e); + } + }); + copy.forEach(e ->{ + System.out.println(e); + }); + } +} diff --git a/week_04/35/com.dans.demo/DelayQueueDemo.java b/week_04/35/com.dans.demo/DelayQueueDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..f5393d1aae6aa8b619fec4d42d9f0e8935843133 --- /dev/null +++ b/week_04/35/com.dans.demo/DelayQueueDemo.java @@ -0,0 +1,134 @@ +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * @author dans + * @ClassName: DelayQueueDemo + * @Function: TODO + * @Date: 2020/1/4 14:19 + */ +public class DelayQueueDemo { + static class Cache implements Runnable { + + private boolean stop = false; + + private Map itemMap = new HashMap<>(); + + private DelayQueue delayQueue = new DelayQueue<>(); + + public Cache () { + // 开启内部线程检测是否过期 + new Thread(this).start(); + } + + /** + * 添加缓存 + * + * @param key + * @param value + * @param exprieTime 过期时间,单位秒 + */ + public void put (String key, String value, long exprieTime) { + CacheItem cacheItem = new CacheItem(key, exprieTime); + + // 此处忽略添加重复 key 的处理 + delayQueue.add(cacheItem); + itemMap.put(key, value); + } + + public String get (String key) { + return itemMap.get(key); + } + + public void shutdown () { + stop = true; + } + + @Override + public void run() { + while (!stop) { + CacheItem cacheItem = delayQueue.poll(); + if (cacheItem != null) { + // 元素过期, 从缓存中移除 + itemMap.remove(cacheItem.getKey()); + System.out.println("key : " + cacheItem.getKey() + " 过期并移除"); + } + } + + System.out.println("cache stop"); + } + } + + static class CacheItem implements Delayed { + + private String key; + + /** + * 过期时间(单位秒) + */ + private long exprieTime; + + private long currentTime; + + public CacheItem(String key, long exprieTime) { + this.key = key; + this.exprieTime = exprieTime; + this.currentTime = System.currentTimeMillis(); + } + + @Override + public long getDelay(TimeUnit unit) { + // 计算剩余的过期时间 + // 大于 0 说明未过期 + return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime); + } + + @Override + public int compareTo(Delayed o) { + // 过期时间长的放置在队列尾部 + if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) { + return 1; + } + // 过期时间短的放置在队列头 + if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { + return -1; + } + + return 0; + } + + public String getKey() { + return key; + } + } + + public static void main(String[] args) throws InterruptedException { + + Cache cache = new Cache(); + + // 添加缓存元素 + cache.put("a", "1", 5); + cache.put("b", "2", 4); + cache.put("c", "3", 3); + + while (true) { + String a = cache.get("a"); + String b = cache.get("b"); + String c = cache.get("c"); + + System.out.println("a : " + a + ", b : " + b + ", c : " + c); + + // 元素均过期后退出循环 + if (a.isEmpty() && b.isEmpty()&& c.isEmpty()) { + break; + } + + TimeUnit.MILLISECONDS.sleep(1000); + } + + cache.shutdown(); + } +} diff --git a/week_04/35/com.dans.demo/ZeroEvenOdd.java b/week_04/35/com.dans.demo/ZeroEvenOdd.java new file mode 100644 index 0000000000000000000000000000000000000000..28b040717826672e15ca5e50e59a6514faba35a8 --- /dev/null +++ b/week_04/35/com.dans.demo/ZeroEvenOdd.java @@ -0,0 +1,83 @@ +import java.util.concurrent.Semaphore; +import java.util.function.IntConsumer; + +/** + * @author dans + * @ClassName: ZeroEvenOdd + * @Function: TODO + * @Date: 2020/1/3 12:01 + */ +public class ZeroEvenOdd { + + private int n; + Semaphore z = new Semaphore(1); + Semaphore e = new Semaphore(0); + Semaphore o = new Semaphore(0); + + public ZeroEvenOdd(int n) { + this.n = n; + } + + public void zero(IntConsumer printNumber) throws InterruptedException { + for (int i = 0; i < n; i++) { + z.acquire(); + printNumber.accept(0); + if ((i & 1) == 0) { + o.release(); + } else { + e.release(); + } + } + } + + public void even(IntConsumer printNumber) throws InterruptedException { + for (int i = 2; i <= n; i += 2) { + e.acquire(); + printNumber.accept(i); + z.release(); + } + } + + public void odd(IntConsumer printNumber) throws InterruptedException { + for (int i = 1; i <= n; i += 2) { + o.acquire(); + printNumber.accept(i); + z.release(); + } + } +} + +class MainClass { + + public static void main(String[] args) { + IntConsumer ic = (x) -> { + System.out.println(x); + }; +// ic.accept(2); + ZeroEvenOdd odd = new ZeroEvenOdd(2); + new Thread(() -> { + try { + odd.zero(ic); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + new Thread(() -> { + try { + odd.even(ic); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + new Thread(() -> { + try { + odd.odd(ic); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + } +} + + +