线程池详解(3):tomcat对ThreadPoolExecutor扩展
# JDK线程池
线程池详解(1) (opens new window)中已经详细介绍了JDK线程池的基本原理,JDK线程池的核心类是ThreadPoolExecutor
。在提交任务后,线程池会根据核心线程数、最大线程数、任务队列等参数来决定任务的执行方式:
- 如果当前线程数小于核心线程数,会创建新线程执行任务。
- 如果当前线程数大于核心线程数,任务会被放到任务队列中(执行队列的
offer
方法)。 - 如果任务队列满了(
offer
方法返回false
),且当前线程数小于最大线程数,会创建新线程执行任务。 - 如果当前线程数大于最大线程数,会执行拒绝策略。
# Tomcat线程池
如果希望在达到核心线程数后,任务不会被放到任务队列中,而是直接创建新线程执行,在达到最大线程数后,再放到任务队列中,可以使用Tomcat的线程池。
# 实现原理
实现自己的任务队列
TaskQueue
,将线程池作为TaskQueue
的属性字段,使得队列能够感知到线程池的线程数量重写队列的
offer
方法,当未达到最大线程数时,offer
方法返回失败,这样线程池就会创建新线程执行任务。当达到最大线程数时,向队列中添加任务。创建新线程时可能已经达到最大线程数,触发拒绝策略,所以线程池需要捕获拒绝策略抛出的异常,此时向队列中添加任务。
# 核心代码
# TaskQueue
该类继承了LinkedBlockingQueue
, 将线程池作为属性字段,重写了offer
方法对应原理的一、二点:
private transient volatile ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//没有传入线程池,那么无法判断线程数,直接保留原功能
if (parent==null) return super.offer(o);
//已经达到最大线程数,添加到队列
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//提交任务数小于工作线程数量,说明存在空闲线程,添加到队列中,让空闲线程去消费。 getSubmittedCount方法是Tomcat线程池扩展的方法,统计工作中的线程数,这里算是一个小优化,避免过度创建线程
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//未达到最大线程数,返回false,不添加到队列
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//并发情况下可能上述条件都绕过了,那么兜底就是添加到队列
return super.offer(o);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# ThreadPoolExecutor
Tomcat
的ThreadPoolExecutor
继承自java.util.concurrent.ThreadPoolExecutor
,重写了execute
方法,对应原理中的第三点:
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//捕获拒绝策略抛出的异常,向队列中添加任务
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 使用
TaskQueue taskQueue = new TaskQueue(200);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 20, 10L, TimeUnit.MILLISECONDS,taskQueue);
taskQueue.setParent(executor);
1
2
3
2
3
需要注意ThreadPoolExecutor
使用的是org.apache.tomcat.util.threads.ThreadPoolExecutor
。
编辑 (opens new window)
上次更新: 2024/10/21, 16:30:50