疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java基础系列Executor框架


 

 
一、Executor框架介绍
 
  Executor框架是JDK1.5之后出现的,位于juc包中,是并发程序设计的工具之一。各个版本以来一直在进行修正。
 
  Executor是执行者之意,表示任务的执行者,这里的任务指的是新的线程任务(实现Runnable接口的执行任务)。
 
  整个Executor执行者框架包括多个接口和类,甚至还涉及到阻塞队列的使用,协同实现任务的执行。
 
  下面是简单的Executor框架的类结构:
 
  
 
  从上面的类结构中我们可以看到Executor接口是整个框架的祖接口,它大致规划了框架的结构,并定义了执行方法execute(),这个方法需要一个Runnable作为入参,表示执行一个线程任务。从这里也可以看出来这个框架的主要思想:将要执行的任务和具体的执行进行解耦,任务的内容单独定义为一个线程,任务的执行交给该框架进行,只需要将任务提交给框架即可(这个后面会提到)。Runnable入参就表示定义为单独线程的任务内容,execute方法则是执行任务,整个框架定义的就是这样一个任务执行器。
 
二、Executor接口
 
  Executor接口是整个框架的总接口,正如上面所述,它描述了框架的主要实现思想:任务内容与执行的解耦。其源码很短,我们可以看看:
 
复制代码
 1 package java.util.concurrent;
 2 public interface Executor {
 3 
 4     /**
 5      * Executes the given command at some time in the future.  The command
 6      * may execute in a new thread, in a pooled thread, or in the calling
 7      * thread, at the discretion of the {@code Executor} implementation.
 8      *
 9      * @param command the runnable task
10      * @throws RejectedExecutionException if this task cannot be
11      * accepted for execution
12      * @throws NullPointerException if command is null
13      */
14     void execute(Runnable command);
15 }
复制代码
  这是一个单独的接口,其内部只有一个execute()方法。我们看看其注释:在将来某一时刻执行给定的指令,可能会在一个新的线程、或一个线程池中的线程、或在正调用的线程中执行,这取决于Executor接口的具体实现。
 
  注意:这个方法中的入参任务指令是必不可少的,不可传null,否则会报NullPointerException(空指针异常)。
 
三、ExecutorService接口
 
  ExecutorService接口继承了Executor接口,Executor接口仅仅描述了思想,定义了一个执行器,ExecutorService接口在其基础上进一步丰富了框架的接口,为框架定义了更多内容,包括:任务的提交,执行器的终止关闭等。
 
3.1 终止方法
 
1     void shutdown();
2     List<Runnable> shutdownNow();
  如上源码,ExecutorService中定义了两个终止方法,这两个方法并不完全相同,第一个方法shutDown()的作用是终止新任务的接收,已接收的任务却需要继续执行。这是保证已提交任务全部执行的终止方法。第二个shutDownNow()方法属于强效终止方法,它会试图停止正在执行的线程任务,并且不再执行处于等待状态的其他任务,并且会将这些任务以列表的方式返回。
 
  注意:第二个方法的试图停止,并不一定会停止,因为其实现会使用Thread.interrupt()方法来进行线程任务中断执行,但是如果任务线程不会响应该中断,则不会被终止。
 
3.2 任务提交方法
 
1     <T> Future<T> submit(Callable<T> task);
2     <T> Future<T> submit(Runnable task, T result);
3     Future<?> submit(Runnable task);
  这三个任务提交方法采用方法重载的方式定义,其实均是对execute方法的再封装,对其进行扩展而来。因为execute方法只能接受Runnable入参,切无返回值。submit提交任务却拥有返回值,而且可以接收两种格式的任务,Callable和Runnable两种。不同的方法参数和返回值也略有不同。
 
  第一种方法接收一个Callable入参,任务执行成功会返回一个表示该任务的Future,通过其get方法可获取到在Callable任务中指定的返回内容。
 
  第二种方法接收一个Runnable入参和一个指定的返回值,任务执行成功会返回一个表示该任务的Future,通过其get方法可以获取到之前的入参result的值,即入参result即为预设的返回值。
 
  第三种方法接收一个Runnable入参,任务执行成功会返回一个表示该任务的Future,通过get方法可得到null。
 
  我们通过下面的实例来进行验证:
 
