JUC:AQS原理及常用同步器介绍

JUC:AQS原理及常用同步器介绍

Tans 1,734 2022-04-13

AQS

AQS 的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。

AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出大量应用广泛的同步器,比如我们提到的 ReentrantLockSemaphore,其他的诸如 ReentrantReadWriteLockSynchronousQueueFutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

AQS的思想主要来说,如果一个线程想获得某个共享资源,如果该资源空闲,那么就会通过CAS来对其进行上锁,就是通过CAS将这个共享资源state+1;这时候其他线程也想获得锁,那么就会阻塞进入CLH(Craig, Landin, and Hagersten)队列中进行等待,直到资源被释放,也就是state=0;

在这里插入图片描述

主要源码分析

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues(依赖于一个先进先出的队列). This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state(依赖于一个原子int来表示锁状态). Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these(实现子类必须要实现保护方法比如更改状态,同时也应该定义获得或者释放锁过程的具体操作), the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:(如果需要适用请重写以下几个方法)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

1. 主要方法和变量定义

从类的注释来看,AQS已经为我们两种资源共享方式(try… 、 try…Shared):

  • 独占 (Exclusive):只有一个线程才能获得锁,这种锁的最主要的实现就是Reentrantlock,同时独占锁又可分为
    • 公平锁:按照线程在CLH队列中的顺序,先到者先拿到锁,因此一定是先申请先得到锁,N个线程等待锁,那么必须唤醒N次。不会发生饥饿。适用于线程之间需要保证同步性。
    • 非公平锁:当线程要获取锁的时候,不需要考虑排队等待的情况,存在后申请却先的到锁的情况,N个线程获取锁,那么可能少于N唤醒。效率比较高。适用于高性能,并且线程的前后运行顺序不是那么严格。
  • 共享(Shared):多个线程可以同时获得同一个把锁。主要实现就是CountDownLatchCylicBarrierSemphoreReentrantReadWriteLock
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
     //.......
     //AQS适用status来标识锁的状态,适用CAS对同步状态进行原子性操作实现状态修改
     private volatile int state;  // The synchronization state. volatile保证线程可见
     // 获取同步的状态
	protected final int getState() {
          return state;
    }
	// 设置同步的状态
	protected final void setState(int newState) {
    		state = newState;
	}
	// 通过CAS设置同步的状态
	protected final boolean compareAndSetState(int expect, int update) {
          return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
	}
     //模板方法:独占方式获取资源
     public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
     //模板方法:独占方式获取资源
     public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
     //模板方法:共享方式获取资源
     public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
     //模板方法:共享方式获取资源
     public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}

2. 几个可以重写的方法

AQS采用了模板方法的设计模式,如果想要构建自己的同步器,只需要重写下面几个指定方法(无非就是按照自己的规则来实现根据资源的获得与释放,来指定改变state的策略),在调用模板方法的时候,会调用我们重写的方法。下面我们会简单实现一个自己的同步器。需要注意的是,我们要保证同一个线程获取锁和释放锁之后要保证status最终是零态的。

AQS的其他方法大多是private或者final修饰的,因此避免用户修改出现错误。

boolean tryAcquire() //独占方式获得锁资源,true:获取成功 false:获取失败
boolean tryRelease()//独占资源释放锁
boolean tryAcquireShared()//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
boolean tryReleaseShared())//共享方式。尝试释放资源,成功则返回true,失败则返回false。
boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

3. 实现自己的同步组件

这里,为了简易,我们自己实现一个不可重入的同步器。

package lock;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * @Describe: 一个不可重入独占方式同步组件
 * @Author: tyf
 * @CreateTime: 2022/4/13
 **/
public class MySynchronizedTest extends AbstractQueuedSynchronizer {

     //重写独占方式的获取锁资源
    @Override
    protected boolean tryAcquire(int arg) {
        final Thread current = Thread.currentThread();
        int c = getState();
         //如果锁资源为0,那么CAS进行加锁
        if(c == 0){
            if(compareAndSetState(0, 1)){
                setExclusiveOwnerThread(current);
                return true;
            }
         //如果当前独占线程是自己,那么不可获得锁
        }else if(current == getExclusiveOwnerThread()){
            return false;
        }
        return false;
    }

