手撕简易线程池
本文内容
前言
在 上一篇文章 中,理解了线程池的使用方法和核心原理后,接下来我们就手撸一个简易版的线程。
我会从一个最简单的版本开始,一步步找出问题,然后提出解决思路,最后编码实现,最后完成一个基本功能完备的线程池。
项目地址:https://github.com/AruNi-01/JavaConcurrency/tree/master/src/main/java/com/run/threadpool
1. 定义线程池接口
首先定义一个 线程池接口,向外表明该线程池提供了哪些功能,这也符合面向接口编程的思想。
一个 简单的线程池只需要具备如下几点功能:
- 向线程池中 添加任务并执行;
- 设置线程池饱和时的 拒绝策略;
- 关闭线程池。
/**
* @desc: 线程池接口
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public interface ThreadPool {
// 添加任务并执行
void execute(Runnable task);
// 优雅关闭,等待已添加的任务执行完毕后再关闭
void shutdown();
}
2. 线程池的简单实现
接下来定义一个线程池接口的简单实现类,实现基础功能。不过在这之前,肯定需要定义一个 工作线程,用于 执行任务,不然线程池中放什么呢?
2.1 定义工作线程
最简单的工作线程无非就是 从任务队列中取出任务来执行,所以我们定义一个任务队列,让线程从中不断取任务执行即可。
/**
* @desc: 工作线程
* @author: AruNi_Lu
* @date: 2023-07-01
*/
@Deprecated
public class WorkerThread extends Thread{
// 任务队列(阻塞)
private BlockingQueue<Runnable> taskQueue;
/**
* 构造函数,将 taskQueue 注入进来,方便从中取任务执行
* @param taskQueue 任务队列
*/
public WorkerThread(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// take():阻塞直到从队列中取到任务
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
为什么要用阻塞队列?
- 如果使用非阻塞队列,那么任务队列为空时,线程只能通过 轮询/间断轮询来获取新任务,是有 资源消耗和延迟 的;
- 可以在非阻塞队列的基础上,实现一个 阻塞/唤醒机制,但会带来更多的编码。
2.2 简易线程池
为了实现线程池,我们需要定义如下几个变量:
initialSize
:线程池初始化的线程数量;taskQueue
:阻塞的任务队列,用于存放任务;threads
:存放所有工作线程的集合;isShutdown
:标志线程池是否已关闭。
由于 WorkerThread 类属于线程池,所以可以使用内部类,这样就不用传递 taskQueue 了,内部类中可以直接使用外部类的成员变量。
下面就是实现了 ThreadPool 接口的简易线程池了,:
/**
* @desc: 简易线程池实现类
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class SimpleThreadPool implements ThreadPool {
// 初始化线程池时的线程数量
private int initialSize;
// 任务队列(阻塞)
private BlockingQueue<Runnable> taskQueue;
// 存放工作线程的集合
private List<WorkerThread> workers;
// 标志线程池是否已关闭
private volatile boolean isShutdown = false;
/**
* 内部类,工作线程
*/
private final class WorkerThread extends Thread {
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// take():阻塞直到从队列中取到任务
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
/**
* 构造函数,初始化线程池
* @param initialSize 线程数量
*/
public SimpleThreadPool(int initialSize) {
this.initialSize = initialSize;
taskQueue = new LinkedBlockingQueue<>();
workers = new ArrayList<>(initialSize);
// 初始化线程池时,创建并调用 start 方法启动工作线程
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
}
}
/**
* 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
* @param task 任务
*/
@Override
public void execute(Runnable task) {
// 线程池已关闭后,不允许再添加任务
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown.");
}
taskQueue.offer(task);
}
/**
* 关闭线程池(优雅):
* 1. 修改 isShutdown 标志;
* 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
*/
@Override
public void shutdown() {
isShutdown = true;
for (WorkerThread thread : workers) {
thread.interrupt();
}
}
}
2.3 测试
进行一个简单的测试:
@Test
public void testV1() throws InterruptedException {
SimpleThreadPool pool = new SimpleThreadPool(3);
for (int i = 0; i < 10; i++) {
pool.execute(() -> System.out.println(Thread.currentThread().getName() + ": executing task..."));
}
Thread.sleep(1000); // 主线程等待任务执行完毕
pool.shutdown();
}
/* 输出:
Thread-0: executing task...
Thread-1: executing task...
Thread-2: executing task...
Thread-1: executing task...
Thread-0: executing task...
Thread-1: executing task...
Thread-2: executing task...
Thread-1: executing task...
Thread-0: executing task...
Thread-2: executing task...
*/
3. 优化:自定义线程池的基本参数
上面简易实现的线程池中,还存在如下问题:
- 没有指定任务队列的大小,如果有大量任务添加时,内存很快就会被用完,从而导致异常;
- 初始化线程池时 线程数量固定死,这样如果后续任务增多时,可能之前设置的线程数远远不够,造成性能问题;
下面我们就来对线程池进行优化,可以做出如下调整:
- 在构造器中增加任务队列的大小;
- 添加一些可动态调整线程数量的成员变量,例如最大线程数、核心线程数。我们参考 Java ThreadPool 的设计:
- 当活跃线程数小于核心线程数时,启动一个新的工作线程来执行任务;
- 当活跃线程数大于核心线程数时,将任务添加到任务队列,等空闲的工作线程来执行;
- 当任务队列已满,且工作线程未达到最大线程数时,启动临时工作线程来执行任务;
- 当任务队列已满,且工作线程已达到最大线程数时,该任务不可执行。
优化后的 SimpleThreadPool 如下:
/**
* @desc: 简易线程池实现类
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class SimpleThreadPool implements ThreadPool {
// 初始化线程池时的线程数量
private int initialSize;
// 核心线程数
private int coreSize;
// 最大线程数
private int maxSize;
// 任务队列大小
private int queueSize;
// 任务队列(阻塞)
private BlockingQueue<Runnable> taskQueue;
// 存放工作线程的集合
private List<WorkerThread> workers;
// 标志线程池是否已关闭
private volatile boolean isShutdown = false;
/**
* 内部类,工作线程
*/
private final class WorkerThread extends Thread {
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// take():阻塞直到从队列中取到任务
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize) {
this.initialSize = initialSize;
this.coreSize = coreSize;
this.maxSize = maxSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
workers = new ArrayList<>(initialSize);
// 初始化线程池时,创建并调用 start 方法启动工作线程
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
}
}
/**
* 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
* @param task 任务
*/
@Override
public void execute(Runnable task) {
// 线程池已关闭后,不允许再添加任务
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown.");
}
// 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
if (workers.size() < coreSize) {
addWorkerThread(task);
} else if (!taskQueue.offer(task)) { // check 任务队列是否已满,未满则添加进入,已满则进入分支
// 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
if (workers.size() < maxSize) {
addWorkerThread(task);
} else {
throw new IllegalStateException("Failed to execute. Too many tasks.");
}
}
}
/**
* 启动新的工作线程,将任务放入队列中以执行
* @param task 任务
*/
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
taskQueue.offer(task);
}
/**
* 关闭线程池(优雅):
* 1. 修改 isShutdown 标志;
* 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
*/
@Override
public void shutdown() {
isShutdown = true;
for (WorkerThread thread : workers) {
thread.interrupt();
}
}
}
4. 优化:饱和拒绝策略
限制了任务队列的大小,那么 当队列满时,且线程数量已达最大,还有任务到来,应该怎么办?
直接像上面那样,抛出一个异常,并不是一个最佳的方式,设计得并不够优雅。
此时我们可以设计一个 饱和拒绝策略,当无法执行该任务时,根据不同的拒绝策略来处理该任务。主要的策略有:
- 直接抛出异常,也就是我们上面的做法(不推荐);
- 忽略该任务;
- 阻塞当前线程;
可以发现,拒绝策略可以有多种,因此使用接口来定义,让不同的策略去具体实现自己的逻辑。我们只需要在线程池实现类中提供一个拒绝策略的成员变量,让使用者传入具体的拒绝策略即可。
4.1 定义拒绝策略接口
定义一个拒绝策略接口 RejectedExecutionHandler,提供一个拒绝执行的方法:
/**
* @desc: 饱和拒绝策略接口
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public interface RejectedExecutionHandler {
/**
* 拒绝执行(任务)
* @param task 被拒绝的任务
* @param pool 哪个线程池拒绝
*/
void rejectedExecution(Runnable task, ThreadPool pool);
}
4.2 AbortPolicy
直接抛出异常的策略实现类:
/**
* @desc: 抛出异常策略
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPool pool) {
throw new RuntimeException("Task queue is full and maximum number of threads has been reached");
}
}
4.3 DiscardPolicy
在实现一个丢弃策略:
/**
* @desc: 拒绝任务策略
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class DiscardPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPool pool) {
// 拒绝该任务,什么也不做
System.out.println("Discard task: " + task);
}
}
4.4 SimpleThreadPool 中添加拒绝策略
最后,在 SimpleThreadPool 中添加拒绝策略,完整代码如下:
/**
* @desc: 简易线程池实现类
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class SimpleThreadPool implements ThreadPool {
// 初始化线程池时的线程数量
private int initialSize;
// 核心线程数
private int coreSize;
// 最大线程数
private int maxSize;
// 任务队列大小
private int queueSize;
// 任务队列(阻塞)
private BlockingQueue<Runnable> taskQueue;
// 存放工作线程的集合
private List<WorkerThread> workers;
// 标志线程池是否已关闭
private volatile boolean isShutdown = false;
// 默认拒绝策略
private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();
// 拒绝策略
private final RejectedExecutionHandler rejectedExecutionHandler;
/**
* 内部类,工作线程
*/
private final class WorkerThread extends Thread {
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// take():阻塞直到从队列中取到任务
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize) {
this(initialSize, coreSize, maxSize, queueSize, DEFAULT_REJECT_HANDLER);
}
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
* @param rejectedHandler 饱和拒绝策略
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, RejectedExecutionHandler rejectedHandler) {
this.initialSize = initialSize;
this.coreSize = coreSize;
this.maxSize = maxSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
workers = new ArrayList<>(initialSize);
this.rejectedExecutionHandler = rejectedHandler;
// 初始化线程池时,创建并调用 start 方法启动工作线程
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
}
}
/**
* 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
* @param task 任务
*/
@Override
public void execute(Runnable task) {
// 线程池已关闭后,不允许再添加任务
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown.");
}
// 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
if (workers.size() < coreSize) {
addWorkerThread(task);
} else if (!taskQueue.offer(task)) { // check 任务队列是否已满,未满则添加进入,已满则进入分支
// 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
if (workers.size() < maxSize) {
addWorkerThread(task);
} else {
// 使用饱和拒绝策略
rejectedExecutionHandler.rejectedExecution(task, this);
}
}
}
/**
* 启动新的工作线程,将任务放入队列中以执行
* @param task 任务
*/
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
taskQueue.offer(task);
}
/**
* 关闭线程池(优雅):
* 1. 修改 isShutdown 标志;
* 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
*/
@Override
public void shutdown() {
isShutdown = true;
for (WorkerThread thread : workers) {
thread.interrupt();
}
}
}
5. 优化:自调节工作线程数
我们在上面添加了一个最大线程数,当任务量增多时,可以启动一些临时线程来处理任务。
那么 当任务量消减回去时,这些临时的线程其实是可以销毁的,只保留核心线程即可,毕竟一直开启也比较消耗资源。
所以我们就需要动态的调节工作线程数,那怎么实现呢?
其实很简单,只需要给这些临时线程设置一个 存活时间,到时间后,把他们从工作线程集合中移除销毁即可。
为了判断某空闲是否已到达线程存活时间,需要在工作线程的 run 方法中 不断更新该线程最后一次执行任务的时间。
同时,我们需要利用阻塞队列的一个 poll()
方法,它可以接受一个最大阻塞时间的参数。
5.1 SimpleThreadPool 中添加空闲线程存活时间
在 SimpleThreadPool 中添加空闲线程存活时间:
/**
* @desc: 简易线程池实现类
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class SimpleThreadPool implements ThreadPool {
// 其他代码略
// 临时线程存活时间
private long keepAliveTime;
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
* @param keepAliveTime 临时线程存活时间
* @param unit 临时线程存活时间单位
* @param rejectedHandler 饱和拒绝策略
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler rejectedHandler) {
if (initialSize < 0 || coreSize < 0 || maxSize <= 0 || maxSize < coreSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
this.initialSize = initialSize;
this.coreSize = coreSize;
this.maxSize = maxSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
workers = new ArrayList<>(initialSize);
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.rejectedExecutionHandler = rejectedHandler;
// 初始化线程池时,创建并调用 start 方法启动工作线程
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
}
}
// 其他代码略
}
5.2 改造 WorkerThread
重点在于 WorkerThread 的 run 方法,改造后如下:
/**
* 内部类,工作线程
*/
private final class WorkerThread extends Thread {
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 记录该工作线程最后执行任务的时间
long lastActiveTime = System.currentTimeMillis();
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// poll():阻塞直到从队列中取到任务,或者到达超时时间
Runnable task = taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
if (task != null) {
System.out.printf("WorkerThread %s, executing task: %s\n", Thread.currentThread().getName(), "My Task...");
task.run();
// 执行完任务后更新 lastActiveTime
lastActiveTime = System.currentTimeMillis();
} else if (workers.size() > coreSize &&
System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
// 临时线程已到达存活时间,则从工作线程集合中移除,然后跳出循环
System.out.printf("Temp worker thread %s, exit workers queue\n", Thread.currentThread().getName());
workers.remove(this);
break;
}
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
6. SimpleThreadPool 最终版
SimpleThreadPool 最后的代码如下:
/**
* @desc: 简易线程池实现类
* @author: AruNi_Lu
* @date: 2023-07-01
*/
public class SimpleThreadPool implements ThreadPool {
// 初始化线程池时的线程数量
private int initialSize;
// 核心线程数
private int coreSize;
// 最大线程数
private int maxSize;
// 任务队列大小
private int queueSize;
// 任务队列(阻塞)
private BlockingQueue<Runnable> taskQueue;
// 存放工作线程的集合
private List<WorkerThread> workers;
// 标志线程池是否已关闭
private volatile boolean isShutdown = false;
// 默认拒绝策略
private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();
// 拒绝策略
private final RejectedExecutionHandler rejectedExecutionHandler;
// 临时线程存活时间
private long keepAliveTime;
/**
* 内部类,工作线程
*/
private final class WorkerThread extends Thread {
/**
* 重写 run 方法,让线程执行时从任务队列中取任务执行
*/
@Override
public void run() {
// 记录该工作线程最后执行任务的时间
long lastActiveTime = System.currentTimeMillis();
// 循环从 taskQueue 中取任务执行
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// poll():阻塞直到从队列中取到任务,或者到达超时时间
Runnable task = taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
if (task != null) {
System.out.printf("WorkerThread %s, executing task: %s\n", Thread.currentThread().getName(), "My Task...");
task.run();
// 执行完任务后更新 lastActiveTime
lastActiveTime = System.currentTimeMillis();
} else if (workers.size() > coreSize &&
System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
// 临时线程已到达存活时间,则从工作线程集合中移除,然后跳出循环
System.out.printf("Temp worker thread %s, exit workers queue\n", Thread.currentThread().getName());
workers.remove(this);
break;
}
} catch (InterruptedException e) {
// 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
workers.remove(this);
break;
}
}
}
}
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
* @param keepAliveTime 临时线程存活时间
* @param unit 临时线程存活时间单位
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit) {
this(initialSize, coreSize, maxSize, queueSize, keepAliveTime, unit, DEFAULT_REJECT_HANDLER);
}
/**
* 构造函数,初始化线程池
* @param initialSize 初始化线程数
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param queueSize 任务队列大小
* @param keepAliveTime 临时线程存活时间
* @param unit 临时线程存活时间单位
* @param rejectedHandler 饱和拒绝策略
*/
public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler rejectedHandler) {
if (initialSize < 0 || coreSize < 0 || maxSize <= 0 || maxSize < coreSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
this.initialSize = initialSize;
this.coreSize = coreSize;
this.maxSize = maxSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
workers = new ArrayList<>(initialSize);
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.rejectedExecutionHandler = rejectedHandler;
// 初始化线程池时,创建并调用 start 方法启动工作线程
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
}
}
/**
* 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
* @param task 任务
*/
@Override
public void execute(Runnable task) {
// 线程池已关闭后,不允许再添加任务
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown.");
}
// 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
if (workers.size() < coreSize) {
addWorkerThread(task);
} else if (!taskQueue.offer(task)) { // check 任务队列是否已满,未满则添加进入,已满则进入分支
// 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
if (workers.size() < maxSize) {
addWorkerThread(task);
} else {
// 使用饱和拒绝策略
rejectedExecutionHandler.rejectedExecution(task, this);
}
}
}
/**
* 启动新的工作线程,将任务放入队列中以执行
* @param task 任务
*/
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workers.add(workerThread);
taskQueue.offer(task);
}
/**
* 关闭线程池(优雅):
* 1. 修改 isShutdown 标志;
* 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
*/
@Override
public void shutdown() {
isShutdown = true;
for (WorkerThread thread : workers) {
thread.interrupt();
}
}
}
测试:
@Test
public void testV2() throws InterruptedException {
SimpleThreadPool pool = new SimpleThreadPool(3, 3, 5, 8, 10, TimeUnit.MILLISECONDS);
// 控制任务数量,不要让饱和拒绝策略触发
for (int i = 0; i < 12; i++) {
pool.execute(() -> {
for (int j = 0; j < 100; j++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
Thread.sleep(10_000_000); // 让主线程一直睡眠,方便我们观察结果
pool.shutdown();
}
我们定义了核心线程数为 3,最大线程数为 5,任务队列大小为 8。所以任务数量 >= 13 (5 + 8)时,就会触发拒绝策略;当所有任务都执行完毕时,会有 2(5 - 3)个临时线程被移除:
WorkerThread Thread-0, executing task: My Task...
WorkerThread Thread-3, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-4, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-0, executing task: My Task...
WorkerThread Thread-3, executing task: My Task...
WorkerThread Thread-4, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
Temp worker thread Thread-0, exit workers queue
Temp worker thread Thread-3, exit workers queue
...... 阻塞 ......