Python 并发编程之使用多线程和多处理器

jopen 5年前

在Python编码中我们经常讨论的一个方面就是如何优化模拟执行的性能。尽管在考虑量化代码时NumPy、SciPy和pandas在这方面已然非常有用,但在构建事件驱动系统时我们无法有效地使用这些工具。有没有可以加速我们代码的其他办法?答案是肯定的,但需要留意!

在这篇文章中,我们看一种不同的模型-并发,我们可以将它引入我们Python程序中。这种模型在模拟中工作地特别好,它不需要共享状态。Monte Carlo模拟器可以用来做期权定价以及检验算法交易等类型的各种参数的模拟。

我们将特别考虑Threading库和Multiprocessing库。

Python并发

当Python初学者探索多线程的代码为了计算密集型优化时,问得最多的问题之一是:”当我用多线程的时候,为什么我的程序变慢了?“

在多核机器上,我们期望多线程的代码使用额外的核,从而提高整体性能。不幸的是,主Python解释器(CPython)的内部并不是真正的多线程,是通过一个全局解释锁(GIL)来进行处理的。

GIL是必须的,因为Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/0操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高,但是当它使用多处理库时,性能可以获得提高。

并行库的实现

现在,我们将使用上面所提到的两个库来实现对一个“小”问题进行并发优化。

线程库

上面我们提到: 运行CPython解释器的Python不会支持通过多线程来实现多核处理。不过,Python确实有一个线程库。那么如果我们(可能)不能使用多个核心进行处理,那么使用这个库能取得什么好处呢?

许多程序,尤其是那些与网络通信或者数据输入/输出(I/O)相关的程序,都经常受到网络性能或者输入/输出(I/O)性能的限制。这样Python解释器就会等待哪些从诸如网络地址或者硬盘等“远端”数据源读写数据的函数调用返回。因此这样的数据访问比从本地内存或者CPU缓冲区读取数据要慢的多。

因此,如果许多数据源都是通过这种方式访问的,那么就有一种方式对这种数据访问进行性能提高,那就是对每个需要访问的数据项都产生一个线程 。

举个例子,假设有一段Python代码,它用来对许多站点的URL进行扒取。再假定下载每个URL所需时间远远超过计算机CPU对它的处理时间,那么仅使用一个线程来实现就会大大地受到输入/输出(I/O)性能限制。

通过给每个下载资源生成一个新的线程,这段代码就会并行地对多个数据源进行下载,在所有下载都结束的时候再对结果进行组合。这就意味着每个后续下载都不会等待前一个网页下载完成。此时,这段代码就受收到客户/服务端带宽的限制。

不过,许多与财务相关的应用都受到CPU性能的限制,这是因为这样的应用都是高度集中式的对数字进行处理。这样的应用都会进行大型线性代数计算或者数值的随机统计,比如进行蒙地卡罗模拟统计。所以只要对这样的应用使用Python和全局解释锁(GIL),此时使用Python线程库就不会有任何性能的提高。

Python实现

下面这段依次添加数字到列表的“玩具”代码,举例说明了多线程的实现。每个线程创建一个新的列表并随机添加一些数字到列表中。这个已选的“玩具”例子对CPU的消耗非常高。

下面的代码概述了线程库的接口,但是他不会比我们用单线程实现的速度更快。当我们对下面的代码用多处理库时,我们将看到它会显著的降低总的运行时间。

让我们检查一下代码是怎样工作的。首先我们导入threading库。然后我们创建一个带有三个参数的函数list_append。第一个参数count定义了创建列表的大小。第二个参数id是“工作”(用于我们输出debug信息到控制台)的ID。第三个参数out_list是追加随机数的列表。

__main__函数创建了一个107的size,并用两个threads执行工作。然后创建了一个jobs列表,用于存储分离的线程。threading.Thread对象将list_append函数作为参数,并将它附加到jobs列表。

