Python并行处理

fsyanyong 6年前
   <h2>简介</h2>    <p>当你在机器上启动某个程序时,它只是在自己的“bubble”里面运行,这个气泡的作用就是用来将同一时刻运行的所有程序进行分离。这个“bubble”也可以称之为 <a href="/misc/goto?guid=4959755276955714440" rel="nofollow,noindex">进程</a> ,包含了管理该程序调用所需要的一切。</p>    <p>例如,这个所谓的进程环境包括该进程使用的 <a href="/misc/goto?guid=4959755277043590477" rel="nofollow,noindex">内存页</a> ,处理该进程打开的文件,用户和组的访问权限,以及它的整个命令行调用,包括给定的参数。</p>    <p>此信息保存在UNIX/Linux系统的流程文件系统中,该系统是一个虚拟文件系统,可通过 <a href="/misc/goto?guid=4959755277135053014" rel="nofollow,noindex">/proc</a> 目录进行访问。条目都已经根据进程ID排过序了,该ID是每个进程的唯一标识符。示例1显示了具有进程ID#177的任意选择的进程。</p>    <p>示例1:可用于进程的信息</p>    <p><img src="https://simg.open-open.com/show/0e5dd248a9467e6a6c815d549d5fe189.png"></p>    <h2>构建程序代码以及数据</h2>    <p>程序越复杂,就越有助于将其分成较小的模块。不仅仅源代码是这样,在机器上执行的代码也同样适用于这条规则。该规则的典型范例就是使用子进程并行执行。这背后的想法就是:</p>    <ul>     <li>单个进程包含了可以单独运行的代码段</li>     <li>某些代码段可以同时运行,因此原则上允许并行</li>     <li>使用现代处理器和操作系统的特性,例如可以使用处理器的所有核心,这样就可以减少程序的总执行时间</li>     <li>减少程序/代码的复杂性,并将工作外包专门的代理</li>    </ul>    <p>使用子进程需要重新考虑程序的执行方式,从线性到并行。它类似于将公司的工作视角从普通员工转变为经理——你必须关注谁在做什么,某个步骤需要多长时间,以及中间结果之间的依赖关系。</p>    <p>这有利于将代码分割成更小的部分,这些更小的部分可以由专门用于此任务的代理执行。如果还没有想清楚,试想一下数据集的构造原理,它也是同样的道理,这样就可以由单个代理进行有效的处理。但是这也引出了一些问题:</p>    <ul>     <li>为什么要将代码并行化?落实到具体案例中或者在努力的过程中,思考这个问题有意义吗?</li>     <li>程序是否打算只运行一次,还是会定期运行在类似的数据集上?</li>     <li>能把算法分成几个单独的执行步骤吗?</li>     <li>数据是否允许并行化?如果不允许,那么数据组织将以何种方式进行调整?</li>     <li>计算的中间结果是否相互依赖?</li>     <li>需要对硬件进行调整吗?</li>     <li>在硬件或算法中是否存在瓶颈,如何避免或者最小化这些因素的影响?</li>     <li>并行化的其他副作用有哪些?</li>    </ul>    <p>可能的用例就是主进程,以及后台运行的等待被激活的守护进程(主/从)。此外,这可能是启动按需运行的工作进程的一个主要过程。在实践中,主要的过程是一个馈线过程,它控制两个或多个被馈送数据部分的代理,并在给定的部分进行计算。</p>    <p>请记住,由于操作系统所需要的子进程的开销,并行操作既昂贵又耗时。与以线性方式运行两个或多个任务相比,在并行的情况下,根据您的用例,可以在每个子过程中节省25%到30%的时间。例如,如果在系列中执行了两项消耗5秒的任务,那么总共需要10秒的时间,并且在并行化的情况下,在多核机器上平均需要8秒。有3秒是用于各种开销,即这部分是无法压缩和优化的,所以速度提高是有极限的。</p>    <h2>运行与Python并行的函数</h2>    <p>Python提供了四种可能的处理方式。首先可以使用 <a href="/misc/goto?guid=4959755277229811508" rel="nofollow,noindex">multiprocessing</a> 模块并行执行功能。第二,进程的替代方法是线程。从技术上讲,这些都是轻量级的进程,不在本文的范围之内。想了解更加详细的内容,可以看看Python的 <a href="/misc/goto?guid=4959755277312428101" rel="nofollow,noindex">线程模块</a> 。第三,可以使用 os 模块的 system() 方法或 subprocess 模块提供的方法调用外部程序,然后收集结果。</p>    <p>multiprocessing 模块涵盖了一系列方法来处理并行执行例程。这包括进程,代理池,队列以及管道。</p>    <p><em>清单1</em> 使用了五个代理程序池,同时处理三个值的块。对于代理的数量和对 chunksize 的值都是任意选择的,用于演示目的。根据处理器中核心的数量来调整这些值。</p>    <p>Pool.map() 方法需要三个参数 - 在数据集的每个元素上调用的函数,数据集本身和 chunksize 。在清单1中,我们使用 square 函数,并计算给定整数值的平方。此外, chunksize 不是必须的。如果未明确设置,则默认 chunksize 为1。</p>    <p>请注意,代理商的执行订单不能保证,但结果集的顺序是正确的。它根据原始数据集的元素的顺序包含平方值。</p>    <p>清单1:并行运行函数</p>    <p><img src="https://simg.open-open.com/show/5b792c909b911ce5f01c171807c1edad.png"></p>    <p>运行此代码应该产生以下输出:</p>    <p><img src="https://simg.open-open.com/show/efce62c084c28a235e2141c73b92764c.png"></p>    <p>注意:我们将使用Python 3作为这些例子。</p>    <h2>使用队列运行多个函数</h2>    <p>作为数据结构,队列是非常普遍的,并且以多种方式存在。 它被组织为 <a href="/misc/goto?guid=4959755277403379465" rel="nofollow,noindex">先进先出</a> (FIFO)或先进先出(LIFO)/ <a href="/misc/goto?guid=4959755277491333244" rel="nofollow,noindex">堆栈</a> ,以及有和没有优先级(优先级队列)。 数据结构被实现为具有固定数量条目的数组,或作为包含可变数量的单个元素的列表。</p>    <p>在 <em>列表2.1-2.7</em> 中,我们使用FIFO队列。 它被实现为已经由来自 multiprocessing 模块的相应类提供的列表。此外, time 模块被加载并用于模拟工作负载。</p>    <p>清单2.1:要使用的模块</p>    <p><img src="https://simg.open-open.com/show/726704ae9109d8182abd5b21a4500eee.png"></p>    <p>接下来,定义一个worker函数( <em>清单2.2</em> )。 该函数实际上代表代理,需要三个参数。进程名称指示它是哪个进程, tasks 和 results 都指向相应的队列。</p>    <p>在工作函数里面是一个 while 循环。 tasks 和 results 都是在主程序中定义的队列。 tasks.get() 从要处理的任务队列中返回当前任务。小于0的任务值退出 while 循环,返回值为-1。任何其他任务值都将执行一个计算(平方),并返回此值。将值返回到主程序实现为 result.put() 。这将在 results 队列的末尾添加计算值。</p>    <p>清单2.2:worker函数</p>    <p><img src="https://simg.open-open.com/show/f7108759d62d911436d64aeded9edfb4.png"></p>    <p>下一步是主循环(参见 <em>清单2.3</em> )。首先,定义了 <a href="/misc/goto?guid=4959011575212588183" rel="nofollow,noindex">进程间通信</a> (IPC)的经理。接下来,添加两个队列,一个保留任务,另一个用于结果。</p>    <p>清单2.3:IPC和队列</p>    <p><img src="https://simg.open-open.com/show/17e626fd9d17c122370890a6fc9038e4.png"></p>    <p>完成此设置后,我们定义一个具有四个工作进程(代理)的进程池。我们使用类 multiprocessing.Pool() ,并创建一个它的实例。 接下来,我们定义一个空的进程列表( <em>见清单2.4</em> )。</p>    <p>清单2.4:定义一个进程池</p>    <p><img src="https://simg.open-open.com/show/6f8c748530451bec87caf421c11d4fc8.png"></p>    <p>作为以下步骤,我们启动了四个工作进程(代理)。 为了简单起见,它们被命名为“P0”到“P3”。使用 multiprocessing.Pool() 完成创建四个工作进程。这将它们中的每一个连接到worker功能以及任务和结果队列。 最后,我们在进程列表的末尾添加新初始化的进程,并使用 new_process.start() 启动新进程(参见 <em>清单2.5</em> )。</p>    <p>清单2.5:准备worker进程</p>    <p><img src="https://simg.open-open.com/show/c1c3394ad143c7226c66feede17d21d6.png"></p>    <p>我们的工作进程正在等待工作。我们定义一个任务列表,在我们的例子中是任意选择的整数。这些值将使用 tasks.put() 添加到任务列表中。每个工作进程等待任务,并从任务列表中选择下一个可用任务。 这由队列本身处理(见 <em>清单2.6</em> )。</p>    <p>清单2.6:准备任务队列</p>    <p><img src="https://simg.open-open.com/show/538224263f6824a802e011d9a71698b7.png"></p>    <p>过了一会儿,我们希望我们的代理完成。 每个工作进程对值为-1的任务做出反应。 它将此值解释为终止信号,此后死亡。 这就是为什么我们在任务队列中放置尽可能多的-1,因为我们有进程运行。 在死机之前,终止的进程会在结果队列中放置-1。 这意味着是代理正在终止的主循环的确认信号。</p>    <p>在主循环中,我们从该队列读取,并计数-1。 一旦我们计算了我们有过程的终止确认数量,主循环就会退出。 否则,我们从队列中输出计算结果。</p>    <p>清单2.7:结果的终止和输出</p>    <p><img src="https://simg.open-open.com/show/33f6bb5071992f53c32e6912f0905fae.png"></p>    <p><em>示例2</em> 显示了Python程序的输出。 运行程序不止一次,您可能会注意到,工作进程启动的顺序与从队列中选择任务的进程本身不可预测。 但是,一旦完成结果队列的元素的顺序与任务队列的元素的顺序相匹配。</p>    <p>示例2</p>    <p><img src="https://simg.open-open.com/show/5f496ec18d6d0f4097b014b3b7107780.png"></p>    <p>注意:如前所述,由于执行顺序不可预测,您的输出可能与上面显示的输出不一致。</p>    <h2>使用os.system()方法</h2>    <p>system() 方法是 <a href="/misc/goto?guid=4959755277603461211" rel="nofollow,noindex">os模块</a> 的一部分,它允许在与Python程序的单独进程中执行外部命令行程序。 system() 方法是一个阻塞调用,你必须等到调用完成并返回。 作为UNIX / Linux拜物教徒,您知道可以在后台运行命令,并将计算结果写入重定向到这样的文件的输出流(参见 <em>示例3</em> ):</p>    <p>示例3:带有输出重定向的命令</p>    <p><img src="https://simg.open-open.com/show/81c0b5e8d1faba9da9fc92979fdbadad.png"></p>    <p>在Python程序中,您只需简单地封装此调用,如下所示:</p>    <p>清单3:使用os模块进行简单的系统调用</p>    <p><img src="https://simg.open-open.com/show/f2e159c8cb9cf7feb66a26c31a960286.png"></p>    <p>此系统调用创建一个与当前Python程序并行运行的进程。 获取结果可能会变得有点棘手,因为这个调用可能会在你的Python程序结束后终止 - 你永远都不会知道。</p>    <p>使用这种方法比我描述的先前方法要贵得多。 首先,开销要大得多(进程切换),其次,它将数据写入物理内存,比如一个需要更长时间的磁盘。 虽然这是一个更好的选择,你的内存有限(像RAM),而是可以将大量输出数据写入固态磁盘。</p>    <h2>使用子进程模块</h2>    <p>该模块旨在替换 os.system() 和 os.spawn() 调用。 <a href="/misc/goto?guid=4959755277693594581" rel="nofollow,noindex">子过程</a> 的想法是简化产卵过程,通过管道和信号与他们进行通信,并收集他们生成的输出包括错误消息。</p>    <p>从Python 3.5开始,子进程包含方法 subprocess.run() 来启动一个外部命令,它是底层 subprocess.Popen() 类的包装器。 作为示例,我们启动UNIX/Linux命令 df -h ,以查找机器的 / home 分区上仍然有多少磁盘空间。在Python程序中,您可以执行如下所示的调用( <em>清单4</em> )。</p>    <p>清单4:运行外部命令的基本示例</p>    <p><img src="https://simg.open-open.com/show/90e22c6be5b10a0de2a2f00e720489ef.png"></p>    <p>这是基本的调用,非常类似于在终端中执行的命令 df -h / home 。请注意,参数被分隔为列表而不是单个字符串。输出将与示例4相似。与此模块的官方Python文档相比,除了调用的返回值之外,它将调用结果输出到 stdout 。</p>    <p><em>示例4</em> 显示了我们的呼叫的输出。输出的最后一行显示命令的成功执行。调用 subprocess.run() 返回一个类 CompletedProcess 的实例,它有两个名为 args (命令行参数)的属性和 returncode (命令的返回值)。</p>    <p>示例4:运行清单4中的Python脚本</p>    <p><img src="https://simg.open-open.com/show/d2efa30c6a05824abd3eba766fb9e3fb.png"></p>    <p>要抑制输出到 stdout ,并捕获输出和返回值进行进一步的评估, subprocess.run() 的调用必须稍作修改。没有进一步修改, subprocess.run() 将执行的命令的输出发送到 stdout ,这是底层Python进程的输出通道。 要获取输出,我们必须更改此值,并将输出通道设置为预定义值 subprocess.PIPE 。清单5显示了如何做到这一点。</p>    <p>清单5:抓取管道中的输出</p>    <p><img src="https://simg.open-open.com/show/c3874b7d323729f35fd2f6aa1c8d39e2.png"></p>    <p>如前所述, subprocess.run() 返回一个类 CompletedProcess 的实例。在清单5中,这个实例是一个简单命名为 output 的变量。该命令的返回码保存在属性 output.returncode 中,打印到 stdout 的输出可以在属性 output.stdout 中找到。 请注意,这不包括处理错误消息,因为我们没有更改输出渠道。</p>    <h2>结论</h2>    <p>由于现在的硬件已经很厉害了,因此也给并行处理提供了绝佳的机会。Python也使得用户即使在非常复杂的级别,也可以访问这些方法。正如在 multiprocessing 和 subprocess 模块之前看到的那样,可以让你很轻松的对该主题有很深入的了解。</p>    <p> </p>    <p>来自:http://blog.csdn.net/dev_csdn/article/details/78424704</p>    <p> </p>