Python3 的多线程学习笔记


Python3 Python3 Python3 Python3 的的的的threading threading threading threading 学习笔记学习笔记学习笔记学习笔记 作者 :罗 寅兵 日期 : 2012-11-14 一、一、一、一、线程基础线程基础线程基础线程基础 1111、、、、线程状态线程状态线程状态线程状态 新建就绪 死亡运行 同步阻塞 | 等待阻塞 | I O 阻塞 调度结束 满足阻塞条件 启动 解除阻塞 2222、、、、线程互斥锁同步控制线程互斥锁同步控制线程互斥锁同步控制线程互斥锁同步控制 多 个线 程需 要修 改同 一共 享数 据时 ,需 要进 行同 步控 制, 引入 了锁 的概 念。 锁定池访问共享数据 请求锁定 访问完成 释放锁定 锁定 线 程 获得锁定 a、同 一时 间可 能有 多个 线程 在锁 定池 中, 它们 处于 同步 阻塞 状态 竞争 锁定 ; b、同 一时 间只 能有 一个 线程 获得 锁定 处于 运行 状态 。 3333、、、、条件变量(线程通信)条件变量(线程通信)条件变量(线程通信)条件变量(线程通信) 有 的线 程需 要预 备条 件才 能干 活。 锁定池访问共享数据 请求锁定 访问完成 释放锁定 锁定 需 要 条 件 的 线 程 等待池 条件不足 等待通知 收到通知 请求锁定 获得锁定 锁定池访问共享数据 请求锁定 访问完成 释放锁定 锁定 创 造 条 件 的 线 程 等待池获得锁定 创造条件 发送通知 二、二、二、二、threadingthreadingthreadingthreading:线程创建、启动、睡眠、退出:线程创建、启动、睡眠、退出:线程创建、启动、睡眠、退出:线程创建、启动、睡眠、退出 1111、方法一:将要执行的函数作为参数传递给 、方法一:将要执行的函数作为参数传递给 、方法一:将要执行的函数作为参数传递给 、方法一:将要执行的函数作为参数传递给 threading.Thread()threading.Thread()threading.Thread()threading.Thread() 以上例子创建了5个线程去执行func 函数。获得的结果可能是5000500050005000,但也有时候会出现错误,解决方法请继续阅 读 下文 。 要 点: a. threading.Thread(target=func, args=(10,)):func 为 函数 名, args 为 函数 参数 (必 须以 元组 的形 式传 递 ); b. t.start(): 启 动函 数, 等待 操作 系统 调度 ; c. 函 数运 行结 束, 线程 也就 结束 ; d. time.sleep(): 线 程进 入睡 眠, 处于 IO 阻 塞状 态。 1111、方法二:继承 、方法二:继承 、方法二:继承 、方法二:继承 threading.Thread()threading.Thread()threading.Thread()threading.Thread()类,并重写 类,并重写 类,并重写 类,并重写 run()run()run()run()【推荐使用】【推荐使用】【推荐使用】【推荐使用】 #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time def func(n): global count time.sleep(0.1) for i in range(n): count += 1 if __name__==’__main__’: count = 0 threads = [] for i in range(5): threads.append(threading.Thread(target=func, args=(1000,))) for t in threads: t.start() time.sleep(5) print(‘count:’,count) #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time class myThread(threading.Thread): def __init__(self, n): threading.Thread.__init__(self) self.myThread_n = n def run(self): global count for i in range(self.myThread_n): count += 1 if __name__==’__main__’: count = 0 threads = [] for i in range(5): threads.append(myThread(1000)) for t in threads: t.start() time.sleep(5) print(‘count:’,count) 要 点: a. threading.Thread.__init__(self):回 调父 类方 法 。如 果你 重写 了 __init__()方法,这 一步 是必 须的 ,否 则出 错 。 三、三、三、三、threadingthreadingthreadingthreading:线程同步锁互斥控制:线程同步锁互斥控制:线程同步锁互斥控制:线程同步锁互斥控制 因 为线 程是 乱序 执行 的, 在某 种情 况下 上面 的两 个例 子, 输出 的结 果可 能不 是预 期的 值。 我 将第 2例 的线 程类 修改 下, 让问 题更 加突 出, 然后 你每 次运 行的 时候 再把 线程 数也 修改 下, 并计 算出 预期 结果 和运 行 结 果对 比。 一定 要多 试几 次哦 。 是 不是 输出 的结 果和 预期 结果 不一 致啊 ,呵 呵! 因为 多个 线程 都在 同时 操作 同一 个共 享资 源, 所以 造成 资源 破坏 。不 过 我 们可 以通 过同 步锁 来解 决这 种问 题: 要 点: a. lock = threading.Lock(): 创建 锁 ; b. lock.acquire([timeount]): 请求 锁定 , 如 果设 定了 timeout, 则在 超时 后通 过返 回值 可以 判断 是否 得到 了锁 , class myThread(threading.Thread): def __init__(self, n): threading.Thread.__init__(self) self.myThread_n = n def run(self): global count for i in range(self.myThread_n): __Temp = count time.sleep(0.0001) count = __Temp + 1 #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time class myThread(threading.Thread): def __init__(self, n): threading.Thread.__init__(self) self.myThread_n = n def run(self): global count for i in range(self.myThread_n): if lock.acquire(): __Temp = count time.sleep(0.0001) count = __Temp + 1 lock.release() if __name__==’__main__’: count = 0 lock = threading.Lock() #同 步锁 ,也 称互 斥量 threads = [] for i in range(5): threads.append(myThread(1000)) for t in threads: t.start() time.sleep(5) print(‘count:’,count) 从 而可 以进 行一 些其 他的 处理 ; c. lock.release(): 释放 锁定 。 四、四、四、四、threadingthreadingthreadingthreading:线程死锁和递归锁:线程死锁和递归锁:线程死锁和递归锁:线程死锁和递归锁 1111、、、、死锁死锁死锁死锁 a. 在 线程 间共 享多 个资 源的 时候 ,如 果两 个线 程分 别占 有一 部分 资源 并且 同时 等待 对方 的资 源, 就会 造成 死锁 , 因 为系 统判 断这 部分 资源 都正 在使 用 ,所 有这 两个 线程 在无 外力 作用 下将 一直 等待 下去 。下 面是 一个 死锁 的例 子: b. 当 一个 线程已 经获 得了 锁, 再此 请求 锁也 会出 现死 锁, 请看 下面 的例 子: #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time class myThread(threading.Thread): def doA(self): if lockA.acquire(): print(self.name,”got lockA.”) time.sleep(0.0001) if lockB.acquire(): print(self.name,”got lockB”) lockB.release() lockA.release() def doB(self): if lockB.acquire(): print(self.name,”got lockB.”) time.sleep(0.0001) if lockA.acquire(): print(self.name,”got lockA”) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__==’__main__’: lockA = threading.Lock() lockB = threading.Lock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() #等 待线 程结 束, 后面 再讲 。 2222、、、、递归锁(也称可重入锁)递归锁(也称可重入锁)递归锁(也称可重入锁)递归锁(也称可重入锁) 上 一例 子的 死锁 ,我 们可 以用 递归 锁解 决: 为 了支 持在 同一 线程 中多 次请 求同 一资 源 ,python 提 供了 “可 重入 锁 ”:threading.RLock。RLock 内 部维 护着 一个 Lock 和 一个 counter 变 量, counter 记 录了 acquire 的 次数 ,从 而使 得资 源可 以被 多次 require。 直到 一个 线程 所有 的 acquire 都 被release, 其他 的线 程才 能获 得资 源。 递 归锁 一般 应用 于复 杂的 逻辑 。 五、五、五、五、threadingthreadingthreadingthreading:条件变量同步:条件变量同步:条件变量同步:条件变量同步 有一类线程需要满足条件之后才能够继续执行,Python 提供了threading.Condition 对象用于条件变量线程的支持,它除 了 能提 供 RLock()或Lock()的 方法 外, 还提 供了 wait()、notify()、notifyAll()方 法。 lock_con = threading.Condition([Lock/Rlock]): 锁是 可选 选项 ,不 传人 锁, 对象 自动 创建 一个 RLock()。 wait(): 条件 不满 足时 调用 ,线 程会 释放 锁并 进入 等待 阻塞 ; notify(): 条件 创造 后调 用, 通知 等待 池激 活一 个线 程; #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time import random class myThread(threading.Thread): def toHex(self): global L if lock.acquire(1): for i in range(len(L)): if type(L[i]) is int: L[i]=”{:X}”.format(L[i]) lock.release() else: print(self.name,“错 误, 系统 忙 ”) def run(self): global L if lock.acquire(1): L.append(random.randint(0, 15)) self.toHex() lock.release() else: print(self.name,“错 误, 系统 忙 ”) if __name__==’__main__’: L = [] lock = threading.Lock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() #等 待线 程结 束, 后面 再讲 。 #lock = threading.Lock() 注 释掉 此行 ,并 加入 下行 。 lock = threading.RLock() notifyAll(): 条件 创造 后调 用, 通知 等待 池激 活所 有线 程。 六、六、六、六、threadingthreadingthreadingthreading:条件同步:条件同步:条件同步:条件同步 条 件同 步和 条件 变量 同步 差不 多意 思, 只是 少了 锁功 能, 因为 条件 同步 设计 于不 访问 共享 资源 的条 件环 境。 event = threading.Event(): 条件 环境 对象 ,初 始值 为 False; event.isSet(): 返回 event 的 状态 值; event.wait(): 如果 event.isSet()==False 将 阻塞 线程 ; event.set(): 设置 event 的 状态 值为 True, 所有 阻塞 池的 线程 激活 进入 就绪 状态 ,等 待操 作系 统调 度; event.clear(): 恢复 event 的 状态 值为 False。 实 例: #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time from random import randint class Producer(threading.Thread): def run(self): global L while True: val = randint(0,100) print(self.name, ”:Append ”+str(val), L) if lock_con.acquire(): l.append(val) lock_con.notify() lock_con.release() time.sleep(3) class Consumer(threading.Thread): global L while True: if lock_con.acquire(): if len(L)==0: lock_con.wait() else: print(self.name, “:DELETE”+L[0],L) del L[0] lock_con.release() time.sleep(0.25) if __name__==’__main__’: L = [] lock_con = threading.Condition() threads = [] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join() 七、七、七、七、threadingthreadingthreadingthreading:线程合并与后台线程:线程合并与后台线程:线程合并与后台线程:线程合并与后台线程 1111、、、、线程合并线程合并线程合并线程合并 join()方法,使得一个线程可以等待另一个线程执行结束后再继续运行。这个方法还可以设定一个timeout 参数,避 免 无休 止的 等待 。因 为两 个线 程顺 序完 成, 看起 来象 一个 线程 ,所 以称 为线 程的 合并 。【 前面 有演 示, 就不 多写 了】 2222、、、、后台线程后台线程后台线程后台线程 默 认情 况下 ,主 线程 在退 出时 会等 待所 有子 线程 的结 束。 如果 希望 主线 程不 等待 子线 程, 而是 在退 出时 自动 结束 所 有 的子 线程 ,就 需要 设置 子线 程为 后台 线程 :setDaemon(True) ((((在Windows Windows Windows Windows 下的Py3.3 Py3.3 Py3.3 Py3.3 调 试失 败, Linux Linux Linux Linux 下面OK)OK)OK)OK) #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time class Boss(threading.Thread): def run(self): print(“BOSS: 今晚 大家 都要 加班 到 22:00。”) event.isSet() or event.set() time.sleep(5) print(“BOSS:<22:00>可 以下 班了 。 ”) event.isSet() or event.set() class Worker(threading.Thread): def run(self): event.wait() print(“Worker:哎……命 苦啊 ! ”) time.sleep(0.25) event.clear() event.wait() print(“Worker:Oh Yeah !”) if __name__==’__main__’: event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() import threading, time class myThread(threading.Thread): def run(self): print(self.name, “is Start.”) time.sleep(5) print(self.name, “is Finished..”) if __name__==’__main__’: t = myThread() t.setDaemon(True) t.start() print(“main thread is finished.”) 八、八、八、八、threadingthreadingthreadingthreading:信号量:信号量:信号量:信号量 信 号量 用来 控制 线程 并发 数的 , BoundedSemaphore 或Semaphore 管 理一 个内 置的 计数 器, 每当 调用 acquire()时-1, 调用release() 时+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用 release()。 BoundedSemaphore 与Semaphore的 唯一 区别 在于 前者 将在 调 用release()时 检查 计数 器的 值是 否超 过了 计数 器的 初始 值 ,如 果超 过了 将抛 出一 个异 常。 九、九、九、九、threadingthreadingthreadingthreading:定时器:定时器:定时器:定时器 timer()是Thread 的 派生 类, 用于 在指 定时 间后 调用 一个 方法 。【 我不 知道 用继 承的 方法 写】 十、十、十、十、threadingthreadingthreadingthreading:线程局部数据:线程局部数据:线程局部数据:线程局部数据 local()类 用于 存储 线程 局部 数据 ,即 线程 和其 他线 程之 间直 接无 法共 享, 互相 不影 响使 用。 #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading, time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__==’__main__’: semaphore = threading.Semaphore(5) thrs = [] for i in range(100): thrs.append(myThread()) for t in thrs: t.start() #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading def func(): print(“Hello World.”) t = threading.Timer(5, func) t.start() import threading class myThread(threading.Thread): def run(self): local.name = “Obama” print(local.name) if __name__==”__main__”: local = threading.local() local.name = “Lady Gaga” myThread().start() print(local.name) 十一、十一、十一、十一、threadingthreadingthreadingthreading:其他函数:其他函数:其他函数:其他函数 threading.isAlive() 判 断线 程是 否还 在运 行中 threading.isDaemon() 返 回线 程的 daemon 标志 threading.getName() 返 回线 程名 threading.current_thread() 返 回当 前线 程句 柄 threading.enumerate() 返 回正 在运 行的 线程 列表 threading.activeCount() 返 回正 在运 行的 线程 数量 十二、十二、十二、十二、多线程利器:列队同步(多线程利器:列队同步(多线程利器:列队同步(多线程利器:列队同步(queuequeuequeuequeue)))) python 有queue 模 块方 便线 程的 生产 者与 使用 者信 息列 队的 安全 交换 。 queue 定 义了 3种 信息 列队 模式 类: a. Queue([maxsize]):FIFO 列 队模 式, [maxsize]定 义列 队容 量, 缺省 为 0, 即无 穷大 ; b. LifoQueue([maxsize]):LIFO 列 队模 式; c. PriorityQueue([maxsize]):优 先级 列队 模式 ,使 用此 列队 时, 项目 应该 是 (priority,data)的 形式 。 queue 列 队类 的方 法: a. q.put(item[,block,[timeout]]):将item 放 入列 队 ,如果block 设 置为 True[默认]时,列 队满 了 ,调 用者 将被 阻塞 , 否 则抛 出 Full 异 常。 timeout 设 置阻 塞超 时; b. q.get([block,[timeout]]):从 列队 中取 出一 个项 目; c. q.qsize():返 回列 队大 小; d. q.full():如 果列 队满 了返 回 True, 否则 为 False; e. q.empty():如 果列 队为 空返 回 True, 否则 为 False; f. q.queue.*:列 队项 目操 作方 法集 合 <和 列表 方法 很类 似 >; queue 列 队实 例: #!/usr/bin/env python3 #-*- coding:utf-8 -*- import threading from time import sleep from random import randint class Production(threading.Thread): def run(self): while True: q.put(randint(0, 100)) Sleep(3) class Proces(threading.Thread): def run(self): while True: re = q.get() if re % 2 == 0: print(re) if __name__==’__main__’: q = queue.Queue(10) threads = [Production(), Proces()] for t in threads: t.start()
还剩8页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享pdf获得金币 ] 1 人已下载

下载pdf

pdf贡献者

wxgwin

贡献于2013-06-04

下载需要 10 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf