- 1. ZTQ异步任务队列潘俊勇
易度云办公 everydo.com
- 2. web服务中的耗时操作生成PDF
网页抓取
游戏数据备份
邮件发送
短信发送
主线程卡死!
- 3. 解决之道:异步执行ABCD同步ABCD异步:在另外的协程、线程、进程、服务器运行
- 4. 语言级实现Scala
Golang
erlang
- 5. 异步队列工作原理任务队列应用
(producer)Worker1Worker2WorkeN。。。pushpop 分布式:Worker可位于不同的机器运行
冲突处理:写操作频繁
可靠:异常,队列数据能保存
性能:worker取数据,等待Block,非轮询
- 6. 异步队列的更多场合性能优化:尽可能异步
日志记录
消息推送
串行化:避免冲突xapian索引只能单写
延时/定时运行
并行计算:分割多个任务并行执行
- 7. 队列选型之路数据库方案(ZODB : zc.async):
轮询查,低效!
频繁写,冲突!
RabbitMQ:非常复杂的消息模型
ZeroMQ:不支持Persistent
Beanstalkd:需要引入专门的服务器
Redis:提供List,支持队列
- 8. RedisRedis:
瑞士军刀!
已经用在Session、Cache
List直接支持队列:
push
brpop/plpop:阻塞式取数据,避免轮询
Persistent
Master/Slave
- 9. Redis List: 太简单底层,使用不方便
错误处理
监控
定时执行
不能查找任务
多个worker之间的工作调度
- 10. Redis之上的队列方案RedisMQ:需要另外一个server
Resque:github之作,Ruby
Pyres:
Resque的Python Clone
使用复杂,不够pythonic
Celery:
目标太大,潜在维护成本
- 11. ZTQ:Z - Task Queue基于Redis
For Python
开源
来自生产系统
易度云查看
易度云办公
文档转换、索引、日志记录、消息发送、邮件发送、短信发送、垃圾清理、redis压缩
- 12. 设计目标实现简单
容易使用
可靠
可管理:拥塞、出错
容易调试
灵活调度,高效利用服务器
- 13. 模块关系Redis
(数据存储、进程通信)Worker1WorkerNWorker2监控后台下达命令执行命令报告状态查看状态应用放入任务处理任务查询、优先错误处理
- 14. 组成包ztq_workerworker服务ztq_console 监控服务(可选)ztq_core核心API队列任务
(应用)pyramidredis配置
- 15. 安装pip install ztq_core
pip install ztq_worker
- 16. 首先:定义队列任务# ztq_demo/tasks.py
import time
from ztq_core import async
@async # 使用默认队列default
def send(body):
print ‘START: ‘, body
time.sleep(5)
print ‘END:’, body
@async(queue=‘mail’) # 使用队列mail
def send_failed(body):
print ‘FAIL START’, body
raise Exception(‘connection error’)
- 17. 接下来:运行worker# 运行:bin/ztq_worker worker.ini
[server]
host = localhost
port = 6379
db = 0
alias = w01
active_config = false
modules = ztq_demo.tasks # 所有需要import的task模块,每个一行
[queues]
default= 0 # default队列,起1个处理线程
mail = 0, 0 # mail队列,起2个处理线程
[log]
handler_file = ./ztq_worker.log
level = ERROR
- 18. 最后:测试异步运行import ztq_core
from ztq_demo.tasks import send
# 设置 Redis 连接
ztq_core.setup_redis(‘default’, ‘localhost’, 6379, 0)
send(‘hello, world’)
# 动态指定queue
send(‘hello world from mail’, ztq_queue=‘mail’)
- 19. 好,喘口气小窥下监控后台
- 20. 安装运行pip install ztq_console
bin/pserve app.ini
- 21. 当前worker状态
- 22. 队列情况
- 23. 错误处理
- 24. 队列执行日志
- 25. Worker运行日志
- 26. 更多特性。。。
- 27. 抢占式执行# 后插入先执行
# 如果任务已经在队列,会优先
send (body, ztq_first=True)
- 28. Ping: 探测任务状态# running: 运行; queue:排队中;
# error: 出错; none: 不存在
ztq_core.ping_task(send, body)
# ztq_first存在就优先; ztq_run不存在就运行
ztq_core.ping_task(send, body,
ztq_first=True, ztq_run=True)
- 29. 事务:transactionimport transaction
ztq_core.enable_transaction(True)
send_mail(from1, to1, body1)
send_mail(from2, to2, body2)
transaction.commit()
send_mail(from2, to2, body2, ztq_transaction=False) # 非事务
- 30. Cron:定时任务from async import async
import redis_wrap
from ztq_core import has_cron, add_cron
@async(queue='clock-0')
def bgrewriteaof():
""" 将redis的AOF文件压缩 """
redis = redis_wrap.get_redis()
redis.bgrewriteaof()
# 自动定时压缩reids
if not has_cron(bgrewriteaof):
add_cron({'hour':1}, bgrewriteaof)
- 31. 任务串行:callbackfrom ztq_core import prepare_task
callback = prepare_task(send, body)
send_mail(body,
ztq_callback=callback)
- 32. 多级串行from ztq_core import prepare_task
callback1 = prepare_task(send, body)
callback2 = prepare_task(send2, body, ztq_callback=callback1)
send (body, ztq_callback=callback2)
- 33. 异常处理:fcallbackfrom ztq_core import prepare_task
@async(queue='mail')
def fail_callback(return_code, return_msg):
print return_code, return_msg
fcallback = prepare_task(send2)
send(body, ztq_fcallback=fcallback)
- 34. 进度回调:pcallbackfrom ztq_core import prepare_task
pcallback = prepare_task(send2, body)
send_mail(body, ztq_pcallback=pcallback)
- 35. 抛出进度信息import ztq_worker
@async(queue=‘xxx’)
def doc2pdf(filename):
…
# 可被进度回调函数调用
ztq_worker.report_progress(page=2)
- 36. 拥塞:批处理加速# 为提升性能,需要多个xapian索引操作,一次性提交数据库
@async(queue=‘xapian’)
def index(data):
pass
def do_commit():
xapian_conn.commit()
# 每执行20个索引任务之后,一次性提交数据库
# 不够20个,但队列空的时候,也会提交
register_batch_queue(‘xapian’, 20, batch_func=do_commit)
- 37. 内部原理更多血淋淋的细节
- 38. 任务的序列化异步任务注册表
‘send_mail’ -> send_mail@async
def send_mail(from, to, body)注册send(from, to, body){‘func_name’:’send_mail’,
‘args’: (from,to,body),
‘kw’:{},
‘callback’:’’,
‘callback_args’: (),
‘callback_kw’: {},
}Worker执行生成Json,加入队列
- 39. 完整的任务信息
- 40. 任务ID?直接根据任务JSON,生成MD5,作为ID
方便查询任务是否已经存在
避免出现重复的任务
也是问题:不可插入完全相同的任务!
附加一个参数,来区分
- 41. 任务队列的Redis存储设计md5Task(json)任务md5索引
Hash任务队列1 (List)
故障队列1 (List)任务队列2
故障队列2Push任务队列和错误队列一一对应,方便管理
- 42. Worker模型工作线程1工作线程 2工作线程 N工作线程管理器指令线程(Command thread)Config.ini指令队列工作队列调度错误队列状态报告workerRedis
task报告报告pullpushworker指令线程:工作机状态报告:线程工作调度;杀死/取消进程
Task可报告工作进程的pid,监控后台可下指令杀死卡死的进程
- 43. 智能调度脚本管理上百台worker服务器?
工作是否饱和程度
自动调整工作安排
是可能的!
读取worker的CPU、内存情况
根据闲忙,调整任务的分配
- 44. TODO延时执行
支持协程,用于下载
优化监控后台的代码
改进cron
需要TestCase
- 45. 总结Redis:分布式计算的通信中心
感谢PyCONChina,让我们有时间开源
- 46. 项目信息Github:https://github.com/everydo/ztq
主要作者
徐陶哲http://weibo.com/xutaozhe
潘俊勇http://weibo.com/panjunyong