python高性能代码之多线程优化

gpob3582 8年前
   <p>以常见的端口扫描器为实例端口扫描器的原理很简单,操作socket来判断连接状态确定主机端口的开放情况。</p>    <pre>  <code class="language-python">import socket   def scan(port):     s = socket.socket()     if s.connect_ex(('localhost', port)) == 0:       print port, 'open'     s.close()   if __name__ == '__main__':     map(scan,range(1,65536))   </code></pre>    <p>这是一个socket扫描器的基本代码。</p>    <p>但是如果直接运行会等待很长时间都没有反应,这是因为socket是阻塞的,到等待每个连接超时后才会进入下一个连接。</p>    <p>给这段代码加一个超时</p>    <pre>  <code class="language-python">s.settimeout(0.1)  </code></pre>    <p>完整的代码如下</p>    <pre>  <code class="language-python">import socket   def scan(port):     s = socket.socket()     s = settimeont(0.1)    if s.connect_ex(('localhost', port)) == 0:       print port, 'open'     s.close()   if __name__ == '__main__':     map(scan,range(1,65536))   </code></pre>    <p>本文的重点不在于扫描器功能部分。而重点在于代码质量的提升和优化从而提升代码的运行效率。</p>    <h3><strong>多线程版本:</strong></h3>    <pre>  <code class="language-python">import socket   import threading   def scan(port):     s = socket.socket()     s.settimeout(0.1)     if s.connect_ex(('localhost', port)) == 0:       print port, 'open'     s.close()      if __name__ == '__main__':     threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)]     map(lambda x:x.start(),threads)   </code></pre>    <p>Run起来,速度确实快了不少,但是抛出了异常:thread.error: can't start new thread</p>    <p>这个进程开启了65535个线程,有两种可能,一种是超过最大线程数了,一种是超过最大socket句柄数了。在linux可以通过ulimit来修改。</p>    <p>如果不修改最大限制,怎么用多线程不报错呢?</p>    <p>加个queue,变成生产者-消费者模式,开固定线程。</p>    <h3><strong>多线程+队列版本:</strong></h3>    <pre>  <code class="language-python">import socket   import threading   from Queue import Queue   def scan(port):     s = socket.socket()     s.settimeout(0.1)     if s.connect_ex(('localhost', port)) == 0:       print port, 'open'     s.close()      def worker():     while not q.empty():       port = q.get()       try:         scan(port)       finally:         q.task_done()      if __name__ == '__main__':     q = Queue()     map(q.put,xrange(1,65535))     threads = [threading.Thread(target=worker) for i in xrange(500)]     map(lambda x:x.start(),threads)     q.join()   </code></pre>    <p>开500个线程,不停的从队列中取出任务来进行...</p>    <h3><strong>multiprocessing + 队列版本:</strong></h3>    <p>总不能开65535个进程吧?还是用生产者消费者模式</p>    <pre>  <code class="language-python">import multiprocessing   def scan(port):     s = socket.socket()     s.settimeout(0.1)     if s.connect_ex(('localhost', port)) == 0:       print port, 'open'     s.close()      def worker(q):     while not q.empty():       port = q.get()       try:         scan(port)       finally:         q.task_done()      if __name__ == '__main__':     q = multiprocessing.JoinableQueue()     map(q.put,xrange(1,65535))     jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)]     map(lambda x:x.start(),jobs)   </code></pre>    <p>注意这里把队列作为一个参数传入到worker中去,因为是process safe的queue,不然会报错。</p>    <p>还有用的是JoinableQueue(),顾名思义就是可以join()的。</p>    <h3><strong>gevent的spawn版本:</strong></h3>    <pre>  <code class="language-python">from gevent import monkey; monkey.patch_all();   import gevent   import socket   ...   if __name__ == '__main__':     threads = [gevent.spawn(scan, i) for i in xrange(1,65536)]     gevent.joinall(threads)   </code></pre>    <p>注意monkey patch必须在被patch的东西之前import,不然会Exception KeyError.比如不能先import threading,再monkey patch.</p>    <h3><strong>gevent的Pool版本:</strong></h3>    <pre>  <code class="language-python">from gevent import monkey; monkey.patch_all();   import socket   from gevent.pool import Pool   ...   if __name__ == '__main__':     pool = Pool(500)     pool.map(scan,xrange(1,65536))     pool.join()   </code></pre>    <h3><strong>concurrent.futures版本:</strong></h3>    <pre>  <code class="language-python">import socket   from Queue import Queue   from concurrent.futures import ThreadPoolExecutor   ...   if __name__ == '__main__':     q = Queue()     map(q.put,xrange(1,65536))     with ThreadPoolExecutor(max_workers=500) as executor:       for i in range(500):         executor.submit(worker,q)   </code></pre>    <p> </p>    <p>来自:http://www.cnblogs.com/lfoder/p/5883143.html</p>    <p> </p>