转载java_eye(http://www.iteye.com/topic/711162)

13年前
一:分析题目
从题中可以看到“很大的List”以及“充分利用多核CPU”,这就已经充分告诉我们要采用多线程(任务)进行编写。具体怎么做呢?大概的思路就是分割List,每一小块的List采用一个线程(任务)进行计算其和,最后等待所有的线程(任务)都执行完后就可得到这个“很大的List”中所有整数的和。
二:具体分析和技术方案
既然我们已经决定采用多线程(任务),并且还要分割List,每一小块的List采用一个线程(任务)进行计算其和,那么我们必须要等待所有的线程(任务)完成之后才能得到正确的结果,那么怎么才能保证“等待所有的线程(任务)完成之后输出结果呢”?这就要靠java.util.concurrent包中的CyclicBarrier类了。它是一个同步辅助类,它允许一组线程(任务)互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程(任务)的程序中,这些线程(任务)必须不时地互相等待,此时 CyclicBarrier 很有用。简单的概括其适应场景就是:当一组线程(任务)并发的执行一件工作的时候,必须等待所有的线程(任务)都完成时才能进行下一个步骤。具体技术方案步骤如下:
  • 分割List,根据采用的线程(任务)数平均分配,即list.size()/threadCounts。
  • 定义一个记录“很大List”中所有整数和的变量sum,采用一个线程(任务)处理一个分割后的子List,计算子List中所有整数和(subSum),然后把和(subSum)累加到sum上。
  • 等待所有线程(任务)完成后输出总和(sum)的值。

示意图如下:

三:详细编码实现
代码中有很详细的注释,这里就不解释了。
Java代码 复制代码 收藏代码
  1. /**  
  2.  * 计算List中所有整数的和<br>  
  3.  * 采用多线程,分割List计算  
  4.  * @author 飞雪无情  
  5.  * @since 2010-7-12  
  6.  */  
  7. public class CountListIntegerSum {   
  8.     private long sum;//存放整数的和   
  9.     private CyclicBarrier barrier;//障栅集合点(同步器)   
  10.     private List<Integer> list;//整数集合List   
  11.     private int threadCounts;//使用的线程数   
  12.     public CountListIntegerSum(List<Integer> list,int threadCounts) {   
  13.         this.list=list;   
  14.         this.threadCounts=threadCounts;   
  15.     }   
  16.     /**  
  17.      * 获取List中所有整数的和  
  18.      * @return  
  19.      */  
  20.     public long getIntegerSum(){   
  21.         ExecutorService exec=Executors.newFixedThreadPool(threadCounts);   
  22.         int len=list.size()/threadCounts;//平均分割List   
  23.         //List中的数量没有线程数多(很少存在)   
  24.         if(len==0){   
  25.             threadCounts=list.size();//采用一个线程处理List中的一个元素   
  26.             len=list.size()/threadCounts;//重新平均分割List   
  27.         }   
  28.         barrier=new CyclicBarrier(threadCounts+1);   
  29.         for(int i=0;i<threadCounts;i++){   
  30.             //创建线程任务   
  31.             if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算   
  32.                 exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));   
  33.             }else{   
  34.                 exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));   
  35.             }   
  36.         }   
  37.         try {   
  38.             barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处   
  39.         } catch (InterruptedException e) {   
  40.             System.out.println(Thread.currentThread().getName()+":Interrupted");   
  41.         } catch (BrokenBarrierException e) {   
  42.             System.out.println(Thread.currentThread().getName()+":BrokenBarrier");   
  43.         }   
  44.         exec.shutdown();   
  45.         return sum;   
  46.     }   
  47.     /**  
  48.      * 分割计算List整数和的线程任务  
  49.      * @author lishuai  
  50.      *  
  51.      */  
  52.     public class SubIntegerSumTask implements Runnable{   
  53.         private List<Integer> subList;   
  54.         public SubIntegerSumTask(List<Integer> subList) {   
  55.             this.subList=subList;   
  56.         }   
  57.         public void run() {   
  58.             long subSum=0L;   
  59.             for (Integer i : subList) {   
  60.                 subSum += i;   
  61.             }     
  62.             synchronized(CountListIntegerSum.this){//在CountListIntegerSum对象上同步   
  63.                 sum+=subSum;   
  64.             }   
  65.             try {   
  66.                 barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处   
  67.             } catch (InterruptedException e) {   
  68.                 System.out.println(Thread.currentThread().getName()+":Interrupted");   
  69.             } catch (BrokenBarrierException e) {   
  70.                 System.out.println(Thread.currentThread().getName()+":BrokenBarrier");   
  71.             }   
  72.             System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);   
  73.         }   
  74.            
  75.     }   
  76.        
  77. }  

