专业的编程技术博客社区

网站首页 > 博客文章 正文

线程并发——AQS抽象队列同步器(语音朗读的线程并发)

baijin 2024-10-16 07:36:11 博客文章 12 ℃ 0 评论

作者:叩丁狼教育,海燕老师

AQS队列同步器英文全称AbstractQueuedSynchronizer,这是一个抽象类,为什么我们今天需要学习这个抽象类呢?这个抽象类它的神奇之处到底是什么呢?我们一起来掀开它的神奇面纱吧!

什么是AQS(AbstractQueuedSynchronizer)?


AQS中文翻译为同步器,Lock接口的实现类基本都是通过包含AQS子类对象来完成线程访问控制的。比如说Lock中的获取锁和释放锁操作。说白了就是一把抽象的锁。


Lock通过调用AQS子类的lock方法实现获取锁,unlock方法实现释放锁。


1.1 AQS源码分析

1.1.1类字段分析:

 private transient volatile Node head;
 private transient volatile Node tail;
 private volatile int state;
 protected final int getState() {
 return state;
 }
 protected final void setState(int newState) {
 state = newState;
 }
 protected final boolean compareAndSetState(int expect, int update) {
 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }

1)我们可以看到字段中其实最主要的就是一个volatile 修饰的state(状态),state用于标记锁状态。

2)compareAndSetState方法是使用CAS算法修改state保证state修改时的数据安全。

3)head和tail表示队列的头和尾,也就是锁该类将线程和线程状态封装成一个节点Node存入到同步队列中,结构如下:


AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

1.1.2方法分析:


AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

1.1.3锁类型分析:


1)公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2)非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。

注意:ReentrantLock初始化默认为非公平锁,非公平锁减少了线程的挂起和恢复运行效率高于公平锁。公平锁可以解决饥饿发生。

1.1.4获取锁操作分析:

final void lock() {//获取锁

if (compareAndSetState(0, 1))//获取锁成功

setExclusiveOwnerThread(Thread.currentThread());//独占锁

else

acquire(1);//其他线程尝试获取锁

}

acquire****方法:



1)tryAcquire()尝试直接去获取资源,如果成功则直接返回;

2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

3)acquireQueued()****方法;重新竞争同步锁

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {//自旋
 final Node p = node.predecessor();//获取当前节点的上一个节点
 if (p == head && tryAcquire(arg)) {//上一节点为头节点和获取尝试获取锁成功
 setHead(node);//上一个节点已经获取到锁,将当前阶段设置为头节点,下一次就由自己获取锁
 p.next = null; // 头节点获取到锁离开队列
 failed = false;
 return interrupted;//返回中断状态
 }
 //不是头节点或者获取不到锁
 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
}

shouldParkAfterFailedAcquire****方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)//如果上一节点被唤醒,当前节点就进入等待
 return true;
 if (ws > 0) {//如果上一节点线程被取消了
 do {
 node.prev = pred = pred.prev;//当前阶段尝试跳过上一节点,插个队
 } while (pred.waitStatus > 0);
 pred.next = node;
 } else {
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//尝试将上一个节点设置为唤醒状态
 }
 return false;
}

parkAndCheckInterrupt****方法:

private final boolean parkAndCheckInterrupt() {
 LockSupport.park(this);//线程进入等待状态
 return Thread.interrupted();//线程唤醒后,检查自己是否真的处于中断状态,注意:interrupted会清除中断状态
}

1)selfInterrupt(),自我中断,当前线程调用了interrupt方法

流程图如下:


16.1.5 释放锁操作分析

release****方法:

public final boolean release(int arg) {
 if (tryRelease(arg)) {//尝试释放锁
 Node h = head;
 if (h != null && h.waitStatus != 0)//头节点不为null并且等待状态不是为没有状态
 unparkSuccessor(h);//唤醒头节点的后面一个节点
 return true;
 }
 return false;
}

tryRelease****方法:

protected final boolean tryRelease(int releases) {
 int c = getState() - releases;//重入锁state大于1,需要调用两次unlock方法,释放两次
 if (Thread.currentThread() != getExclusiveOwnerThread())//如果独占锁不是当前需要释放的线程,抛出异常
 throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
 free = true;
 setExclusiveOwnerThread(null);//独占锁设置为null,释放锁
 }
 setState(c);
 return free;
}

unparkSuccessor****方法:

private void unparkSuccessor(Node node) {
 int ws = node.waitStatus;
 if (ws < 0)//等待状态,不是被取消的状态
 compareAndSetWaitStatus(node, ws, 0);//CAS修改回到最初的状态,没有等待的状态下
 Node s = node.next;
 if (s == null || s.waitStatus > 0) {//各种等待情况下
 s = null;
 //从队尾开始找不为空和不是当前节点的节点,直到找到当前节点的后面一个节点位置
 for (Node t = tail; t != null && t != node; t = t.prev)
 if (t.waitStatus <= 0)//如果t为等待状态下
 s = t;//s设置为t,拿到t这个节点
 }
 if (s != null)//唤醒当前节点的下一个节点
 LockSupport.unpark(s.thread);
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表