Java并发:JUC-Executor

Exeutor执行器框架

  • Future
    • RunnableFuture
  • Callable
  • Executor
    • ThreadPoolExecutor
  • CompletionService
  • RejectedExecutionhandler
    • ThreadPoolExecutor.DiscardPolicy
  • TimeUnit

Executor

对于并发执行的任务,Executor提供了大量可调节的选项,例如创建、关闭线程的策略,处理队列任务的策略,饱和策略,并且提供了几个钩子方法进行扩展行为。

但是与大部分功能强大的框架一样,有一部分参数并不能很好的工作,某些类型的任务需要特定的执行策略,一些参数组合则可能产生奇怪的结果。

概述

JUC提供了一种灵活的线程池实现作为Executor框架的一部分。线程池基于生产者-消费者模式,提交任务的执行者是生产者,执行任务的线程是消费者。

1
2
3
public interface Executot{
void execute(Runnable command);
}
  • 为灵活且强大的异步任务执行框架提供了基础
  • 该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦
  • 提供了对生命周期的支持,以及统计信息收集、应用程序管理机制以及性能监控等机制

示例

使用一个固定100线程的线程池

1550988548365

而且我们可以很容易地修改提交任务的方式,其对服务器的影响非常小:

1
2
3
4
5
public class ThreadPerTaskExecutor implements Executors{
public void execute(Runnable r){
new Thread(x).start();
}
}

执行策略

任务的提交与任务的执行解耦,可以更简单地为一个类给定的任务制定执行策略。执行策略是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求,通过限制并发任务的数量,可用确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。

将任务的提交与任务的执行分离,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。一个执行策略包含“what where when how”的因素

  • 任务在什么线程执行 what
  • 任务以什么顺序执行 what (FIFO,LIFO,优先级)
  • 可以由多少个任务并发执行 how many
  • 可以有多少个任务进入等待执行队列 how many
  • 如果系统过载,需要放弃一个任务,选择哪一个任务? which
    • 如何通知应用程序有任务被拒绝 HOW
  • 在一个任务执行之前与之后,应该做什么处理 what

生命周期

由于exectuor异步执行任务,所以之前提交的任务状态不能立即可见(有些已经完成,有些正在运行,有些在队列当中),如果将其关闭,可能出现各种问题。

Executor扩展了ExecutorService,其有三种状态:运行、关闭和已终止。

方法

  • void shutdown()。执行平缓的关闭,不再接受新任务,并等待已提交的任务执行完成
  • List<Runnable> shutdownNow()。执行粗暴的关闭,尝试取消所有运行中的任务,并不再启动队列中尚未开始的任务
  • boolean isShutdown()
  • boolean isTerminated()。轮询线程池终止
  • boolean awaitTermination(long timeout, TimeUnit unit) thorws InterruptedException。等待线程池到达终止,调用后通常会立即调用shutdown

当线程池关闭后,提交的任务将由“拒绝执行处理器Rejected Execution Handler”处理,将抛弃任务,或者排除一个未检查异常。

附加功能

使用Executor可用实现各种调优、管理、监视、记录日志、错误报告和其他功能。

适用性

  • 只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。
    • 如果将运行时较长于运行时间较短的任务混合一起,除非线程池很大,否则很可能拥塞
    • 如果提交的任务依赖于其他任务,除非线程池无限大,否则可能死锁。
  • 如果执行时间较长的任务存在很大的影响,则可以限定任务等待资源的时间,而不是无限制等待。

为什么要用

new Thread弊端

  • 每次新建对象,性能差
  • 线程缺乏统一管理,可能无限新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
  • 缺少更多功能,如更多执行、定期执行、线程中断

线程池好处

  • 重用存在的线程,减少对象创建、消亡的开销
  • 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
  • 提供定时执行、定期执行、单线程、并发控制等功能

在任务与执行策略间的隐性耦合

虽然Executor框架可以将任务的提交与任务的执行策略解耦开来,但是这种论断有一点言过其实。虽然Executor为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。有些类型的任务需要明确指定执行策略

  • 依赖性任务。如果提交给线程池的任务需要依赖其他任务,那么就隐含地给执行策略带来了约束,此时必须小心维持这些执行策略以避免产生活跃性问题
  • 使用线程封闭机制的任务。单线程的Executor能够对并发性做出更强的承诺,能确保任务不会并发执行,使你放宽代码对线程安全的要求。则此时当线程池更改将失去线程安全性
  • 对响应时间敏感的任务。如果将应该运行时间较长的任务提交给单线程的Executor将导致性能下降
  • 使用ThreadLocal的任务。ThreadLocal使得线程可以拥有某个变量的私有版本。但是只要条件允许,Executor可以自由重用这些线程。因此只有当线程本地值的生命周期受限于任务的生命周期,线程池中使用ThreadLocal才有意义。

线程池

  • 可用重用现有的线程而部署创建新线程,可用在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销
  • 当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行
  • 可通过调整线程池大小,创建足够的线程以便使处理器保持忙碌,并防止多线程相互竞争导致

是什么

线程池是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销

线程池包括四个基本组成部分

  • 线程管理器:用于创建并管理线程池,包括创建线程、销毁线程池、添加新任务
  • 工作线程:线程池中线程,在没有任务时处于等待状态,可以循环地执行任务
  • 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,主要规定了任务的入口,任务执行完成的收尾工作,任务的执行状态等。
  • 任务队列:用于存放没有处理的任务,提供一种缓冲机制。

线程池实例的状态

  • running
    • 能接受新提交的任务,并能处理阻塞队列的任务
  • shutdown
    • 关闭状态,不能接受新提交的任务,但能处理阻塞队列的任务
  • stop
    • 不能接受新提交的任务,也不能处理阻塞队列的任务
    • 会中断正在处理任务的线程
  • tidying
    • 如果所有任务都已经中止
  • terminated

1551616609267

分类

newFixedThreadPool

创建一个固定长度的线程池,每当提交一个任务就创建一个新的线程,直到达到线程池的最大数量

newCachedThreadPool

创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,将回收空闲线程,当需求增加时,则可用添加新的线程,其规模不存在限制

当任务间存在依赖时,可以选择。

newSingleThreadExecutor

是一个单线程的Executor,创建单个工作线程执行任务,如果线程异常结束,则创建另一个线程来替代。能够确保任务在队列中的顺序来串行执行。

单线程的Executor提供了大量的内部同步机制,确保了任务执行的任何内存写入操作对于后续任务都是可见的,即使这个线程会不时被另一个线程替代,但对象总是可以安全地封闭在任务线程中。

newScheduledThreadExecutor

创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。而Timer存在缺陷,应该考虑使用ScheduledThreadPoolExecutor代替。

Timer

负责管理延迟任务,例如在100ms后执行该任务,以及周期任务每10ms执行一次该任务。

  • Timer在执行任务时只会创建应该线程,如果某个任务的执行事件过长,将破坏其他TimerTask的定时精确性。
  • TimerTask如果抛出了一个未检查异常,而Timer线程并不会捕获异常,此时将终止定时线程,并且Timer也不恢复线程的执行,而是认为整个Timer都被取消了。

方法

  • execute
    • 提交任务,交给线程池执行
  • submit
    • 提交任务,能够返回执行结果
  • shutdown
    • 关闭线程池,等待任务都执行完
  • shutdownNow
    • 关闭线程池,不等待任务执行完
  • getTaskCount
    • 线程池已执行和未执行的任务总数
  • getCompletedTaskCount
    • 已完成的任务数
  • getPoolSize
    • 线程池当前的线程数量
  • getActiveCount
    • 当前线程池当中正在执行任务的线程数量

参数

  • corePoolSize:核心线程数量
  • maxmumPoolSize:线程最大线程数
    • 当核心线程都在工作,且阻塞队列满,且线程数目小于最大线程数,则创建新线程
    • 如果线程数目等于最大线程数,通过拒绝策略来处理新到来的任务
  • workQueue:阻塞队列,存储等待执行的任务
    • 直接切换
    • 无界队列,基于链表,创建的最大线程即核心线程数目
    • 有界队列,创建的最大线程数目为最大线程数
  • keepAliveTime:线程没有任务执行时最多保持多久时间终止
  • unit:时间单位
  • threadFactory:线程工厂,创建线程
  • rejectHandler:拒绝处理任务时的策略
    • 直接抛出异常,默认
    • 用调用者所在的线程执行任务
    • 丢弃队列中最靠前的任务
    • 直接丢弃这个任务

降低系统资源消耗:较大队列容量,较小线程池容量

设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。而设计线程池只要避免过大或过小的极端即可。

如果过大,则线程将在相对少的CPU和内存上竞争,会导致更高的内存使用量,还可能耗尽资源。如果国小,将导致许多空闲的CPU无法执行工作,而降低吞吐率。

设置线程池的大小,需要分析

  • 计算环境。即部署的系统有多少CPU
  • 资源预算。即部署的系统有多大的内存
  • 任务的特性。任务时CPU密集型、IO密集型还是二者皆可。是否需要JDBC连接这样的稀缺资源。如果执行不同类别的任务,则应该考虑使用多个线程池
    • CPU密集型任务,需要尽量压榨CPU,参考值设置为NCPU(CPU数量)+1
    • IO密集型任务,参照值设置为2*NCPU

并且可以通过在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平:

1565179219646

配置ThreadPoolExecutor

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){...}

线程的创建与销毁

线程池的基本大小corePoolSize、最大大小maximumPoolSize、存活时间keepAliveTime等因素共同负责线程的创建与销毁。

当某个线程的空闲时间超过了存活时间,则会被标记为可回收的,并在大小超过基本大小时将其终止。

  • newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时
  • newCachedThreadPool工厂方法将线程池的最大大小设置为MAX_VALUE,基本大小为0,并超时时间为1分钟。

管理队列任务

在有限的线程池中会限制可并发执行的任务数量。单线程的线程池确保不会有任务并发执行。

即使使用固定大小的线程池,在高负载下,应用程序仍可能耗尽资源。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累计起来。

在线程池当中,累计的请求将在由Executor管理的Runnable队列张等待,而不会像线程那样竞争CPU资源。但是即使如此在请求的速率很高的时候依然可能会耗尽资源。并且在耗尽资源前,响应性能也随着任务队列的增长而增长。

ThreadPoolExecutor允许提供一个BlockingQueue用于保存等待执行的任务。基本的任务排列方法有:无界队列有界队列同步移交。队列的选择也与线程池的其他配置有关。

无界队列

无界的LinkedBlockingQueue是newFixedThreadPool与newSingleThreadExecutor的默认选择

有界队列

一种更为稳妥的资源管理策略是使用有界队列,例如有界的ArrayBlockingQueue,有界的LinkedBlockingQueuePriorityBlockingQueue,但是带来了新的问题:即当队列填满后如何解决新的任务。而且有界队列的大小需要与线程池的大小一同调节

  • 如果线程池较小而队列较大,有助于减少内存使用量,降低CPU利用率,并且减少上下文切换。但可能会限制吞吐率

同步移交

对于非常大的或无界的线程池,可以通过使用SynchronousQueue来避免排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程间进行移交的机制。

要将一个元素放入SynchronousQueue必须有另一个线程正在等待接受这个元素,如果没有线程等待,并且线程池的当前大小小于最大值,那么将创建一个新的线程,否则根据饱和策略将拒绝任务。

使用移交将更加高效,因为任务直接交给了线程而不是放入队列。

控制优先级

使用PriorityBlockingQueue将根据任务的优先级来安排任务。

饱和策略

当有界队列被填满,则饱和策略开始发挥作用,ThreadPoolExecutor.setRejectedExecutionHandler进行设置。如果任务被提交到一个已被关闭的Executor时也会使用饱和策略。

