线程池详解(1):基础原理

# 核心参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# corePoolSize

线程池常驻核心线程数。 添加任务时,如果线程池当前线程数量小于核心线程数,会创建新的线程来执行任务。如果超过核心线程数,会先尝试将任务缓存到队列中。

# maximumPoolSize

最大线程数。 如果队列满了,线程池中线程数小于最大线程数,则新建线程执行任务。 最大线程数的设置需要考虑任务的类型,如果是计算密集型,一般数量是CPU+1,保证最大化利用CPU的同时又要避免不必要的线程切换。如果是IO密集型,则线程数量可以适当增大。

# keepAliveTime

当线程数超过核心线程数时,这是多余线程在终止前等待新任务的最长时间。

# workQueue

BlockingQueue的任务队列。线程数达到核心线程数,之后submit的任务将被优先放到队列中。

# threadFactory

线程池用于创建线程的工厂,可以通过自定义threadFactory来指定线程名称。

# RejectedExecutionHandler

拒绝策略。当队列已满时的执行策略。

  1. ThreadPoolExecutor.AbortPolicy 抛出异常

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
    
    1
    2
    3
  2. ThreadPoolExecutor.DiscardPolicy 丢弃,啥也不做。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    
    1
    2
  3. ThreadPoolExecutor.DiscardOldestPolicy 删除队列头部的任务,将当前任务加入队列

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
    
    1
    2
    3
    4
    5
    6
  4. ThreadPoolExecutor.CallerRunsPolicy 由调用者线程来执行被拒绝的任务

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
    
    1
    2
    3
    4
    5

# 线程池的状态流转

线程池使用一个AtomicInteger字段来记录线程池的状态和线程数量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

高三位为状态,剩余29位用来计数线程数量。

# RUNNING

运行状态,可以接收新任务和处理队列中的任务

# SHUTDOWN

关闭状态,不接受新任务,但是会继续处理阻塞队列中的任务。

# STOP

不接受新任务,也不再处理队列中的任务,中断正在处理任务的线程。

# TIDYING

所有任务都已终止,线程数为0。会调用terminated()方法。

# TERMINATED

terminated()方法执行完后进入该状态。

# 执行过程

# 提交任务

AbstractExecutorService提供了三种submit方法,来支持RunnableCallable类型的任务。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

submit(Runnable task, T result)方法中result是在任务执行成功后返回的数据。

AbstractExecutorService将任务封装成RunnableFuture,交给实现execute方法的子类ThreadPoolExecutor

# 执行任务

在线程池中,用于处理任务的线程被封装成Worker。 这里的执行任务其实是新建Worker,或者将当前任务添加到队列中。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //工作线程小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //添加新的工作现场,并将当前任务作为它的第一个处理任务
        if (addWorker(command, true))
            return;
        //添加失败,说明有并发,重新获取控制参数
        c = ctl.get();
    }
    //检查线程池处于运行状态且能将任务添加到队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次检查线程池运行状态,因为检查状态和向队列添加任务不是原子的,可能向队列添加任务的时候,其他线程关闭了线程池,这时需要清除添加的任务,执行拒绝策略。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //线程池处于运行状态,那么检查一下当前是否有工作线程,因为可能线程池核心线程数为0,工作线程被回收。
        else if (workerCountOf(recheck) == 0)
            //新添加的工作线程,不指定任务,因为不确定刚添加的任务是不是已经被其他工作线程执行了。
            addWorker(null, false);
    }
    //走到这里可能是线程池关闭了,也可能是队列满了。对于前者,添加工作线程会被拒绝。对于后者,尝试新增工作线程,如果工作线程超过最大线程数也会被拒绝。两种情况都执行拒绝策略。
    else if (!addWorker(command, false))
        reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

由于检查线程池状态、创建工作线程、添加任务到队列这三个操作组合起来是非原子的,所以操作前后需要两次检查线程池状态。

拒绝情况有两种:

  • 线程池关闭

  • 工作线程数已达最大值

# 添加Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
        // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 这种情况下虽然线程池关闭,但是任然需要创建工作线程
        // 从上文excute方法解析可以知道,firstTask == null的发生场景是先检测到线程池处于运行状态,添加任务到队列成功,之后再检查线程池状态任然是运行状态。
        // 此时检测到线程池关闭了,那么这个关闭事件是发生在之后的,根据SHUTDOWN的状态定义:不再接收新任务,原有任务还会执行,所以继续创建工作线程。
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS机制
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //线程池状态变化了,回到开始的地方,重新判断
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁后再次检查状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 线程不能处于启动状态
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

# Worker运行

Worker实现了Runnablerun方法,run方法调用runWorker方法执行任务

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 初始化时state=-1,state>=0才能中断worker线程,这里设置state为0,允许中断线程
    boolean completedAbruptly = true;
    try {
        //如果任务为空,那么从队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池STOP状态,要中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //空实现,可以留给用户继承线程池类做一些定制操作,如打印日志
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //空实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //走到这里,说明没有任务了,正常情况下的执行完成
        completedAbruptly = false;
    } finally {
        //工作线程退出
        processWorkerExit(w, completedAbruptly);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

在while循环中,工作线程不断从队列中获取任务并调用任务的run方法执行任务。

# Worker从队列中获取任务

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池关闭 队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 核心线程是否设置存活时间 默认false
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //进入循环前timedOut是false,队列poll超时时会赋值为true,说明没有任务了,那么工作线程数减1,返回空,外部会执行工作线程的退出。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

allowCoreThreadTimeOut如果为false(默认),则即使在空闲时,核心线程仍保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。

# Worker结束执行

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 异常情况下结束的,worker数量还没有减1,这里减一下worker数量
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //看看线程池状态是否可以更新为TIDYING
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            //正常结束的情况下
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                //至少保留一个工作线程
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 线程池关闭

线程池关闭分为两种

  • shutdown 线程池状态修改为SHUTDOWN,中断闲置的Worker(阻塞在getTask的Worker),当闲置Worker被中断时,该Worker会进入processWorkerExit方法。

  • shutdownNow 线程池状态修改为STOP,中断所有Worker,返回未执行的任务。

上次更新: 2024/10/19, 04:02:06
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>