线程池详解(1):基础原理
# 核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# corePoolSize
线程池常驻核心线程数。 添加任务时,如果线程池当前线程数量小于核心线程数,会创建新的线程来执行任务。如果超过核心线程数,会先尝试将任务缓存到队列中。
# maximumPoolSize
最大线程数。 如果队列满了,线程池中线程数小于最大线程数,则新建线程执行任务。 最大线程数的设置需要考虑任务的类型,如果是计算密集型,一般数量是CPU+1,保证最大化利用CPU的同时又要避免不必要的线程切换。如果是IO密集型,则线程数量可以适当增大。
# keepAliveTime
当线程数超过核心线程数时,这是多余线程在终止前等待新任务的最长时间。
# workQueue
BlockingQueue的任务队列。线程数达到核心线程数,之后submit的任务将被优先放到队列中。
# threadFactory
线程池用于创建线程的工厂,可以通过自定义threadFactory来指定线程名称。
# RejectedExecutionHandler
拒绝策略。当队列已满时的执行策略。
ThreadPoolExecutor.AbortPolicy 抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
1
2
3ThreadPoolExecutor.DiscardPolicy 丢弃,啥也不做。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
1
2ThreadPoolExecutor.DiscardOldestPolicy 删除队列头部的任务,将当前任务加入队列
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
1
2
3
4
5
6ThreadPoolExecutor.CallerRunsPolicy 由调用者线程来执行被拒绝的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
1
2
3
4
5
# 线程池的状态流转
线程池使用一个AtomicInteger字段来记录线程池的状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
4
5
6
7
8
9
10
11
12
13
14
15
高三位为状态,剩余29位用来计数线程数量。
# RUNNING
运行状态,可以接收新任务和处理队列中的任务
# SHUTDOWN
关闭状态,不接受新任务,但是会继续处理阻塞队列中的任务。
# STOP
不接受新任务,也不再处理队列中的任务,中断正在处理任务的线程。
# TIDYING
所有任务都已终止,线程数为0。会调用terminated()
方法。
# TERMINATED
terminated()
方法执行完后进入该状态。
# 执行过程
# 提交任务
AbstractExecutorService
提供了三种submit
方法,来支持Runnable
和Callable
类型的任务。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
submit(Runnable task, T result)
方法中result是在任务执行成功后返回的数据。
AbstractExecutorService
将任务封装成RunnableFuture
,交给实现execute
方法的子类ThreadPoolExecutor
。
# 执行任务
在线程池中,用于处理任务的线程被封装成Worker。 这里的执行任务其实是新建Worker,或者将当前任务添加到队列中。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//工作线程小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//添加新的工作现场,并将当前任务作为它的第一个处理任务
if (addWorker(command, true))
return;
//添加失败,说明有并发,重新获取控制参数
c = ctl.get();
}
//检查线程池处于运行状态且能将任务添加到队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池运行状态,因为检查状态和向队列添加任务不是原子的,可能向队列添加任务的时候,其他线程关闭了线程池,这时需要清除添加的任务,执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于运行状态,那么检查一下当前是否有工作线程,因为可能线程池核心线程数为0,工作线程被回收。
else if (workerCountOf(recheck) == 0)
//新添加的工作线程,不指定任务,因为不确定刚添加的任务是不是已经被其他工作线程执行了。
addWorker(null, false);
}
//走到这里可能是线程池关闭了,也可能是队列满了。对于前者,添加工作线程会被拒绝。对于后者,尝试新增工作线程,如果工作线程超过最大线程数也会被拒绝。两种情况都执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
由于检查线程池状态、创建工作线程、添加任务到队列这三个操作组合起来是非原子的,所以操作前后需要两次检查线程池状态。
拒绝情况有两种:
线程池关闭
工作线程数已达最大值
# 添加Worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 这种情况下虽然线程池关闭,但是任然需要创建工作线程
// 从上文excute方法解析可以知道,firstTask == null的发生场景是先检测到线程池处于运行状态,添加任务到队列成功,之后再检查线程池状态任然是运行状态。
// 此时检测到线程池关闭了,那么这个关闭事件是发生在之后的,根据SHUTDOWN的状态定义:不再接收新任务,原有任务还会执行,所以继续创建工作线程。
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS机制
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程池状态变化了,回到开始的地方,重新判断
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加锁后再次检查状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 线程不能处于启动状态
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Worker运行
Worker
实现了Runnable
的run
方法,run
方法调用runWorker
方法执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 初始化时state=-1,state>=0才能中断worker线程,这里设置state为0,允许中断线程
boolean completedAbruptly = true;
try {
//如果任务为空,那么从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池STOP状态,要中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空实现,可以留给用户继承线程池类做一些定制操作,如打印日志
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空实现
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//走到这里,说明没有任务了,正常情况下的执行完成
completedAbruptly = false;
} finally {
//工作线程退出
processWorkerExit(w, completedAbruptly);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
在while循环中,工作线程不断从队列中获取任务并调用任务的run方法执行任务。
# Worker从队列中获取任务
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池关闭 队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 核心线程是否设置存活时间 默认false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//进入循环前timedOut是false,队列poll超时时会赋值为true,说明没有任务了,那么工作线程数减1,返回空,外部会执行工作线程的退出。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
allowCoreThreadTimeOut
如果为false(默认),则即使在空闲时,核心线程仍保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。
# Worker结束执行
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 异常情况下结束的,worker数量还没有减1,这里减一下worker数量
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//看看线程池状态是否可以更新为TIDYING
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//正常结束的情况下
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
//至少保留一个工作线程
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 线程池关闭
线程池关闭分为两种
shutdown 线程池状态修改为
SHUTDOWN
,中断闲置的Worker(阻塞在getTask的Worker),当闲置Worker被中断时,该Worker会进入processWorkerExit
方法。shutdownNow 线程池状态修改为
STOP
,中断所有Worker,返回未执行的任务。