JDK提供了几种不同的实现:AbortPolicyCallerRunsCallerRunPolicyDiscardPolicyDiscardOldestPolicy

Abort策略

默认的饱和策略,将抛出未检查的RejectedExecution,调用者可以捕获该异常,然后根据需求编写自己的处理代码。

Discard策略

当新提交的任务无法保存到队列中时,Discard策略将悄悄地抛弃任务

DiscardOldest策略

该策略会抛弃下一个即将执行的任务,然后重新提交新的任务。

CallerRuns策略

实现了一种调节机制,不会抛弃任务也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。即任务不会由线程池执行,而是由调用了execute的线程执行该任务,因此也会导致提交任务的线程在一段时间内无法提交任务。

线程工厂

当线程池需要创建一个线程时,都是提供线程工厂方法来完成的。默认的线程工厂方法会创建一个新的、非守护的线程,并且不包含特殊的配置信息。

可以指定一个线程工厂方法,定制线程池的配置信息。ThreadFactory.newThread()用于创建新线程。实现自己的线程工厂只需要实现该接口。

在调用构造函数后再定制ThreadPoolExecutor

即使构造完ThreadPoolExecutor也可以通过Setter修改大多数传递给它的构造函数的参数。

扩展ThreadPoolExecutor

任务

找出可利用的并行性

当使用线程池时,必须将任务描述为一个Runnable,多数服务器应用程序存在一个明显的任务边界:单个客户请求。而有些时候依然需要去发掘并行性,或者客户请求内部更深层次的并行性。

概述

任务的生命周期:

  • 创建
  • 提交
  • 开始
  • 完成

联系

Runnable与Callable都描述的是抽象的计算任务,这些任务通常有范围,即有一个明确的起始点,并且最终会结束。

任务设计

只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

而对于异构任务,很难通过并行化来获得重大的性能提升。

Runnable

Executor使用Runnable作为基本的任务表示形式,但它是一种具有很大局限的抽象,它不能返回一个值或者抛出一个受检查异常。

Callable

某些任务存在延迟的计算–执行数据库查询、从网络中获取资源、计算某个复杂的功能。Callable可以返回一个值,并抛出一个异常

方法

  • call。对于完成的任务,立即返回或抛出一个Exception。对于未完成任务,则阻塞直到任务完成。
    • 任务抛出异常,则将异常封装到ExecutionException。如果任务被取消,则CancellationException。

Future

表示一个任务的生命周期,并提供相应的方法判断是否已经完成或取消,以及获取任务的结果和取消任务。

在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退。Future是一个接口,它的功能:

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

创建Future

  • ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用于获得任务的执行结果或取消任务。
  • 显式指定Runnable或Callable实例化FutureTask

FutureTask实现了Runnable接口

方法

  • boolean cancel(boolean mayInterruptIfRunning);。用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
    • 参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true
  • boolean isCancelled();表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • boolean isDone();表示任务是否已经完成,若任务完成,则返回true;
  • V get() throwsInterruptedException, ExecutionException;用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • V get(longtimeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

时限

如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃。

在支持时限的Future.get中支持这种需求,当结果可用时会立即返回,如果在指定时限中没有计算出结果,将抛出TimeOutException

FutureTask

是什么

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

1
2
3
4
5
public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

FutureTask有3中状态:Waiting to Run、Running、Completed

应用

  • 用于异步获取执行结果或取消执行任务的场景,表示异步任务。当一个计算任务需要执行很长时间,那么就可以用FutureTask来封装这个任务,主线程在完成自己的任务之后再去获取结果。
  • 用做闭锁,即异步任务没有完成前,无法执行下一步

Code

FutureTask可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用FutureTask来封装这个任务,主线程在完成自己的任务之后再去获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class FutureTaskExample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});

Thread computeThread = new Thread(futureTask);
computeThread.start();

Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
other task is running...
4950

原理

  • Future.get():取决于任务的状态,如果任务已经完成,则会立即返回结果,否则阻塞直到任务完成,然后返回结果或抛出异常。

参考