线程池详解(3):tomcat对ThreadPoolExecutor扩展

# JDK线程池

线程池详解(1) (opens new window)中已经详细介绍了JDK线程池的基本原理,JDK线程池的核心类是ThreadPoolExecutor。在提交任务后,线程池会根据核心线程数、最大线程数、任务队列等参数来决定任务的执行方式:

  1. 如果当前线程数小于核心线程数,会创建新线程执行任务。
  2. 如果当前线程数大于核心线程数,任务会被放到任务队列中(执行队列的offer方法)。
  3. 如果任务队列满了(offer方法返回false),且当前线程数小于最大线程数,会创建新线程执行任务。
  4. 如果当前线程数大于最大线程数,会执行拒绝策略。

# Tomcat线程池

如果希望在达到核心线程数后,任务不会被放到任务队列中,而是直接创建新线程执行,在达到最大线程数后,再放到任务队列中,可以使用Tomcat的线程池。

# 实现原理

  1. 实现自己的任务队列TaskQueue,将线程池作为TaskQueue的属性字段,使得队列能够感知到线程池的线程数量

  2. 重写队列的offer方法,当未达到最大线程数时,offer方法返回失败,这样线程池就会创建新线程执行任务。当达到最大线程数时,向队列中添加任务。

  3. 创建新线程时可能已经达到最大线程数,触发拒绝策略,所以线程池需要捕获拒绝策略抛出的异常,此时向队列中添加任务。

# 核心代码

# TaskQueue

该类继承了LinkedBlockingQueue, 将线程池作为属性字段,重写了offer方法对应原理的一、二点:

private transient volatile ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
    //没有传入线程池,那么无法判断线程数,直接保留原功能
    if (parent==null) return super.offer(o);
    //已经达到最大线程数,添加到队列
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //提交任务数小于工作线程数量,说明存在空闲线程,添加到队列中,让空闲线程去消费。 getSubmittedCount方法是Tomcat线程池扩展的方法,统计工作中的线程数,这里算是一个小优化,避免过度创建线程
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //未达到最大线程数,返回false,不添加到队列
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    //并发情况下可能上述条件都绕过了,那么兜底就是添加到队列
    return super.offer(o);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# ThreadPoolExecutor

TomcatThreadPoolExecutor继承自java.util.concurrent.ThreadPoolExecutor,重写了execute方法,对应原理中的第三点:

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        //捕获拒绝策略抛出的异常,向队列中添加任务
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 使用

TaskQueue taskQueue = new TaskQueue(200);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 20, 10L, TimeUnit.MILLISECONDS,taskQueue);
taskQueue.setParent(executor);
1
2
3

需要注意ThreadPoolExecutor使用的是org.apache.tomcat.util.threads.ThreadPoolExecutor

上次更新: 2024/10/21, 16:30:50
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>