Java多线程设计模式

sweetbaybe 贡献于2011-06-08

作者 微软中国  创建于2010-08-27 03:57:00   修改者微软中国  修改于2010-08-27 04:08:00字数18854

文档摘要:线程的创建和启动,Java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法()。
关键词:

Java多线程设计模式 http://www.liaoxuefeng.com/it-d225a33ad6e947cea997cc02b1826e7f-1 线程的创建和启动 Java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。 Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法: view sourceprint? 1.Thread t = new Thread(); 2.t.start(); start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。 因此,有两个方法可以实现自己的线程: 方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如: view sourceprint? 1.public class MyThread extends Thread { 2.    public run() { 3.        System.out.println("MyThread.run()"); 4.    } 5.} 在合适的地方启动线程:new MyThread().start(); 方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口: view sourceprint? 1.public class MyThread extends OtherClass implements Runnable { 2.    public run() { 3.        System.out.println("MyThread.run()"); 4.    } 5.} 为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例: view sourceprint? 1.MyThread myt = new MyThread(); 2.Thread t = new Thread(myt); 3.t.start(); 事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码: view sourceprint? 1.public void run() { 2.    if (target != null) { 3.        target.run(); 4.    } 5.} 线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。 线程的同步 由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。 最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。 但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。 此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。 Java锁机制 多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。 Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如: view sourceprint? 1.public class SharedResource { 2.    private int count = 0; 3.   4.    public int getCount() { return count; } 5.   6.    public synchronized void setCount(int count) { this.count = count; } 7.} 注意,如果将synchronized关键字标记在方法上,例如上面的: view sourceprint? 1.public synchronized void setCount(int count) { ... } 那么,锁住的是哪个对象呢?答案是this对象,因此,以上方法事实上完全等同于下面的写法: view sourceprint? 1.public void setCount(int count) { 2.    synchronized(this) { // 在此获得this锁 3.         this.count = count; 4.    } // 在此释放this锁 5.} synchronized {}括号内的部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。 退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。 为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量: view sourceprint? 1.public class SharedResouce { 2.    private int a = 0; 3.    private int b = 0; 4.   5.    public synchronized void setA(int a) { this.a = a; } 6.    public synchronized void setB(int b) { this.b = b; } 7.} 若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁: view sourceprint? 01.public class SharedResouce { 02.    private int a = 0; 03.    private int b = 0; 04.    private Object sync_a = new Object(); 05.    private Object sync_b = new Object(); 06.   07.    public void setA(int a) { 08.        synchronized(sync_a) { 09.            this.a = a; 10.        } 11.    } 12.   13.    public synchronized void setB(int b) { 14.        synchronized(sync_b) { 15.            this.b = b; 16.        } 17.    } 18.} 如果将synchronized关键字标记在静态方法上,由于静态方法不可能访问this实例,那么,锁住的是哪个对象呢?答案是当前类的Class对象,原因是每个对象的Class实例是唯一且不可变的。比如: view sourceprint? 1.public synchronized static void sync() { ... } 事实上完全等同于下面的写法: view sourceprint? 1.public static void sync() { 2.    synchronized(SharedResource.class) { 3.         ... 4.    } 5.} wait/notify机制 通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。 以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如: view sourceprint? 1.synchronized(obj) { 2.    while(!condition) { 3.        obj.wait(); 4.    } 5.    obj.doSomething(); 6.} 当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。 在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A: view sourceprint? 1.synchronized(obj) { 2.    condition = true; 3.    obj.notify(); 4.} 需要注意的概念是: # 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。 # 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。 # 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。 # 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。 # obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。 # 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。 wait/sleep的区别 Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是:sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。 但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。 如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。 需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。 Worker Pattern 前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。 然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。 每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。 Worker Pattern实现了类似线程池的功能。首先定义Task接口: view sourceprint? 1.public interface Task { 2.    void execute(); 3.} 线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。 具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask: view sourceprint? 01.// CalculateTask.java 02.public class CalculateTask implements Task { 03.    private static int count = 0; 04.    private int num = count; 05.    public CalculateTask() { 06.        count++; 07.    } 08.    public void execute() { 09.        System.out.println("[CalculateTask " + num + "] start..."); 10.        try { 11.            Thread.sleep(3000); 12.        } 13.        catch(InterruptedException ie) {} 14.        System.out.println("[CalculateTask " + num + "] done."); 15.    } 16.} 17.   18.// TimerTask.java 19.public class TimerTask implements Task { 20.    private static int count = 0; 21.    private int num = count; 22.    public TimerTask() { 23.        count++; 24.    } 25.    public void execute() { 26.        System.out.println("[TimerTask " + num + "] start..."); 27.        try { 28.            Thread.sleep(2000); 29.        } 30.        catch(InterruptedException ie) {} 31.        System.out.println("[TimerTask " + num + "] done."); 32.    } 33.} 以上任务均简单的sleep若干秒。 TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务: view sourceprint? 01.import java.util.*; 02.   03.public class TaskQueue { 04.    private List queue = new LinkedList(); 05.    public synchronized Task getTask() { 06.        while(queue.size()==0) { 07.            try { 08.                this.wait(); 09.            } 10.            catch(InterruptedException ie) { 11.                return null; 12.            } 13.        } 14.        return (Task)queue.remove(0); 15.    } 16.    public synchronized void putTask(Task task) { 17.        queue.add(task); 18.        this.notifyAll(); 19.    } 20.} 终于到了真正的WorkerThread,这是真正执行任务的服务器线程: view sourceprint? 01.public class WorkerThread extends Thread { 02.    private static int count = 0; 03.    private boolean busy = false; 04.    private boolean stop = false; 05.    private TaskQueue queue; 06.    public WorkerThread(ThreadGroup group, TaskQueue queue) { 07.        super(group, "worker-" + count); 08.        count++; 09.        this.queue = queue; 10.    } 11.    public void shutdown() { 12.        stop = true; 13.        this.interrupt(); 14.        try { 15.            this.join(); 16.        } 17.        catch(InterruptedException ie) {} 18.    } 19.    public boolean isIdle() { 20.        return !busy; 21.    } 22.    public void run() { 23.        System.out.println(getName() + " start.");         24.        while(!stop) { 25.            Task task = queue.getTask(); 26.            if(task!=null) { 27.                busy = true; 28.                task.execute(); 29.                busy = false; 30.            } 31.        } 32.        System.out.println(getName() + " end."); 33.    } 34.} 前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。 最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数: view sourceprint? 01.import java.util.*; 02.   03.public class ThreadPool extends ThreadGroup { 04.    private List threads = new LinkedList(); 05.    private TaskQueue queue; 06.    public ThreadPool(TaskQueue queue) { 07.        super("Thread-Pool"); 08.        this.queue = queue; 09.    } 10.    public synchronized void addWorkerThread() { 11.        Thread t = new WorkerThread(this, queue); 12.        threads.add(t); 13.        t.start(); 14.    } 15.    public synchronized void removeWorkerThread() { 16.        if(threads.size()>0) { 17.            WorkerThread t = (WorkerThread)threads.remove(0); 18.            t.shutdown(); 19.        } 20.    } 21.    public synchronized void currentStatus() { 22.        System.out.println("-----------------------------------------------"); 23.        System.out.println("Thread count = " + threads.size()); 24.        Iterator it = threads.iterator(); 25.        while(it.hasNext()) { 26.            WorkerThread t = (WorkerThread)it.next(); 27.            System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy")); 28.        } 29.        System.out.println("-----------------------------------------------"); 30.    } 31.} currentStatus()方法是为了方便调试,打印出所有线程的当前状态。 最后,Main负责完成main()方法: view sourceprint? 01.public class Main { 02.    public static void main(String[] args) { 03.        TaskQueue queue = new TaskQueue(); 04.        ThreadPool pool = new ThreadPool(queue); 05.        for(int i=0; i<10; i++) { 06.            queue.putTask(new CalculateTask()); 07.            queue.putTask(new TimerTask()); 08.        } 09.        pool.addWorkerThread(); 10.        pool.addWorkerThread(); 11.        doSleep(8000); 12.        pool.currentStatus(); 13.        pool.addWorkerThread(); 14.        pool.addWorkerThread(); 15.        pool.addWorkerThread(); 16.        pool.addWorkerThread(); 17.        pool.addWorkerThread(); 18.        doSleep(5000); 19.        pool.currentStatus(); 20.    } 21.    private static void doSleep(long ms) { 22.        try { 23.            Thread.sleep(ms); 24.        } 25.        catch(InterruptedException ie) {} 26.    } 27.} main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下: worker-0 start. [CalculateTask 0] start... worker-1 start. [TimerTask 0] start... [TimerTask 0] done. [CalculateTask 1] start... [CalculateTask 0] done. [TimerTask 1] start... [CalculateTask 1] done. [CalculateTask 2] start... [TimerTask 1] done. [TimerTask 2] start... [TimerTask 2] done. [CalculateTask 3] start... ----------------------------------------------- Thread count = 2 worker-0: busy worker-1: busy ----------------------------------------------- [CalculateTask 2] done. [TimerTask 3] start... worker-2 start. [CalculateTask 4] start... worker-3 start. [TimerTask 4] start... worker-4 start. [CalculateTask 5] start... worker-5 start. [TimerTask 5] start... worker-6 start. [CalculateTask 6] start... [CalculateTask 3] done. [TimerTask 6] start... [TimerTask 3] done. [CalculateTask 7] start... [TimerTask 4] done. [TimerTask 7] start... [TimerTask 5] done. [CalculateTask 8] start... [CalculateTask 4] done. [TimerTask 8] start... [CalculateTask 5] done. [CalculateTask 9] start... [CalculateTask 6] done. [TimerTask 9] start... [TimerTask 6] done. [TimerTask 7] done. ----------------------------------------------- Thread count = 7 worker-0: idle worker-1: busy worker-2: busy worker-3: idle worker-4: busy worker-5: busy worker-6: busy ----------------------------------------------- [CalculateTask 7] done. [CalculateTask 8] done. [TimerTask 8] done. [TimerTask 9] done. [CalculateTask 9] done. 仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,7个线程中的两个状态变成idle,说明处于wait()状态。 思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。 如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE 从JDK 5开始,java.util.concurrent包已经内置了Worker线程模式(即java.util.concurrent.Executors),无需我们手动编写上述代码。不过,理解Worker模式的原理非常重要。 ReadWriteLock模式 多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。 要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明: DataHandler对象保存了一个可读写的char[]数组: view sourceprint? 01.public class DataHandler { 02.    // store data: 03.    private char[] buffer = "AAAAAAAAAA".toCharArray(); 04.   05.    private char[] doRead() { 06.        char[] ret = new char[buffer.length]; 07.        for(int i=0; i0 || (preferWrite && waitingThreads>0)) 09.            this.wait(); 10.        readingThreads++; 11.    } 12.   13.    public synchronized void readUnlock() { 14.        readingThreads--; 15.        preferWrite = true; 16.        notifyAll(); 17.    } 18.   19.    public synchronized void writeLock() throws InterruptedException { 20.        waitingThreads++; 21.        try { 22.            while(readingThreads>0 || writingThreads>0) 23.                this.wait(); 24.        } 25.        finally { 26.            waitingThreads--; 27.        } 28.        writingThreads++; 29.    } 30.   31.    public synchronized void writeUnlock() { 32.        writingThreads--; 33.        preferWrite = false; 34.        notifyAll(); 35.    } 36.} readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock: view sourceprint? 01.public class DataHandler { 02.    // store data: 03.    private char[] buffer = "AAAAAAAAAA".toCharArray(); 04.    // lock: 05.    private ReadWriteLock lock = new ReadWriteLock(); 06.   07.    public char[] read(String name) throws InterruptedException { 08.        System.out.println(name + " waiting for read..."); 09.        lock.readLock(); 10.        try { 11.            char[] data = doRead(); 12.            System.out.println(name + " reads data: " + new String(data)); 13.            return data; 14.        } 15.        finally { 16.            lock.readUnlock(); 17.        } 18.    } 19.   20.    public void write(String name, char[] data) throws InterruptedException { 21.        System.out.println(name + " waiting for write..."); 22.        lock.writeLock(); 23.        try { 24.            System.out.println(name + " wrote data: " + new String(data)); 25.            doWrite(data); 26.        } 27.        finally { 28.            lock.writeUnlock(); 29.        } 30.    } 31.   32.    private char[] doRead() { 33.        char[] ret = new char[buffer.length]; 34.        for(int i=0; i

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享文档获得金币 ] 4 人已下载

下载文档