使用 Twisted Python 和 Treq 进行 HTTP 压力测试

jopen 8年前

从事API相关的工作很有挑战性,在高峰期保持系统的稳定及健壮性就是其中之一,这也是我们在Mailgun做很多压力测试的原因。

这么久以来,我们已经尝试了很多种方法,从简单的ApacheBench到复杂些的自定义测试套。但是本贴讲述的,是一种使用python进行“快速粗糙”却非常灵活的压力测试的方法。

使用python写HTTP客户端的时候,我们都很喜欢用

Requests library 。这也是我们向我们的API用户们推荐的。Requests 很强大,但有一个缺点,它是一个模块化的每线程一个调用的东西,很难或者说不可能用它来快速的产生成千上万级别的请求。

Treq on Twisted简介

为解决这个问题我们引入了Treq ( Github库 )。Treq是一个HTTP客户端库,受Requests影响,但是它运行在Twisted上,具有Twisted典型的强大能力:处理网络I/O时它是异步且高度并发的方式。

Treq并不仅仅限于压力测试:它是写高并发HTTP客户端的好工具,比如网页抓取。Treq很优雅、易于使用且强大。这是一个例子:

>>> from treq import get    >>> def done(response):  ...     print response.code  ...     reactor.stop()    >>> get("http://www.github.com").addCallback(done)    >>> from twisted.internet import reactor  >>> reactor.run()  200
>>> fromtreqimportget     >>> defdone(response):  ...    printresponse.code  ...    reactor.stop()     >>> get("http://www.github.com").addCallback(done)     >>> fromtwisted.internetimportreactor  >>> reactor.run()  200

简单的测试脚本

如下是一个使用Treq的简单脚本,用最大可能量的请求来对单一URL进行轰炸。

#!/usr/bin/env python  from twisted.internet import epollreactor  epollreactor.install()    from twisted.internet import reactor, task  from twisted.web.client import HTTPConnectionPool  import treq  import random  from datetime import datetime    req_generated = 0  req_made = 0  req_done = 0    cooperator = task.Cooperator()    pool = HTTPConnectionPool(reactor)    def counter():      '''This function gets called once a second and prints the progress at one      second intervals.      '''      print("Requests: {} generated; {} made; {} done".format(              req_generated, req_made, req_done))      # reset the counters and reschedule ourselves      req_generated = req_made = req_done = 0      reactor.callLater(1, counter)    def body_received(body):      global req_done      req_done += 1    def request_done(response):      global req_made      deferred = treq.json_content(response)      req_made += 1      deferred.addCallback(body_received)      deferred.addErrback(lambda x: None)  # ignore errors      return deferred    def request():      deferred = treq.post('http://api.host/v2/loadtest/messages',                           auth=('api', 'api-key'),                           data={'from': 'Loadtest <test@example.com>',                                 'to': 'to@example.org',                                 'subject': "test"},                           pool=pool)      deferred.addCallback(request_done)      return deferred    def requests_generator():      global req_generated      while True:          deferred = request()          req_generated += 1          # do not yield deferred here so cooperator won't pause until          # response is received          yield None    if __name__ == '__main__':      # make cooperator work on spawning requests      cooperator.cooperate(requests_generator())        # run the counter that will be reporting sending speed once a second      reactor.callLater(1, counter)        # run the reactor      reactor.run()
#!/usr/bin/env python  fromtwisted.internetimportepollreactor  epollreactor.install()     fromtwisted.internetimportreactor, task  fromtwisted.web.clientimportHTTPConnectionPool  importtreq  importrandom  fromdatetimeimportdatetime     req_generated = 0  req_made = 0  req_done = 0     cooperator = task.Cooperator()     pool = HTTPConnectionPool(reactor)     defcounter():      '''This function gets called once a second and prints the progress at one      second intervals.      '''      print("Requests: {} generated; {} made; {} done".format(              req_generated, req_made, req_done))      # reset the counters and reschedule ourselves      req_generated = req_made = req_done = 0      reactor.callLater(1, counter)     defbody_received(body):      global req_done      req_done += 1     defrequest_done(response):      global req_made      deferred = treq.json_content(response)      req_made += 1      deferred.addCallback(body_received)      deferred.addErrback(lambda x: None)  # ignore errors      return deferred     defrequest():      deferred = treq.post('http://api.host/v2/loadtest/messages',                          auth=('api', 'api-key'),                          data={'from': 'Loadtest <test@example.com>',                                'to': 'to@example.org',                                'subject': "test"},                          pool=pool)      deferred.addCallback(request_done)      return deferred     defrequests_generator():      global req_generated      while True:          deferred = request()          req_generated += 1          # do not yield deferred here so cooperator won't pause until          # response is received          yieldNone     if __name__ == '__main__':      # make cooperator work on spawning requests      cooperator.cooperate(requests_generator())         # run the counter that will be reporting sending speed once a second      reactor.callLater(1, counter)         # run the reactor      reactor.run()

输出结果:

2013-04-25 09:30 Requests: 327 generated; 153 sent; 153 received  2013-04-25 09:30 Requests: 306 generated; 156 sent; 156 received  2013-04-25 09:30 Requests: 318 generated; 184 sent; 154 received
2013-04-25 09:30 Requests: 327 generated; 153 sent; 153 received  2013-04-25 09:30 Requests: 306 generated; 156 sent; 156 received  2013-04-25 09:30 Requests: 318 generated; 184 sent; 154 received

“Generated”类的数字代表被Twisted反应器准备好但是还没有发送的请求。这个脚本为了简洁性忽略了所有错误处理。为它添加超时状态的信息就留给读者作为一个练习。

这个脚本可以当做是一个起始点,你可以通过拓展改进它来自定义特定应用下的处理逻辑。建议你在改进的时候用 collections.Counter 来替代丑陋的全局变量。这个脚本运行在单线程上,想通过一台机器压榨出最大量的请求的话,你可以用类似 mulitprocessing 的技术手段。

愿你乐在压力测试!

</div>

来自: http://python.jobbole.com/84156/