高性能的消息框架 go-disruptor

GleQav 8年前
   <p>Java程序员都知道, <a href="/misc/goto?guid=4958857406893331306" rel="nofollow,noindex">Disruptor</a> 是一个高性能的线程间通信的框架,即在同一个JVM进程中的多线程间消息传递,由LMAX开发。</p>    <p>Disruptor性能是如此之高,LMAX利用它可以处理每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。那么Go语言生态圈中有没有这样的库呢?</p>    <p><a href="/misc/goto?guid=4959675740128920947" rel="nofollow,noindex">go-disruptor</a> 就是对Java Disruptor的移植,它也提供了与Java Disruptor类似的API设计,使用起来也算不上麻烦。</p>    <p>至于性能呢,下面就会介绍,这也是本文的重点。</p>    <p>因为Disruptor的高性能, 好多人对它都有所关注, 有一系列的文章介绍Disruptor,比如下列的文章和资源:</p>    <ul>     <li><a href="/misc/goto?guid=4959675740227817536" rel="nofollow,noindex">Disruptor Google Group</a></li>     <li><a href="/misc/goto?guid=4959675740305666559" rel="nofollow,noindex">Bad Concurrency</a> (Michael Barker)</li>     <li><a href="/misc/goto?guid=4959675740392487096" rel="nofollow,noindex">LMAX</a> (Planet)</li>     <li><a href="/misc/goto?guid=4959675740488208661" rel="nofollow,noindex">LMAX Exchange</a></li>     <li><a href="/misc/goto?guid=4959619587769777873" rel="nofollow,noindex">Disruptor presentation @ QCon SF</a></li>     <li><a href="/misc/goto?guid=4959675740603254384" rel="nofollow,noindex">Disruptor Technical Paper</a></li>     <li><a href="/misc/goto?guid=4958529908407552812" rel="nofollow,noindex">Mechanical Sympathy</a> (Martin Thompson)</li>     <li><a href="/misc/goto?guid=4959675740723841408" rel="nofollow,noindex">Martin Fowler's Technical Review</a></li>     <li><a href="/misc/goto?guid=4959675740801040282" rel="nofollow,noindex">.NET Disruptor Port</a></li>     <li><a href="/misc/goto?guid=4959675740875558604" rel="nofollow,noindex">Introduction to the Disruptor</a></li>     <li><a href="/misc/goto?guid=4959675740970316034" rel="nofollow,noindex">Disruptor wiki</a></li>    </ul>    <p>也有一些中文的翻译和介绍,比如 <a href="/misc/goto?guid=4959618569401777925" rel="nofollow,noindex">并发编程网的Disrutpor专题</a> 。</p>    <p><a href="/misc/goto?guid=4959675741085571633" rel="nofollow,noindex">阿里巴巴封仲淹:如何优雅地使用Disruptor</a> 。</p>    <p>Disruptor由LMAX开发,LMAX目标是要称为世界上最快的交易平台,为了取得低延迟和高吞吐率的目标,它们不得不开发一套高性能的生产者-消费者的消息框架。Java自己的Queue的性能还是有所延迟的,下图就是Disruptor和JDK ArrayBlockingQueue的性能比较。</p>    <p><img src="https://simg.open-open.com/show/6ccaefdb9b6ea3e856579cfa8cbdb328.png"></p>    <p>X轴显示的是延迟时间,Y轴是操作次数。可以看到Disruptor的延迟小,吞吐率高。</p>    <p>Disruptor有多种使用模型和配置,官方的一些模型的测试结果的链接在 <a href="/misc/goto?guid=4959618569235981727" rel="nofollow,noindex">这里</a> 。</p>    <p>我想做的其实就是go-disruptor和官方的Java Disruptor的性能比较。因为Disruptor有多种配置方式,单生产者和多生产者,单消费者和多消费者,配置的不同性能差别还是蛮大的,所以公平地讲,两者的比较应该使用相同的配置,尽管它们是由不同的编程语言开发的。</p>    <p>我选取的一个测试方案是:3个生产者和一个消费者,如果使用一个生产者Java Disruptor的性能会成倍的提升。</p>    <h2>Java Disruptor</h2>    <p>Java的测试主类如下:</p>    <pre>  <code class="language-java">publicclassMain{  privatestaticfinalintNUM_PUBLISHERS =3;//Runtime.getRuntime().availableProcessors();  privatestaticfinalintBUFFER_SIZE =1024*64;  privatestaticfinallongITERATIONS =1000L *1000L *20L;  privatefinalExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS +1, DaemonThreadFactory.INSTANCE);  privatefinalCyclicBarrier cyclicBarrier =newCyclicBarrier(NUM_PUBLISHERS +1);      privatefinalRingBuffer<ValueEvent> ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE,newBusySpinWaitStrategy());    privatefinalSequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  privatefinalValueAdditionEventHandler handler =newValueAdditionEventHandler();  privatefinalBatchEventProcessor<ValueEvent> batchEventProcessor =newBatchEventProcessor<>(ringBuffer, sequenceBarrier, handler);  privatefinalValueBatchPublisher[] valuePublishers =newValueBatchPublisher[NUM_PUBLISHERS];     {  for(inti =0; i < NUM_PUBLISHERS; i++)   {   valuePublishers[i] = newValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS,16);   }     ringBuffer.addGatingSequences(batchEventProcessor.getSequence());   }      publiclongrunDisruptorPass()throwsException   {  finalCountDownLatch latch =newCountDownLatch(1);   handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS));     Future<?>[] futures = newFuture[NUM_PUBLISHERS];  for(inti =0; i < NUM_PUBLISHERS; i++)   {   futures[i] = executor.submit(valuePublishers[i]);   }   executor.submit(batchEventProcessor);      longstart = System.currentTimeMillis();   cyclicBarrier.await(); //start test    for(inti =0; i < NUM_PUBLISHERS; i++)   {   futures[i].get();   } //all published     latch.await(); //all handled    longopsPerSecond = (ITERATIONS *1000L) / (System.currentTimeMillis() - start);   batchEventProcessor.halt();    returnopsPerSecond;   }    publicstaticvoidmain(String[] args)throwsException   {   Main m = newMain();   System.out.println("opsPerSecond:"+ m.runDisruptorPass());   }  }  </code></pre>    <p>生产者和消费者类如下:</p>    <pre>  <code class="language-java">publicfinalclassValueAdditionEventHandlerimplementsEventHandler<ValueEvent>  {  privatelongvalue =0;  privatelongcount;  privateCountDownLatch latch;    publiclonggetValue()   {  returnvalue;   }    publicvoidreset(finalCountDownLatch latch,finallongexpectedCount)   {   value = 0;  this.latch = latch;   count = expectedCount;   }    @Override  publicvoidonEvent(finalValueEvent event,finallongsequence,finalbooleanendOfBatch)throwsException   {   value = event.getValue();    if(count == sequence)   {   latch.countDown();   }   }  }  </code></pre>    <pre>  <code class="language-java">publicfinalclassValueBatchPublisherimplementsRunnable  {  privatefinalCyclicBarrier cyclicBarrier;  privatefinalRingBuffer<ValueEvent> ringBuffer;  privatefinallongiterations;  privatefinalintbatchSize;    publicValueBatchPublisher(  finalCyclicBarrier cyclicBarrier,  finalRingBuffer<ValueEvent> ringBuffer,  finallongiterations,  finalintbatchSize)   {  this.cyclicBarrier = cyclicBarrier;  this.ringBuffer = ringBuffer;  this.iterations = iterations;  this.batchSize = batchSize;   }    @Override  publicvoidrun()   {  try   {   cyclicBarrier.await();    for(longi =0; i < iterations; i += batchSize)   {  longhi = ringBuffer.next(batchSize);  longlo = hi - (batchSize -1);  for(longl = lo; l <= hi; l++)   {   ValueEvent event = ringBuffer.get(l);   event.setValue(l);   }   ringBuffer.publish(lo, hi);   }   }  catch(Exception ex)   {  thrownewRuntimeException(ex);   }   }  }  </code></pre>    <pre>  <code class="language-java">publicfinalclassValueEvent  {  privatelongvalue;    publiclonggetValue()   {  returnvalue;   }    publicvoidsetValue(finallongvalue)   {  this.value = value;   }    publicstaticfinalEventFactory<ValueEvent> EVENT_FACTORY =newEventFactory<ValueEvent>()   {  publicValueEventnewInstance()   {  returnnewValueEvent();   }   };  }  </code></pre>    <p>生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。</p>    <p>实际测试每秒能达到 <strong>183486238</strong> 的吞吐率, 也就是1.8亿的吞吐率。</p>    <h2>go-disruptor</h2>    <p>下面看看go-disruptor的性能能达到多少。</p>    <p>我们知道,Go语言内置的goroutine之间的消息传递是通过channel实现的,go-disruptor官方网站上比较了go-disruptor和channel的性能,明显go-disruptor要比channel要好:</p>    <table>     <thead>      <tr>       <th>cenario</th>       <th>Per Operation Time</th>      </tr>     </thead>     <tbody>      <tr>       <td>Channels: Buffered, Blocking, GOMAXPROCS=1</td>       <td>58.6 ns</td>      </tr>      <tr>       <td>Channels: Buffered, Blocking, GOMAXPROCS=2</td>       <td>86.6 ns</td>      </tr>      <tr>       <td>Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write</td>       <td>194 ns</td>      </tr>      <tr>       <td>Channels: Buffered, Non-blocking, GOMAXPROCS=1</td>       <td>26.4 ns</td>      </tr>      <tr>       <td>Channels: Buffered, Non-blocking, GOMAXPROCS=2</td>       <td>29.2 ns</td>      </tr>      <tr>       <td>Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write</td>       <td>110 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Reserve One</td>       <td>4.3 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Reserve Many</td>       <td>1.0 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Reserve One, Multiple Readers</td>       <td>4.5 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Reserve Many, Multiple Readers</td>       <td>0.9 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Await One</td>       <td>3.0 ns</td>      </tr>      <tr>       <td>Disruptor: Writer, Await Many</td>       <td>0.7 ns</td>      </tr>      <tr>       <td>Disruptor: SharedWriter, Reserve One</td>       <td>13.6 ns</td>      </tr>      <tr>       <td>Disruptor: SharedWriter, Reserve Many</td>       <td>2.5 ns</td>      </tr>      <tr>       <td>Disruptor: SharedWriter, Reserve One, Contended Write</td>       <td>56.9 ns</td>      </tr>      <tr>       <td>Disruptor: SharedWriter, Reserve Many, Contended Write</td>       <td>3.1 ns</td>      </tr>     </tbody>    </table>    <p>在与Java Disruptor相同的测试条件下go-disruptor的性能呢?</p>    <p>下面是测试代码:</p>    <pre>  <code class="language-java">packagemain    import(  "fmt"  "runtime"  "sync"  "time"     disruptor "github.com/smartystreets/go-disruptor"  )    const(   RingBufferSize =1024*64   RingBufferMask = RingBufferSize -1   ReserveOne =1   ReserveMany =16   ReserveManyDelta = ReserveMany -1   DisruptorCleanup = time.Millisecond *10  )    varringBuffer = [RingBufferSize]int64{}    funcmain() {   NumPublishers :=3//runtime.NumCPU()   totalIterations := int64(1000*1000*20)   iterations := totalIterations / int64(NumPublishers)   totalIterations = iterations * int64(NumPublishers)     fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers)     runtime.GOMAXPROCS(NumPublishers)  varconsumer = &countConsumer{TotalIterations: totalIterations, Count:0}   consumer.WG.Add(1)     controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared()   controller.Start()  defercontroller.Stop()    varwg sync.WaitGroup   wg.Add(NumPublishers +1)    varsendWG sync.WaitGroup   sendWG.Add(NumPublishers)    fori :=0; i < NumPublishers; i++ {  gofunc() {   writer := controller.Writer()   wg.Done()   wg.Wait()   current := disruptor.InitialSequenceValue  forcurrent < totalIterations {   current = writer.Reserve(ReserveMany)    forj := current - ReserveMany; j <= current; j++ {   ringBuffer[j&RingBufferMask] = j   }   writer.Commit(current-ReserveMany, current)   }     sendWG.Done()   }()   }     wg.Done()   t := time.Now().UnixNano()   wg.Wait() //waiting for ready as a barrier   fmt.Println("start to publish")     sendWG.Wait()   fmt.Println("Finished to publish")     consumer.WG.Wait()   fmt.Println("Finished to consume")//waiting for consumer     t = (time.Now().UnixNano() - t) /1000000//ms     fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)  }    typecountConsumerstruct{   Count int64   TotalIterations int64   WG sync.WaitGroup  }    func(cc *countConsumer) Consume(lower, upperint64) {  forlower <= upper {   message := ringBuffer[lower&RingBufferMask]  ifmessage != lower {   warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message)   fmt.Printf(warning)  panic(warning)   }   lower++   cc.Count++  //fmt.Printf("count: %d, message: %d\n", cc.Count-1, message)  ifcc.Count == cc.TotalIterations {   cc.WG.Done()  return   }   }  }  </code></pre>    <p>实际测试go-disruptor的每秒的吞吐率达到 <strong>137931020</strong> 。</p>    <p>好了,至少我们在相同的测试case情况下得到了两组数据,另外我还做了相同case情况的go channel的测试,所以一共三组数据:</p>    <ul>     <li>Java Disruptor : 183486238 ops/s</li>     <li>go-disruptor : 137931020 ops/s</li>     <li>go channel : 6995452 ops/s</li>    </ul>    <p>可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已经足够高了,达到1.4亿/秒,所以它还是值的我们关注的。go channel的性能远远不如前两者。</p>    <h2>Go Channel</h2>    <p>如果通过Go Channel实现,每秒的吞吐率为 6995452。</p>    <p>代码如下:</p>    <pre>  <code class="language-java">funcmain() {   NumPublishers :=3//runtime.NumCPU()   totalIterations := int64(1000*1000*20)   iterations := totalIterations / int64(NumPublishers)   totalIterations = iterations * int64(NumPublishers)   channel := make(chanint64,1024*64)    varwg sync.WaitGroup   wg.Add(NumPublishers +1)    varreaderWG sync.WaitGroup   readerWG.Add(1)    fori :=0; i < NumPublishers; i++ {  gofunc() {   wg.Done()   wg.Wait()  fori :=int64(0); i < iterations; {  select{  casechannel <- i:   i++  default:  continue   }   }   }()   }    gofunc() {  fori :=int64(0); i < totalIterations; i++ {  select{  casemsg := <-channel:  ifNumPublishers ==1&& msg != i {  //panic("Out of sequence")   }  default:  continue   }   }     readerWG.Done()   }()     wg.Done()   t := time.Now().UnixNano()   wg.Wait()     readerWG.Wait()   t = (time.Now().UnixNano() - t) /1000000//ms     fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)  }  </code></pre>    <p> </p>    <p>来自:http://colobu.com/2016/07/22/using-go-disruptor/</p>    <p> </p>