线程池源码剖析

使用

我们 JDK 中提供了一些封装好的线程池提供直接使用,比如

  • newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池
1
2
3
4
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

  • newSingleThreadExecutor:返回一个核心线程数为 1 的线程池
1
2
3
4
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

  • newCachedThreadPool:大同小异
1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建

阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池,因为可能会出现OOM的问题(提问:出现OOM应该怎么排查问题?),所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
200, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit.SECONDS, // 非核心工作线程在阻塞队列位置等待的单位
new LinkedBlockingQueue<>(), // 阻塞队列,存放任务的地方
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种
);

for (int i = 0; i < 10; i++) {
MyTask task = new MyTask();
executor.execute(task);
}

// 关闭线程
executor.shutdown();

}
}

class MyTask implements Runnable {
@Override
public void run() {
System.out.println("我被执行了....");
}
}

源码

执行流程:

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

// 执行ThreadPoolExecutor的构造方法进行初始化
// corePoolSize: 核心线程数
// maximumPoolSize: 最大线程数
// keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间
// unit: 非核心工作线程在阻塞队列位置等待的时间单位
// workQueue: 存放任务的阻塞队列
// threadFactory: 线程工厂(生产线程的地方)
// RejectedExecutionHandler: 拒绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 核心线程数可以为0
// 最大线程数不为0
// 最大线程数 大于 核心线程数
// 等待时间大于等于0
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 将当前的入参赋值给成员变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

我们上面初始化的过程主要对入参做了一些校验,然后将方法的入参赋予给成员变量

拒绝策略

  • AbortPolicy(默认策略)

简单粗暴,直接抛出异常

1
2
3
4
5
6
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
  • CallerRunsPolicy

当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理(适合用于处理比较重要的任务,不可丢失,但是实习的时候进行CR的时候听说可能会阻塞主线程)

1
2
3
4
5
6
7
8
9
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果当前的
if (!e.isShutdown()) {
r.run();
}
}
}
  • DiscardOldestPolicy

如果当前的阻塞队列满了,弹出时间最久的

1
2
3
4
5
6
7
8
9
10
11
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 获取阻塞队列,弹出一个时间最久的
e.getQueue().poll();
// 执行当前的
e.execute(r);
}
}
}
  • DiscardPolicy

简单粗暴,不做任何操作

1
2
3
4
5
6
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

  • 自定义拒绝策略
1
2
3
4
5
6
7
public static class MyRejectedExecution implements RejectedExecutionHandler{

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("这是我自己的拒绝策略");
}
}

其余变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 该数值代表两个意思:
// 高3位表示当前线程池的状态
// 低29位表示当前线程池工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;

// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }

线程池的状态变化流程图

线程池的execute方法

  • Step1:当前的线程池个数低于核心线程数,直接添加核心线程即可

  • Step2:当前的线程池个数大于核心线程数,将任务添加至阻塞队列中

  • Step3:如果添加阻塞队列失败,则需要添加非核心线程数处理任务

  • Step4:如果添加非核心线程数失败(满了),执行拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void execute(Runnable command) {
// 如果当前传过来的任务是null,直接抛出异常即可
if (command == null)
throw new NullPointerException();
// 获取当前的数据值
int c = ctl.get();

//==========================线程池第一阶段:启动核心线程数开始==================================================
// Step1:获取ctl低29位的数值,与我们的核心线程数相比
if (workerCountOf(c) < corePoolSize) {
// Step2:添加一个核心线程
if (addWorker(command, true)){
return;
}
// 更新一下当前值
c = ctl.get();
}
//==========================线程池第一阶段:启动核心线程数结束==================================================

// 如果走到下面会有两种情况:
// 1、核心线程数满了,需要往阻塞队列里面扔任务
// 2、核心线程数满了,阻塞队列也满了,执行拒绝策略

//==========================线程池第二阶段:任务放至阻塞队列开始==================================================
// 判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
// 如果是Running的状态,则可以将任务放至阻塞队列中
// 这里如果放阻塞队列失败了,证明阻塞队列满了
if (isRunning(c) && workQueue.offer(command)) {
// 再次更新数值
int recheck = ctl.get();
// 再次校验当前的线程池状态是不是Running
// 如果线程池状态不是Running的话,需要删除掉刚刚放的任务
if (!isRunning(recheck) && remove(command)){
// 执行拒绝策略
reject(command);
}
// 如果到这里,说明上面阻塞队列中已经有数据了
// 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务
// 不能让人家堵塞着
else if (workerCountOf(recheck) == 0){
addWorker(null, false);
}
}
//==========================线程池第二阶段:任务放至阻塞队列结束==================================================

// 如果走到这里的逻辑,证明上面的逻辑没走通,有以下两种情况:
// 1、线程池的状态不是Running
// 1.1 如果是这种情况,下面的添加非核心工作线程失败执行拒绝策略,但这个并不是这个逻辑的重点
// 2、阻塞队列添加任务失败(阻塞队列满了)
// 2.1 这种情况才是我们需要关心的
// 2.2 阻塞队列满了,添加非核心工作线程
// 2.3 若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
//==========================线程池第三阶段:启动非核心线程数开始==================================================
// 添加一个非核心工作线程
else if (!addWorker(command, false))
// 工作队列中添加任务失败,执行拒绝策略
reject(command);
//==========================线程池第三阶段:启动非核心线程数结束==================================================
}

线程池的addWorker方法

addWorker`方法也是一个很关键的方法, 添加线程到线程池,返回 true 表示创建 Worker 成功,且启动线程。

校验

  • 校验当前线程池的状态
  • 校验当前线程池工作线程的个数(核心线程数、最大工作线程数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里主要是为了结束整个循环
retry:
for (;;) {
// 获取当前线程池的数值(ctl)
int c = ctl.get();

// runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值
int rs = runStateOf(c);

//==========================线程池状态判断=============================================================
// rs >= SHUTDOWN:代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常
// 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)
// 如果当前的状态是SHUTDOWN状态并且阻塞队列任务不为空且新任务为空
// 需要新起一个非核心工作线程去执行任务
// 如果不是前面的,直接返回false即可
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
return false;
}

//==========================工作线程个数判断==========================================================
for (;;) {
// 获取当前线程池中线程的个数
int wc = workerCountOf(c);
// 1、如果线程池线程的个数是否超过了工作线程的最大个数
// 2、core=true(核心线程)=false(工作线程)
// 2.1 根据当前core判断创建的是核心线程数(corePoolSize)还是非核心线程数(maximumPoolSize)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
return false;
}
// 尝试将线程池线程加一
if (compareAndIncrementWorkerCount(c)){
// CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
break retry;
}

// 获取当前线程池的数值(ctl)
c = ctl.get();
// 获取当前线程池的状态
// 判断当前线程池的状态等不等于我们上面的rs
// 我们线程池的状态被人更改了,需要重新跑整个for循环判断逻辑
if (runStateOf(c) != rs){
continue retry;
}
}
}
// 省略下面的代码
}

添加线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 // 下面开始真正创建线程了
// 运行标记,表示创建的 worker 是否已经启动,false未启动 true启动
boolean workerStarted = false;
// 添加标记,表示创建的 worker 是否添加到池子中了,默认false未添加,true是添加。
boolean workerAdded = false;
Worker w = null;
try {
//【创建 Worker,底层通过线程工厂 newThread 方法创建执行线程,指定了首先执行的任务】
w = new Worker(firstTask);
// 将新创建的 worker 节点中的线程赋值给 t
final Thread t = w.thread;
// 这里的判断为了防止 程序员自定义的 ThreadFactory 实现类有 bug,创造不出线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 这里为什么要加全局锁?因为下面的操作是添加worker,但是万一其他调用了shotdown()或者shotdownNow()方法
// 那我到底是添加还是不添加?这是一个纠结的事情,当我们点开shotdown()或者shotdownNow()方法时,会发现也有一个
// 全局lock锁,所以加锁的原因是不想添加worker与关闭线程池冲突!
mainLock.lock();
try {
// 获取最新线程池运行状态
int rs = runStateOf(ctl.get());
// 判断线程池是否为RUNNING状态,不是再【判断当前是否为SHUTDOWN状态且firstTask为空,特殊情况】
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当线程start后,线程isAlive会返回true,这里还没开始启动线程,如果被启动了就需要报错
if (t.isAlive())
throw new IllegalThreadStateException();
//将新建的 Worker 添加到线程池中
workers.add(w);
int s = workers.size();
// 当前池中的线程数量是一个新高,更新 largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// 添加标记置为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功就【启动线程执行任务】
if (workerAdded) {
// 启动线程
t.start();
// 运行标记置为 true
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
// 清理工作,比如从线程池中移除。
addWorkerFailed(w);
}
return workerStarted;
}

private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 持有线程池全局锁,因为操作的是线程池相关的东西
mainLock.lock();
try {
//条件成立需要将 worker 在 workers 中清理出去。
if (w != null)
workers.remove(w);
// 将线程池计数 -1,相当于归还令牌。
decrementWorkerCount();
// 尝试停止线程池
tryTerminate();
} finally {
//释放线程池全局锁。
mainLock.unlock();
}
}

  • 这里注意一个点,SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务。

  • 这里为什么要加全局锁?因为下面的操作是添加worker,但是万一其他调用了shotdown()或者shotdownNow()方法,那我到底是添加还是不添加?这是一个纠结的事情,当我们点开shotdown()或者shotdownNow()方法时,会发现也有一个全局lock锁,所以加锁的原因是不想添加worker与关闭线程池冲突!

线程池的 worker 源码

Woker类是ThreadPoolExecutor类的内部类,见明知意,它是承担了一个“工人”干活,也就是工作线程的责任。

  • Worker类

每个 Worker 对象有一个初始任务,启动 Worker 时优先执行,这也是造成线程池不公平的原因。Worker 继承自 AQS,本身具有锁的特性,采用独占锁模式,state = 0 表示未被占用,> 0 表示被占用,< 0 表示初始状态不能被抢锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// worker 内部封装的工作线程
final Thread thread;
// worker 第一个执行的任务,普通的 Runnable 实现类或者是 FutureTask
Runnable firstTask;
// 记录当前 worker 所完成任务数量
volatile long completedTasks;

// 构造方法
Worker(Runnable firstTask) {
// 设置AQS独占模式为初始化中状态,这个状态不能被抢占锁
setState(-1);
// firstTask不为空时,当worker启动后,内部线程会优先执行firstTask,执行完后会到queue中去获取下个任务
this.firstTask = firstTask;
// 使用线程工厂创建一个线程,并且【将当前worker指定为Runnable】,所以thread启动时会调用 worker.run()
this.thread = getThreadFactory().newThread(this);
}
// 不可重入锁,重写了AQS中的方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
// 设置state为0,开始抢锁
setState(0);
return true;
}
}

  • Worker的工作方法run
1
2
3
4
5
// Worker#run
public void run() {
// 调用自身的runWoker方法
runWorker(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Worker#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取 worker 的 firstTask
Runnable task = w.firstTask;
// 引用置空,【防止复用该线程时重复执行该任务】
w.firstTask = null;
// 初始化 worker 时设置 state = -1,表示不允许抢占锁
// 这里需要设置 state = 0 和 exclusiveOwnerThread = null,可以被中断
w.unlock();
// true 表示发生异常退出,false 表示正常退出。
boolean completedAbruptly = true;
try {
// firstTask 不是 null 就直接运行,否则去 queue 中获取任务
while (task != null || (task = getTask()) != null) {
// worker 加锁,shutdown 状态下不允许线程被中断
w.lock();
// 说明线程池状态大于 STOP,目前处于 STOP/TIDYING/TERMINATION,此时给线程一个中断信号
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
// 线程不是处于中断的情况
!wt.isInterrupted())
// 中断线程,设置线程的中断标志位为 true
wt.interrupt();
try {
// 任务执行前的回调,空实现,可以在子类中自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,【任务执行的后置处理】
afterExecute(task, thrown);
}
} finally {
// 将局部变量task置为null,代表任务执行完成
task = null;
// 更新worker完成任务数量
w.completedTasks++;
// 解锁
w.unlock();
}
}
// getTask()方法返回null时会走到这里,表示queue为空并且线程空闲超过保活时间,【当前线程执行退出逻辑】
completedAbruptly = false;
} finally {
// 正常退出 completedAbruptly = false
// 异常退出 completedAbruptly = true,【从 task.run() 内部抛出异常】时,跳到这一行
processWorkerExit(w, completedAbruptly);
}
}

processWorkerExit()工作线程退出方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 正常退出 completedAbruptly = false,异常退出为 true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 条件成立代表当前 worker 是发生异常退出的,task 任务执行过程中向上抛出异常了
if (completedAbruptly)
// 从异常时到这里 ctl 一直没有 -1,需要在这里 -1
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 将当前 worker 完成的 task 数量,汇总到线程池的 completedTaskCount
completedTaskCount += w.completedTasks;
// 将 worker 从线程池中移除
workers.remove(w);
} finally {
mainLock.unlock(); // 解锁
}
// 尝试停止线程池,唤醒下一个线程
tryTerminate();

int c = ctl.get();
// 线程池不是停止状态就应该有线程运行【担保机制】
if (runStateLessThan(c, STOP)) {
// 正常退出的逻辑,是对空闲线程回收,不是执行出错
if (!completedAbruptly) {
// 根据是否回收核心线程确定【线程池中的线程数量最小值】
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 最小值为 0,但是线程队列不为空,需要一个线程来完成任务担保机制
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池中的线程数量大于最小值可以直接返回
if (workerCountOf(c) >= min)
return;
}
// 执行 task 时发生异常,有个线程因为异常终止了,需要添加
// 或者线程池中的数量小于最小值,这里要创建一个新 worker 加进线程池
addWorker(null, false);
}
}

线程复用的原理

在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中不停的检查是否有任务需要被执行,如果有则直接执行,也就是调用任务中的 run 方法,将 run 方法当成一个普通的方法执行,通过这种方式将只使用固定的线程就将所有任务的 run 方法串联起来。

  • 源码剖析

其实就是runWorker()方法

省略掉部分和复用无关的代码之后,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 释放锁 设置work的state=0 允许中断
boolean completedAbruptly = true;
try {
//一直执行 如果task不为空 或者 从队列中获取的task不为空
while (task != null || (task = getTask()) != null) {
task.run();//执行task中的run方法
}
}
completedAbruptly = false;
} finally {
//1.将 worker 从数组 workers 里删除掉
//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组 workers
processWorkerExit(w, completedAbruptly);
}
}

可以看到,实现线程复用的逻辑主要在一个不停循环的 while 循环体中。

  • 通过获取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务
  • 直接通过 task.run() 来执行具体的任务(而不是新建线程)

在这里,我们找到了线程复用最终的实现,通过取 Worker 的 firstTask 或者 getTask 方法从 workQueue 中取出了新任务,并直接调用 Runnable 的 run 方法来执行任务,也就是如之前所说的,每个线程都始终在一个大循环中,反复获取任务,然后执行任务,从而实现了线程的复用。


线程池源码剖析
http://example.com/2023/10/12/线程池源码剖析/
Author
Posted on
October 12, 2023
Licensed under