跳至主要內容

手撕简易线程池

AruNi_LuJava并发编程约 5227 字大约 17 分钟

本文内容

前言

上一篇文章open in new window 中,理解了线程池的使用方法和核心原理后,接下来我们就手撸一个简易版的线程。

我会从一个最简单的版本开始,一步步找出问题,然后提出解决思路,最后编码实现,最后完成一个基本功能完备的线程池。

项目地址:https://github.com/AruNi-01/JavaConcurrency/tree/master/src/main/java/com/run/threadpoolopen in new window

1. 定义线程池接口

首先定义一个 线程池接口,向外表明该线程池提供了哪些功能,这也符合面向接口编程的思想。

一个 简单的线程池只需要具备如下几点功能

  • 向线程池中 添加任务并执行
  • 设置线程池饱和时的 拒绝策略
  • 关闭线程池
/**
 * @desc: 线程池接口
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public interface ThreadPool {

    // 添加任务并执行
    void execute(Runnable task);

    // 优雅关闭,等待已添加的任务执行完毕后再关闭
    void shutdown();

}

2. 线程池的简单实现

接下来定义一个线程池接口的简单实现类,实现基础功能。不过在这之前,肯定需要定义一个 工作线程,用于 执行任务,不然线程池中放什么呢?

2.1 定义工作线程

最简单的工作线程无非就是 从任务队列中取出任务来执行,所以我们定义一个任务队列,让线程从中不断取任务执行即可。

/**
 * @desc: 工作线程
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
@Deprecated
public class WorkerThread extends Thread{

    // 任务队列(阻塞)
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 构造函数,将 taskQueue 注入进来,方便从中取任务执行
     * @param taskQueue 任务队列
     */
    public WorkerThread(BlockingQueue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    /**
     * 重写 run 方法,让线程执行时从任务队列中取任务执行
     */
    @Override
    public void run() {
        // 循环从 taskQueue 中取任务执行
        while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
            try {
                // take():阻塞直到从队列中取到任务
                Runnable task = taskQueue.take();
                task.run();
            } catch (InterruptedException e) {
                // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                workers.remove(this);
                break;
            }
        }
    }
}

为什么要用阻塞队列?

  • 如果使用非阻塞队列,那么任务队列为空时,线程只能通过 轮询/间断轮询来获取新任务,是有 资源消耗和延迟 的;
  • 可以在非阻塞队列的基础上,实现一个 阻塞/唤醒机制,但会带来更多的编码。

2.2 简易线程池

为了实现线程池,我们需要定义如下几个变量:

  • initialSize:线程池初始化的线程数量;
  • taskQueue:阻塞的任务队列,用于存放任务;
  • threads:存放所有工作线程的集合;
  • isShutdown:标志线程池是否已关闭。

由于 WorkerThread 类属于线程池,所以可以使用内部类,这样就不用传递 taskQueue 了,内部类中可以直接使用外部类的成员变量。

下面就是实现了 ThreadPool 接口的简易线程池了,:

/**
 * @desc: 简易线程池实现类
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class SimpleThreadPool implements ThreadPool {

    // 初始化线程池时的线程数量
    private int initialSize;

    // 任务队列(阻塞)
    private BlockingQueue<Runnable> taskQueue;

    // 存放工作线程的集合
    private List<WorkerThread> workers;

    // 标志线程池是否已关闭
    private volatile boolean isShutdown = false;


    /**
     * 内部类,工作线程
     */
    private final class WorkerThread extends Thread {

        /**
         * 重写 run 方法,让线程执行时从任务队列中取任务执行
         */
        @Override
        public void run() {
            // 循环从 taskQueue 中取任务执行
            while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
                try {
                    // take():阻塞直到从队列中取到任务
                    Runnable task = taskQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                    workers.remove(this);
                    break;
                }
            }
        }

    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 线程数量
     */
    public SimpleThreadPool(int initialSize) {
        this.initialSize = initialSize;
        taskQueue = new LinkedBlockingQueue<>();
        workers = new ArrayList<>(initialSize);

        // 初始化线程池时,创建并调用 start 方法启动工作线程
        for (int i = 0; i < initialSize; i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            workers.add(workerThread);
        }
    }

    /**
     * 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
     * @param task 任务
     */
    @Override
    public void execute(Runnable task) {
        // 线程池已关闭后,不允许再添加任务
        if (isShutdown) {
            throw new IllegalStateException("ThreadPool is shutdown.");
        }
        taskQueue.offer(task);
    }

    /**
     * 关闭线程池(优雅):
     * 1. 修改 isShutdown 标志;
     * 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
     */
    @Override
    public void shutdown() {
        isShutdown = true;

        for (WorkerThread thread : workers) {
            thread.interrupt();
        }
    }

}

