Python 多线程编程(6寸)


Python Python Python Python 多线程编程多线程编程多线程编程多线程编程 1. 引言 2. 线程和进程 3. 线程和 Python 4. thread 模块 5. threading 模块 6. 生产者-消费者问题和 Queue 模块 7. 相关模块 本节中,我们将探索在 Python 中,用多线程编 程技术实现代码并行性的几种不同的方法。在前面几 节中,我们将介绍进程与线程的区别。然后介绍多线 程编程的概念。(已经熟悉多线程编程的读者可以直 接跳到第 1.3.5 节)。本章的最后几节将演示在 Python 中如何使用 threading 和Queue 模块来实现 多线程编程。 1.1 引言 在多线程(MT)编程出现之前,电脑程序的运行 由一个执行序列组成,执行序列按顺序在主机的中央 处理器(CPU)中运行。无论是任务本身要求顺序执 行还是整个程序是由多个子任务组成,程序都是按这 种方式执行的。即使子任务相互独立,互相无关 (即,一个子任务的结果不影响其它子任务的结果) 时也是这样。这样是不是有点不合逻辑?会不会想要 并行运行这些相互独立的子任务呢?这样的并行处理 可以大幅度地提升整个任务的效率。这就是多线程编 程的目的。 多线程编程对于某些任务来说,是最理想的。这 些任务具有以下特点:它们本质上就是异步的,需要 有多个并发事务,各个事务的运行顺序可以是不确定 的,随机的,不可预测的。这样的编程任务可以被分 成多个执行流,每个流都有一个要完成的目标。根据 应用的不同,这些子任务可能都要计算出一个中间结 果,用于合并得到最后的结果。 运算密集型的任务一般都比较容易分隔成多个子 任务,可以顺序执行或以多线程的方式执行。单线程 处理多个外部输入源的的任务就不是那么容易了。这 种编程任务如果不用多线程的方式处理,则一定要使 用一个或多个计时器来实现。 一个顺序执行的程序要从每个 I/O(输入/输出) 终端信道检查用户的输入时,程序无论如何也不能在 读取 I/O 终端信道的时候阻塞。因为用户输入的到达 是不确定的,阻塞会导致其它 I/O 信息的数据不能被 处理。顺序执行的程序必须使用非阻塞 I/O,或是带 有计时器的阻塞 I/O(这样才能保证阻塞只是暂时 的)。 由于顺序执行的程序只有一个线程在运行。它要 保证它要做的多任务,不会有某个任务占用太多的时 间,而且要合理地分配用户的响应时间。执行多任务 的顺序执行的程序一般程序控制流程都很复杂,难以 理解。 使用多线程编程和一个共享的数据结构如 Queue (本章后面会介绍的一种多线程队列数据结构),这 种程序任务可以用几个功能单一的线程来组织: UserRequestThread: 负责读取客户的输入,可 能是一个 I/O 信道。程序可能创建多个线程,每个客 户一个,请求会被放入队列中。 RequestProcessor: 一个负责从队列中获取并处 理请求的线程,它为下面那种线程提供输出。 ReplyThread: 负责把给用户的输出取出来,如 果是网络应用程序就把结果发送出去,否则就保存到 本地文件系统或数据库中。 把这种编程任务用多线程来组织可以降低程序的 复杂度,并使得干净,有效和具有良好组织地程序结 构实现变得可能。每个线程的逻辑都不会很复杂,因 为它要做的事情很清楚。例如 UserRequestThread 只是从用户或某个数据源读取数据,放到一个队列 中,等待其它线程进一步的处理,等等,每个线程都 有自己明确的任务。你只要设计好每个线程要做什 么,并把要做的事做好就可以了。对某些任务使用线 程跟亨利福特制造汽车时使用的装配线模型有些相 似。 1.2 线程和进程 1.2.1 什么是进程? 计算机程序只不过是磁盘中可执行的,二进制 (或其它类型)的数据。它们只有在被读取到内存 中,被操作系统调用的时候才开始它们的生命期。进 程(有时被称为重量级进程)是程序的一次执行。每 个进程都有自己的地址空间,内存,数据栈以及其它 记录其运行轨迹的辅助数据。操作系统管理在其上运 行的所有进程,并为这些进程公平地分配时间。进程 也可以通过 fork 和spawn 操作来完成其它的任务。 不过各个进程有自己的内存空间,数据栈等,所以只 能使用进程间通讯(IPC),而不能直接共享信息。 1.2.2 什么是线程? 线程(有时被称为轻量级进程)跟进程有些相 似,不同的是,所有的线程运行在同一个进程中,共 享相同的运行环境。它们可以想像成是在主进程或 “主线程”中并行运行的“迷你进程”。 线程有开始,顺序执行和结束三部分。它有一个 自己的指令指针,记录自己运行到什么地方。线程的 运行可能被抢占(中断),或暂时的被挂起(也叫睡 眠),让其它的线程运行,这叫做让步。一个进程中 的各个线程之间共享同一片数据空间,所以线程之间 可以比进程之间更方便地共享数据以及相互通讯。线 程一般都是并发执行的,正是由于这种并行和数据共 享的机制使得多个任务的合作变为可能。实际上,在 单 CPU 的系统中,真正的并发是不可能的,每个线 程会被安排成每次只运行一小会,然后就把 CPU 让 出来,让其它的线程去运行。在进程的整个运行过程 中,每个线程都只做自己的事,在需要的时候跟其它 的线程共享运行的结果。 当然,这样的共享并不是完全没有危险的。如果 多个线程共同访问同一片数据,则由于数据访问的顺 序不一样,有可能导致数据结果的不一致的问题。这 叫做竞态条件(race condition)。幸运的是,大多数 线程库都带有一系列的同步原语,来控制线程的执行 和数据的访问。 另一个要注意的地方是,由于有的函数会在完成 之前阻塞住,在没有特别为多线程做修改的情况下, 这种“贪婪”的函数会让 CPU 的时间分配有所倾 斜。导致各个线程分配到的运行时间可能不尽相同, 不尽公平。 1.3 Python、线程和全局解释器锁 1.3.1 全局解释器锁(GIL) Python 代码的执行由 Python 虚拟机(也叫解释 器主循环)来控制。Python 在设计之初就考虑到要在 主循环中,同时只有一个线程在执行,就像单 CPU 的系统中运行多个进程那样,内存中可以存放多个程 序,但任意时刻,只有一个程序在 CPU 中运行。同样 地,虽然 Python 解释器中可以“运行”多个线程, 但在任意时刻,只有一个线程在解释器中运行。 对 Python 虚拟机的访问由全局解释器锁 (GIL)来控制,正是这个锁能保证同一时刻只有一 个线程在运行。在多线程环境中,Python 虚拟机按 以下方式执行: 1. 设置 GIL 2. 切换到一个线程去运行 3. 运行: a. 指定数量的字节码指令 b. 线程主动让出控制(可以调用 time.sleep(0)) 4. 把线程设置为睡眠状态 5. 解锁 GIL 6. 再次重复以上所有步骤 在调用外部代码(如 C/C++扩展函数)的时候, GIL 将会被锁定,直到这个函数结束为止(由于在这 期间没有 Python 的字节码被运行,所以不会做线程 切换)。编写扩展的程序员可以主动解锁 GIL。 不过,Python 的开发人员则不用担心在这些情况下 你的 Python 代码会被锁住。 例如,对所有面向 I/O 的(会调用内建的操作系 统C代码的)程序来说,GIL 会在这个 I/O 调用之前 被释放,以允许其它的线程在这个线程等待 I/O 的时 候运行。如果某线程并未使用很多 I/O 操作,它会在 自己的时间片内一直占用处理器(和 GIL)。也就是 说,I/O 密集型的 Python 程序比计算密集型的程序 更能充分利用多线程环境的好处。 对源代码,解释器主循环和 GIL 感兴趣的人,可 以看看 Python/ceval.c 文件。 1.3.2 退出线程 当一个线程结束计算,它就退出了。线程可以调 用thread.exit()之类的退出函数,也可以使用 Python 退出进程的标准方法,如 sys.exit()或抛出 一个 SystemExit 异常等。不过,你不可以直接“杀 掉”("kill")一个线程。 在下面一节中,我们将要讨论两个跟线程有关的 模块。这两个模块中,我们不建议使用 thread 模 块。这样做有很多原因,很明显的一个原因是,当主 线程退出的时候,所有其它线程没有被清除就退出 了。但另一个模块 threading 就能确保所有“重要 的”子线程都退出后,进程才会结束。(我们等一会 会详细说明什么叫“重要的”,请参阅守护线程的核 心提示)。 主线程应该是一个好的管理者,它要了解每个线 程都要做些什么事,线程都需要什么数据和什么参 数,以及在线程结束的时候,它们都提供了什么结 果。这样,主线程就可以把各个线程的结果组合成一 个有意义的最后结果。 1.3.3 在Python 中使用线程 在Win32 和Linux, Solaris, MacOS, *BSD 等大 多数类 Unix 系统上运行时,Python 支持多线程编 程。Python 使用 POSIX 兼容的线程,即 pthreads。 默认情况下,从源代码编译的(2.0 及以上版本 的)Python 以及 Win32 的安装包里,线程支持是打开 的。想要从解释器里判断线程是否可用,只要简单的 在交互式解释器里尝试导入 thread 模块就行了,只 要没出现错误就表示线程可用。 >>> import thread >>> 如果你的 Python 解释器在编译时,没有打开线 程支持,导入模块会失败: >>> import thread Traceback (innermost last): File "", line 1, in ? ImportError: No module named thread 这种情况下,你就要重新编译你的 Python 解释 器才能使用线程。你可以在运行配置脚本的时候,加 上“--with-thread”参数。参考你的发布版的 README 文件,以获取如何编译支持线程的 Python 的 相关信息。 1.3.4 没有线程支持的情况 第一个例子中,我们会使用 time.sleep()函数 来演示线程是怎样工作的。time.sleep()需要一个浮 点型的参数,来指定“睡眠”的时间(单位秒)。这 就意味着,程序的运行会被挂起指定的时间。 我们要创建两个“计时循环”。一个睡眠 4秒 种,一个睡眠 2秒种,分别是 loop0()和loop1()。 (我们命名为“loop0”和“loop1”表示我们将有一 个循环的序列)。如果我们像例 1.1 的onethr.py 中 那样,在一个进程或一个线程中,顺序地执行 loop0()和loop1(),那运行的总时间为 6秒。在启 动 loop0(),loop1(),和其它的代码时,也要花去 一些时间,所以,我们看到的总时间也有可能会是 7 秒钟。 例1.1 单线程中运行的循环 (onethr.py) 在单线程中顺序执行两个循环。一定要一个循环 结束后,另一个才能开始。总时间是各个循环运行时 间之和。 1 #!/usr/bin/env python 2 3 from time import sleep, ctime 4 5 def loop0(): 6 print 'start loop 0 at:', ctime() 7 leep(4) 8 print 'loop 0 done at:', ctime() 9 10 def loop1(): 11 print 'start loop 1 at:', ctime() 12 sleep(2) 13 print 'loop 1 done at:', ctime() 14 15 def main(): 16 print 'starting at:', ctime() 17 loop0() 18 loop1() 19 print 'all DONE at:', ctime() 20 21 if __name__ == '__main__': 22 main() 我们可以通过运行 onethr.py 来验证这一点,下 面是运行的输出: $ onethr.py starting at: Sun Aug 13 05:03:34 2006 start loop 0 at: Sun Aug 13 05:03:34 2006 loop 0 done at: Sun Aug 13 05:03:38 2006 start loop 1 at: Sun Aug 13 05:03:38 2006 loop 1 done at: Sun Aug 13 05:03:40 2006 all DONE at: Sun Aug 13 05:03:40 2006 假定 loop0()和 loop1()里做的不是睡眠,而是 各自独立的,不相关的运算,各自的运算结果到最后 将会汇总成一个最终的结果。这时,如果能让这些计 算并行执行的话,那不是可以减少总的运行时间吗? 这就是我们现在要介绍的多线程编程的前提条件。 1.3.5 Python 的threading 模块 Python 提供了几个用于多线程编程的模块,包 括thread, threading 和Queue 等。thread 和 threading 模块允许程序员创建和管理线程。thread 模块提供了基本的线程和锁的支持,而 threading 提 供了更高级别,功能更强的线程管理的功能。Queue 模块允许用户创建一个可以用于多个线程之间共享数 据的队列数据结构。我们将分别介绍这几个模块,并 给出一些例子和中等大小的应用。 核心提示:避免使用 thread 模块 出于以下几点考虑,我们不建议您使用 thread 模块。首先,更高级别的 threading 模块更为先进, 对线程的支持更为完善,而且使用 thread 模块里的 属性有可能会与 threading 出现冲突。其次,低级别 的thread 模块的同步原语很少(实际上只有一 个),而 threading 模块则有很多。 不过,出于对学习 Python 和线程的兴趣,我们 将给出一点使用 thread 模块的例子。这些代码只用 于学习目的,让你对为什么应该避免使用 thread 模 块有更深的认识,以及让你了解在把代码改为使用 threading 和Queue 模块时,我们能获得多大的便 利。 另一个不要使用 thread 原因是,对于你的进程 什么时候应该结束完全没有控制,当主线程结束时, 所有的线程都会被强制结束掉,没有警告也不会有正 常的清除工作。我们之前说过,至少 threading 模块 能确保重要的子线程退出后进程才退出。 只建议那些有经验的专家在想访问线程的底层结 构的时候,才使用 thread 模块。而使用线程的新手 们则应该看看我们是如何把线程应用到我们的第一个 程序,从而增加代码的可读性,以及第一段例子如何 进化到我们本章的主要的代码的。如果可以的话,你 的第一个多线程程序应该尽可能地使用 threading 等 高级别的线程模块。 1.4 thread 模块 我们先看看 thread 模块都提供了些什么。除了 产生线程外,thread 模块也提供了基本的同步数据 结构锁对象(lock object,也叫原语锁,简单锁,互 斥锁,互斥量,二值信号量)。如之前所说,同步原 语与线程的管理是密不可分的。 表1.1 中所列的是常用的线程函数以及 LockType 类型的锁对象的方法。 start_new_thread()函数是 thread 模块的一个 关键函数,它的语法与内建的 apply()函数完全一 样,其参数为:函数,函数的参数以及可选的关键字 参数。不同的是,函数不是在主线程里运行,而是产 生一个新的线程来运行这个函数。 现在,把线程加入到我们的 onethr.py 例子中。 稍微改变一下 loop*()函数的调用方法,我们得到了 例1.2 的mtsleep1.py。 表1.1 thread 模块和锁对象 函数 描述 thread 模块函数 start_new_thread(function,args, kwargs=None) 产生一个新的线程,在新线程中用指定的参数和可选 的。Kwargs 来调用这个函数。 allocate_lock() 分配一个 LockType 类型 的锁对象 exit() 让线程退出 LockType 类型锁对象方法 acquire(wait=None) 尝试获取锁对象 locked() 如果获取了锁对象返回 True,否 则返回 False release() 释放锁 例1.2 使用 thread 模块 (mtsleep1.py) 这儿执行的是和 onethr.py 中一样的循环,不同 的是,这一次我们使用的是 thread 模块提供的简单 的多线程的机制。两个循环并发地被执行(显然,短 的那个先结束)。总的运行时间为最慢的那个线程的 运行时间,而不是所有的线程的运行时间之和。 1 #!/usr/bin/env python 2 3 import thread 4 from time import sleep, ctime 5 6 def loop0(): 7 print 'start loop 0 at:', ctime() 8 sleep(4) 9 print 'loop 0 done at:', ctime() 10 11 def loop1(): 12 print 'start loop 1 at:', ctime() 13 sleep(2) 14 print 'loop 1 done at:', ctime() 15 16 def main(): 17 print 'starting at:', ctime() 18 thread.start_new_thread(loop0, ()) 19 thread.start_new_thread(loop1, ()) 20 sleep(6) 21 print 'all DONE at:', ctime() 22 23 if __name__ == '__main__': 24 main() start_new_thread()要求一定要有前两个参数。 所以,就算我们想要运行的函数不要参数,我们也要 传一个空的元组。 这个程序的输出与之前的输出大不相同,之前是 运行了 6,7秒,而现在则是 4秒,是最长的循环的 运行时间与其它的代码的时间总和。 $ mtsleep1.py starting at: Sun Aug 13 05:04:50 2006 start loop 0 at: Sun Aug 13 05:04:50 2006 start loop 1 at: Sun Aug 13 05:04:50 2006 loop 1 done at: Sun Aug 13 05:04:52 2006 loop 0 done at: Sun Aug 13 05:04:54 2006 all DONE at: Sun Aug 13 05:04:56 2006 睡眠 4秒和 2秒的代码现在是并发执行的。这 样,就使得总的运行时间被缩短了。你可以看到, loop1 甚至在 loop0 前面就结束了。程序的一大不同 之处就是多了一个“sleep(6)”的函数调用。为什么 要加上这一句呢?因为,如果我们没有让主线程停下 来,那主线程就会运行下一条语句,显示“all done”,然后就关闭运行着 loop0()和loop1()的两 个线程,退出了。 我们没有写让主线程停下来等所有子线程结束之 后再继续运行的代码。这就是我们之前说线程需要同 步的原因。在这里,我们使用了 sleep()函数做为我 们的同步机制。我们使用 6秒是因为我们已经知道, 两个线程(你知道,一个要 4秒,一个要 2秒)在主 线程等待 6秒后应该已经结束了。 你也许在想,应该有什么好的管理线程的方法, 而不是在主线程里做一个额外的延时 6 秒的操作。 因为这样一来,我们的总的运行时间并不比单线程的 版本来得少。而且,像这样使用 sleep()函数做线程 的同步操作是不可靠的。如果我们的循环的执行时间 不能事先确定的话,那怎么办呢?这可能造成主线程 过早或过晚退出。这就是锁的用武之地了。 上一次修改程序,我们去掉了 loop 函数,现 在,我们要再一次修改程序为例 1.3 的 mtsleep2.py,引入锁的概念。运行它,我们看到, 其输出与 mtsleep1.py 很相似,唯一的区别是我们 不用为线程什么时候结束再做额外的等待。使用了 锁,我们就可以在两个线程都退出后,马上退出。 $ mtsleep2.py starting at: Sun Aug 13 16:34:41 2006 start loop 0 at: Sun Aug 13 16:34:41 2006 start loop 1 at: Sun Aug 13 16:34:41 2006 loop 1 done at: Sun Aug 13 16:34:43 2006 loop 0 done at: Sun Aug 13 16:34:45 2006 all DONE at: Sun Aug 13 16:34:45 2006 我们是怎么通过锁来完成任务的呢?先看一看代 码吧。 例1.3 使用线程和锁 (mtsleep2.py) 这里,使用锁比 mtsleep1.py 那里在主线程中使 用sleep()函数更合理。 1 #!/usr/bin/env python 2 3 import thread 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec, lock): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 lock.release() 13 14 def main(): 15 print 'starting at:', ctime() 16 locks = [] 17 nloops = range(len(loops)) 18 19 for i in nloops: 20 lock = thread.allocate_lock() 21 lock.acquire() 22 locks.append(lock) 23 24 for i in nloops: 25 thread.start_new_thread(loop, 26 (i, loops[i], locks[i])) 27 28 for i in nloops: 29 while locks[i].locked(): pass 30 31 print 'all DONE at:', ctime() 32 33 if __name__ == '__main__': 34 main() 逐行解释 1-6 行 在Unix 启动信息行后面,我们导入了 thread 模 块和 time 模块里我们早已熟悉的几个函数。我们不 再在函数里写死要等 4秒和 2秒,而是使用一个 loop()函数,把这些常量放在一个列表 loops 里。 8-12 行 loop()函数替换了我们之前的那几个 loop*()函 数。在 loop()函数里,增加了一些锁的操作。一个 很明显的改变是,我们现在要在函数中记录下循环的 号码和要睡眠的时间。最后一个不一样的地方就是那 个锁了。每个线程都会被分配一个事先已经获得的 锁,在 sleep()的时间到了之后就释放相应的锁以通 知主线程,这个线程已经结束了。 14-34 行 主要的工作在包含三个循环的 main()函数中完 成。我们先调用 thread.allocate_lock()函数创建 一个锁的列表,并分别调用各个锁的 acquire()函数 获得锁。获得锁表示“把锁锁上”。锁上后,我们就 把锁放到锁列表 locks 中。下一个循环创建线程, 每个线程都用各自的循环号,睡眠时间和锁为参数去 调用 loop()函数。为什么我们不在创建锁的循环里 创建线程呢?有以下几个原因:(1) 我们想到实现线 程的同步,所以要让“所有的马同时冲出栅栏”。(2) 获取锁要花一些时间,如果你的线程退出得“太 快”,可能会导致还没有获得锁,线程就已经结束了 的情况。 在线程结束的时候,线程要自己去做解锁操作。 最后一个循环只是坐在那一直等(达到暂停主线程的 目的),直到两个锁都被解锁为止才继续运行。由于 我们顺序检查每一个锁,所以我们可能会要长时间地 等待运行时间长且放在前面的线程,当这些线程的锁 释放之后,后面的锁可能早就释放了(表示对应的线 程已经运行完了)。结果主线程只能毫不停歇地完成 对后面这些锁的检查。最后两行代码的意思你应该已 经知道了,就是只有在我们直接运行这个脚本时,才 运行 main()函数 在核心笔记中我们就已经说过,使用 thread 模 块只是为了给读者演示如何进行多线程编程。你的多 线程程序应该使用更高级别的模块,如 threading 等。现在我们就开始讨论它。 1.5 threading 模块 接下来,我们要介绍的是更高级别的 threading 模块,它不仅提供了 Thread 类,还提供了各种非常 好用的同步机制。表 1.2 列出了 threading 模块里所 有的对象。 在这一节中,我们会演示如何使用 Thread 类来 实现多线程。之前已经介绍过锁的基本概念,这里我 们将不会提到锁原语。而 Thread 类也有某种同步机 制,所以,没有必要详细介绍锁原语。 表1.2 threading 模块对象 threading 模块对象 描述 Thread 表示一个线程的执行的对象 Lock 锁原语对象(跟 thread 模块里的锁 对象相同) RLock 可重入锁对象。使单线程可以再次 获得已经获得了的锁(递归锁定)。 Condition 条件变量对象能让一个线程停下 来,等待其它线程满足了某个“条件”。 如,状态的改变或值的改变。 Event 通用的条件变量。多个线程可以 等待某个事件的发生,在事件发生后, 所有的线程都会被激活。 Semaphore 为等待锁的线程提供一个类似“等 候室”的结构 BoundedSemaphore 与Semaphore 类似,只是它 不允许超过初始值 Timer 与Thread 相似,只是,它要等待一段时 间后才开始运行。 核心提示:守护线程 另一个避免使用 thread 模块的原因是,它不支 持守护线程。当主线程退出时,所有的子线程不论它 们是否还在工作,都会被强行退出。有时,我们并不 期望这种行为,这时,就引入了守护线程的概念 threading 模块支持守护线程,它们是这样工作 的:守护线程一般是一个等待客户请求的服务器,如 果没有客户提出请求,它就在那等着。如果你设定一 个线程为守护线程,就表示你在说这个线程是不重要 的,在进程退出的时候,不用等待这个线程退出。就 像你在第 16 章网络编程看到的,服务器线程运行在 一个无限循环中,一般不会退出。 如果你的主线程要退出的时候,不用等待那些子 线程完成,那就设定这些线程的 daemon 属性。即, 在线程开始(调用 thread.start())之前,调用 setDaemon()函数设定线程的 daemon 标志 (thread.setDaemon(True))就表示这个线程“不重 要” 如果你想要等待子线程完成再退出,那就什么都 不用做,或者显式地调用 thread.setDaemon(False) 以保证其 daemon 标志为 False。你可以调用 thread.isDaemon()函数来判断其 daemon 标志的值。 新的子线程会继承其父线程的 daemon 标志。整个 Python 会在所有的非守护线程退出后才会结束,即进 程中没有非守护线程存在的时候才结束。 1.5.1 Thread 类 threading 的Thread 类是你主要的运行对象。 它有很多 thread 模块里没有的函数,详见表 1.3。 用Thread 类,你可以用多种方法来创建线程。 我们在这里介绍三种比较相像的方法。你可以任选一 种你喜欢的,或最适合你的程序以及最能满足程序可 扩展性的(我们一般比较喜欢最后一个选择): 创建一个 Thread 的实例,传给它一个函数 创建一个 Thread 的实例,传给它一个可调用的 类对象 从Thread 派生出一个子类,创建一个这个子类 的实例 表1.3 Thread 对象的函数 函数 描述 start() 开始线程的执行 run() 定义线程的功能的函数(一般会被子 类重写) join(timeout=None) 程序挂起,直到线程结 束;如果给了 timeout,则最多阻塞 timeout 秒 getName() 返回线程的名字 setName(name) 设置线程的名字 isAlive() 布尔标志,表示这个线程是否还 在运行中 isDaemon() 返回线程的 daemon 标志 setDaemon(daemonic) 把线程的 daemon 标志设 为daemonic 一定要在调用 start()函数前调用) 创建一个 Thread 的实例,传给它一个函数 第一个例子的中,我们将初始化一个 Thread 对 象,把函数(及其参数)像上一个例子那样传进去。 在线程开始执行的时候,这个函数会被执行。把 mtsleep2.py 脚本拿过来,做一些调整加入 Thread 对象的使用,就成了例 1.4 中的 mtsleep3.py。 运行的输出跟之前很相似: $ mtsleep3.py starting at: Sun Aug 13 18:16:38 2006 start loop 0 at: Sun Aug 13 18:16:38 2006 start loop 1 at: Sun Aug 13 18:16:38 2006 loop 1 done at: Sun Aug 13 18:16:40 2006 loop 0 done at: Sun Aug 13 18:16:42 2006 all DONE at: Sun Aug 13 18:16:42 2006 那么,都做了些什么修改呢?在使用 thread 模 块时使用的锁没有了。新加了一些 Thread 对象。在 实例化每个 Thread 对象的时候,我们把函数 (target)和参数(args)传进去,得到返回的 Thread 实例。实例化一个 Thread(调用 Thread()) 与调用 thread.start_new_thread()之间最大的区别 就是,新的线程不会立即开始。在你创建线程对象, 但不想马上开始运行线程的时候,这是一个很有用的 同步特性。 例1.4 使用 threading 模块 (mtsleep3.py) threading 模块的 Thread 类有一个 join()函 数,允许主线程等待线程的结束。 1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 13 def main(): 14 print 'starting at:', ctime() 15 threads = [] 16 nloops = range(len(loops)) 17 18 for i in nloops: 19 t = threading.Thread(target=loop, 20 args=(i, loops[i])) 21 threads.append(t) 22 23 for i in nloops: # start threads 24 threads[i].start() 25 26 for i in nloops: # wait for all 27 threads[i].join() # threads to finish 28 29 print 'all DONE at:', ctime() 30 31 if __name__ == '__main__': 32 main() 所有的线程都创建了之后,再一起调用 start() 函数启动,而不是创建一个启动一个。而且,不用再 管理一堆锁(分配锁,获得锁,释放锁,检查锁的状 态等),只要简单地对每个线程调用 join()函数就 可以了。 join()会等到线程结束,或者在给了 timeout 参 数的时候,等到超时为止。使用 join()看上去会比 使用一个等待锁释放的无限循环清楚一些(这种锁也 被称为"spinlock") join()的另一个比较重要的方面是它可以完全不 用调用。一旦线程启动后,就会一直运行,直到线程 的函数结束,退出为止。如果你的主线程除了等线程 结束外,还有其它的事情要做(如处理或等待其它的 客户请求),那就不用调用 join(),只有在你要等 待线程结束的时候才要调用 join()。 创建一个 Thread 的实例,传给它一个可调用的 类对象 与传一个函数很相似的另一个方法是在创建线程 的时候,传一个可调用的类的实例供线程启动的时候 执行——这是多线程编程的一个更为面向对象的方 法。相对于一个或几个函数来说,由于类对象里可以 使用类的强大的功能,可以保存更多的信息,这种方 法更为灵活。 把ThreadFunc 类加入到 mtsleep3.py 代码中, 并做一些其它的小修改后,就得到了例 1.5 中的 mtsleep4.py。运行它,就会得到如下的输出: $ mtsleep4.py starting at: Sun Aug 13 18:49:17 2006 start loop 0 at: Sun Aug 13 18:49:17 2006 start loop 1 at: Sun Aug 13 18:49:17 2006 loop 1 done at: Sun Aug 13 18:49:19 2006 loop 0 done at: Sun Aug 13 18:49:21 2006 all DONE at: Sun Aug 13 18:49:21 2006 那么,这次又改了些什么呢?主要是增加了 ThreadFunc 类和创建 Thread 对象时会实例化一个可 调用类 ThreadFunc 的类对象。也就是说,我们实例 化了两个对象。下面,来仔细地看一看 ThreadFunc 类吧。 我们想让这个类在调用什么函数方面尽量地通 用,并不局限于那个 loop()函数。所以,我们加了 一些修改,如,这个类保存了函数的参数,函数本身 以及函数的名字字符串。构造函数__init__()里做了 这些值的赋值工作。 创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。由于我们已经有了要用的参数,所以就 不用再传到 Thread()的构造函数中。由于我们有一 个参数的元组,这时要在代码中使用 apply()函数。 如果你使用的是 Python1.6 或是更高版本,你可以使 用11.6.3 节中所说的新的调用语法,而不用像第 16 行那样使用 apply()函数: self.res = self.func(*self.args) 例1.5 使用可调用的类 (mtsleep4.py) 此例中,我们传了一个可调用的类(的实例),而 不是仅传一个函数。相对 mtsleep3.py 中的方法来 说,这样做更具面向对象的概念。 1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 class ThreadFunc(object): 9 10 def __init__(self, func, args, name=''): 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def __call__(self): 16 apply(self.func, self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: # create all threads 29 t = threading.Thread( 30 target=ThreadFunc(loop, (i, loops[i]), 31 loop.__name__)) 32 threads.append(t) 33 34 for i in nloops: # start all threads 35 threads[i].start() 36 37 for i in nloops: # wait for completion 38 threads[i].join() 39 40 print 'all DONE at:', ctime() 41 42 if __name__ == '__main__': 43 main() 从Thread 派生出一个子类,创建一个这个子类 的实例 最后一个例子介绍如何子类化 Thread 类,这与 上一个例子中的创建一个可调用的类非常像。使用子 类化创建线程(第 29-30 行)使代码看上去更清晰明 了。我们将在例 1.6 中给出 mtsleep5.py 的代码,以 及代码运行的输出。比较 mtsleep5.py 和 mtsleep4.py 的任务则留给读者做为练习。 下面是 mtsleep5.py 的输出,同样,跟我们的期 望一致: $ mtsleep5.py starting at: Sun Aug 13 19:14:26 2006 start loop 0 at: Sun Aug 13 19:14:26 2006 start loop 1 at: Sun Aug 13 19:14:26 2006 loop 1 done at: Sun Aug 13 19:14:28 2006 loop 0 done at: Sun Aug 13 19:14:30 2006 all DONE at: Sun Aug 13 19:14:30 2006 在读者比较 mtsleep4 和mtsleep5 两个模块的代 码之前,我们想指出最重要的两点改变:(1)我们的 MyThread 子类的构造函数一定要先调用基类的构造 函数(第 9 行),(2)之前的特殊函数__call__()在子 类中,名字要改为 run()。 现在,在 MyThread 类中,加入一些用于调试的 输出信息,把代码保存到 myThread 模块中(见例 1.7),并在下面的例子中,导入这个类。除了简单地 使用 apply()函数来运行这些函数之外,我们还把结 果保存到实现的 self.res 属性中,并创建一个新的 函数 getResult()来得到结果。 1.5.4 斐波那契,阶乘和累加和 例1.8 中的 mtfacfib.py 脚本比较了递归求斐波 那契,阶乘和累加和函数的运行。脚本先在单线程中 运行这三个函数,然后在多线程中做同样的事,以说 明多线程的好处。 例1.6 子类化 Thread (mtsleep5.py) 我们现在要子类化 Thread 类,而不是创建它的 实例。这样做可以更灵活地定制我们的线程对象,而 且在创建线程的时候也更简单。 1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = (4, 2) 7 8 class MyThread(threading.Thread): 9 def __init__(self, func, args, name=''): 10 threading.Thread.__init__(self) 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def run(self): 16 apply(self.func, self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: 29 t = MyThread(loop, (i, loops[i]), 30 loop.__name__) 31 threads.append(t) 32 33 for i in nloops: 34 threads[i].start() 35 36 for i in nloops: 37 threads[i].join() 38 39 print 'all DONE at:', ctime()' 40 41 if __name__ == '__main__': 42 main() 例1.7 MyThread 子类化 Thread (myThread.py) 为了让 mtsleep5.py 中,Thread 的子类更为通 用,我们把子类单独放在一个模块中,并加上一个 getResult()函数用以返回函数的运行结果。 1 #!/usr/bin/env python 2 3 import threading 4 from time import ctime 5 6 class MyThread(threading.Thread): 7 def __init__(self, func, args, name=''): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.func = func 11 self.args = args 12 13 def getResult(self): 14 return self.res 15 16 def run(self): 17 print 'starting', self.name, 'at:', \ 18 ctime() 19 self.res = apply(self.func, self.args) 20 print self.name, 'finished at:', \ 21 ctime() 在单线程中运行只要简单地逐个调用这些函数, 在函数结束后,显示对应的结果。在多线程中,我们 不马上显示结果。由于我们想让 MyThread 类尽可能 地通用(能同时适应有输出和没输出的函数),我们会 等到要结束时才会调用 getResult()函数,并在最后 显示每个函数的结果。 由于这些函数运行得很快(斐波那契函数会慢一 些),你会看到,我们得在每个函数中加上一个 sleep()函数,让函数慢下来,以便于我们能方便地 看到多线程能在多大程度上加速程序的运行。不过实 际工作中,你一般不会想在程序中加上 sleep()函数 的。下面是程序的输出: $ mtfacfib.py *** SINGLE THREAD starting fib at: Sun Jun 18 19:52:20 2006 233 fib finished at: Sun Jun 18 19:52:24 2006 例1.8 斐波那契,阶乘和累加和 (mtfacfib.py) 在这个多线程程序中,我们会分别在单线程和多 线程环境中,运行三个递归函数。 1 #!/usr/bin/env python 2 3 from myThread import MyThread 4 from time import ctime, sleep 5 6 def fib(x): 7 sleep(0.005) 8 if x < 2: return 1 9 return (fib(x-2) + fib(x-1)) 10 11 def fac(x): 12 sleep(0.1) 13 if x < 2: return 1 14 return (x * fac(x-1)) 15 16 def sum(x): 17 sleep(0.1) 18 if x < 2: return 1 19 return (x + sum(x-1)) 20 21 funcs = [fib, fac, sum] 22 n = 12 23 24 def main(): 25 nfuncs = range(len(funcs)) 26 27 print '*** SINGLE THREAD' 28 for i in nfuncs: 29 print 'starting', funcs[i].__name__, 'at:', \ 30 ctime() 31 print funcs[i](n) 32 print funcs[i].__name__, 'finished at:', \ 33 ctime() 34 35 print '\n*** MULTIPLE THREADS' 36 threads = [] 37 for i in nfuncs: 38 t = MyThread(funcs[i], (n,), 39 funcs[i].__name__) 40 threads.append(t) 41 42 for i in nfuncs: 43 threads[i].start() 44 45 for i in nfuncs: 46 threads[i].join() 47 print threads[i].getResult() 48 49 print 'all DONE' 50 51 if __name__ == '__main__': 52 main() starting fac at: Sun Jun 18 19:52:24 2006 479001600 fac finished at: Sun Jun 18 19:52:26 2006 starting sum at: Sun Jun 18 19:52:26 2006 78 sum finished at: Sun Jun 18 19:52:27 2006 *** MULTIPLE THREADS starting fib at: Sun Jun 18 19:52:27 2006 starting fac at: Sun Jun 18 19:52:27 2006 starting sum at: Sun Jun 18 19:52:27 2006 233 479001600 78 all DONE 1.5.5 threading 模块中的其它函数 除了各种同步对象和线程对象外,threading 模块还 提供了一些函数。见表 1.4。表 1.4 threading 模块 的函数 函数 描述 activeCount() 当前活动的线程对象的数量 currentThread() 返回当前线程对象 enumerate() 返回当前活动线程的列表 settrace(func) 为所有线程设置一个跟踪函数 a setprofile(func) a 为所有线程设置一个 profile 函数 a. Python 2.3 新增 1.5.5 生产者-消费者问题和 Queue 模块 最后一个例子演示了生产者和消费者的场景。生 产者生产货物,然后把货物放到一个队列 之类的数据结构中,生产货物所要花费的时间无 法预先确定。消费者消耗生产者生产 的货物的时间也是不确定的。 表1.5 常用的 Queue 模块的属性 函数 描述 Queue 模块函数 queue(size) 创建一个大小为 size 的Queue 对象 Queue 对象函数 qsize() 返回队列的大小(由于在返回的时 候,队列可能会被其它线程修改,所以这个 值是近似值) empty() 如果队列为空返回 True,否则 返回 False full() 如果队列已满返回 True,否则返回 False put(item, block=0) 把item 放到队列中,如果给了 block(不为 0),函数会一直阻塞到队列中有 空间为止 get(block=0) 从队列中取一个对象,如果给 了 block(不为 0),函数会一直阻塞到 队列中有对象为止 Queue 模块可以用来进行线程间通讯,让各个线 程之间共享数据。现在,我们创建一个队列,让生产 者(线程)把新生产的货物放进去供消费者(线程) 使用。要达到这个目的,我们要使用到 Queue 模块的 以下属性(见表 1.5)。 很容易地,我们就能写出例 1.9 的prodcons.py 的代码。 下面是这个脚本的运行输出: $ prodcons.py starting writer at: Sun Jun 18 20:27:07 2006 producing object for Q... size now 1 starting reader at: Sun Jun 18 20:27:07 2006 consumed object from Q... size now 0 producing object for Q... size now 1 consumed object from Q... size now 0 producing object for Q... size now 1 producing object for Q... size now 2 producing object for Q... size now 3 consumed object from Q... size now 2 consumed object from Q... size now 1 writer finished at: Sun Jun 18 20:27:17 2006 consumed object from Q... size now 0 reader finished at: Sun Jun 18 20:27:25 2006 all DONE 如你所见,生产者和消费者不一定是轮流执行的 (多亏有了随机数!)。实际上,真实生活总是充满 了随机性和不确定性。 逐行解释 1-6 行 在这个模块中,我们要使用 Queue.Queue 对象 以及我们在例 1.7 中给出的的线程类 myThread.MyThread。我们将使用 random.randint() 函数来随机的进行生产和消耗。并从 time 模块中导 入了常用的属性。 8-16 行 writeQ()和 readQ()函数分别用来把对象放入队 列和消耗队列中的一个对象。在这里我们使用字符串 'xxx'来表示队列中的对象。 18-26 行 writer()函数只做一件事,就是一次往队列中放 入一个对象,等待一会,然后再做同样的事,一共做 指定的次数,这个次数是由脚本运行时随机生成的。 reader()函数做的事比较类似,只是它是用来消耗对 象的。 你会注意到,writer 睡眠的时间一般会比 reader 睡眠的时间短。这可以减少 reader 尝试从空 队列中取数据的机会。writer 的睡眠时间短,那 reader 在想要数据的时候总是能拿到数据。 28-29 行 设置有多少个线程要被运行。 例1.9 生产者-消费者问题 (prodcons.py) 这个实现中使用了 Queue 对象和随机地生产 (和消耗)货物的方式。生产者和消费者相互独立并 且并发地运行。 1 #!/usr/bin/env python 2 3 from random import randint 4 from time import sleep 5 from Queue import Queue 6 from myThread import MyThread 7 8 def writeQ(queue): 9 print 'producing object for Q...', 10 queue.put('xxx', 1) 11 print "size now", queue.qsize() 12 13 def readQ(queue): 14 val = queue.get(1) 15 print 'consumed object from Q... size now', \ 16 queue.qsize() 17 18 def writer(queue, loops): 19 for i in range(loops): 20 writeQ(queue) 21 sleep(randint(1, 3)) 22 23 def reader(queue, loops): 24 for i in range(loops): 25 readQ(queue) 26 sleep(randint(2, 5)) 27 28 funcs = [writer, reader] 29 nfuncs = range(len(funcs)) 30 31 def main(): 32 nloops = randint(2, 5) 33 q = Queue(32) 34 35 threads = [] 36 for i in nfuncs: 37 t = MyThread(funcs[i], (q, nloops), 38 funcs[i].__name__) 39 threads.append(t) 40 41 for i in nfuncs: 42 threads[i].start() 43 44 for i in nfuncs: 45 threads[i].join() 46 47 print 'all DONE' 48 49 if __name__ == '__main__': 50 main() 31-47 行 最后,就到了 main()函数,它与之前的所有脚 本的 main()函数都很像。先是创建所有的线程,然 后运行它们,最后,等两个线程都结束后,得到最后 的运行结果。 从本例中,我们可以了解到,一个要完成多项任 务的程序,可以考虑每个任务使用一个线程。这样的 程序在设计上相对于单线程做所有事的程序来说,更 为清晰明了。 本章中,我们看到了单线程的程序在程序性能上 的限制。尤其在有相互独立的,运行时间不确定的多 个任务的程序里,把多个任务分隔成多个线程同时运 行会比顺序运行速度更快。由于 Python 解释器是单 线程的,所以不是所有的程序都能从多线程中得到好 处。不过,你已经对 Python 下的多线程有所了解, 在适当的时候,可以利用它来改善程序的性能。 1.6 相关模块 下表列出了一些多线程编程中可能用得到的模 块: 表1.6 多线程相关的标准库模块 模块 描述 thread 基本的,底级别的线程模块 threading 高级别的线程和同步对象 Queue 供多线程使用的同步先进先出(FIFO) 队列 mutex 互斥对象 SocketServer 具有线程控制的 TCP 和UDP 管 理器
还剩44页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 6 金币 [ 分享pdf获得金币 ] 0 人已下载

下载pdf

pdf贡献者

qingmingsq

贡献于2013-07-09

下载需要 6 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf