yehao
发布于 2023-12-02 / 46 阅读
0
0

线程安全工具

线程安全工具

线程安全队列

队列种类介绍

队列类型

概述

应用体现

ArrayListBlockingQueue

默认有界队列

Web 服务器实现(下标索引)

LinkedBlockingQueue

默认无界队列

Web 服务器实现(无下标)

LinkedBlockingDueue

无界双向队列

撤销、重做实现

PriorityQueue

带优先级的队列

操作系统调度(中断、关系优先执行)​​

DelayQueue

延时队列

流量控制等

LinkedTransferQueue

无容量队列(tryTransfer)

线程池实现

SynchronousQueue

无容量队列(offer)

线程池实现

补充说明

LinkedTransferQueue --> tryTransfer 反向压力(添加任务,动态添加新的消费者)===> 那用offer判断可以吗? 结论 : tryTransfer 需要匹配到消费者,否则返回false, offer只要队列可存储,返回true

SynchronousQueue--> 基本同LinkedTransferQueue , 可以配置fair(公平非公平,但公平仅仅保证消费者足够的情况下) , api上直接使用offer,即等同于tryTrasfe)

总结

SynchronousQueue和LinkedTransferQueue在实现匹配策略时,队列容量都默认为0,当匹配当消费者(有消费者take阻塞住)时,才能提交任务成功。