有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。
Java代码 复制代码 收藏代码
  1. /**  
  2.  * 计算List中所有整数的和测试类  
  3.  * @author 飞雪无情  
  4.  * @since 2010-7-12  
  5.  */  
  6. public class CountListIntegerSumMain {   
  7.   
  8.     /**  
  9.      * @param args  
  10.      */  
  11.     public static void main(String[] args) {   
  12.         List<Integer> list = new ArrayList<Integer>();   
  13.         int threadCounts = 10;//采用的线程数   
  14.         //生成的List数据   
  15.         for (int i = 1; i <= 1000000; i++) {   
  16.             list.add(i);   
  17.         }   
  18.         CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);   
  19.         long sum=countListIntegerSum.getIntegerSum();   
  20.         System.out.println("List中所有整数的和为:"+sum);   
  21.     }   
  22.   
  23. }  

四:总结
本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。

附mathfox提到的ExecutorService.invokeAll()方法的实现
这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
Java代码 复制代码 收藏代码
  1. /**  
  2.  * 使用ExecutorService的invokeAll方法计算  
  3.  * @author 飞雪无情  
  4.  *  
  5.  */  
  6. public class CountSumWithCallable {   
  7.   
  8.     /**  
  9.      * @param args  
  10.      * @throws InterruptedException   
  11.      * @throws ExecutionException   
  12.      */  
  13.     public static void main(String[] args) throws InterruptedException, ExecutionException {   
  14.         int threadCounts =19;//使用的线程数   
  15.         long sum=0;   
  16.         ExecutorService exec=Executors.newFixedThreadPool(threadCounts);   
  17.         List<Callable<Long>> callList=new ArrayList<Callable<Long>>();   
  18.         //生成很大的List   
  19.         List<Integer> list = new ArrayList<Integer>();   
  20.         for (int i = 0; i <= 1000000; i++) {   
  21.             list.add(i);   
  22.         }   
  23.         int len=list.size()/threadCounts;//平均分割List   
  24.         //List中的数量没有线程数多(很少存在)   
  25.         if(len==0){   
  26.             threadCounts=list.size();//采用一个线程处理List中的一个元素   
  27.             len=list.size()/threadCounts;//重新平均分割List   
  28.         }   
  29.         for(int i=0;i<threadCounts;i++){   
  30.             final List<Integer> subList;   
  31.             if(i==threadCounts-1){   
  32.                 subList=list.subList(i*len,list.size());   
  33.             }else{   
  34.                 subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));   
  35.             }   
  36.             //采用匿名内部类实现   
  37.             callList.add(new Callable<Long>(){   
  38.                 public Long call() throws Exception {   
  39.                     long subSum=0L;   
  40.                     for(Integer i:subList){   
  41.                         subSum+=i;   
  42.                     }   
  43.                     System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);   
  44.                     return subSum;   
  45.                 }   
  46.             });   
  47.         }   
  48.         List<Future<Long>> futureList=exec.invokeAll(callList);   
  49.         for(Future<Long> future:futureList){   
  50.             sum+=future.get();   
  51.         }   
  52.         exec.shutdown();   
  53.         System.out.println(sum);   
  54.     }   
  55.   
  56. }