Java线程池编程

RemixVer 贡献于2011-05-11

作者 雨林木风  创建于2010-11-26 03:29:00   修改者USER  修改于2011-05-11 12:37:00字数22612

文档摘要:一般的服务器都需要线程池,比如Web、FTP等服务器,不过它们一般都自己实现了线程池,比如以前介绍过的Tomcat、Resin和Jetty等,现在有了JDK5,我们就没有必要重复造车轮了,直接使用就可以,何况使用也很方便,性能也非常高。
关键词:

线程池编程 java.util.concurrent 多线程框架---线程池编程(一) 一般的服务器都需要线程池,比如Web、FTP等服务器,不过它们一般都自己实现了线程池,比如以前介绍过的Tomcat、Resin和Jetty等,现在有了JDK5,我们就没有必要重复造车轮了,直接使用就可以,何况使用也很方便,性能也非常高。  1 package concurrent;  2 import java.util.concurrent.ExecutorService;   3 import java.util.concurrent.Executors;   4 public class TestThreadPool   5 {   6     public static void main(String args[]) throws InterruptedException  7     {   8         // only two threads   9         ExecutorService exec = Executors.newFixedThreadPool(2);  10         for(int index = 0; index < 100; index++) 11         {  12             Runnable run = new Runnable() 13             {  14                 public void run()  15                 {  16                     long time = (long) (Math.random() * 1000);  17                     System.out.println("Sleeping " + time + "ms"); 18                     try 19                     { 20                         Thread.sleep(time);  21                          22                     }  23                     catch (InterruptedException e)  24                     {  25                          26                     }  27                      28                 } 29                  30             }; 31                      32             exec.execute(run);  33                      34         } // must shutdown 35                      36         exec.shutdown();  37          38     }  39     //} 40     } 41 //} 42    上面是一个简单的例子,使用了2个大小的线程池来处理100个线程。但有一个问题:在for循环的过程中,会等待线程池有空闲的线程,所以主线程会阻塞的。为了解决这个问题,一般启动一个线程来做for循环,就是为了避免由于线程池满了造成主线程阻塞。不过在这里我没有这样处理。[重要修正:经过测试,即使线程池大小小于实际线程数大小,线程池也不会阻塞的,这与Tomcat的线程池不同,它将Runnable实例放到一个“无限”的BlockingQueue中,所以就不用一个线程启动for循环,Doug Lea果然厉害] 另外它使用了Executors的静态函数生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即使它是Idle。这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像Tomcat的线程池一样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。通过这些可以基本上替换Tomcat的线程池实现方案。 需要注意的是线程池必须使用shutdown来显式关闭,否则主线程就无法退出。shutdown也不会阻塞主线程。 许多长时间运行的应用有时候需要定时运行任务完成一些诸如统计、优化等工作,比如在电信行业中处理用户话单时,需要每隔1分钟处理话单;网站每天凌晨统计用户访问量、用户数;大型超时凌晨3点统计当天销售额、以及最热卖的商品;每周日进行数据库备份;公司每个月的10号计算工资并进行转帐等,这些都是定时任务。通过 java的并发库concurrent可以轻松的完成这些任务,而且非常的简单。    1 package concurrent;  2 import static java.util.concurrent.TimeUnit.SECONDS;  3 import java.util.Date;   4 import java.util.concurrent.Executors;   5 import java.util.concurrent.ScheduledExecutorService;  6 import java.util.concurrent.ScheduledFuture;   7   8 public class TestScheduledThread   9 {  10     public static void main(String[] args)  11     {  12         final ScheduledExecutorService scheduler = Executors .newScheduledThreadPool(2);  13         final Runnable beeper = new Runnable()  14         {  15             int count = 0;  16             public void run()  17             {  18                 System.out.println(new Date() + "beep "+ (++count)); 19             }  20         }; // 1秒钟后运行,并每隔2秒运行一次 21         final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate( beeper, 1, 2, SECONDS); 22         // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行  23         final ScheduledFuture beeperHandle2 = scheduler .scheduleWithFixedDelay(beeper, 2, 5, SECONDS);  24         // 30秒后结束关闭任务,并且关闭Scheduler  25         scheduler.schedule( 26             new Runnable()  27             { 28                 public void run()  29                 {  30                     beeperHandle.cancel(true); 31                     beeperHandle2.cancel(true); 32                     scheduler.shutdown();  33                 }  34             }, 30, SECONDS);  35     }  36 } 37  38  39    为了退出进程,上面的代码中加入了关闭Scheduler的操作。而对于24小时运行的应用而言,是没有必要关闭Scheduler的。 在实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等待其他线程都完成某一阶段后再执行,等所有线程都到达某一个阶段后再统一执行。 比如有几个旅行团需要途经深圳、广州、韶关、长沙最后到达武汉。旅行团中有自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要等待其他旅行团到达此地后再同时出发,直到都到达终点站武汉。 这时候CyclicBarrier就可以派上用场。CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。   package concurrent;  import java.text.SimpleDateFormat; import java.util.Date;  import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  class TestCyclicBarrier  {      // 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan      public static int[] timeWalk = { 5, 8, 15, 15, 10 };      // 自驾游      public static int[] timeSelf = { 1, 3, 4, 4, 5 };     // 旅游大巴      public static int[] timeBus = { 2, 4, 6, 6, 7 };      public static String now()      {          SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");          return sdf.format(new Date()) + ": ";      }  } public class Tour implements Runnable  {      private int[] times;      private CyclicBarrier barrier;     private String tourName;      public Tour(CyclicBarrier barrier, String tourName, int[] times)      {          this.times = times;          this.tourName = tourName;          this.barrier = barrier;     }      public void run()      {          try          {              Thread.sleep(times[0] * 1000);              System.out.println(TestCyclicBarrier.now() + tourName + "Reached Shenzhen");             barrier.await();             Thread.sleep(times[1] * 1000);             System.out.println(TestCyclicBarrier.now() + tourName + " Reached Guangzhou");              barrier.await(); Thread.sleep(times[2] * 1000);             System.out.println(TestCyclicBarrier.now() + tourName + " Reached Shaoguan");             barrier.await();             Thread.sleep(times[3] * 1000);              System.out.println(TestCyclicBarrier.now() + tourName + " Reached Changsha");              barrier.await();              Thread.sleep(times[4] * 1000);             System.out.println(TestCyclicBarrier.now() + tourName + " Reached Wuhan");              barrier.await();          }          catch (InterruptedException e)          { }          catch (BrokenBarrierException e)          { }      }      public static void main(String[] args)      { // 三个旅行团          CyclicBarrier barrier = new CyclicBarrier(3);         ExecutorService exec = Executors.newFixedThreadPool(3);         exec.submit(new Tour(barrier, "WalkTour", TestCyclicBarrier.timeWalk));         exec.submit(new Tour(barrier, "SelfTour", TestCyclicBarrier.timeSelf));         exec.submit(new Tour(barrier, "BusTour", TestCyclicBarrier.timeBus));         exec.shutdown();     } }      运行结果: 00:02:25: SelfTour Reached Shenzhen 00:02:25: BusTour Reached Shenzhen 00:02:27: WalkTour Reached Shenzhen 00:02:30: SelfTour Reached Guangzhou 00:02:31: BusTour Reached Guangzhou 00:02:35: WalkTour Reached Guangzhou 00:02:39: SelfTour Reached Shaoguan 00:02:41: BusTour Reached Shaoguan 并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。 下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,特在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。 当然线程池也要显式退出了。 线程池--java.util.concurrent 多线程框架(二)   当然线程池也要显式退出了。 package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueue queue = new LinkedBlockingQueue(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File(“F:\\JavaLib”); // 完成标志 final File exitFile = new File(“”); // 读个数 final AtomicInteger rc = new AtomicInteger(); // 写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(“.java”); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println(“Read0: ” + index + ” “ + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = “Write” + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加”标志”,以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + “: ” + index + ” “ + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } } 从名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。   CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。 一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier。 下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。 同样,线程池需要显式shutdown。 package concurrent;   import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { // 开始的倒数锁 final CountDownLatch begin = new CountDownLatch(1); // 结束的倒数锁 final CountDownLatch end = new CountDownLatch(10); // 十名选手 final ExecutorService exec = Executors.newFixedThreadPool(10); for(int index = 0; index < 10; index++) { final int NO = index + 1; Runnable run = new Runnable(){ public void run() { try { begin.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println(“No.” + NO + ” arrived”); } catch (InterruptedException e) { } finally { end.countDown(); } } }; exec.submit(run); } System.out.println(“Game Start”); begin.countDown(); end.await(); System.out.println(“Game Over”); exec.shutdown(); } } 运行结果: Game Start No.4 arrived No.1 arrived No.7 arrived No.9 arrived No.3 arrived No.2 arrived No.8 arrived No.10 arrived No.6 arrived No.5 arrived Game Over 有时候在实际应用中,某些操作很耗时,但又不是不可或缺的步骤。比如用网页浏览器浏览新闻时,最重要的是要显示文字内容,至于与新闻相匹配的图片就没有那么重要的,所以此时首先保证文字信息先显示,而图片信息会后显示,但又不能不显示,由于下载图片是一个耗时的操作,所以必须一开始就得下载。   Java的并发库的Future类就可以满足这个要求。Future的重要方法包括get()和cancel(),get()获取数据对象,如果数据没有加载,就会阻塞直到取到数据,而 cancel()是取消数据加载。另外一个get(timeout)操作,表示如果在timeout时间内没有取到就失败返回,而不再阻塞。 下面的Demo简单的说明了Future的使用方法:一个非常耗时的操作必须一开始启动,但又不能一直等待;其他重要的事情又必须做,等完成后,就可以做不重要的事情。 package concurrent;   import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFutureTask { public static void main(String[] args)throws InterruptedException, ExecutionException { final ExecutorService exec = Executors.newFixedThreadPool(5); Callable call = new Callable() { public String call() throws Exception { Thread.sleep(1000 * 5); return “Other less important but longtime things.”; } }; Future task = exec.submit(call); // 重要的事情 Thread.sleep(1000 * 3); System.out.println(“Let’s do important things.”); // 其他不重要的事情 String obj = task.get(); System.out.println(obj); // 关闭线程池 exec.shutdown(); } } 运行结果: Let’s do important things. Other less important but longtime things. 考虑以下场景:浏览网页时,浏览器了5个线程下载网页中的图片文件,由于图片大小、网站访问速度等诸多因素的影响,完成图片下载的时间就会有很大的不同。如果先下载完成的图片就会被先显示到界面上,反之,后下载的图片就后显示。   Java的并发库的CompletionService可以满足这种场景要求。该接口有两个重要方法:submit()和take()。submit用于提交一个runnable或者callable,一般会提交给一个线程池处理;而take就是取出已经执行完毕runnable或者callable实例的Future对象,如果没有满足要求的,就等待了。 CompletionService还有一个对应的方法poll,该方法与take类似,只是不会等待,如果没有满足要求,就返回null对象。 package concurrent;   import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestCompletionService { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = Executors.newFixedThreadPool(10); CompletionService serv = new ExecutorCompletionService(exec); for (int index = 0; index < 5; index++) { final int NO = index; Callable downImg = new Callable() { public String call() throws Exception { Thread.sleep((long) (Math.random() * 10000)); return “Downloaded Image ” + NO; } }; serv.submit(downImg); } Thread.sleep(1000 * 2); System.out.println(“Show web content”); for (int index = 0; index < 5; index++) { Future task = serv.take(); String img = task.get(); System.out.println(img); } System.out.println(“End”); // 关闭线程池 exec.shutdown(); } } 运行结果: Show web content Downloaded Image 1 Downloaded Image 2 Downloaded Image 4 Downloaded Image 0 Downloaded Image 3 End 操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库的Semaphore可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,acquire()获取一个许可,如果没有就等待,而release()释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。 Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存“无限”的节点,用Semaphore可以实现有限大小的链表。另外重入锁ReentrantLock也可以实现该功能,但实现上要负责些,代码也要复杂些。 下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。 package concurrent;   import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class TestSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println(“Accessing: ” + NO); Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } // 退出线程池 exec.shutdown(); } } 运行结果: Accessing: 0 Accessing: 1 Accessing: 2 Accessing: 3 Accessing: 4 Accessing: 5 Accessing: 6 Accessing: 7 Accessing: 8 Accessing: 9 Accessing: 10 Accessing: 11 Accessing: 12 Accessing: 13 Accessing: 14 Accessing: 15 Accessing: 16 Accessing: 17 Accessing: 18 Accessing: 19 java.util.concurrent 多线程框架---线程池编程(三) 1 引言 在软件项目开发中,许多后台服务程序的处理动作流程都具有一个相同点,就是:接受客户端发来的请求,对请求进行一些相关的处理,最后将处理结果返回给客户 端。这些请求的来源和方式可能会各不相同,但是它们常常都有一个共同点:数量巨大,处理时间短。这类服务器在实际应用中具有较大的普遍性,如web服务 器,短信服务器,DNS服务器等等。因此,研究如何提高此类后台程序的性能,如何保证服务器的稳定性以及安全性都具有重要的实用价值。 2 后台服务程序设计 2.1 关于设计原型 构建服务器应用程序的一个简单的模型是:启动一个无限循环,循环里放一个监听线程监听某个地址端口。每当一个请求到达就创建一个新线程,然后新线程为请求服务,监听线程返回继续监听。   //简单举例如下: import java.net.*; public class MyServer extends Thread{     public void run(){     try{ ServerSocket server=null; Socket clientconnection=null; server = new ServerSocket(8008);//监听某地址端口对 while(true){进入无限循环 clientconnection =server.accept();//收取请求 new ServeRequest(clientconnection).start();//启动一个新服务线程进行服务 …… } }catch(Exception e){ System.err.println("Unable to start serve listen:"+e.getMessage()); e.printStackTrace(); } } } 实际上,这只是个简单的原型,如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。 首先,为每个请求创建一个新线程的开销很大,为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源, 往往有时候要比花在处理实际的用户请求的时间和资源更多。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提 高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数。这样综合看来,系统的性能瓶颈就在于线程的创建开销。 其次,除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻运行的处理 线程数目,以防止服务器被“压死”的情况发生。所以在设计后台程序的时候,一般需要提前根据服务器的内存、CPU等硬件情况设定一个线程数量的上限值。 如果创建和销毁线程的时间相对于服务时间占用的比例较大,那末假设在一个较短的时间内有成千上万的请求到达,想象一下,服务器的时间和资源将会大量的花在 创建和销毁线程上,而真正用于处理请求的时间却相对较少,这种情况下,服务器性能瓶颈就在于创建和销毁线程的时间。按照这个模型写一个简单的程序测试一下 即可看出,由于篇幅关系,此处略。如果把(服务时间/创建和销毁线程的时间)作为衡量服务器性能的一个参数,那末这个比值越大,服务器的性能就越高。 应此,解决此类问题的实质就是尽量减少创建和销毁线程的时间,把服务器的资源尽可能多地用到处理请求上来,从而发挥多线程的优点(并发),避免多线程的缺点(创建和销毁的时空开销)。 线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时 线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也 就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。 3 JAVA线程池原理 3.1 原理以及实现 在实践中,关于线程池的实现常常有不同的方法,但是它们的基本思路大都是相似的:服务器预先存放一定数目的 “热”的线程,并发程序需要使用线程的时候,从 服务器取用一条已经创建好的线程(如果线程池为空则等待),使用该线程对请求服务,使用结束后,该线程并不删除,而是返回线程池中,以备复用,这样可以避 免对每一个请求都生成和删除线程的昂贵操作。 一个比较简单的线程池至少应包含线程池管理器、工作线程、任务队列、任务接口等部分。其中线程池管理器(ThreadPool Manager)的作用是创建、销毁并管理线程池,将工作线程放入线程池中;工作线程是一个可以循环执行任务的线程,在没有任务时进行等待;任务队列的作 用是提供一种缓冲机制,将没有处理的任务放在任务队列中;任务接口是每个任务必须实现的接口,主要用来规定任务的入口、任务执行完后的收尾工作、任务的执 行状态等,工作线程通过该接口调度任务的执行。下面的代码实现了创建一个线程池: public class ThreadPool {  private Stack threadpool = new Stack(); private int poolSize; private int currSize=0; public void setSize(int n) {  poolSize = n; } public void run() { for(int i=0;i   4.2    框架与结构 下面让我们来看看util.concurrent的框架结构。关于这个工具包概述的e文原版链接地址是http: //gee.cs.oswego.edu/dl/cpjslides/util.pdf。该工具包主要包括三大部分:同步、通道和线程池执行器。第一部分 主要是用来定制锁,资源管理,其他的同步用途;通道则主要是为缓冲和队列服务的;线程池执行器则提供了一组完善的复杂的线程池实现。 --主要的结构如下图所示 4.2.1 Sync acquire/release协议的主要接口 - 用来定制锁,资源管理,其他的同步用途 - 高层抽象接口 - 没有区分不同的加锁用法 实现 -Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore, FIFOSemaphore, PrioritySemaphore 还有,有几个简单的实现,例如ObservableSync, LayeredSync 举例:如果我们要在程序中获得一独占锁,可以用如下简单方式: try { lock.acquire(); try { action(); } finally { lock.release(); } }catch(Exception e){ } 程序中,使用lock对象的acquire()方法获得一独占锁,然后执行您的操作,锁用完后,使用release()方法释放之即可。呵呵,简单吧,想 想看,如果您亲自撰写独占锁,大概会考虑到哪些问题?如果关键的锁得不到怎末办?用起来是不是会复杂很多?而现在,以往的很多细节和特殊异常情况在这里都 无需多考虑,您尽可以把精力花在解决您的应用问题上去。 4.2.2 通道(Channel) 为缓冲,队列等服务的主接口 具体实现 LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue, SynchronousChannel, Slot 通道例子 在后台服务器中,缓冲和队列都是最常用到的。试想,如果对所有远端的请求不排个队列,让它们一拥而上的去争夺cpu、内存、资源,那服务器瞬间不当掉才怪。而在这里,成熟的队列和缓冲实现已经提供,您只需要对其进行正确初始化并使用即可,大大缩短了开发时间。 4.2.3执行器(Executor) Executor是这里最重要、也是我们往往最终写程序要用到的,下面重点对其进行介绍。 类似线程的类的主接口 - 线程池 - 轻量级运行框架 - 可以定制调度算法 只需要支持execute(Runnable r) - 同Thread.start类似 实现 - PooledExecutor, ThreadedExecutor, QueuedExecutor, FJTaskRunnerGroup PooledExecutor(线程池执行器)是个最常用到的类,以它为例: 可修改得属性如下: - 任务队列的类型 - 最大线程数 - 最小线程数 - 预热(预分配)和立即(分配)线程 - 保持活跃直到工作线程结束 -- 以后如果需要可能被一个新的代替 - 饱和(Saturation)协议 -- 阻塞,丢弃,生产者运行,等等 可不要小看上面这数条属性,对这些属性的设置完全可以等同于您自己撰写的线程池的成百上千行代码。下面以笔者撰写过得一个GIS服务器为例: 该GIS服务器是一个典型的 “请求-服务”类型的服务器,遵循后端程序设计的一般框架。首先对所有的请求按照先来先服务排入一个请求队列,如果瞬间到达的 请求超过了请求队列的容量,则将溢出的请求转移至一个临时队列。如果临时队列也排满了,则对以后达到的请求给予一个“服务器忙”的提示后将其简单抛弃。这 个就够忙活一阵的了。 然后,结合链表结构实现一个线程池,给池一个初始容量。如果该池满,以x2的策略将池的容量动态增加一倍,依此类推,直到总线程数服务达到系统能力上限, 之后线程池容量不在增加,所有请求将等待一个空余的返回线程。每从池中得到一个线程,该线程就开始最请求进行GIS信息的服务,如取坐标、取地图,等等。 服务完成后,该线程返回线程池继续为请求队列离地后续请求服务,周而复始。当时用矢量链表来暂存请求,用wait()、 notify() 和 synchronized等原语结合矢量链表实现线程池,总共约600行程序,而且在运行时间较长的情况下服务器不稳定,线程池被取用的线程有异常消失的 情况发生。而使用util.concurrent相关类之后,仅用了几十行程序就完成了相同的工作而且服务器运行稳定,线程池没有丢失线程的情况发生。由 此可见util.concurrent包极大的提高了开发效率,为项目节省了大量的时间。   Code 使用PooledExecutor例子 import java.net.*; /** *  Title:  *  Description: 负责初始化线程池以及启动服务器 *  Copyright: Copyright (c) 2003 *  Company:  * @author not attributable * @version 1.0 */ public class MainServer { //初始化常量 public static final int MAX_CLIENT=100; //系统最大同时服务客户数 //初始化线程池 public static final PooledExecutor pool = new PooledExecutor(new BoundedBuffer(10), MAX_CLIENT); //chanel容量为10, //在这里为线程池初始化了一个 //长度为10的任务缓冲队列。 public MainServer() { //设置线程池运行参数 pool.setMinimumPoolSize(5); //设置线程池初始容量为5个线程 pool.discardOldestWhenBlocked();//对于超出队列的请求,使用了抛弃策略。 pool.createThreads(2); //在线程池启动的时候,初始化了具有一定生命周期的2个“热”线程 } public static void main(String[] args) { MainServer MainServer1 = new MainServer(); new HTTPListener().start();//启动服务器监听和处理线程 new manageServer().start();//启动管理线程 } } 类HTTPListener import java.net.*; /** *  Title:  *  Description: 负责监听端口以及将任务交给线程池处理 *  Copyright: Copyright (c) 2003 *  Company:  * @author not attributable * @version 1.0 */ public class HTTPListener extends Thread{ public HTTPListener() { } public void run(){ try{ ServerSocket server=null; Socket clientconnection=null; server = new ServerSocket(8008);//服务套接字监听某地址端口对 while(true){//无限循环 clientconnection =server.accept(); System.out.println("Client connected in!"); //使用线程池启动服务 MainServer.pool.execute(new HTTPRequest(clientconnection));//如果收到一个请求,则从线程池中取一个线程进行服务,任务完成后,该线程自动返还线程池 } }catch(Exception e){ System.err.println("Unable to start serve listen:"+e.getMessage()); e.printStackTrace(); } } } class Service { //  final Channel msgQ = new LinkedQueue(); public void serve() throws InterruptedException { String status = doService(); msgQ.put(status); } public Service() { // start background thread Runnable logger = new Runnable() { public void run() { try { for(;;) System.out.println(msqQ.take()); } catch(InterruptedException ie) {} } }; new Thread(logger).start(); } }   关于util.concurrent工具包就有选择的介绍到这,更详细的信息可以阅读这些java源代码的API文档。Doug Lea是个很具有“open”精神的作者,他将util.concurrent工具包的java源代码全部公布出来,有兴趣的读者可以下载这些源代码并细 细品味。 5    结束语 以上内容介绍了线程池基本原理以及设计后台服务程序应考虑到的问题,并结合实例详细介绍了重要的多线程开发工具包util.concurrent的构架和使用。结合使用已有完善的开发包,后端服务程序的开发周期将大大缩短,同时程序性能也有了保障。 java.util.concurrent 多线程框架---线程池编程(四) java.util.concurrent 结构   Sync:获得/释放(acquire/release) 协议。同步(定制锁、资源管理、其他同步) Channel:放置/取走(put/take) 协议。通信(缓冲和队列服务) Executor:执行Runnable任务。线程池执行器(线程池的实现一些实现了Executor接口的)     Sync -- acquire/release协议的主要接口 -用来定制锁,资源管理,其他的同步用途 - 高层抽象接口 - 没有区分不同的加锁用法 --实现 -Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore n   还有,有几个简单的实现,例如ObservableSync, LayeredSync     独占锁 try { lock.acquire(); try { action(); } finally { lock.release(); } } catch (InterruptedException ie) { ... } -- Java同步块不适用的时候使用它 - 超时,回退(back-off) - 确保可中断 - 大量迅速锁定 - 创建Posix风格应用(condvar)     独占例子 class ParticleUsingMutex { int x; int y; final Random rng = new Random(); final Mutex mutex = new Mutex(); public void move() { try { mutex.acquire(); try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; } finally { mutex.release(); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } public void draw(Graphics g) { int lx, ly; try { mutex.acquire(); try { lx = x; ly = y; } finally { mutex.release(); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; } g.drawRect(lx, ly, 10, 10); } }     回退(Backoff)例子 class CellUsingBackoff { private long val; private final Mutex mutex = new Mutex(); void swapVal(CellUsingBackoff other) throws InterruptedException { if (this == other) return; // alias check for (;;) { mutex.acquire(); try { I f (other.mutex.attempt(0)) { try { long t = val; val = other.val; other.val = t; return; } finally { other.mutex.release(); } } } finally { mutex.release(); }; Thread.sleep(100); // heuristic retry interval } } }     读写锁 interface ReadWriteLock { Sync readLock(); Sync writeLock(); } -- 管理一对锁 - 和普通的锁一样的使用习惯 -- 对集合类很有用 -半自动的方式实现SyncSet, SyncMap, ... -- 实现者使用不同的锁策略 - WriterPreference, ReentrantWriterPreference, ReaderPreference, FIFO     ReadWriteLock例子 -- 示范在读写锁中执行任何Runnable的包装类 class WithRWLock { final ReadWriteLock rw; public WithRWLock(ReadWriteLock l) { rw = l; } public void performRead(Runnable readCommand) throws InterruptedException { rw.readLock().acquire(); try { readCommand.run(); } finally { rw.readlock().release(); } } public void performWrite(...) // similar }     闭锁(Latch) -- 闭锁是开始时设置为false,但一旦被设置为true,他将永远保持true状态 - 初始化标志 - 流结束定位 - 线程中断 - 事件出发指示器 -- CountDown和他有点类似,不同的是,CountDown需要一定数量的触发设置,而不是一次 -- 非常简单,但是广泛使用的类 - 替换容易犯错的开发代码     Latch Example 闭锁例子 class Worker implements Runnable { Latch startSignal; Worker(Latch l) { startSignal = l; } public void run() { startSignal.acquire(); // ... doWork(); } } class Driver { // ... void main() { Latch ss = new Latch(); for (int i = 0; i < N; ++i) // make threads new Thread(new Worker(ss)).start(); doSomethingElse(); // don’t let run yet ss.release(); // now let all threads proceed } }     信号(Semaphores) -- 服务于数量有限的占有者 - 使用许可数量构造对象(通常是0) - 如果需要一个许可才能获取,等待,然后取走一个许可 - 释放的时候将许可添加回来 -- 但是真正的许可并没有转移(But no actual permits change hands.) - 信号量仅仅保留当前的计数值 -- 应用程序 - 锁:一个信号量可以被用作互斥体(mutex) - 一个独立的等待缓存或者资源控制的操作 - 设计系统是想忽略底层的系统信号 -- (phores ‘remember’ past signals)记住已经消失的信号量     信号量例子 class Pool { ArrayList items = new ArrayList(); HashSet busy = new HashSet(); final Semaphore available; public Pool(int n) { available = new Semaphore(n); // ... somehow initialize n items ...; } public Object getItem() throws InterruptedException { available.acquire(); return doGet(); } public void returnItem(Object x) { if (doReturn(x)) available.release(); } synchronized Object doGet() { Object x = items.remove(items.size()-1); busy.add(x); // put in set to check returns return x; } synchronized boolean doReturn(Object x) { return busy.remove(x); // true if was present } }     屏障(Barrier) -- 多部分同步接口 - 每一部分都必须等待其他的分不撞倒屏障 -- CyclicBarrier类 - CountDown的一个可以重新设置的版本 - 对于反复划分算法很有用(iterative partitioning algorithms) -- Rendezvous类 - 一个每部分都能够和其他部分交换信息的屏障 - 行为类似同时的在一个同步通道上put和take - 对于资源交换协议很有用(resource-exchange protocols)       通道(Channel) --为缓冲,队列等服务的主接口 -- 具体实现 - LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot     通道属性 -- 被定义为Puttable和Takable的子接口 - 允许安装生产者/消费者模式执行 -- 支持可超时的操作offer和poll - 当超时值是0时,可能会被阻塞 - 所有的方法能够抛出InterruptedException异常 -- 没有接口需要size方法 - 但是一些实现定义了这个方法 - BoundedChannel有capacity方法     通道例子 class Service { // ... final Channel msgQ = new LinkedQueue(); public void serve() throws InterruptedException { String status = doService(); msgQ.put(status); } public Service() { // start background thread Runnable logger = new Runnable() { public void run() { try { for(;;) System.out.println(msqQ.take()); } catch(InterruptedException ie) {} } }; new Thread(logger).start(); } }       运行器(Executor) -- 类似线程的类的主接口 - 线程池 - 轻量级运行框架 - 可以定制调度算法 -- 只需要支持execute(Runnable r) - 同Thread.start类似 -- 实现 - PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup - 相关的ThreadFactory类允许大多数的运行器通过定制属性使用线程       PooledExecutor -- 一个可调的工作者线程池,可修改得属性如下: - 任务队列的类型 - 最大线程数 - 最小线程数 - 预热(预分配)和立即(分配)线程 - 保持活跃直到工作线程结束 -- 以后如果需要可能被一个新的代替 - 饱和(Saturation)协议 -- 阻塞,丢弃,生产者运行,等等       PooledExecutor例子 class WebService { public static void main(String[] args) { PooledExecutor pool = new PooledExecutor(new BoundedBuffer(10), 20); pool.createThreads(4); try { ServerSocket socket = new ServerSocket(9999); for (;;) { final Socket connection = socket.accept(); pool.execute(new Runnable() { public void run() { new Handler().process(connection); }}); } } catch(Exception e) { } // die } } class Handler { void process(Socket s); }     前景(Future)和可调用(Callable) -- Callabe是类似于Runnable的接口,用来作为参数和传递结果 interface Callable { Object call(Object arg) throws Exception; } -- FutureResult管理Callable的异步执行 class FutureResult { // ... // block caller until result is ready public Object get() throws InterruptedException, InvocationTargetException; public void set(Object result); // unblocks get // create Runnable that can be used with an Executor public Runnable setter(Callable function); }     FutureResult例子 class ImageRenderer { Image render(byte[] raw); } class App { // ... Executor executor = ...; // any executor ImageRenderer renderer = new ImageRenderer(); public void display(byte[] rawimage) { try { FutureResult futureImage = new FutureResult(); Runnable cmd = futureImage.setter(new Callable(){ public Object call() { return renderer.render(rawImage); }}); executor.execute(cmd); drawBorders(); // do other things while executing drawCaption(); drawImage((Image)(futureImage.get())); // use future } catch (Exception ex) { cleanup(); return; } } }     其他的类 -- CopyOnWriteArrayList - 支持整个集合复制时每一个修改的无锁访问 - 适合大多数的多路广播应用程序 -- 工具包还包括了一个java.beans多路广播类的COW版本 -- SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc - 类似于java.lang.Double,提供可变操作的同步版本.例如,addTo,inc - 添加了一些象swap,commit这样的实用操作     未来计划 -- 并发数据构架 - 一组繁重线程连接环境下有用的工具集合 --支持侧重I/O的程序 - 事件机制的IO系统 -- 小版本的实现 - 例如SingleSourceQueue --小幅度的改善 - 使运行器更容易使用 -- 替换 - JDK1.3 java.util.Timer 被ClockDaemon取代 java.util.concurrent 多线程框架---线程池编程(五)

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享文档获得金币 ] 15 人已下载

下载文档