线程池详解(2):异常处理
# 问题描述
工作中写了一个批量处理数据的任务,由于数据量较大,使用了线程池加快处理速度。代码如下:
long start = System.currentTimeMillis();
try {
for (int i = 0; i < array.size(); i++) {
threadPoolExecutor.submit(()->{
//业务代码
});
}
}catch (Exception e){
logger.error("数据处理失败",e);
throw e;
}
logger.info("数据处理结束耗时{}ms",System.currentTimeMillis()-start);
2
3
4
5
6
7
8
9
10
11
12
测试时搜索日志,一看到“数据处理结束耗时”,以为处理完成,实际上线程池执行的任务有业务代码报错,这些报错并不会被主线程捕获,给发现问题带来一定难度。
后来通过submit
方法返回的Future
对象的get
方法触发内部抛出的异常。
为了加深理解,避免下次出错,决定再次阅读线程池源码。
# 任务封装
向线程池提交任务代码如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
2
3
这三种方式都将入参封装成RunnableFuture
对象,该对象是继承了Runnable
和Future
的接口,。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
2
3
通过newTaskFor
方法可知其实现类为FutureTask
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
2
3
4
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
将任务封装成Callable
对象,保存在FutureTask
中。之后这个FutureTask
被提交到线程池中。
到这里我们发现,我们向线程池提交的Runnable
任务有run
方法,封装成的FutureTask
对象实现了Runnable
接口也有run
方法,最终任务封装到Callable
对象还有个call
方法,那么这三个执行方法到底是在何时被调用呢?
# 执行谁的任务
线程池详解(1) (opens new window)中介绍了线程池如何调度任务。
当工作线程获取一个待执行任务对象时,这个对象并不是我们最初通过submit
方法向线程池提交的任务对象,而是经过封装的FutureTask
对象。
所以当调用任务对象的run
方法时,并不是直接调用我们传进去的Runnable
对象的run
方法,而是FutureTask.run()
。
查看FutureTask#run
源码:
public void run() {
//判断任务未执行,CAS方式设置任务执行线程为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
//任务封装在该对象内
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//call方法内调用由开发者提供的run方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//封装异常
setException(ex);
}
if (ran)
//如果执行成功,封装结果
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
可以看到FutureTask
的run
方法对执行结果做了封装,如果执行出错,会捕获异常并封装,并不会再向外部抛出。这也就是主线程没有感知到异常的原因。
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
最终将结果放到了FutureTask.outcome
上,并设置了相应的最终状态。
关于FutureTask源码的详细分析在另一篇博客中,点击跳转 (opens new window)
另外,在回顾ThreadPoolExecutor.runWorker (opens new window)方法时发现,线程池预留了beforeExecute
和afterExecute
两个方法,分别在任务执行前和执行后调用,且afterExecute
方法入参会传入捕获的异常,但是从上面的分析可知这里调用的是FutureTask.run
方法,并不会向外抛出异常。
实际断点测试也发现,这里不会捕获到异常。