线程安全工具
线程安全队列
队列种类介绍
队列类型 | 概述 | 应用体现 |
---|---|---|
ArrayListBlockingQueue | 默认有界队列 | Web 服务器实现(下标索引) |
LinkedBlockingQueue | 默认无界队列 | Web 服务器实现(无下标) |
LinkedBlockingDueue | 无界双向队列 | 撤销、重做实现 |
PriorityQueue | 带优先级的队列 | 操作系统调度(中断、关系优先执行) |
DelayQueue | 延时队列 | 流量控制等 |
LinkedTransferQueue | 无容量队列(tryTransfer) | 线程池实现 |
SynchronousQueue | 无容量队列(offer) | 线程池实现 |
补充说明
LinkedTransferQueue --> tryTransfer 反向压力(添加任务,动态添加新的消费者)===> 那用offer判断可以吗? 结论 : tryTransfer 需要匹配到消费者,否则返回false, offer只要队列可存储,返回true
SynchronousQueue--> 基本同LinkedTransferQueue , 可以配置fair(公平非公平,但公平仅仅保证消费者足够的情况下) , api上直接使用offer,即等同于tryTrasfe)
总结
SynchronousQueue和LinkedTransferQueue在实现匹配策略时,队列容量都默认为0,当匹配当消费者(有消费者take阻塞住)时,才能提交任务成功。
public class Scheduler {
LinkedTransferQueue<Runnable> tasks = new LinkedTransferQueue<>();
static AtomicInteger idCount = new AtomicInteger(0);
int maxWorkers;
public Scheduler(int maxWorkers) {
this.maxWorkers = maxWorkers;
for(int i = 0; i < maxWorkers; i++) {
new Thread(new Worker()).start();
}
}
class Worker implements Runnable {
int id;
public Worker(){
this.id = idCount.getAndIncrement();
}
@Override
public void run() {
while(true) {
Runnable runnable = null;
try {
// 执行take,代表消费者就绪
runnable = tasks.take();
runnable.run();
System.out.format("work done by id=%d\n", id);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void submit(Runnable r) throws InterruptedException {
// 判断消费者是否就绪
// 若就绪tryTransfer返回true,将r加入到队列
// 若不就绪,进入循环体,创建新消费者来动态扩充消费核心
while(!tasks.tryTransfer(r)) {
Thread.onSpinWait();
new Thread(new Worker()).start();
}
}
public static void main(String[] argv) throws InterruptedException {
var scheduler = new Scheduler(10);
for(int i = 0; i < 1000; i++) {
var localI = i;
Thread.sleep(1);
scheduler.submit(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
线程池框架
线程的几种状态
线程池框架
线程池框架 | 描述 | 特点 |
---|---|---|
| 固定大小的线程池,执行任务的线程数量固定 | 适用于需要限制线程数量的场景,避免线程数量无限增长 |
| 可根据任务动态调整线程数量,适用于短时任务 | 适用于短时任务,线程数随着任务数量的增加而自动调整 |
| 只有一个线程的线程池 | 适用于需要顺序执行任务的场景 |
| 能执行定时任务和周期性任务 | 适用于需要定时执行或周期性执行任务的场景 |
| JDK 1.8 引入的线程池,适用于并行任务 | 适用于处理大量独立、可并行计算的任务 |
底层实现均有ThreadPoolExecutor创建,其构造方法参数
参数名 | 说明 |
---|---|
corePoolSize | 线程池中保持存活的核心线程数量 |
maximumPoolSize | 线程池中允许的最大线程数量(队列满才会扩) |
keepAliveTime | 空闲线程超过核心线程数时的存活时间 |
unit | 空闲线程存活时间的单位 |
workQueue | 用于保存等待执行的任务的阻塞队列 |
threadFactory | 用于创建新线程的工厂 |
rejectedExecutionHandler | 用于处理任务执行超出边界并被拒绝执行的情况 |
threadFactory主要用于定义相关线程参数,线程名、守护线程等传入
// 自定义 ThreadFactory
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "MyThread-";
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setPriority(Thread.NORM_PRIORITY);
t.setDaemon(true);
return t;
}
};
rejectedExecutionHandler内置了几种拒绝策略,默认策略时拒绝。
AbortPolicy(默认策略):默认的拒绝策略。当任务添加到线程池中被拒绝时,会抛出
RejectedExecutionException
异常给调用者。CallerRunsPolicy:在任务被拒绝时,会使用调用线程来执行该任务。这样可以降低新任务的提交速度。
DiscardPolicy:当任务被拒绝时,会默默地丢弃该任务,不做任何处理。
DiscardOldestPolicy:当任务被拒绝时,会丢弃队列中最旧的未处理任务,并尝试为当前任务腾出空间。
扩展
并发处理出现的死锁问题该如何处理?
死锁(互相抢占资源,陷入等待)
活锁(规律性执行、规律性结束,无法进步)
死锁案例
// forks为共享资源
// A的Left资源就是Z的right资源
while (true) {
try {
this.thinking();
while (!this.takeLeft(forks)) {
Thread.onSpinWait();
}
// 在此处产生上线文线程切换
// 会导致死锁(拿到Left,Right被其他线程拿到)
while (!this.takeRight(forks)) {
Thread.onSpinWait();
}
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
使用上述方法在多线程竞争资源时,会产生死锁。
疑问: 此处不用while循环,拿不到left资源或right资源直接跳过,岂不就行了?
while (true) {
try {
this.thinking();
if(!this.takeLeft(forks) || !this.takeRight(forks)) {
this.putLeft(forks);
this.putRight(forks);
continue;
}
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
解决: 测试发现无问题(此处模拟阻塞产生死锁,探究死锁解决方案)。
方案1
此种方案在拿不到right资源超过一定限制,就放弃争抢。但是会产生活锁(释放left资源的一瞬间又被自己抢到,周而复始)。
int c = 0;
while (!this.takeRight(forks)) {
c ++;
if(c > 100) {
this.putLeft(forks);
continue;
}
Thread.onSpinWait();
}
方案2
对于临界区加锁,来防止资源被一个拿去一个,造成线程间互相等待产生死锁。
在获取left资源时,获取不到,就await等待其他线程释放资源后唤醒线程;
在已获取left之后时,认为自己拥有较高执行权限,去请求其他线程释放right资源;
此种方案过于臃肿,不推荐使用。
while(true) {
try {
this.thinking();
lock.lockInterruptibly();
while(!this.takeLeft(forks)) {
waitForks[this.left()].await();
}
while(!this.takeRight(forks)) {
var rid = this.right();
var rightPhi = phis[forks[rid] -1];
if(rightPhi.getState() != "Eating" && dirty[rid] == true) {
forks[rid] = this.id;
dirty[rid] = false;
break;
}
waitForks[this.right()].await();
}
lock.unlock();
this.eating();
lock.lockInterruptibly();
this.putLeft(forks);
waitForks[this.left()].signalAll();
this.putRight(forks);
waitForks[this.right()].signalAll();
dirty[this.left()] = true;
dirty[this.right()] = true;
lock.unlock();
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
方案3
阻塞队列的方式实现。
感觉此种方式与原始方案不加锁,拿不到资源直接continue有点像。
开一个线程竞争资源,由于在一个线程执行竞争,也不用加锁。
竞争成功后,就把当前元素加入执行队列workingQueue等待执行。
竞争失败,直接重回竞争队列(这不就是初始方案放弃了阻塞么,提取了一个生产者消费者模式去处理)。
class ContentionManager implements Runnable {
@Override
public void run() {
while(true) {
try {
var phi = managerQueue.take();
if(phi.checkLeft(forks) && phi.checkRight(forks)) {
phi.takeLeft(forks);
phi.takeRight(forks);
workingQueue.offer(phi);
} else {
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
工作队列代码如下
class Worker implements Runnable {
@Override
public void run() {
while(true) {
Philosopher phi = null;
try {
phi = workingQueue.take();
if(phi.getState() == "Hungry") {
phi.eating();
phi.putLeft(forks);
phi.putRight(forks);
phi.finished();
workingQueue.offer(phi);
} else {
phi.thinking();
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
}
}
}
}
方案4(方案3补充)
构造延时队列,模拟异常情况。
延时队列代码如下
class DelayInterruptingThread implements Delayed{
long time;
Thread current;
public DelayInterruptingThread(Thread t, long delay) {
this.current = t;
this.time = System.currentTimeMillis() + delay;
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (time - ((DelayInterruptingThread)o).time);
}
public void rollback() {
if(this.current != null) {
this.current.interrupt();
}
}
public void commit() {
this.current = null;
}
}
工作队列添加超时打断操作
while(true) {
Philosopher phi = null;
try {
phi = workingQueue.take();
if(phi.getState() == "Hungry") {
// 将元素加入延时队列,1s中 eating未执行完成,将会被rollback
var delayItem = new DelayInterruptingThread(Thread.currentThread(),
1000);
delayQueue.offer(delayItem);
phi.eating();
delayItem.commit();
phi.putLeft(forks);
phi.putRight(forks);
phi.finished();
workingQueue.offer(phi);
} else {
phi.thinking();
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
if(phi != null) {
// rollback打断后,释放资源,回滚状态
phi.putLeft(forks);
phi.putRight(forks);
if(phi.getState() == "Eating") {
phi.setState("Hungry");
}
managerQueue.offer(phi);
}
}
}
打断线程代码如下
class InterruptingWorker implements Runnable {
@Override
public void run() {
while(true) {
try {
// 从延时队列里循环获取内容,获取到代表任务过期,直接打断
DelayInterruptingThread delayed = (DelayInterruptingThread)
delayQueue.take();
delayed.rollback();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
入口代码如下
public DiningPhilosophersBlockingQueue() {
phis = new Philosopher[5];
forks = new int[5];
workingQueue = new LinkedBlockingQueue<>();
managerQueue = new LinkedBlockingQueue<>();
for(int i = 0; i < 5; i++) {
phis[i] = new Philosopher(i+1);
workingQueue.offer(phis[i]);
}
}
public void run(){
var pool = Executors.newFixedThreadPool(7);
for(int i = 0; i < 5; i++) {
// 启动5个消费线程,消费可执行队列任务
pool.submit(new Worker());
}
// 一个线程循环抢夺资源,创造任务
pool.submit(new ContentionManager());
// 一个打断线程,防止任务执行超时
pool.submit(new InterruptingWorker());
}
}