菠菜网

济宁市人事信息网:Java读源码之ReentrantLock

时间:6个月前   阅读:57

前言

ReentrantLock 可重入锁,应该是除了 synchronized 关键字外用『的』最多『的』线程同步手段了,《虽然》JVM维护者疯狂优化 synchronized 使其已经拥有了很好『的』性能。但 ReentrantLock 仍有其存在价值,例如可以感知线程中止,{< 公正[>锁模式},可以指定超时时间『的』抢锁等更细粒度『的』控制都是现在『的』 synchronized 做不到『的』。

(若是不是很领会) Java 中线程『的』一些基本概念,【可以看之前这篇】:

Java读源码之Thread

案例

用一个最简朴『的』案例引出我们『的』主角

public class ReentrantLockDemo {

    // 默认是非< 公正[>锁和 synchronized 一样
    private static ReentrantLock reentrantLock = new ReentrantLock();

    public void printThreadInfo(int num) {
        reentrantLock.lock();
        try {
            System.out.println(num + " : " + Thread.currentThread().getName());
            System.out.println(num + " : " + Thread.currentThread().toString());
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        IntStream.rangeClosed(0, 3)
                .forEach(num -> executorService
                        .execute(() -> new ReentrantLockDemo().printThreadInfo(num))
                );
    }

    /**
     * (输出):
     * 0 : pool-1-thread-1
     * 0 : Thread[pool-1-thread-1,5,main]
     * 3 : pool-1-thread-4
     * 3 : Thread[pool-1-thread-4,5,main]
     * 1 : pool-1-thread-2
     * 1 : Thread[pool-1-thread-2,5,main]
     * 2 : pool-1-thread-3
     * 2 : Thread[pool-1-thread-3,5,main]
     */

“可以看到使用起来也很简”朴,而且达到了同步『的』效果。空话不多说一起来瞅一瞅 lock() 和 unlock() 两个同步方式是怎么实现『的』。

源码剖析

< 公正[>锁与非< 公正[>锁

< 公正[>锁顾名思义。就是每个线程排队抢占锁资源。而非< 公正[>锁线程什么时刻能执行更多『的』看缘分,例如一个线程需要执行临界区代码,不管之前有若干线程在等,{直接去抢锁},说白了就是插队。对于 ReentrantLock ‘【『的』实现】’, 从组织器[看出,当我们传入 true 代表选择了{< 公正[>锁模式}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

为什么先看< 公正[>锁实现,而不是默认『的』非< 公正[>锁,由于 synchronized 就是非< 公正[>锁,1.7最先 synchronized ‘【『的』实现】’改变了,{而且基本借鉴了} ReentrantLock ‘【『的』实现】’,{加入了自旋},偏向锁削减系统挪用,以是若是需要非< 公正[>锁且不需要稀奇精致『的』控制,<完全没有必要由于性能选择> ReentrantLock 了。

AQS 结构

从案例中『的』 lock 方式进入

  • ReentrantLock.FairSync#lock
final void lock() {
    // 『要一把锁』,【向谁要锁】?
    acquire(1);
}

在继续深入之前让我们先熟悉一下 AbstractQueuedSynchronizer(AKA :AQS) 『的』结构

  • 首先继续了 AbstractOwnableSynchronizer ,主要属性:
// 保留当前持有锁『的』线程
private transient Thread exclusiveOwnerThread;
  • AQS 自身主要属性:
// 壅闭行列『的』头
private transient volatile Node head;

// 壅闭行列『的』尾
private transient volatile Node tail;

// 同步器『的』状态
private volatile int state;

从 head 和 tail 可以猜想到,AQS 应该是用一个链表作为守候行列,给守候『的』线程排队, status {字段默认是}0,一旦锁被某个线程占有就 +1,那为啥要用int呢? 若是当前持有锁『的』这个线程(exclusiveOwnerThread)还要再来把锁,那状态还可以继续 +1,《也就》实现了可重入。

  • 上面『的』 Node 节点长啥样呢,不要被注释中 CLH 锁高峻上『的』名称吓到,实在就是双向链表,主要属性:
// 标识次节点是共享模式
static final Node SHARED = new Node();
// 标识次节点是独占模式
static final Node EXCLUSIVE = null;
// 节点里装着排队『的』线程
volatile Thread thread;
// 节点里装『的』线程放弃了,不抢锁了,可能超时了,可能中止了
static final int CANCELLED =  1;
// 【下一个节点里『的』线程守候被】通知出队
static final int SIGNAL    = -1;
// 【节点里装『的』】线程在守候执行条件,连系 Condition 使用
static final int CONDITION = -2;
// 节点状态需要被传播到下一个节点,主要用在共享模式
static final int PROPAGATE = -3;
//  标[识节点『的』守候状态,初始0,取值是上面『的』 -3 ~ 1
volatile int waitStatus;
// 前一个节点
volatile Node prev;
// 后一个节点
volatile Node next;
// <指向下一个守候条件> Condition
Node nextWaiter;

去掉一些通俗情形不会涉及『的』属性,若是有四个线程竞争,「结构如下图所示」:

【可以看到就是一个尺度『的』头节点为空『的』双链】表,‘为什么头节点是空’?

< 公正[>锁加锁

  • AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    // 若是实验拿锁没乐成,那就进守候行列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // (检测到线程被中止)了,由于重置了中止信号但没做处置,再设置下中止位,让用户去处置,中止尺度操作
        selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 取AQS『的』 state 值
    int c = getState();
    // 当前没有线程持有锁
    if (c == 0) {
        // 若是没有其他线程在排队(< 公正[>)
        if (!hasQueuedPredecessors() &&
            // {这里可能存在竞争} CAS ‘试着去抢一次锁’
            compareAndSetState(0, acquires)) {
            // 【抢到锁了】,把锁持有者改成自己,其他线程往后稍稍
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 锁已经被持有了,但若是锁主人就是自己,『那欢迎光临』(可重入)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 由于其他线程进不来,这里不存在竞争,直接改锁状态
        setState(nextc);
        return true;
    }
    return false;
}
  • AbstractQueuedSynchronizer#hasQueuedPredecessors
// (返)回 false 代表不需要排队,true 代表要排队
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // h == t 头即是尾只可能是刚初始『的』状态或者已经没有节点守候了
    // h.next == null ? 下面先容进队『的』过程中,<若是其他线程与此同时> tryAcquire 乐成了,会把之前『的』head.next‘置为空’,说明被捷足先登了,差一点惋惜
    // 若是到最后一个判断了,也就是行列中至少有一个守候节点,直接看第一个守候节点是不是自己,若是不是自己就乖乖排队去
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • AbstractQueuedSynchronizer#addWaiter

tryAcquire「若是没有拿到锁」,就需要进守候行列了,酿成一个 Node 实例

// 这里 mode 为独占模式
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //  若是尾节[点不为空,说明守候行列已经初始化过
    if (pred != null) {
        node.prev = pred;
        // 「实验把自己放」到队尾
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 〖进来这里说明〗,守候行列没有被初始化过,『或者实验失败了』
    enq(node);
    return node;
}
  • AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 若是尾节点是空,说明行列没有初始化
        if (t == null) {
            // 初始化一个空节点(延迟加载),head ,tail{都指向它}
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        // {一直实验把自己塞到队尾}(自旋)
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • AbstractQueuedSynchronizer#acquireQueued

addWaiter方式已经把守候线程包装成节点放到守候行列了,为啥要(返)回中止标识呢?主要是为了给一些需要处置中止『的』方式复用,例如 ReentrantLock#lockInterruptibly,以及带超时『的』锁

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 『这边逻辑最先绕起来了』
        for (;;) {
            // 拿前一个节点
            final Node p = node.predecessor();
            // 前一个节点是head,<说明自己排在第一个>
            if (p == head && 
                // 在『让出』cpu“前再试一次”,此时可能锁持有者已经让位了
                tryAcquire(arg)) {
                // 【抢到锁了】
                setHead(node);
                // 把之前没用『的』头节点释放
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 两次实验都失败了,只能准备被【挂起】,『让出』cpu了(调了内核,重量级)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 通俗『的』锁不处置中止异常,不会进这个方式
        if (failed)
            cancelAcquire(node);
    }
}

private void setHead(Node node) {
    // “把头节点设为自己”
    head = node;
    // 由于已经【抢到锁了】,《不需要纪录这个》线程在守候了,保持了头节点中线程永远为 null
    node.thread = null;
    node.prev = null;
}
  • AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 已经告诉前一个节点自己需要被通知了
        return true;
    if (ws > 0) {
        // 只有 CANCELLED 〖这个状态大于〗0,若是前面『的』节点不排队了,就一直找到一个没 CANCELLED 『的』
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } 
    // 进到这里,只剩下PROPAGATE(共享锁时刻才有) CONDITION(‘本文不涉’及) 和 未赋值状态也就是0, 
    else {
        // 这里把 默认状态0 置为 -1,也就代表着后面有线程在等着被叫醒了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // (返)回false,就暂时不会让线程【挂起】,继续自旋,直到(返)回true
    return false;
}
  • AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
    // 【挂起】,<尺度用法>this充当blocker 
    LockSupport.park(this);
    // 一旦恢复,(返)回线程在【挂起】阶段是否被中止,{此方式会重置中止位}
    return Thread.interrupted();
}

到这里加锁流程就先容差不多了,用一个最简朴流程『的』图来〖总结〗一下:

< 公正[>锁解锁

  • AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    // 实验释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 若是守候行列已经被初始化过,『而且后面有节点守候操作』
        if (h != null && h.waitStatus != 0)
            // 恢复【挂起】『的』线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • ReentrantLock.FairSync#tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 能执行释放锁『的』肯定是锁『的』持有者,除非虚拟机魔怔了
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 思量可重入
    if (c == 0) {
        free = true;
        // (锁现在没有持有者了)
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
  • ReentrantLock.FairSync#unparkSuccessor
// node 是头节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 若是状态不是 CANCELED,就把状态置为初始状态 
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // s == null 这个条件建立主要是在共享模式下自旋释放。
    if (s == null || s.waitStatus > 0) {
        // 把 CANCELED 状态『的』节点‘置为空’
        s = null;
        // 由于 head 这条路已经断了,从尾巴最先找到第一个排队『的』节点,然后把行列接上
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 把第一个排队『的』节点中『的』线程叫醒,
        LockSupport.unpark(s.thread);
}

线程从加锁代码里先容『的』 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方式中醒来,继续自旋拿锁。若是此时后面还有人排队就一定能拿到锁了。如图所示:

非< 公正[>锁加锁

  • ReentrantLock.NonfairSync#lock
final void lock() {
    // 不管三七二十一,<直接抢锁>,“若是运气好”,锁正好被释放了,就不排队了
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 和上面先容『的』< 公正[>锁一样,只是 tryAcquire 实现不一样
        acquire(1);
}
  • ReentrantLock.Sync#nonfairTryAcquire

上面< 公正[>锁我们已经知道,线程真正【挂起】前会实验两次,由于不思量别人有没有入队,实现异常简朴

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 若是没有线程持有锁,<直接抢锁>
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是是重入,‘状态累加’
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非< 公正[>锁解锁

由于都是独占锁模式,解锁和< 公正[>锁逻辑一样。

〖总结〗

至此,总算看完了 ReentrantLock 通例『的』加锁解锁源码,好好体会下 AQS 『的』结构,照样能看懂『的』,〖且很有收获〗,总之 Doug Lea 大神牛B。

本文照样挖了许多坑『的』:

  • 带超时『的』锁是若何实现『的』?
  • 检测中止『的』锁是若何实现『的』?
  • 种种 Condition 是若何实现『的』?
  • 锁『的』共享模式是若何实现『的』?

以后有时间再一探事实吧。

,

阳光在线官网

阳光在线官网(原诚信在线官网)现已开放阳光在线手机版、阳光在线电脑客户端下载。阳光在线娱乐戏公平、〖公开〗、< 公正[>,用实力赢取信誉。

上一篇:镇江租房网:林志玲现身树模防疫7步骤 细节都不放过!"/>

下一篇:哈尔滨新闻夜航专题:瓦妮莎又更新INS,替科比谢谢球迷!丈夫遗作脱销榜第1,太感动

网友评论