复制代码
 1     public static void main(String[] args) throws Exception {
 2         ExecutorService executor = Executors.newCachedThreadPool();
 3         //第一种方法:入参为Callable
 4         Future<String> result1 = executor.submit(new Callable<String>() {
 5             @Override
 6             public String call() throws Exception {
 7                 return "task2";//此处的task2即为返回的内容,即future.get()的值
 8             }
 9         });
10         //第二种方法:入参为Runnable和T
11         Future<String> result2 = executor.submit(new Runnable() {
12             @Override
13             public void run() {
14                 System.out.println("mmp");
15             }
16         },"task1");//此处的task1即为返回的内容,即future.get()的值
17         //第三种方法:入参为Runnable
18         Future<?> result3 = executor.submit(new Runnable() {
19             @Override
20             public void run() {
21                 System.out.println("nnd");
22             }
23         });
24         System.out.println(result1.get());
25         System.out.println(result2.get());
26         System.out.println(result3.get());
27     }
复制代码
  执行结果为:
 
 View Code
  从上面的结果中也可以看出三个方法的不同之处。
 
3.3 invokeAny方法
 
1     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
2     <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  第一种方法表示执行给定的任务列表中的任务,如果某个任务成功完成,没有任何异常,则将该任务的结果返回,一旦成功或者异常被返回之后,任务列表中其他任务则取消执行,那么可以看出返回的必定是第一个执行成功的任务的结果或者最后一个任务的执行异常。
 
  第二个方法是在第一个方法的基础上加上一个超时限制,如果在超时期满之前完成了某个任务则返回该任务的结果,其余同上。
 
  注意:这里写到一旦成功或者异常被返回,其实这里如果第一个任务执行的时候出现了异常,则同样会被返回,同样其他任务取消执行。
 
  例子:
 
复制代码
 1     public static void main(String[] args) throws Exception {
 2         ExecutorService executor = Executors.newCachedThreadPool();
 3         List<Callable<String>> callables = new ArrayList<>();
 4         callables.add(new Callable<String>() {
 5             @Override
 6             public String call() throws Exception {
 7                 return "task1";
 8             }
 9         });
10         callables.add(new Callable<String>() {
11             @Override
12             public String call() throws Exception {
13                 return "task2";
14             }
15         });
16         callables.add(new Callable<String>() {
17             @Override
18             public String call() throws Exception {
19                 return "task3";
20             }
21         });
22         callables.add(new Callable<String>() {
23             @Override
24             public String call() throws Exception {
25                 return "task4";
26             }
27         });
28         String s = executor.invokeAny(callables);
29         System.out.println(s);
30     }
复制代码
  执行结果:
 
 View Code
3.4 invokeAll方法
 
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
  这两个方法和3.3的两个方法结构类似,第一个方法表示执行给定的任务列表中的任务,当列表中的所有任务执行完毕之后,返回所有任务的结果组成的列表,此时列表中所有的Future中的isDone均为true,表示所有任务均被执行。
 
  第二个方法同样在第一个方法的基础上加上了时限限制,表示在所有任务完成或者时限到期之后将所有任务的结果组成列表返回,此时列表中所有的Future中的isDone均为true
 
  注意:如果时限期满导致返回结果的话,那些未执行的任务的结果中是null,而isDone仍然为true,状态为6-INTERRUPTED(中断),而执行成功的任务的结果状态为2-COMPLETING(完成)
 
  例子:
 
