线程池详解(2):异常处理

# 问题描述

工作中写了一个批量处理数据的任务,由于数据量较大,使用了线程池加快处理速度。代码如下:

long start = System.currentTimeMillis();
try {
    for (int i = 0; i < array.size(); i++) {
        threadPoolExecutor.submit(()->{
            //业务代码
        });
    }
}catch (Exception e){
    logger.error("数据处理失败",e);
    throw e;
}
logger.info("数据处理结束耗时{}ms",System.currentTimeMillis()-start);
1
2
3
4
5
6
7
8
9
10
11
12

测试时搜索日志,一看到“数据处理结束耗时”,以为处理完成,实际上线程池执行的任务有业务代码报错,这些报错并不会被主线程捕获,给发现问题带来一定难度。

后来通过submit方法返回的Future对象的get方法触发内部抛出的异常。

为了加深理解,避免下次出错,决定再次阅读线程池源码。

# 任务封装

向线程池提交任务代码如下:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
1
2
3

这三种方式都将入参封装成RunnableFuture对象,该对象是继承了RunnableFuture的接口,。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
1
2
3

通过newTaskFor方法可知其实现类为FutureTask

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
1
2
3
4
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

将任务封装成Callable对象,保存在FutureTask中。之后这个FutureTask被提交到线程池中。

到这里我们发现,我们向线程池提交的Runnable任务有run方法,封装成的FutureTask对象实现了Runnable接口也有run方法,最终任务封装到Callable对象还有个call方法,那么这三个执行方法到底是在何时被调用呢?

# 执行谁的任务

线程池详解(1) (opens new window)中介绍了线程池如何调度任务。

当工作线程获取一个待执行任务对象时,这个对象并不是我们最初通过submit方法向线程池提交的任务对象,而是经过封装的FutureTask对象。

所以当调用任务对象的run方法时,并不是直接调用我们传进去的Runnable对象的run方法,而是FutureTask.run()

查看FutureTask#run源码:

public void run() {
    //判断任务未执行,CAS方式设置任务执行线程为当前线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        //任务封装在该对象内
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //call方法内调用由开发者提供的run方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //封装异常
                setException(ex);
            }
            if (ran)
                //如果执行成功,封装结果
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

可以看到FutureTaskrun方法对执行结果做了封装,如果执行出错,会捕获异常并封装,并不会再向外部抛出。这也就是主线程没有感知到异常的原因。

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

最终将结果放到了FutureTask.outcome上,并设置了相应的最终状态。

关于FutureTask源码的详细分析在另一篇博客中,点击跳转 (opens new window)

另外,在回顾ThreadPoolExecutor.runWorker (opens new window)方法时发现,线程池预留了beforeExecuteafterExecute两个方法,分别在任务执行前和执行后调用,且afterExecute方法入参会传入捕获的异常,但是从上面的分析可知这里调用的是FutureTask.run方法,并不会向外抛出异常。

实际断点测试也发现,这里不会捕获到异常。

上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>