线程池是如何关闭非核心线程的

在 Java 中,多线程的核心实现类是 ThreadPoolExecutor,该类提供了多线程的几个参数,用于开发人员自定义自己的线程池。

线程池的参数

1
2
3
4
5
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//...
}

线程池一共有 7 个参数,其中跟本次相关的有三个,分别是 corePoolSize、maximumPoolSize、keepAliveTime,这三个参数代表的意思如下:

  • corePoolSize:当线程池中的任务数小于 corePoolSize 或者线程池中的任务数大于 corePoolSize 但是小于阻塞队列的最大长度,那么线程池中的核心线程数就睡 <= corePoolSize
  • maximumPoolSize:当线程池中的任务数已经达到队列上限并且线程池中的线程数 < maximumPoolSize,此时线程池就会将线程数增加至 maximumPoolSize
  • keepAliveTime:代表线程的空闲时间,也就是线程等待多久以后可以被销毁

如果一个线程池中,线程数已经达到了 maximumPoolSize,如果后续任务数减少,此时就会销毁多余的线程,具体保留多少线程数还是需要依据线程池的具体参数,
例如如果配置了 allowCoreThreadTimeOut,则所有线程都有被回收的可能。

线程池的状态

线程池一共有五种状态,按照源代码中的定义分别如下:

1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS;-536870912
private static final int SHUTDOWN = 0 << COUNT_BITS;0
private static final int STOP = 1 << COUNT_BITS;536870912
private static final int TIDYING = 2 << COUNT_BITS;1073741824
private static final int TERMINATED = 3 << COUNT_BITS;1610612736

其中状态的变化如下图:

  • RUNNING 表示该线程池一切正常,可以接受新的任务并且队列中的任务也会执行
  • SHUTDOWN 表示该线程池拒绝接受新的任务,处于队列中的任务也会执行完毕
  • STOP 表示该线程池拒绝接受新的任务,而且队列中的任务不会执行,并且会向正在执行中的任务发送一个中断指令
  • TIDYING 表示所有线程已经被终止,而且工作线程已经为 0,线程池即将去执行 terminated 钩子函数
  • TERMINATED terminated() 函数已经执行完毕,线程池被销毁

上面铺垫这么多了,现在可以回到主题了

线程池是如何关闭非核心线程的

首先,还记得第一个标题里面的 keepAliveTime、workQueue 吗,线程池中的线程如果需要被销毁,满足下面任意一个条件即可:

  1. 线程中的业务代码执行出现了异常
  2. 线程池中的线程在等待了 keepAliveTime 后还是拿不到任务

下面从线程开始获取任务的地方开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果允许核心线程被销毁或者此时总线程数 > core,可以进行线程的销毁
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 当 timed 为 true,那么会等待 timeout 时间,如果为 false,则直接阻塞,除非拿到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

在 getTask 方法中,第一步会判断线程池的状态,如果是处于 SHUTDOWN && 队列为空,或者是处于 STOP 状态,则可以直接减少当前的工作数量了,因为线程池的状态流转到 TIDYING 的条件是:线程数为 0 且队列为空,而此时队列已经是空了,所以只需要减少当前的工作线程数就可以完成状态的流转。

如果此时的线程池状态为 RUNNING,那么会进行两个判断:

  1. 是否允许 core 线程销毁或
  2. 当前的线程数是否大于 corePoolSize

这两个判断就决定当前线程是否会被销毁,线程池中的线程通过 poll(long timeout, TimeUnit unit) 这个函数来获取任务,在等待 timeout 时间以后,就会将 timedOut 设置为 true,表示已经等待了 keepAliveTime 时间了,但是还没有拿到任务。

此时 (wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()) 这个判断就很容易通过了,于是会通过 CAS 操作原子性的减少工作线程数,一旦设置成功,则会直接返回 null,然后就会进行线程的销毁。

processWorkerExit

当拿不到任务的时候,就会执行 processWorkerExit,如果线程执行的过程中出现了异常也是从这个方法进入并进行销毁的,不过线程执行出错的情况下 completedAbruptly 是 true。

1
2
3
4
5
while (task != null || (task = getTask()) != null) {
// ...
} finally {
processWorkerExit(w, completedAbruptly);
}

processWorkerExit 主要是做了以下几个操作:

  1. 上报该 worker 已经执行完毕的任务数,并且从 workers 集合中移除该 worker。
  2. 执行 tryTerminate()
  3. 判断是否是异常导致的线程销毁,如果是的则补充一个 worker,如果不是的,则判断当前 worker 数量是否大于线程池允许的最小数,如果小于的话也会补充 worker。

其中最关键是这里面的第2步 tryTerminate 方法

tryTerminate

tryTerminate 会判断当前线程池的状态,如果符合一定的条件,那么就会让线程池向 TERMINATED 进行转换。

如果此时的线程池满足以下任意一个条件,则不会做任何的操作,直接返回:

  1. 线程池处于 RUNNING、TIDYING、TERMINATED。
  2. 正在处于 SHUTDOWN,但是队列中还有等待执行的任务。

当一个线程池不满足以上的情况的时候,那么剩下的就是:

  1. 处于 STOP
  2. 处于 SHUTDOWN,但是队列已空

从上面得知,STOP 状态下是队列中的任务是不会进行处理的,所以此时将线程池状态由 STOP 向 TERMINATED 是合理的。而如果处于 SHUTDOWN 且队列为空,此时也是可以向 TERMINATED 转移的。而且在转移的过程中,如果判断此时线程池中的 worker 数量大于 0,那么会直接随机中断一个线程,然后返回,保留线程池的当前状态。

这里之所以尝试中断一个线程,是因为假设线程池在执行了 shutdown 之后,如果队列中还有任务,此时并不会向 TERMINATED 转换,但是此时由于 shutdown 中的中断方法已经执行了,所以后续就需要再次中断 worker,而这里加一个判断,就是处理这种情况。

而如果上述的判断都是 false,此时证明已经可以转为 TERMINATED 了。转换的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}

修改线程状态为 TIDYING,然后执行 terminated() 函数,最后再唤醒所有的 termination()

termination

termination 这个变量是一个 condition,在 ThreadPoolExecutor 中只有两个地方出现过,一个是在 awaitTermination 中,另一个就是在 termination,不过在 awaitTermination 是等待获取线程池的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

而线程池已经到了 TERMINATED,自然可以唤醒 termination了。

作者

Somersames

发布于

2021-07-21

更新于

2021-12-05

许可协议

评论