     //重写独占方式的得到锁资源
    @Override
    protected boolean tryRelease(int arg) {
        int c = getState() - arg;
        //检查此线程是否为当前独占线程。
        if(Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if(c == 0){
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    public void lock(){
        acquire(1); //调用AQS的acquire方法来进行获取锁
    }
    public void unlock(){
        release(1);//调用AQS的acquire方法来进行获取锁
    }

    //test
    public static void main(String[] args) {
         //模拟锁重入,会发现无法加锁。
        MySynchronizedTest lock = new MySynchronizedTest();
        lock.lock();
        System.out.println("第一次加锁");
        lock.lock();
        System.out.println("第二次加锁");
        lock.unlock();
        lock.unlock();
    }
}

结果:

image-20220413160319230

几个实现共享锁的组件

1. CountDownLatch

主要是实现了让一个线程等待其他n个线程执行完毕。主要可以用作

  • 某一个线程运行前等待其他n个线程执行完毕
  • 实现多个线程最大并行度,也就是同时让多个线程同时运作。
package aqs;

import java.util.concurrent.CountDownLatch;

/**
 * @Describe: CountDownLatch场景
 * @Author: tyf
 * @CreateTime: 2022/4/10
 **/
public class CountDownLatchTest {
    public static void main(String... args) throws InterruptedException {
//        test1();
        test2();
    }
    //场景一:模拟发号,一起跑!
    public static void test1() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        System.out.println("大家都准备好了嘛?");
        for(int i = 0; i < 5; i++){
            final int idx = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() +  "号队员准备好了!");
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() +  "开始奋力跑");
            }).start();
        }
        Thread.sleep(2000);
        System.out.println("开始......3 2 1 跑!");
        countDownLatch.countDown();
    }

    //场景二:等待一个线程跑完然后
    public static void test2() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++){
            final int idx = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() +  "整理好" + idx + "号文件了!");
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("他们运行完了!我要开始汇总了");
    }
}

大家都准备好了嘛?
Thread-0号队员准备好了!
Thread-2号队员准备好了!
Thread-3号队员准备好了!
Thread-1号队员准备好了!
Thread-4号队员准备好了!
开始......3 2 1 跑!
Thread-2开始奋力跑
Thread-0开始奋力跑
Thread-4开始奋力跑
Thread-1开始奋力跑
Thread-3开始奋力跑

进程已结束,退出代码0

2. CylicBarrier(循环栅栏)

基于ReentrantLock实现发,主要是让多个线程互相等待,然后当所有线程全部到达同一个安全点(屏障)后,大家再一起运行,类比GC过程中的安全屏障。听着其和

场景:当需要计算1月份每天的流水总额,由于每天的流水较多,那么我们同时开启多个线程取汇总每天的线程,最会汇总总流水.

package aqs;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Describe: 类描述
 * @Author: tyf
 * @CreateTime: 2022/4/10
 **/
public class CyclicBarrierTest {

    public static void main(String[] args) {
            CyclicBarrier barrier = new CyclicBarrier(7, () -> {
                System.out.println("开始汇总" + "流水.....");
            });
            for(int i = 0; i < 7; i++){
                final int idx = i;
                new Thread(()->{
                    System.out.println("第" + idx + "部分当日流水已经汇总完毕");
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    }
}

第1部分当日流水已经汇总完毕
第4部分当日流水已经汇总完毕
第2部分当日流水已经汇总完毕
第3部分当日流水已经汇总完毕
第0部分当日流水已经汇总完毕
第6部分当日流水已经汇总完毕
第5部分当日流水已经汇总完毕
开始汇总流水.....

进程已结束,退出代码0

3. Semaphore(信号量)

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

package aqs;

import java.util.concurrent.Semaphore;

/**
 * @Describe: 类描述
 * @Author: tyf
 * @CreateTime: 2022/4/10
 **/
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            final int idx= i ;
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "占了" + idx +  "号茅坑....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "开始升天....");
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + "解放" + idx +  "号茅坑....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.release();
            }).start();
        }
    }
}

参考资料

  1. Java“锁”事-美团技术团队
  2. JUC:AQS介绍、AQS原理、AQS底层用模板方法模式、定义两种资源共享方式