最后,jobs分别开始并分别“joined”。join()方法阻塞了调用的线程(例如主Python解释器线程)直到线程终止。在打印完整的信息到控制台之前,确认所有的线程执行完成。

# thread_test.pyimport randomimport threadingdef list_append(count, id, out_list):   """   Creates an empty list and then appends a    random number to the list 'count' number   of times. A CPU-heavy operation!   """   for i in range(count):    out_list.append(random.random())if __name__ == "__main__":   size = 10000000   # Number of random numbers to add   threads = 2   # Number of threads to create     # Create a list of jobs and then iterate through   # the number of threads appending each thread to   # the job list    jobs = []   for i in range(0, threads):    out_list = list()    thread = threading.Thread(target=list_append(size, i, out_list))    jobs.append(thread)     # Start the threads (i.e. calculate the random number lists)   for j in jobs:    j.start()     # Ensure all of the threads have finished   for j in jobs:    j.join()     print "List processing complete."

我们能在控制台中调用如下的命令time这段代码

time python thread_test.py

将产生如下的输出

List processing complete.  real    0m2.003s  user    0m1.838s  sys     0m0.161s

注意user时间和sys时间相加大致等于real时间。这表明我们使用线程库没有获得性能的提升。我们期待real时间显著的降低。在并发编程的这些概念中分别被称为CPU时间和挂钟时间(wall-clock time)

多进程处理库

为了充分地使用所有现代处理器所能提供的多个核心 ,我们就要使用多进程处理库 。它的工作方式与线程库完全不同 ,不过两种库的语法却非常相似 。

多进程处理库事实上对每个并行任务都会生成多个操作系统进程。通过给每个进程赋予单独的Python解释器和单独的全局解释锁(GIL)十分巧妙地规避了一个全局解释锁所带来的问题。而且每个进程还可独自占有一个处理器核心,在所有进程处理都结束的时候再对结果进行重组。

不过也存在一些缺陷。生成许多进程就会带来很多I/O管理问题,这是因为多个处理器对数据的处理会引起数据混乱 。这就会导致整个运行时间增多 。不过,假设把数据限制在每个进程内部 ,那么就可能大大的提高性能 。当然,再怎么提高也不会超过阿姆达尔法则所规定的极限值。

Python实现

使用Multiprocessing实现仅仅需要修改导入行和multiprocessing.Process行。这里单独的向目标函数传参数。除了这些,代码几乎和使用Threading实现的一样:

# multiproc_test.pyimport randomimport multiprocessingdef list_append(count, id, out_list):   """   Creates an empty list and then appends a    random number to the list 'count' number   of times. A CPU-heavy operation!   """   for i in range(count):    out_list.append(random.random())if __name__ == "__main__":   size = 10000000   # Number of random numbers to add   procs = 2   # Number of processes to create     # Create a list of jobs and then iterate through   # the number of processes appending each process to   # the job list    jobs = []   for i in range(0, procs):    out_list = list()    process = multiprocessing.Process(target=list_append,                                    args=(size, i, out_list))    jobs.append(process)     # Start the processes (i.e. calculate the random number lists)     for j in jobs:    j.start()     # Ensure all of the processes have finished   for j in jobs:    j.join()     print "List processing complete."

控制台测试运行时间:

time python multiproc_test.py

得到如下输出:

List processing complete.  real    0m1.045s  user    0m1.824s  sys     0m0.231s

在这个例子中可以看到user和sys时间基本相同,而real下降了近两倍。之所以会这样是因为我们使用了两个进程。扩展到四个进程或者将列表的长度减半结果如下(假设你的电脑至少是四核的):

List processing complete.  real    0m0.540s  user    0m1.792s  sys     0m0.269s

使用四个进程差不多提高了3.8倍速度。但是,在将这个规律推广到更大范围,更复杂的程序上时要小心。数据转换,硬件cacha层次以及其他一些问题会减弱加快的速度。

在下一篇文章中我们会将Event-Driben Basketer并行化,从而提高其运行多维参数寻优的能力。

相关阅读: