yehao
发布于 2023-12-01 / 21 阅读
0
0

AQS原理

AQS原理

AQS简介

AbstractQueuedSynchronizer 是一个实现同步功能的基础框架,java juc内部的包的同步工具类都基于它实现。比如ReentrantLock、Semaphore、CountDownLatch等。

AQS伪代码实现原理

// queue 代表阻塞线程
queue = {}
int state = 3
int localState

// 无锁竞争进入临界区(cas操作,信号量state为0时,挂起当前线程)
// LockSupport.park() 挂起线程
// LockSupport.unpark() 执行线程
do {
 localState = state
 if localState == 0 {
	 queue.add(Thread.currentThread())
	 LockSupport.park()
 }
} while( !UnSafe.cas(&state, localState, localState - 1) )


// 临界区程序


// 释放锁
while( !UnSafe.cas(&state, state, state+1) ){ 
}
LockSupport.unpark( queue.remove())

AbstractQueuedSynchronizer 底层实现与上述伪代码类似。

其中阻塞线程队列使用CLH队列。

自定义实现锁

使用aqs自定义实现锁,继承此抽象类,实现tryAcquire、tryRelease方法,利用内部提供的cas操作即可实现自定义锁。(其中ReentrantLock、Semaphore实现思路如此)

Lock的自定义实现

主要实现tryAcquire和tryRelease方法。

	public class Mutex {
        private final Sync sync = new Sync();
        static class Sync extends AbstractQueuedSynchronizer{
            protected boolean tryAcquire(int arg) {
                return compareAndSetState(0, 1);
            }
            protected boolean tryRelease(int arg) {
                return compareAndSetState(1, 0);
            }
        }
        public void lock(){
            sync.acquire(0);
        }
        public void unlock(){
            sync.release(0);
        }
        static int i;
        public static void main(String[] argv) throws InterruptedException {
            var mutex = new Mutex();
            var t1 = new Thread(() -> {
                for(int j = 0; j < 10000; j++) {
                    mutex.lock();
                    i++;
                    mutex.unlock();
                }
            });
            var t2 = new Thread(() -> {
                for(int j = 0; j < 10000; j++) {
                    mutex.lock();
                    i++;
                    mutex.unlock();
                }
            });
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println("i=" + i);
        }
    }

Semaphore的自定义实现

主要实现tryAcquireShared和tryReleaseShared方法。

class SemaphoreExample {
    static class Semaphore extends AbstractQueuedSynchronizer{
        public Semaphore(int permits) {
            setState(permits);
        }
        @Override
        protected int tryAcquireShared(int arg) {
            var available = getState();
            var left = available - 1;
            if(available == 0) {
                return -1;
            }
            if(compareAndSetState(available, left)){
                return left;
            }
            return -1;
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            var available = getState();
            return compareAndSetState(available, available + 1);
        }
    }
    public static int i = 0;
    public static void main(String[] args) throws InterruptedException {
        var semaphore = new Semaphore(3);
        for(int i = 0; i < 1000; i++) {
            new Thread(() -> {
                semaphore.acquireShared(0);
                try {
                    System.out.println("go");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.releaseShared(0);
            }).start();
        }
    }
}

CLH队列

用单向链表实现队列, 尾部插入, 头部删除(基础实现)

cas操作插入和删除。均为O(1)的cas操作,效率较高。

AQS的公平性

就是进入CLH队列等待的线程waitSet。在某一瞬间,进入临界区的线程执行完成等操作或者休眠等操作让出资源,此时新进入的线程可能直接获取到资源,类似synchronized中偏向锁获取。即为非公平锁。非公平锁效率高。如果新进入的线程,发现有waitSet,就直接进入队列,那即为公平锁

ReentrantLock有公平锁和非公平锁的两种实现。

常用线程框架总结

ReentrantLock

与synchronized对比,基于java实现(aqs),可重入(设置owner线程),可响应中断,tryLock(timeout)

CountDownLatch

作用:是一组线程等待其他的线程完成工作以后在执行,加强版join,await用来等待,countDown负责计数器的减一

CyclicBarrier

作用:让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)、CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行。

CountDownLatch 用于一个多个线程等待其他线程的操作完成,而 CyclicBarrier 则用于一组线程相互等待,直到所有线程都到达共同的执行点后再继续执行。CyclicBarrier 是可重用的,而 CountDownLatch 一旦计数器值为零,就不能再次使用。

关键方法:

barrier.await(); // 等待其他线程到达后开始执行

barrier.reset(); // 重置等待的线程

Semaphore

控制同时访问某个特定资源的线程数量,用在流量控制。

Exchange

两个线程间的数据交换(适用于少个线程交互,如果线程数量多,就有点困难了)。

Pharser

是一个基于封装类似CyclicBarrier、CountDownLatch多线程批量操作的一个框架。

arrive 到达

waitAdvance 等待进步

register 注册任务

deregister 注销任务

提供以上几个事件点,与CyclicBarrier、CountDownLatch不同的是,可以动态添加多线程捆绑协作数据合并同步处理。

  • 如下代码使用Pharser实现CyclicBarrier的功能。

public class PhaserTest {
    Phaser phaser = new Phaser();
    ExecutorService executorService = Executors.newCachedThreadPool();
    class Worker implements Runnable{
        @Override
        public void run(){
            phaser.register();
            while(true) {
                try {
                    Thread.sleep(1000);
                    System.out.println("I'm working! @" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public void run() throws InterruptedException {
        phaser.register();
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        while(true) {
            phaser.arriveAndAwaitAdvance();

            System.out.println("Sync...." + phaser.getPhase());
        }
    }
    public static void main(String[] argv) throws InterruptedException {
        var test = new PhaserTest();
        test.run();
    }
}


评论