博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java高并发系列 — AQS
阅读量:4621 次
发布时间:2019-06-09

本文共 7815 字,大约阅读时间需要 26 分钟。

  只懂volatile和CAS是不是可以无视concurrent包了呢,发现一个好链接,继续死磕,第一日;

  首先,我承认很多时候要去看源码才能更好搞懂一些事,但如果站在巨人肩膀上呢?有了大概思想源码看还是挺容易的,关键是要调试一次,特别是类似CountDownLatch这样的。

另外,没有完全理解我是不会记录一个字的,写这个目的就是为了快速复习而已;不开玩笑,这个AQS我多年前也看过一次,不用也全忘了。

  最后,文本是在自己理解下做的另一番解析,和原文不同。

介绍一副肩膀

https://www.cnblogs.com/waterystone/p/4920797.html

首先是AQS的整个构造,我喜欢图,上图:

  就是上图的构造,结合volatile(上图的volatile int state),CAS(各种如compareAndSetTail()方法),for(;;)自旋,整个concurrent包的灵魂就出来了。

  初认识AQS只需要知道用它分获取和释放两种操作,每个操作分为独占和共享,也就是acqurie、release、acquireShared、releaseshare这四个方法,可以多个子类同步器,如CyclicBarrire

记录文:

独占锁acquire(int)

  

  还有得到个好东西,一图明确acquire()方法,如下图:

public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

 

附带acquire()的总结:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

  这里有个有趣的问题就是在加入等待队列之后,线程是否应该进入wait状态【图中的park(),其实是用Unsafe.unpark实现的。unpark使得线程不需要获取锁进入wait状态】,加入队尾后,利用acquireQueued休息:

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;//标记是否成功拿到资源    try {        boolean interrupted = false;//标记等待过程中是否被中断过                //又是一个“自旋”!这里自旋什么时候会跳出呢?看return可知,只有在获取到了锁会退出,如果获取不到,那么就会被unpark把线程带到wait状态。       //当unpark唤醒后会继续自旋看自己是否是老二,一般情况下就是了,然后退出循环,返回中断状态,用作补偿处理,比较wai是不响应中断的        for (;;) {            final Node p = node.predecessor();//拿到前驱            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。            if (p == head && tryAcquire(arg)) {                setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!                failed = false;                return interrupted;//返回等待过程中是否被中断过            }                        //如果自己可以休息了,就进入waiting状态,直到被unpark()            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true        }    } finally {        if (failed)            cancelAcquire(node);    }}

   acquireQueued方法是如何找到安全点的呢,找安全点其实就是找到一个前驱节点,该节点的waitStatus == Node.SIGNAL,而这个整型值为-1,这个标志的Node遵守一个协议就是该节点在释放锁的时候会唤醒下一个需要唤醒的节点(不一定是后继节点);如果找不到,就会在自旋中变为直接争取锁,结合上面和下面代码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;//拿到前驱的状态    if (ws == Node.SIGNAL)        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了        return true;    if (ws > 0) {        /*         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!         */        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

 

   介绍下waitStatus的状态取:

  1、如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。
  2、如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除。
  3、如果pred的waitStatus为Node.SIGNAL,则通过LockSupport.park()方法把线程A挂起,并等待被唤醒。
    /** waitStatus value to indicate thread has cancelled */        static final int CANCELLED =  1;        /** waitStatus value to indicate successor's thread needs unparking */        static final int SIGNAL    = -1;        /** waitStatus value to indicate thread is waiting on condition */        static final int CONDITION = -2;        /**         * waitStatus value to indicate the next acquireShared should         * unconditionally propagate         */        static final int PROPAGATE = -3;

 

  我们得知,如果waitStatus为-1那么就可以让线程upark进入等待状态(同thread.wait()),那么CLH是如何办到唤醒的呢?答案在realease()方法上。

独占锁释放release(int):

  该方法的目的是用unpark()唤醒等待队列中最前边的那个未放弃线程,问题是所有的线程要么被中断放弃,要么在等待被唤醒,那么该如何唤醒?就是用到waitStatus,首先尝试释放首节点,如果释放失败,那么就从tail开始,一直往前找,如果找到就Node s变量用记下来,找到多个就把最后的覆盖前面的,那么遍历到头节点后,自然s记录的就是最前的需要唤醒的节点了。结合代码就很好看懂:

  

