关于我们

质量为本、客户为根、勇于拼搏、务实创新

< 返回新闻公共列表

云南大王-带你看看Java的锁(二)

发布时间:2020-04-13 00:00:00
前言 简介 Semaphore 中文称信号量,它和ReentrantLock 有所区别,ReentrantLock是排他的,也就是只能允许一个线程拥有资源,Semaphore是共享的,它允许多个线程同时拥有资源,是AQS中共享模式的实现,在前面的AQS分析文章中,我也是用Semaphore去解释共享锁的 现实中,我们火爆一点儿的饭店吃饭,比如海底捞,为什么我们需要排队,是因为里面只能容纳这么多人吃饭,位置不够了,我们就要去排队,有人吃完出来了才能有新的人进去,同样的道理 使用 下面还是常规流程 可能有的人没用过,就写个小demo,介绍下他的使用 先看下代码: /** * @ClassName SemaphoreDemo * @Auther burgxun * @Description: 使用信号量的Demo * @Date 2020/4/3 13:53 **/ public class SemaphoreDemo { public static void PrintLog(String logContent) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm.ss.SSS"); System.out.println(String.format("%s : %s", simpleDateFormat.format(new Date()), logContent)); } public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(10); Thread thread1 = new Thread(new Runnable() { @Override public void run() { try { PrintLog("Thread1 starting ......"); semaphore.acquire(5); PrintLog("Thread1 get permits success"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { PrintLog("thread1 release permits success"); semaphore.release(5); } } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { PrintLog("Thread2 starting ......"); semaphore.acquire(5); PrintLog("Thread2 get permits success"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { PrintLog("thread2 release permits success"); semaphore.release(5); } } }); thread1.start(); thread2.start(); try { semaphore.acquire(5); PrintLog("Main thread get permits success"); Thread.sleep(3000); } finally { PrintLog("Main thread release permits"); semaphore.release(5); } } } 上面的代码逻辑也很简答,就是设置一个信号量为10的permits,然后代码中新开2个线程,也获取semaphore的5个permits,同时主线程也获取5个permits 让我们看下结果: 2020-04-08 11:53.40.539 : Thread1 starting ...... 2020-04-08 11:53.40.539 : Main thread get permits success 2020-04-08 11:53.40.539 : Thread2 starting ...... 2020-04-08 11:53.40.544 : Thread1 get permits success 2020-04-08 11:53.43.543 : Main thread release permits 2020-04-08 11:53.43.543 : Thread2 get permits success 2020-04-08 11:53.43.544 : thread1 release permits success 2020-04-08 11:53.46.543 : thread2 release permits success 从结果上可以看到 程序启动的时候,Thread1 和Thread2 还有主线程同时要获取permits,但是由于Semaphore的permits一共就是10个,所以当主线程和Thread1获取到了以后,Thread2 虽然启动了 但是会阻塞在这边,进入AQS的SyncQueue中,当MainThread或者Thread1 执行完成,释放permits后,Thread2 才会从阻塞队列中 唤醒回来从新获取的信号量,后面继续执行! 源码分析 看完了 上面的小Demo 相信你对信号量有了一定的了解,那我们就进入源码中看下,是怎么实现的,首先我们先看下Semaphore的结构图: 类结构图 从图上我们可以看到 这个类机构和我们之前看的ReentrantLock差不多,有一个Sync静态的抽象类,然后还有2个继承Sync的类,一个是公平类FairSync ,另外一个是非公平的NonfairSync 关于公平和非公平的选择 我在ReentrantLock的结尾部分已经做了部分阐述,这边就不说了 那么就看下代码实现吧! Sync abstract static class Sync extends AbstractQueuedSynchronizer { /** * m默认的构造函数 设置AQS同步器的State值 */ Sync(int permits) { setState(permits); } /** * 获取同步器的状态值 State 这个就是获取设置的许可数量 */ final int getPermits() { return getState(); } /** * 实现共享模式下非公平方法的获取资源 */ final int nonfairTryAcquireShared(int acquires) { for (; ; ) {//自旋 int available = getState();//当前的同步器的状态值 int remaining = available - acquires;//本次用完 还剩下几个permits值 // 如果剩余小于0或者CAS 成功 就返回 后面利用这个方法的时候 会判断返回值的 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * 共享模式下的尝试释放资源 */ protected final boolean tryReleaseShared(int releases) { for (; ; ) { int current = getState(); int next = current + releases;//用完的permits 就要加回去 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//CAS 修改AQS的State 由于可能存在多个线程同时操作的State // 所以要使用CAS操作 失败了就继续循环,CAS成功就返回 return true; } } /** * 根据参数reductions 去减少信号量 */ final void reducePermits(int reductions) { for (; ; ) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } /** * 清空所有信号量 并返回清空的数量 * 这边我看很多文章里面的人只是翻译了英文 获取信号量 但是这个里面有个清空的操作 */ final int drainPermits() { for (; ; ) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } Sync 里面的方法不多,都是常见的讨论,里面默认提供一个非公平版本的获取Permits,还有统一的释放Permits的方法,其余的就是一个获取信号量方法getPermits,和减少当前Permits数量的方法reducePermits,最后还有一个清空信号量的方法drainPermits,drainPermits这个方法好多文章里面 都翻译了因为注解,都翻译为获取并返回许可,但是这个方法其实主要做的还是清空Semaphore里面的信号量的操作,有人会想 为什么提供这个一个鸡肋方法么,因为先用getPermits获取 然后使用reducePermits减少不就好了么,哈哈,这边要考虑到多线程并发的情况,这边只能使用CAS的操作去更新!每一行代码都有它存在的意义,就像人一样,存在即合理! NonfairSync /** * 非公平版本 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } 这个类 真没啥好说的,都是调的Sync里面的实现 FairSync /** * 公平版本 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (; ; ) {//这边为什么要用自旋 主要是因为当前版本是共享模式 可能会多个线程同事操作State 导致当前的CAS 操作失败 所以要做重试 if (hasQueuedPredecessors())//如果当前的SyncQueue中还有等待线程 那就直接返回-1 不让当前线程获取资源 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } 这个公平方法的初始化方法和非公平的一样,区别就是这个tryAcquireShared方法的实现,一个是自己实现的,一个是调的Sync的实现,这个2个方法 虽然区别也不大,但是执行的逻辑却是不一样的,主要是这个hasQueuedPredecessors方法,这个方法就是获取AQS的SyncQueue中 是否还有等待获取资源的线程,这就是公平和非公平的区别,上一篇ReentrantLock中我也做个说明,就相当于插队一样,如果公平的话 大家获取不到的时候 就到后面排队等待,大家挨个来,但是非公共的获取 就是一上来,我不管后面有没有等待的人,如果有满足我获取的条件,我就直接占用! Semaphore 构造函数 Semaphore有个构造函数 Semaphore(int permits) 这个是默认的构造函数,是非公平permits就是信号量许可的总数 Semaphore(int permits, boolean fair) 这个和上面的区别就是 可以设置实现的版本 true就是FairSync false 就是NonfairSync Semaphore 成员方法 获取 方法 是否响应中断 是否阻塞 acquire() 是 是 acquire(int permits) 是 是 acquireUninterruptibly() 否 是 acquireUninterruptibly(int permits) 否 是 tryAcquire() 否 否 tryAcquire(int permits) 否 否 tryAcquire(long timeout, TimeUnit unit) 是 是(时间可控) tryAcquire(int permits, long timeout, TimeUnit unit) 是 是(时间可控) 上面的方法 是我对Semaphore获取permits的使用总结,记不住也没事儿,看看名字或者到时候看下注解,应该也能看明白的~ 调一个核心方法acquire吧 /** * Semaphore中 * 获取资源 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * AbstractQueuedSynchronizer 中 * 共享模式下的 获取资源 可以响应中断 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//检测下 中断标识符 如果发生了中断 就抛出中断异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//尝试获取资源 如果返回值小于0 说明当前的同步器里面的值 不够当前获取 就进入排队 doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //以共享模式加入到阻塞队列中 这里addWaiter和独占锁加锁使用的是同一个方法 不清楚的 可以看之前的文章 final Node node = addWaiter(Node.SHARED);// 返回成功加入队尾的节点 boolean failed = true;//标识是否获取资源失败 try { for (; ; ) {//自旋 final Node p = node.predecessor();// 获取当前节点的前置节点 if (p == head) {// 如果前置节点是head 那就去尝试获取资源,因为可能head已经释放了资源 int r = tryAcquireShared(arg); if (r >= 0) {// 如果获取成功且大于等于0,意味这资源还有剩余,可唤醒其余线程获取 setHeadAndPropagate(node, r);// 这边方法就是和独占锁处理不一样地放 我们可以重点去看下 其余的流程是一样的 p.next = null; // help GC failed = false; return; } } /*下面的方法和独占锁的是一样的 在第一篇文章中已经解读过,小伙伴们如果不清楚 可以去看下 有区别的地方就是对中断的处理这边是直接抛出中断异常,独占锁处理是返回标记是否中断 让上一层处理中断 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 更新prev节点状态 ,并根据prev节点状态判断是否自己当前线程需要阻塞 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// node的prev节点的状态 if (ws == Node.SIGNAL) // 如果SIGNAL 就返回true 就会执行到parkAndCheckInterrupt方法里面 return true; /* * 如果ws 大于0 这里是只能为1,如果是1说明线程已经取消,相当于无效节点 * 者说明 当前node 节点加入到了一个无效节点后面,那这个必须处理一下 node.prev = pred = pred.prev * 这个操作 我们拆解下来看,下看 pred = pred.prev这个的意思是把prev节点的prev*节点 赋值给prev节点 *后面再看 node.prev = pred 联合 刚才的赋值 这个的意思就是把prev节点的prev节点和node关联起来, *原因我上面也说了因为pre节点线程取消了,所以node节点不能指向pre节点 只能一个一个的往前找, *找到waitStatus 小于或者等于0的结束循环最后再把找到的pre节点执行node节点 ,这样就跳过了所有无效的节点 */ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* *这边的操作就是把pred 节点的状态设置为SIGNAL,这样返回false 这样可以返回到上面的自旋中 *再次执行一次,如果还是获取不到锁,那么又回到当前的shouldParkAfterFailedAcquire方法 执行到方法最上面的判断 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 阻塞当前线程,并在恢复后坚持是否发送了中断 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { /* * 这边线程阻塞,只有2中方式唤醒当前线程, * 一种方式就是 当前线程发生中断 * 另外一个情况就是 资源释放的时候会调unpark 方法 唤醒当前的线程 这个会在下一篇会讲到 */ LockSupport.park(this); return Thread.interrupted();//检查线程在阻塞过程中 是否发生了中断 } 这其中 tryAcquireShared 方法 都是子类去重写实现的,非公平版本和公平版本的实现 上文已经描述过! 这其中的整个实现都是在AQS中的,在前面的AQS文章中也详细的描述过~不清楚的 去前面的文章中看下 下面是整个方法的调用关系图如下: 释放 /** * 释放资源 */ public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } 释放方法调用了Sync的releaseShared 实际上就是调用了AQS内部的方法releaseShared /** * AbstractQueuedSynchronizer 中 * 共享版本的 释放资源 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//tryReleaseShared 还是和之前的套路一样 子类去重写的 doReleaseShared();//是不是很熟悉 return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式下的释放资源 */ private void doReleaseShared() { for (; ; ) { Node h = head; if (h != null && h != tail) {// 这个head!=tail 说明阻塞队列中至少2个节点 不然也没必要去传播唤醒 如果就自己一个节点 就算资源条件满足 还换个谁呢? int ws = h.waitStatus;// head 节点状态SIGNAL if (ws == Node.SIGNAL) {// 如果head状态是 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);//是和独占锁释放用的同样的方法 唤醒的是下一个节点 前面的文章有分析到 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; //这边设置为-3 是为了唤醒的传播 也就是满足上一个方法有判断waitStatus 小于0 } if (h == head) break; } } /** * 唤醒等待线程去获取资源 */ private void unparkSuccessor(Node node) { /* * 这边判断了下如果当前节点状态小于0,更新这边的节点状态的0,是为了防止多次释放的时候 会多次唤醒, * 因为上面的方法有个判断waitStatus不等0才会执行到这个方法里面 */ int ws = node.waitStatus;//这边的弄得节点就是 要释放的节点 也就是当前队列的头节点 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 如果当前节点的next 节点 不存在或者waitStatus 大于0 说明next节点的线程已取消 if (s == null || s.waitStatus > 0) { s = null; //这个循环就是 从尾部节点开始往前找,找到离node节点也就是当前释放节点最近的一个非取消的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } /* *一开始觉得 这行判断null有点多余 因为上面去for 循环去找s 的时候 已经判断了不等于null *才可以进下面的循环赋值的 后来一想 不对,你们猜为什么? *因为 可能在循环的过程中t在赋值给s后,继续循环 虽然条件不满足, *但是这个时候已经比修改成null 了 我是这么想的哈~不知道对不对~ */ if (s != null) LockSupport.unpark(s.thread);// 这边就是唤醒当前线程去获取资源, } 具体里面的doReleaseShared方法 我之前在AQS的共享锁的文章里面 都详细做了概述,这边我就不再次赘述了! tryReleaseShared 方法 还是和之前的套路一样,AQS里面没有实现 只是写了个抛出异常的方法,tryReleaseShared方法需要子类去重写实现,具体为什么不写成抽象方法,哈哈 这个问题 自己去AQS中的文章去找下吧~相信你能找到答案 总结 看完了整个代码的实现,Semphore实际上就是一个共享锁,多个线程可以共享一个AQS中的State,Semphore常见使用场景是限制资源的并发访问的线程数量

/template/Home/Zkeys/PC/Static