复制代码
 1     public static void main(String[] args) throws Exception {
 2         ExecutorService executor = Executors.newCachedThreadPool();
 3         List<Callable<String>> callables = new ArrayList<>();
 4         callables.add(new Callable() {
 5             @Override
 6             public Object call() throws Exception {
 7                 return "task1";
 8             }
 9         });
10         callables.add(new Callable() {
11             @Override
12             public Object call() throws Exception {
13                 return "task2";
14             }
15         });
16         callables.add(new Callable() {
17             @Override
18             public Object call() throws Exception {
19                 return "task3";
20             }
21         });
22         List<Future<String>> futures = executor.invokeAll(callables,1900L,TimeUnit.MICROSECONDS);
23         for(Future<String> future:futures){
24             System.out.println(future.get());
25         }
26     }
复制代码
  测试第一个方法就将22行第二个和第三个参数去掉即可,这里设置1900毫秒在我的电脑上正好能有几率测试到完成一部分就超时的情况,其执行结果为:
 
 View Code
其返回结果为:
 
 
 
 
  全部成功的结果为:
 
 View Code
  结果为:
 
 
四、AbstractExecutorService
 
  这是一个抽象类,实现了ExecutorService接口。这是ExecutorService的默认实现,我们来看下AbstractExecutorService中实现的方法:
 
4.1 newTaskFor方法
 
  可以从中看出,AbstractExecutorService实现了之前我们介绍的ExecutorService中的大部分方法,包括三个任务提交方法,两个invokeAny和两个invokeAll方法,其中doInvokeAny方法是私有方法,被invokeAny调用,只是多出了两个newTaskFor方法。
 
  newTaskFor方法是做什么的呢?
 
  我们来看看源码:
 
1     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
2         return new FutureTask<T>(runnable, value);
3     }
4     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
5         return new FutureTask<T>(callable);
6     }
  可以看出,这两个newTaskFor是使用给定的参数组建一个FutureTask实例并返回。所以它的作用就是提供任务执行结果Future,只是这里提供的是FutureTask类型的Future,如果我们需要使用别的RunnableFuture的实现类型(FutureTask就是RunnableFuture的实现之一),我们可以自定义。
 
  这两个方法被submit方法所调用,用于在任务执行之前,将其包装起来,然后调用execute执行即可,之前我们看过,execute的入参是Runnable类型,此处FutureTask的超接口RunnableFuture就实现了Runnable接口。所以可以直接将包装过的任务直接作为execute的入参进行执行。
 
4.2 submit提交方法
 
  下面我们就来看看三个submit方法:
 
复制代码
 1     public Future<?> submit(Runnable task) {
 2         if (task == null) throw new NullPointerException();
 3         RunnableFuture<Void> ftask = newTaskFor(task, null);
 4         execute(ftask);
 5         return ftask;
 6     }
 7 
 8     public <T> Future<T> submit(Runnable task, T result) {
 9         if (task == null) throw new NullPointerException();
10         RunnableFuture<T> ftask = newTaskFor(task, result);
11         execute(ftask);
12         return ftask;
13     }
14 
15     public <T> Future<T> submit(Callable<T> task) {
16         if (task == null) throw new NullPointerException();
17         RunnableFuture<T> ftask = newTaskFor(task);
18         execute(ftask);
19         return ftask;
20     }
复制代码
  参考之前ExecuteService中的介绍和实例,我们可以轻松理解这里代码的含义,首先判断任务是否为null,若是null,则抛出空指针,否则使用newTaskFor将任务(和返回值)封装成为FutureTask,再将其作为入参调用execute进行任务执行。最后将之前封装的FutureTask作为返回值返回。
 
4.3 invokeAny方法
 
  这里实现了invokeAny,核心是doInvokeAny方法,我们可以看下源码:
 