2.3 测试

进行一个简单的测试:

@Test
public void testV1() throws InterruptedException {
    SimpleThreadPool pool = new SimpleThreadPool(3);

    for (int i = 0; i < 10; i++) {
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + ": executing task..."));
    }

    Thread.sleep(1000); 	// 主线程等待任务执行完毕
    pool.shutdown();
}

/* 输出:
Thread-0: executing task...
Thread-1: executing task...
Thread-2: executing task...
Thread-1: executing task...
Thread-0: executing task...
Thread-1: executing task...
Thread-2: executing task...
Thread-1: executing task...
Thread-0: executing task...
Thread-2: executing task...
*/

3. 优化:自定义线程池的基本参数

上面简易实现的线程池中,还存在如下问题:

  • 没有指定任务队列的大小,如果有大量任务添加时,内存很快就会被用完,从而导致异常;
  • 初始化线程池时 线程数量固定死,这样如果后续任务增多时,可能之前设置的线程数远远不够,造成性能问题;

下面我们就来对线程池进行优化,可以做出如下调整:

  • 在构造器中增加任务队列的大小;
  • 添加一些可动态调整线程数量的成员变量,例如最大线程数、核心线程数。我们参考 Java ThreadPool 的设计:
    • 当活跃线程数小于核心线程数时,启动一个新的工作线程来执行任务;
    • 当活跃线程数大于核心线程数时,将任务添加到任务队列,等空闲的工作线程来执行;
    • 当任务队列已满,且工作线程未达到最大线程数时,启动临时工作线程来执行任务;
    • 当任务队列已满,且工作线程已达到最大线程数时,该任务不可执行。

优化后的 SimpleThreadPool 如下:

/**
 * @desc: 简易线程池实现类
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class SimpleThreadPool implements ThreadPool {

    // 初始化线程池时的线程数量
    private int initialSize;

    // 核心线程数
    private int coreSize;

    // 最大线程数
    private int maxSize;

    // 任务队列大小
    private int queueSize;

    // 任务队列(阻塞)
    private BlockingQueue<Runnable> taskQueue;

    // 存放工作线程的集合
    private List<WorkerThread> workers;

    // 标志线程池是否已关闭
    private volatile boolean isShutdown = false;


    /**
     * 内部类,工作线程
     */
    private final class WorkerThread extends Thread {

        /**
         * 重写 run 方法,让线程执行时从任务队列中取任务执行
         */
        @Override
        public void run() {
            // 循环从 taskQueue 中取任务执行
            while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
                try {
                    // take():阻塞直到从队列中取到任务
                    Runnable task = taskQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                    workers.remove(this);
                    break;
                }
            }
        }

    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize) {
        this.initialSize = initialSize;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        taskQueue = new LinkedBlockingQueue<>(queueSize);
        workers = new ArrayList<>(initialSize);

        // 初始化线程池时,创建并调用 start 方法启动工作线程
        for (int i = 0; i < initialSize; i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            workers.add(workerThread);
        }
    }

    /**
     * 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
     * @param task 任务
     */
    @Override
    public void execute(Runnable task) {
        // 线程池已关闭后,不允许再添加任务
        if (isShutdown) {
            throw new IllegalStateException("ThreadPool is shutdown.");
        }

        // 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
        if (workers.size() < coreSize) {
            addWorkerThread(task);
        } else if (!taskQueue.offer(task)) {    // check 任务队列是否已满,未满则添加进入,已满则进入分支
            // 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
            if (workers.size() < maxSize) {
                addWorkerThread(task);
            } else {
                throw new IllegalStateException("Failed to execute. Too many tasks.");
            }
        }
    }

    /**
     * 启动新的工作线程,将任务放入队列中以执行
     * @param task 任务
     */
    private void addWorkerThread(Runnable task) {
        WorkerThread workerThread = new WorkerThread();
        workerThread.start();
        workers.add(workerThread);
        taskQueue.offer(task);
    }

    /**
     * 关闭线程池(优雅):
     * 1. 修改 isShutdown 标志;
     * 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
     */
    @Override
    public void shutdown() {
        isShutdown = true;

        for (WorkerThread thread : workers) {
            thread.interrupt();
        }
    }

}

4. 优化:饱和拒绝策略

限制了任务队列的大小,那么 当队列满时,且线程数量已达最大,还有任务到来,应该怎么办?

直接像上面那样,抛出一个异常,并不是一个最佳的方式,设计得并不够优雅。

此时我们可以设计一个 饱和拒绝策略,当无法执行该任务时,根据不同的拒绝策略来处理该任务。主要的策略有:

  • 直接抛出异常,也就是我们上面的做法(不推荐);
  • 忽略该任务;
  • 阻塞当前线程;

可以发现,拒绝策略可以有多种,因此使用接口来定义,让不同的策略去具体实现自己的逻辑。我们只需要在线程池实现类中提供一个拒绝策略的成员变量,让使用者传入具体的拒绝策略即可。

4.1 定义拒绝策略接口

定义一个拒绝策略接口 RejectedExecutionHandler,提供一个拒绝执行的方法:

/**
 * @desc: 饱和拒绝策略接口
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public interface RejectedExecutionHandler {

    /**
     * 拒绝执行(任务)
     * @param task 被拒绝的任务
     * @param pool 哪个线程池拒绝
     */
    void rejectedExecution(Runnable task, ThreadPool pool);

}

4.2 AbortPolicy

直接抛出异常的策略实现类:

/**
 * @desc: 抛出异常策略
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class AbortPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable task, ThreadPool pool) {
        throw new RuntimeException("Task queue is full and maximum number of threads has been reached");
    }

}

4.3 DiscardPolicy

在实现一个丢弃策略:

/**
 * @desc: 拒绝任务策略
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class DiscardPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable task, ThreadPool pool) {
        // 拒绝该任务,什么也不做
        System.out.println("Discard task: " + task);
    }

}

4.4 SimpleThreadPool 中添加拒绝策略

最后,在 SimpleThreadPool 中添加拒绝策略,完整代码如下:

/**
 * @desc: 简易线程池实现类
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class SimpleThreadPool implements ThreadPool {

    // 初始化线程池时的线程数量
    private int initialSize;

    // 核心线程数
    private int coreSize;

    // 最大线程数
    private int maxSize;

    // 任务队列大小
    private int queueSize;

    // 任务队列(阻塞)
    private BlockingQueue<Runnable> taskQueue;

    // 存放工作线程的集合
    private List<WorkerThread> workers;

    // 标志线程池是否已关闭
    private volatile boolean isShutdown = false;

    // 默认拒绝策略
    private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();

    // 拒绝策略
    private final RejectedExecutionHandler rejectedExecutionHandler;

    /**
     * 内部类,工作线程
     */
    private final class WorkerThread extends Thread {
        /**
         * 重写 run 方法,让线程执行时从任务队列中取任务执行
         */
        @Override
        public void run() {
            // 循环从 taskQueue 中取任务执行
            while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
                try {
                    // take():阻塞直到从队列中取到任务
                    Runnable task = taskQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                    workers.remove(this);
                    break;
                }
            }
        }
    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize) {
        this(initialSize, coreSize, maxSize, queueSize, DEFAULT_REJECT_HANDLER);
    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     * @param rejectedHandler 饱和拒绝策略
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, RejectedExecutionHandler rejectedHandler) {
        this.initialSize = initialSize;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        taskQueue = new LinkedBlockingQueue<>(queueSize);
        workers = new ArrayList<>(initialSize);
        this.rejectedExecutionHandler = rejectedHandler;

        // 初始化线程池时,创建并调用 start 方法启动工作线程
        for (int i = 0; i < initialSize; i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            workers.add(workerThread);
        }
    }

    /**
     * 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
     * @param task 任务
     */
    @Override
    public void execute(Runnable task) {
        // 线程池已关闭后,不允许再添加任务
        if (isShutdown) {
            throw new IllegalStateException("ThreadPool is shutdown.");
        }

        // 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
        if (workers.size() < coreSize) {
            addWorkerThread(task);
        } else if (!taskQueue.offer(task)) {    // check 任务队列是否已满,未满则添加进入,已满则进入分支
            // 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
            if (workers.size() < maxSize) {
                addWorkerThread(task);
            } else {
                // 使用饱和拒绝策略
                rejectedExecutionHandler.rejectedExecution(task, this);
            }
        }
    }

    /**
     * 启动新的工作线程,将任务放入队列中以执行
     * @param task 任务
     */
    private void addWorkerThread(Runnable task) {
        WorkerThread workerThread = new WorkerThread();
        workerThread.start();
        workers.add(workerThread);
        taskQueue.offer(task);
    }

    /**
     * 关闭线程池(优雅):
     * 1. 修改 isShutdown 标志;
     * 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
     */
    @Override
    public void shutdown() {
        isShutdown = true;

        for (WorkerThread thread : workers) {
            thread.interrupt();
        }
    }

}

5. 优化:自调节工作线程数

我们在上面添加了一个最大线程数,当任务量增多时,可以启动一些临时线程来处理任务。

那么 当任务量消减回去时,这些临时的线程其实是可以销毁的,只保留核心线程即可,毕竟一直开启也比较消耗资源。

所以我们就需要动态的调节工作线程数,那怎么实现呢?

其实很简单,只需要给这些临时线程设置一个 存活时间,到时间后,把他们从工作线程集合中移除销毁即可。

为了判断某空闲是否已到达线程存活时间,需要在工作线程的 run 方法中 不断更新该线程最后一次执行任务的时间

同时,我们需要利用阻塞队列的一个 poll() 方法,它可以接受一个最大阻塞时间的参数。

5.1 SimpleThreadPool 中添加空闲线程存活时间

在 SimpleThreadPool 中添加空闲线程存活时间:

/**
 * @desc: 简易线程池实现类
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class SimpleThreadPool implements ThreadPool {

    // 其他代码略

    // 临时线程存活时间
    private long keepAliveTime;

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     * @param keepAliveTime 临时线程存活时间
     * @param unit 临时线程存活时间单位
     * @param rejectedHandler 饱和拒绝策略
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler rejectedHandler) {
        if (initialSize < 0 || coreSize < 0 || maxSize <= 0 || maxSize < coreSize || keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }

        this.initialSize = initialSize;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        taskQueue = new LinkedBlockingQueue<>(queueSize);
        workers = new ArrayList<>(initialSize);
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.rejectedExecutionHandler = rejectedHandler;

        // 初始化线程池时,创建并调用 start 方法启动工作线程
        for (int i = 0; i < initialSize; i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            workers.add(workerThread);
        }
    }
	
    // 其他代码略

}

5.2 改造 WorkerThread

重点在于 WorkerThread 的 run 方法,改造后如下:

    /**
     * 内部类,工作线程
     */
    private final class WorkerThread extends Thread {
        /**
         * 重写 run 方法,让线程执行时从任务队列中取任务执行
         */
        @Override
        public void run() {
            // 记录该工作线程最后执行任务的时间
            long lastActiveTime = System.currentTimeMillis();

            // 循环从 taskQueue 中取任务执行
            while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
                try {
                    // poll():阻塞直到从队列中取到任务,或者到达超时时间
                    Runnable task = taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

                    if (task != null) {
                        System.out.printf("WorkerThread %s, executing task: %s\n", Thread.currentThread().getName(), "My Task...");
                        task.run();

                        // 执行完任务后更新 lastActiveTime
                        lastActiveTime = System.currentTimeMillis();
                    } else if (workers.size() > coreSize &&
                            System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
                        // 临时线程已到达存活时间,则从工作线程集合中移除,然后跳出循环
                        System.out.printf("Temp worker thread %s, exit workers queue\n", Thread.currentThread().getName());
                        workers.remove(this);
                        break;
                    }
                } catch (InterruptedException e) {
                    // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                    workers.remove(this);
                    break;
                }
            }
        }
    }

6. SimpleThreadPool 最终版

SimpleThreadPool 最后的代码如下:

/**
 * @desc: 简易线程池实现类
 * @author: AruNi_Lu
 * @date: 2023-07-01
 */
public class SimpleThreadPool implements ThreadPool {

    // 初始化线程池时的线程数量
    private int initialSize;

    // 核心线程数
    private int coreSize;

    // 最大线程数
    private int maxSize;

    // 任务队列大小
    private int queueSize;

    // 任务队列(阻塞)
    private BlockingQueue<Runnable> taskQueue;

    // 存放工作线程的集合
    private List<WorkerThread> workers;

    // 标志线程池是否已关闭
    private volatile boolean isShutdown = false;

    // 默认拒绝策略
    private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();

    // 拒绝策略
    private final RejectedExecutionHandler rejectedExecutionHandler;

    // 临时线程存活时间
    private long keepAliveTime;

