AQS原理
AQS简介
AbstractQueuedSynchronizer 是一个实现同步功能的基础框架,java juc内部的包的同步工具类都基于它实现。比如ReentrantLock、Semaphore、CountDownLatch等。
AQS伪代码实现原理
// queue 代表阻塞线程
queue = {}
int state = 3
int localState
// 无锁竞争进入临界区(cas操作,信号量state为0时,挂起当前线程)
// LockSupport.park() 挂起线程
// LockSupport.unpark() 执行线程
do {
localState = state
if localState == 0 {
queue.add(Thread.currentThread())
LockSupport.park()
}
} while( !UnSafe.cas(&state, localState, localState - 1) )
// 临界区程序
// 释放锁
while( !UnSafe.cas(&state, state, state+1) ){
}
LockSupport.unpark( queue.remove())
AbstractQueuedSynchronizer 底层实现与上述伪代码类似。
其中阻塞线程队列使用CLH队列。
自定义实现锁
使用aqs自定义实现锁,继承此抽象类,实现tryAcquire、tryRelease方法,利用内部提供的cas操作即可实现自定义锁。(其中ReentrantLock、Semaphore实现思路如此)
Lock的自定义实现
主要实现tryAcquire和tryRelease方法。
public class Mutex {
private final Sync sync = new Sync();
static class Sync extends AbstractQueuedSynchronizer{
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
protected boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}
}
public void lock(){
sync.acquire(0);
}
public void unlock(){
sync.release(0);
}
static int i;
public static void main(String[] argv) throws InterruptedException {
var mutex = new Mutex();
var t1 = new Thread(() -> {
for(int j = 0; j < 10000; j++) {
mutex.lock();
i++;
mutex.unlock();
}
});
var t2 = new Thread(() -> {
for(int j = 0; j < 10000; j++) {
mutex.lock();
i++;
mutex.unlock();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("i=" + i);
}
}
Semaphore的自定义实现
主要实现tryAcquireShared和tryReleaseShared方法。
class SemaphoreExample {
static class Semaphore extends AbstractQueuedSynchronizer{
public Semaphore(int permits) {
setState(permits);
}
@Override
protected int tryAcquireShared(int arg) {
var available = getState();
var left = available - 1;
if(available == 0) {
return -1;
}
if(compareAndSetState(available, left)){
return left;
}
return -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
var available = getState();
return compareAndSetState(available, available + 1);
}
}
public static int i = 0;
public static void main(String[] args) throws InterruptedException {
var semaphore = new Semaphore(3);
for(int i = 0; i < 1000; i++) {
new Thread(() -> {
semaphore.acquireShared(0);
try {
System.out.println("go");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.releaseShared(0);
}).start();
}
}
}
CLH队列
用单向链表实现队列, 尾部插入, 头部删除(基础实现)
cas操作插入和删除。均为O(1)的cas操作,效率较高。
AQS的公平性
就是进入CLH队列等待的线程waitSet。在某一瞬间,进入临界区的线程执行完成等操作或者休眠等操作让出资源,此时新进入的线程可能直接获取到资源,类似synchronized中偏向锁获取。即为非公平锁。非公平锁效率高。如果新进入的线程,发现有waitSet,就直接进入队列,那即为公平锁。
ReentrantLock有公平锁和非公平锁的两种实现。
常用线程框架总结
ReentrantLock
与synchronized对比,基于java实现(aqs),可重入(设置owner线程),可响应中断,tryLock(timeout)
CountDownLatch
作用:是一组线程等待其他的线程完成工作以后在执行,加强版join,await用来等待,countDown负责计数器的减一
CyclicBarrier
作用:让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)、CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行。
CountDownLatch
用于一个或多个线程等待其他线程的操作完成,而CyclicBarrier
则用于一组线程相互等待,直到所有线程都到达共同的执行点后再继续执行。CyclicBarrier
是可重用的,而CountDownLatch
一旦计数器值为零,就不能再次使用。
关键方法:
barrier.await(); // 等待其他线程到达后开始执行
barrier.reset(); // 重置等待的线程
Semaphore
控制同时访问某个特定资源的线程数量,用在流量控制。
Exchange
两个线程间的数据交换(适用于少个线程交互,如果线程数量多,就有点困难了)。
Pharser
是一个基于封装类似CyclicBarrier、CountDownLatch多线程批量操作的一个框架。
arrive 到达
waitAdvance 等待进步
register 注册任务
deregister 注销任务
提供以上几个事件点,与CyclicBarrier、CountDownLatch不同的是,可以动态添加多线程捆绑协作数据合并同步处理。
如下代码使用Pharser实现CyclicBarrier的功能。
public class PhaserTest {
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
class Worker implements Runnable{
@Override
public void run(){
phaser.register();
while(true) {
try {
Thread.sleep(1000);
System.out.println("I'm working! @" + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run() throws InterruptedException {
phaser.register();
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
while(true) {
phaser.arriveAndAwaitAdvance();
System.out.println("Sync...." + phaser.getPhase());
}
}
public static void main(String[] argv) throws InterruptedException {
var test = new PhaserTest();
test.run();
}
}