public class Scheduler {
    LinkedTransferQueue<Runnable> tasks = new LinkedTransferQueue<>();
    static AtomicInteger idCount = new AtomicInteger(0);
    int maxWorkers;
    public Scheduler(int maxWorkers) {
        this.maxWorkers = maxWorkers;
        for(int i = 0; i < maxWorkers; i++) {
            new Thread(new Worker()).start();
        }
    }
    class Worker implements Runnable {
        int id;
        public Worker(){
            this.id = idCount.getAndIncrement();
        }
        @Override
        public void run() {
            while(true) {
                Runnable runnable = null;
                try {
					// 执行take,代表消费者就绪
                    runnable = tasks.take();
                    runnable.run();
                    System.out.format("work done by id=%d\n", id);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public void submit(Runnable r) throws InterruptedException {
		// 判断消费者是否就绪
		// 若就绪tryTransfer返回true,将r加入到队列
		// 若不就绪,进入循环体,创建新消费者来动态扩充消费核心
        while(!tasks.tryTransfer(r)) {
            Thread.onSpinWait();
            new Thread(new Worker()).start();
        }
    }
    public static void main(String[] argv) throws InterruptedException {
        var scheduler = new Scheduler(10);
        for(int i = 0; i < 1000; i++) {
            var localI = i;
            Thread.sleep(1);
            scheduler.submit(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

线程池框架

线程的几种状态

image-trlz.png

线程池框架

线程池框架

描述

特点

FixedThreadPool

固定大小的线程池,执行任务的线程数量固定

适用于需要限制线程数量的场景,避免线程数量无限增长

CachedThreadPool

可根据任务动态调整线程数量,适用于短时任务

适用于短时任务,线程数随着任务数量的增加而自动调整

SingleThreadPool

只有一个线程的线程池

适用于需要顺序执行任务的场景

ScheduledThreadPool

能执行定时任务和周期性任务

适用于需要定时执行或周期性执行任务的场景

WorkStealingPool

JDK 1.8 引入的线程池,适用于并行任务

适用于处理大量独立、可并行计算的任务

底层实现均有ThreadPoolExecutor创建,其构造方法参数

参数名

说明

corePoolSize

线程池中保持存活的核心线程数量

maximumPoolSize

线程池中允许的最大线程数量(队列满才会扩

keepAliveTime

空闲线程超过核心线程数时的存活时间

unit

空闲线程存活时间的单位

workQueue

用于保存等待执行的任务的阻塞队列

threadFactory

用于创建新线程的工厂

rejectedExecutionHandler

用于处理任务执行超出边界并被拒绝执行的情况

threadFactory主要用于定义相关线程参数,线程名、守护线程等传入

// 自定义 ThreadFactory
ThreadFactory threadFactory = new ThreadFactory() {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix = "MyThread-";

    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
        t.setPriority(Thread.NORM_PRIORITY);
        t.setDaemon(true);
        return t;
    }
};

rejectedExecutionHandler内置了几种拒绝策略,默认策略时拒绝。

  1. AbortPolicy(默认策略):默认的拒绝策略。当任务添加到线程池中被拒绝时,会抛出 RejectedExecutionException 异常给调用者。

  2. CallerRunsPolicy:在任务被拒绝时,会使用调用线程来执行该任务。这样可以降低新任务的提交速度

  3. DiscardPolicy:当任务被拒绝时,会默默地丢弃该任务,不做任何处理。

  4. DiscardOldestPolicy:当任务被拒绝时,会丢弃队列中最旧的未处理任务,并尝试为当前任务腾出空间。

扩展

并发处理出现的死锁问题该如何处理?

  • 死锁(互相抢占资源,陷入等待)

  • 活锁(规律性执行、规律性结束,无法进步)

死锁案例

// forks为共享资源
// A的Left资源就是Z的right资源
while (true) {
    try {
        this.thinking();
        while (!this.takeLeft(forks)) {
            Thread.onSpinWait();
        }
		// 在此处产生上线文线程切换
		// 会导致死锁(拿到Left,Right被其他线程拿到)
        while (!this.takeRight(forks)) {
            Thread.onSpinWait();
        }
        this.eating();
        this.putLeft(forks);
        this.putRight(forks);
        this.finished();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

使用上述方法在多线程竞争资源时,会产生死锁。

疑问: 此处不用while循环,拿不到left资源或right资源直接跳过,岂不就行了?

while (true) {
    try {
        this.thinking();
        if(!this.takeLeft(forks) || !this.takeRight(forks)) {
            this.putLeft(forks);
            this.putRight(forks);
            continue;
        }
        this.eating();
        this.putLeft(forks);
        this.putRight(forks);
        this.finished();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

解决: 测试发现无问题(此处模拟阻塞产生死锁,探究死锁解决方案)。

方案1

此种方案在拿不到right资源超过一定限制,就放弃争抢。但是会产生活锁(释放left资源的一瞬间又被自己抢到,周而复始)。

int c = 0;
while (!this.takeRight(forks)) {
    c ++;
    if(c > 100) {
        this.putLeft(forks);
        continue;
    }
    Thread.onSpinWait();
}

方案2

对于临界区加锁,来防止资源被一个拿去一个,造成线程间互相等待产生死锁。

在获取left资源时,获取不到,就await等待其他线程释放资源后唤醒线程;

在已获取left之后时,认为自己拥有较高执行权限,去请求其他线程释放right资源;

此种方案过于臃肿,不推荐使用。

while(true) {
    try {
        this.thinking();
        lock.lockInterruptibly();
        while(!this.takeLeft(forks)) {
            waitForks[this.left()].await();
        }
        while(!this.takeRight(forks)) {
            var rid = this.right();
            var rightPhi = phis[forks[rid] -1];
            if(rightPhi.getState() != "Eating" && dirty[rid] == true) {
                forks[rid] = this.id;
                dirty[rid] = false;
                break;
            }
            waitForks[this.right()].await();
        }
        lock.unlock();
        this.eating();
        lock.lockInterruptibly();
        this.putLeft(forks);
        waitForks[this.left()].signalAll();
        this.putRight(forks);
        waitForks[this.right()].signalAll();
        dirty[this.left()] = true;
        dirty[this.right()] = true;
        lock.unlock();
        this.finished();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

方案3

阻塞队列的方式实现。

感觉此种方式与原始方案不加锁,拿不到资源直接continue有点像。

  • 开一个线程竞争资源,由于在一个线程执行竞争,也不用加锁。

  • 竞争成功后,就把当前元素加入执行队列workingQueue等待执行。

  • 竞争失败,直接重回竞争队列(这不就是初始方案放弃了阻塞么,提取了一个生产者消费者模式去处理)。

class ContentionManager implements Runnable {
    @Override
    public void run() {
        while(true) {
            try {
                var phi = managerQueue.take();
                if(phi.checkLeft(forks) && phi.checkRight(forks)) {
                    phi.takeLeft(forks);
                    phi.takeRight(forks);
                    workingQueue.offer(phi);
                } else {
                    managerQueue.offer(phi);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

工作队列代码如下

class Worker implements Runnable {
    @Override
    public void run() {
        while(true) {
            Philosopher phi = null;
            try {
                phi = workingQueue.take();
                if(phi.getState() == "Hungry") {
                    phi.eating();
                    phi.putLeft(forks);
                    phi.putRight(forks);
                    phi.finished();
                    workingQueue.offer(phi);
                } else {
                    phi.thinking();
                    managerQueue.offer(phi);
                }
            } catch (InterruptedException e) {

            }
        }
    }
}

方案4(方案3补充)

构造延时队列,模拟异常情况。

延时队列代码如下

class DelayInterruptingThread implements Delayed{
    long time;
    Thread current;
    public DelayInterruptingThread(Thread t, long delay) {
        this.current = t;
        this.time = System.currentTimeMillis() + delay;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        return (int) (time - ((DelayInterruptingThread)o).time);
    }
    public void rollback() {
        if(this.current != null) {
            this.current.interrupt();
        }
    }
    public void commit() {
        this.current = null;
    }
}

工作队列添加超时打断操作

while(true) {
    Philosopher phi = null;
    try {
        phi = workingQueue.take();
        if(phi.getState() == "Hungry") {
			// 将元素加入延时队列,1s中 eating未执行完成,将会被rollback
            var delayItem = new DelayInterruptingThread(Thread.currentThread(),
                    1000);
            delayQueue.offer(delayItem);
            phi.eating();
            delayItem.commit();
            phi.putLeft(forks);
            phi.putRight(forks);
            phi.finished();
            workingQueue.offer(phi);
        } else {
            phi.thinking();
            managerQueue.offer(phi);
        }
    } catch (InterruptedException e) {
        if(phi != null) {
			// rollback打断后,释放资源,回滚状态
            phi.putLeft(forks);
            phi.putRight(forks);
            if(phi.getState() == "Eating") {
                phi.setState("Hungry");
            }
            managerQueue.offer(phi);
        }
    }
}

打断线程代码如下

class InterruptingWorker implements Runnable {
    @Override
    public void run() {
        while(true) {
            try {
				// 从延时队列里循环获取内容,获取到代表任务过期,直接打断
                DelayInterruptingThread delayed = (DelayInterruptingThread)
                        delayQueue.take();
                delayed.rollback();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

入口代码如下

    public DiningPhilosophersBlockingQueue() {
        phis = new Philosopher[5];
        forks = new int[5];
        workingQueue = new LinkedBlockingQueue<>();
        managerQueue = new LinkedBlockingQueue<>();
        for(int i = 0; i < 5; i++) {
            phis[i] = new Philosopher(i+1);
            workingQueue.offer(phis[i]);
        }
    }
    public void run(){
        var pool = Executors.newFixedThreadPool(7);
        for(int i = 0; i < 5; i++) {
			// 启动5个消费线程,消费可执行队列任务
            pool.submit(new Worker());
        }
		// 一个线程循环抢夺资源,创造任务
        pool.submit(new ContentionManager());
		// 一个打断线程,防止任务执行超时
        pool.submit(new InterruptingWorker());
    }
}


评论