private void unparkSuccessor(Node node) {    //这里,node一般为当前线程所在的结点。    int ws = node.waitStatus;    if (ws < 0)//置零当前线程所在的结点状态,允许失败,使得找节点只需找<0的,可以跳过头节点。        compareAndSetWaitStatus(node, ws, 0);    Node s = node.next;//找到下一个需要唤醒的结点s,先从头结点开始,不对就从尾部开始往前找到最前的    if (s == null || s.waitStatus > 0) {
//如果为空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒}

   

  从以上代码可以看出,唤醒这个操作是当前线程做的,由获取锁的线程去唤醒下一个线程;这里只需要LocakSupport.unpark,当被唤醒的线程成功后,就会设置自己为head,然后退出代码块,具体见acquireQueue方法注释。

共享获取acquireShared()

  这个其实和acquire差不多,只不过获取的status是多个,而且获取失败后直接继续唤醒后调用doAcquireShare

public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }

  而doAcquireShare方法和acquireQueued方法是差不多的,具体看如下代码注释:

/**     * Acquires in shared uninterruptible mode.     * @param arg the acquire argument     */    private void doAcquireShared(int arg) {        final Node node = addWaiter(Node.SHARED);//首先保证入队        boolean failed = true;        try {            boolean interrupted = false;  //追踪中断状态            for (;;) {                final Node p = node.predecessor();                if (p == head) {    //如果是头结点那么久尝试获取资源,然后根据资源情况决定是否唤醒后面线程                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);  //r>0,唤醒后面线程                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }          //这里也是找安全点                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

  再看看该方法是如何继续唤醒后面线程的,这里注意,唤醒线程有两种情况,第一种是头结点获取到锁后还有资源所以唤醒后面的一起共享,第二种情况是获得锁的线程释放资源的时候去唤醒后面的节点,如下代码所示,doReleaseShared方法就是用作唤醒线程的。

  另外,在众多代码中我们都看到interrupted的标记位,可以看出,等待过程是不响应中断的,只会在后期补上中断响应,而中断响应是线程自身决定的。

private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;     setHead(node);//head指向自己     //如果还有剩余量,继续唤醒下一个邻居线程    if (propagate > 0 || h == null || h.waitStatus < 0) {        Node s = node.next;        if (s == null || s.isShared())            doReleaseShared();    }}

释放共享releaseShare()

  该方法同样简单,尝试释放资源,否则唤醒后面线程,这里我们同样看到了doReleaseShared方法

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {
//尝试释放资源 doReleaseShared();//唤醒后继结点 return true; } return false;}

  代码:

  这里又用到了unparkSuccessor方法,首先判断头结点的waitStates是否为SIGNAL标志,是就设置为0,代表已经不再需要资源,然后自旋(for ;;)调用unparkSusseor,直到后继线程获取成功,后继线程获取成功会被唤醒,唤醒后后继线程第一件事就是在acquireQueue中自旋内部把自己设置为头结点,从而导致head引用(注意,head引用是各个线程共用的)发生变化,这样一来,这个释放的线程节点就能退出循环,代表资源释放完毕。

private void doReleaseShared() {    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                unparkSuccessor(h);//唤醒后继            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;        }        if (h == head)// head发生变化,当唤醒成功后,老二结点会自动替换head为自己,所以这里就不再相等,释放成功            break;    }}

  head发生变化,当唤醒成功后,老二结点会自动替换head为自己,所以这里就不再相等,释放成功,请参看appendQueue代码注释、

 

Thread状态转换  

  另外因为复习该知识点通常要知道一下线程的状态和之间的关系,所以再贴一张图,由此特别说明一下,unpark方法是不加锁进入waiting的唯一方法。

 

转载于:https://www.cnblogs.com/iCanhua/p/8965604.html

你可能感兴趣的文章
[LevelDB] LevelDB理论基础
查看>>
【codecombat】 试玩全攻略 第一关kithguard地牢
查看>>
【DP】 POJ 1191 棋盘分割 记忆化搜索
查看>>
自动化测试 Appium之Python运行环境搭建 Part2
查看>>
说说DBA职责和目标
查看>>
从头认识Spring-2.4 基于java的标准注解装配-@Inject-限定器@Named
查看>>
sql server 实现多表连接查询
查看>>
Python标准库:内置函数getattr(object, name[, default])
查看>>
转:android 自定义RadioButton样式
查看>>
HTTP请求过程
查看>>
织梦多域名解析到同一个空间导致打开链接不一致怎么办?
查看>>
Xcode10 library not found for -lstdc++ 找不到问题
查看>>
Mysql 8.0.13如何重置密码
查看>>
发布功能完成
查看>>
excel 合并单元格
查看>>
iOS设计模式简介
查看>>
c# 扩展方法 奇思妙用 高级篇 九:OrderBy(string propertyName, bool desc)
查看>>
C语言中的地址传递(传指针,传递给形参的指针仍然是实参指针的一份拷贝)
查看>>
redis缓存数据库及Python操作redis
查看>>
opencms忘记Admin用户登录密码解决方案
查看>>