    /**
     * 内部类,工作线程
     */
    private final class WorkerThread extends Thread {
        /**
         * 重写 run 方法,让线程执行时从任务队列中取任务执行
         */
        @Override
        public void run() {
            // 记录该工作线程最后执行任务的时间
            long lastActiveTime = System.currentTimeMillis();

            // 循环从 taskQueue 中取任务执行
            while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
                try {
                    // poll():阻塞直到从队列中取到任务,或者到达超时时间
                    Runnable task = taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

                    if (task != null) {
                        System.out.printf("WorkerThread %s, executing task: %s\n", Thread.currentThread().getName(), "My Task...");
                        task.run();

                        // 执行完任务后更新 lastActiveTime
                        lastActiveTime = System.currentTimeMillis();
                    } else if (workers.size() > coreSize &&
                            System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
                        // 临时线程已到达存活时间,则从工作线程集合中移除,然后跳出循环
                        System.out.printf("Temp worker thread %s, exit workers queue\n", Thread.currentThread().getName());
                        workers.remove(this);
                        break;
                    }
                } catch (InterruptedException e) {
                    // 当线程阻塞时,收到 interrupt 信号会抛出 InterruptedException,故在此捕获处理
                    workers.remove(this);
                    break;
                }
            }
        }
    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     * @param keepAliveTime 临时线程存活时间
     * @param unit 临时线程存活时间单位
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit) {
        this(initialSize, coreSize, maxSize, queueSize, keepAliveTime, unit, DEFAULT_REJECT_HANDLER);
    }

    /**
     * 构造函数,初始化线程池
     * @param initialSize 初始化线程数
     * @param coreSize 核心线程数
     * @param maxSize 最大线程数
     * @param queueSize 任务队列大小
     * @param keepAliveTime 临时线程存活时间
     * @param unit 临时线程存活时间单位
     * @param rejectedHandler 饱和拒绝策略
     */
    public SimpleThreadPool(int initialSize, int coreSize, int maxSize, int queueSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler rejectedHandler) {
        if (initialSize < 0 || coreSize < 0 || maxSize <= 0 || maxSize < coreSize || keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }

        this.initialSize = initialSize;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        taskQueue = new LinkedBlockingQueue<>(queueSize);
        workers = new ArrayList<>(initialSize);
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.rejectedExecutionHandler = rejectedHandler;

        // 初始化线程池时,创建并调用 start 方法启动工作线程
        for (int i = 0; i < initialSize; i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            workers.add(workerThread);
        }
    }

    /**
     * 添加任务并执行,由于使用了阻塞队列,因此无需通知工作线程
     * @param task 任务
     */
    @Override
    public void execute(Runnable task) {
        // 线程池已关闭后,不允许再添加任务
        if (isShutdown) {
            throw new IllegalStateException("ThreadPool is shutdown.");
        }

        // 当前工作线程数 < 核心线程数时,启动新的线程来执行任务
        if (workers.size() < coreSize) {
            addWorkerThread(task);
        } else if (!taskQueue.offer(task)) {    // check 任务队列是否已满,未满则添加进入,已满则进入分支
            // 当前工作线程数 < 最大线程数时,启动新的(临时)线程来执行任务
            if (workers.size() < maxSize) {
                addWorkerThread(task);
            } else {
                // 使用饱和拒绝策略
                rejectedExecutionHandler.rejectedExecution(task, this);
            }
        }
    }

    /**
     * 启动新的工作线程,将任务放入队列中以执行
     * @param task 任务
     */
    private void addWorkerThread(Runnable task) {
        WorkerThread workerThread = new WorkerThread();
        workerThread.start();
        workers.add(workerThread);
        taskQueue.offer(task);
    }

    /**
     * 关闭线程池(优雅):
     * 1. 修改 isShutdown 标志;
     * 2. 遍历所有工作线程,中断它们(interrupt() 方法并不会立即执行中断,取决于其线程本身)
     */
    @Override
    public void shutdown() {
        isShutdown = true;

        for (WorkerThread thread : workers) {
            thread.interrupt();
        }
    }

}

测试:

@Test
    public void testV2() throws InterruptedException {
        SimpleThreadPool pool = new SimpleThreadPool(3, 3, 5, 8, 10, TimeUnit.MILLISECONDS);

        // 控制任务数量,不要让饱和拒绝策略触发
        for (int i = 0; i < 12; i++) {
            pool.execute(() -> {
                for (int j = 0; j < 100; j++) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }

        Thread.sleep(10_000_000);   // 让主线程一直睡眠,方便我们观察结果
        pool.shutdown();
    }

我们定义了核心线程数为 3,最大线程数为 5,任务队列大小为 8。所以任务数量 >= 13 (5 + 8)时,就会触发拒绝策略;当所有任务都执行完毕时,会有 2(5 - 3)个临时线程被移除:

WorkerThread Thread-0, executing task: My Task...
WorkerThread Thread-3, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-4, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-0, executing task: My Task...
WorkerThread Thread-3, executing task: My Task...
WorkerThread Thread-4, executing task: My Task...
WorkerThread Thread-2, executing task: My Task...
WorkerThread Thread-1, executing task: My Task...
Temp worker thread Thread-0, exit workers queue
Temp worker thread Thread-3, exit workers queue
...... 阻塞 ......
上次编辑于: