Ruby并发框架简介


Ruby并发框架简介 @ShiningRay 曹⼒ https://gitcafe.com/ShiningRay Ruby并发差吗? 并发是什么? 同时进⾏多个任务 单个任务的阻塞不会影响其他任务 ⾼并发 != ⾼性能 Ruby != Rails ⼀般的 Rails:并发靠进程数 根据应⽤场景 选择正确的并发模型! 选择正确的并发模型 • node.js —— Reactor模型 • Erlang —— Actor模型 • Clojure —— Software Transactional Memory • Go —— CSP / pi-calculus Ruby⼀样可以实现他们! EventMachine http://rubyeventmachine.com/ Reactor模型 require 'eventmachine' ! module EchoServer def post_init puts "-- someone connected to the echo server!" end def receive_data data send_data ">>>you sent: #{data}" close_connection if data =~ /quit/i end ! def unbind puts "-- someone disconnected from the echo server!" end end ! EventMachine.run { EventMachine.start_server "127.0.0.1", 8081, EchoServer } EventMachine EchoServer EventMachine通过回调模块的继承提供了常⽤的协议解 析,也便于进⾏协议的扩展 各种协议 • HeaderAndContentProtocol • LineAndTextProtocol • Memcache • ObjectProtocol • Stomp • SmtpServer SmtpClient • SASLauth • etc 常⽤数据结构 • Channel • Deferrable • Queue • Timer • PeriodicTimer • Pool • ProcessWatcher • FileStreamer • FileWatcher • Iterator EM-Synchrony https://github.com/igrigorik/em-synchrony EventMachine + Fiber EventMachine.run { page = EventMachine::HttpRequest.new('http://google.ca/').get page.errback { p "Google is down! terminate?" } page.callback { } } EventMachine中进⾏异步 HTTP请求 EventMachine.synchrony do page = EventMachine::HttpRequest.new("http:// www.google.com").get ! p "No callbacks! Fetched page: #{page}" EventMachine.stop end EM-Synchonry中进⾏同步 HTTP请求 Celluloid http://celluloid.io Actor-based concurrent object framework for Ruby Painless multithreaded programming for Ruby Actor 模型 –Alan Kay, creator of Smalltalk, on the meaning of "object oriented programming" “I thought of objects being like biological cells and/or individual computers on a network, only able to communicate with messages” Actor基本理念 • Actor是并发的单元 • Actor之间没有任何状态共享 • Actor之间通过异步的消息进⾏通讯 • 消息放在 Actor的邮箱中逐个处理 Actor模型的理念与 Ruby 的⾯向对象理念不谋⽽ 合 class Sheen include Celluloid ! def initialize(name) @name = name end ! def set_status(status) @status = status end ! def report "#{@name} is #{@status}" end end ⽤ Celluloid写⼀个 Actor >> charlie = Sheen.new "Charlie Sheen" => # >> charlie.set_status "winning!" => "winning!" >> charlie.report => "Charlie Sheen is winning!" >> charlie.async.set_status "asynchronously winning!" => nil >> charlie.report => "Charlie Sheen is asynchronously winning!" Celluloid相关库 • celluloid-io⾼性能事件 IO库,兼容 Ruby标准库的同 步接⼝ • dcell • reel HTTP应⽤服务器 • Sidekiq ⾼性能后台任务处理框架 使⽤ Celluloid::IO的 Echo Server require 'celluloid/io' require 'celluloid/autostart' ! class EchoServer include Celluloid::IO finalizer :shutdown ! def initialize(host, port) puts "*** Starting echo server on #{host}:#{port}" ! @server = TCPServer.new(host, port) async.run end ! def shutdown @server.close if @server end ! def run loop { async.handle_connection @server.accept } end ! def handle_connection(socket) _, port, host = socket.peeraddr puts "*** Received connection from #{host}:#{port}" loop { socket.write socket.readpartial(4096) } rescue EOFError puts "*** #{host}:#{port} disconnected" socket.close end end Fault-Tolerant • 采⽤ Erlang的核⼼理念 • 实现了 Linking, Supervisor, Supervisor Groups Fault-Tolerant Fault-Tolerant One-For-One restart One-For-All restart Fault-Tolerant EchoServer.supervise_as :echo_server EchoServer实例 Crash后, Supervisor⾃动重启新的实例 基于 Reel,⽀持 WebSocket的 EchoServer require 'reel' ! Reel::Server::HTTP.supervise("0.0.0.0", 3000) do |connection| # Support multiple keep-alive requests per connection connection.each_request do |request| # WebSocket support if request.websocket? puts "Client made a WebSocket request to: #{request.url}" websocket = request.websocket ! websocket << "Hello everyone out there in WebSocket land" websocket.close else puts "Client requested: #{request.method} #{request.url}" request.respond :ok, "Hello, world!" end end end ! sleep Concurrent Ruby http://concurrent-ruby.com/ 丰富的并发数据结构 • 异步并发结构: Agent, Async, Future/Promise ScheduledTask, TimerTask。甚⾄包括⼀个轻量级的 Actor模型的实现。 • 各种线程安全的变量结构,如 I结构, M结构以及软 件事务内存。 • Java中借来的各种并发⼯具, executors, thread pool等等 Software Transactional Memory 软件事务内存 使⽤与数据库中类似的⽅式处理共享数据的并发 简单的转账例⼦ 确保原⼦性 a = BankAccount.new(100_000) b = BankAccount.new(100) ! a.value -= 10 b.value += 10 a = BankAccount.new(100_000) b = BankAccount.new(100) ! original_a = a.value a.value -= 10 ! begin b.value += 10 rescue e => a.value = original_a raise e end 确保线程安全 lock.synchronize do a.value -= 10 b.value += 10 end a.lock.synchronize do b.lock.synchronize do a.value -= 10 b.value += 10 end end locks_needed = [a.lock, b.lock] locks_in_order = locks_needed.sort{ |x, y| x.number <=> y.number } ! locks_in_order[0].synchronize do locks_in_order[1].synchronize do a.value -= 10 b.value += 10 end end 使⽤ TVar和 atomically a = TVar.new(100_000) b = TVar.new(100) ! Concurrent::atomically do a.value -= 10 b.value += 10 end 软件事务内存 • ⽆需明确使⽤任何锁操作 • 在 atomically的代码块中, STM会根据所⽤到的 TVar ⾃动进⾏加锁 • 如果出现错误,则会把 TVar的状态恢复到之前版本 Agent https://github.com/igrigorik/agent Example: Goroutine Generator c = channel!(Integer) ! go! do i = 0 loop { c << (i += 1) } end ! p c.receive.first # => 1 p c.receive.first # => 2 Example: Multi-channel selector cw = channel!(Integer, 1) cr = channel!(Integer, 1) ! select! do |s| s.case(cr, :receive) { |value| do_something(value) } s.case(cw, :send, 3) end cr = channel!(Integer, 1) ! select! do |s| s.case(cr, :receive) { |value| do_something(value) } s.timeout(1.0) { puts :timeout } end 仅供参考 End 谢谢⼤家
还剩45页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 1 人已下载

下载pdf

pdf贡献者

n5bn

贡献于2016-02-20

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf