Java 线程池
什么是线程池
前言
大家可能都听说过线程池、连接池、内存池、对象池等。那究竟这些池到底是什么呢?
其实就是一种池化技术,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。
所谓的线程池也是池化技术的一种具体实现。下面将以线程池为例做总结。
创建一个线程
Java创建一个线程通常有两种方法,一种是实现Runnable接口,另一种是继承Thread类。还有两种是衍生的方法,一种是通过Callable和FutureTask创建线程,另外一种是通过线程池创建线程。
下面以实现Runnable接口为例,做说明。
public class App {
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程运行中");
}
}).start();
}
}
新线程被创建了之后就可以利用多核CPU,处理更多线程了。当一个任务结束后,当前线程就回收。
但很多时候,我们不止会执行一个任务。如果每次都是如此的创建线程->执行任务->销毁线程,会造成很大的性能开销。
那能否一个线程创建后,执行完一个任务后,又去执行另一个任务,而不是销毁。这就是线程池。
这也就是池化技术的思想,通过预先创建好多个线程,放在池中,这样可以在需要使用线程的时候直接获取,避免多次重复创建、销毁带来的开销。
线程池的简单使用
Java创建线程池
import java.util.concurrent.*;
public class App {
public static void main(String[] args) throws Exception {
ExecutorService executorService = new ThreadPoolExecutor(1, 1,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程池");
}
});
executorService.shutdown();
}
}
通过上面代码不难理解,Java创建线程池很简单,直接通过JDK提供的ThreadPoolExecutor构造就可以。也可以通过Executors静态工厂构建,但一般不建议。
线程池构造函数
ThreadPoolExecutor部分源码展示
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
如果把线程池比作一个公司。公司会有正式员工处理正常业务,如果工作量大的话,会雇佣外包人员来工作。
闲时就可以释放外包人员以减少公司管理开销。一个公司因为成本关系,雇佣的人员始终是有最大数。
如果这时候还有任务处理不过来,就走需求池排任务。
-
acc : 获取调用上下文
-
corePoolSize: 核心线程数量,可以类比正式员工数量,常驻线程数量。
-
maximumPoolSize: 最大的线程数量,公司最多雇佣员工数量。常驻+临时线程数量。
-
workQueue:多余任务等待队列,再多的人都处理不过来了,需要等着,在这个地方等。
-
keepAliveTime:非核心线程空闲时间,就是外包人员等了多久,如果还没有活干,解雇了。
-
threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。每个公司对员工的要求不一样,恩,在这里设置员工的属性。
-
handler:线程池拒绝策略,什么意思呢?就是当任务实在是太多,人也不够,需求池也排满了,还有任务咋办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了。
添加一个任务
接着,我们看一下线程池中比较重要的execute方法,该方法用于向线程池中添加一个任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 第一个
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 第二个
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 第三个
reject(command);
}
第一个判断:workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;
第二个判断:判断线程池是否在运行,如果在,任务队列是否允许插入,插入成功再次验证线程池是否运行,如果不在运行,移除插入的任务,然后抛出拒绝策略。如果在运行,没有线程了,就启用一个线程。
第三个判断:如果添加非核心线程失败,就直接拒绝了。
添加worker线程
从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码如下(这里代码有点长,没关系,也是分块的,总共有5个关键的代码块):
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
这段代码是做是否能够添加工作线程条件过滤: 判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
这段代码是做做自旋,更新创建线程数量: 通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程
有人或许会疑问 retry 是什么?这个是java中的goto语法。只能运用在break和continue后面。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
上面代码获取线程池主锁。 线程池的工作线程通过Woker类实现,通过ReentrantLock锁保证线程安全。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
上面代码添加线程到workers中(线程池中)。
if (workerAdded) {
t.start();
workerStarted = true;
}
上面代码启动新建的线程。
看完了上面的代码,估计大家还有一个疑问,workers是什么? 看看源码就知道其实是一个hashSet。所以,线程池底层的存储结构其实就是一个HashSet。
private final HashSet<Worker> workers = new HashSet<Worker>();
worker线程处理队列任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 第一处
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 第二处
Throwable thrown = null;
try {
task.run(); // 第三处
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 第四处
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
-
第一处:是否是第一次执行任务,或者从队列中可以获取到任务。
-
第二处:获取到任务后,执行任务开始前操作钩子。
-
第三处:执行任务。
-
第四处:执行任务后钩子。
这两个钩子(beforeExecute,afterExecute)允许我们自己继承线程池,做任务执行前后处理。
到这里,源代码分析到此为止。
总结
-
所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中。
-
只有当阻塞队列满了后,才会触发非核心线程的创建。所以非核心线程只是临时过来打杂的。直到空闲了,然后自己关闭了。
-
线程池提供了两个钩子(beforeExecute,afterExecute)给我们,我们继承线程池,在执行任务前后做一些事情。
-
线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)
声明
转载自 https://mp.weixin.qq.com/s/-89-CcDnSLBYy3THmcLEdQ
如果觉得对你有帮助,我就没白写,如果需要交流,可以留言,或者加我wx