复制代码
 1     /**
 2      * the main mechanics of invokeAny.
 3      */
 4     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 5                               boolean timed, long nanos)
 6         throws InterruptedException, ExecutionException, TimeoutException {
 7         if (tasks == null)
 8             throw new NullPointerException();
 9         int ntasks = tasks.size();
10         if (ntasks == 0)
11             throw new IllegalArgumentException();
12         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
13         ExecutorCompletionService<T> ecs =
14             new ExecutorCompletionService<T>(this);
15 
16         // For efficiency, especially in executors with limited
17         // parallelism, check to see if previously submitted tasks are
18         // done before submitting more of them. This interleaving
19         // plus the exception mechanics account for messiness of main
20         // loop.
21 
22         try {
23             // Record exceptions so that if we fail to obtain any
24             // result, we can throw the last exception we got.
25             ExecutionException ee = null;
26             final long deadline = timed ? System.nanoTime() + nanos : 0L;
27             Iterator<? extends Callable<T>> it = tasks.iterator();
28 
29             // Start one task for sure; the rest incrementally
30             futures.add(ecs.submit(it.next()));
31             --ntasks;
32             int active = 1;
33 
34             for (;;) {
35                 Future<T> f = ecs.poll();
36                 if (f == null) {
37                     if (ntasks > 0) {
38                         --ntasks;
39                         futures.add(ecs.submit(it.next()));
40                         ++active;
41                     }
42                     else if (active == 0)
43                         break;
44                     else if (timed) {
45                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
46                         if (f == null)
47                             throw new TimeoutException();
48                         nanos = deadline - System.nanoTime();
49                     }
50                     else
51                         f = ecs.take();
52                 }
53                 if (f != null) {
54                     --active;
55                     try {
56                         return f.get();
57                     } catch (ExecutionException eex) {
58                         ee = eex;
59                     } catch (RuntimeException rex) {
60                         ee = new ExecutionException(rex);
61                     }
62                 }
63             }
64 
65             if (ee == null)
66                 ee = new ExecutionException();
67             throw ee;
68 
69         } finally {
70             for (int i = 0, size = futures.size(); i < size; i++)
71                 futures.get(i).cancel(true);
72         }
73     }
复制代码
  解析:
 
    1.参数校验,主要是看任务列表是否存在任务
 
    2.优先执行一个任务,这个任务为任务列表tasks中的首个任务(30行),然后进入一个无限循环(当然会有退出条件)。
 
    3.从执行器Executor的阻塞队列中移除队头的元素,并将该元素返回,如果队列为空队列,则这里返回值为null,此时会将优先提交的任务元素返回(35行),然后执行第53行。
 
    4.执行器执行首个任务,执行56行,等待任务执行完成,如果任务执行成功,会在此处直接退出整个方法,一旦该任务执行出错,则会产生异常,并将异常保存在ee中(58行,60行),然后继续进行循环。
 
    5.再次执行35行代码发现返回值为null,则会判断任务列表中的未执行任务数ntasks(该值初始为任务列表总任务数,但会随着任务的提交执行而逐渐递减,它的值就是任务列表中未提交执行的任务的数量)是否为0,此时该值一定不为0,则会执行38-40行代码,再次提交一个执行任务,然后会再次下一次循环,这次循环类似第4点。
 
    6.一旦某个任务执行成功,就会将该任务的执行结果返回,但是一旦某个任务执行失败,则继续执行下个任务,如果所有任务都执行失败,则会将最后一个任务的失败异常抛出(这个异常将保存在ee中,一直到循环结束,由67行抛出)。
 
    7.最后取消其他正在执行的任务(70-71行)。
 
  总结:该方法会返回任务列表中第一个执行成功的任务的执行结果或者是抛出异常,一旦抛出了异常,表示任务全部被执行,但是全部失败。一旦某个任务执行成功,则剩下的任务将不会再执行,而且会取消其他正在执行的任务。
 
  注意:对于有超时限制的情况,会执行44-49行代码,每个任务执行时都会进行超时判断,一旦超时期满,则抛出超时异常,并在最后取消所有正在执行的任务(70-71行)。
 
 4.4 invokeAll方法
 
  AbstractExecutorService中的两个invokeAll是分开实现的。如前所述,该方法用于执行一个任务列表,确保所有任务全部执行,也即All之意。