无锁数据结构:队列

rm378667 7年前
   <p>队列多种多样,不同之处在于消息生产者、消费的数量不同;在于是基于预先分配的buffer有界队列,还是基于List的无界队列;在于是否支持优先级;在于是无锁非阻塞,还是有锁;在于严格遵守FIFO,公平还是非公平等等。</p>    <p>众所周知,更多特定的队列需求,势必需要更加有效的算法。本文中,只考虑队列最常见的版本,多个生产者对多个消费者,无界并发队列,因此不考虑优先级。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/3601b7e0b1bc57edb0c584e8b237f90c.jpg"></p>    <p>我猜队列想必是研究人员最喜欢的数据结构,因为它简单,但却比栈复杂,因为它有两端而非一端。正是因为有两端,那么一个有趣的问题就出来了:如何在多线程环境下管理它们呢?各种版本的队列算法纷纷发表,想要做一个全面的描述是不可能的了。我提炼其中一些最流行的算法简要介绍一下,首先从经典队列开始。</p>    <h2>经典队列</h2>    <p>经典队列是一个带有两端,即头和尾的列表。从头部读取数据,从尾部写入数据。</p>    <h2>一个标准的简单队列</h2>    <p>下面的代码拷贝自《无锁数据结构:简介》</p>    <pre>  <code class="language-cpp">struct Node {          Node * m_pNext ;  };  class queue {          Node * m_pHead ;          Node * m_pTail ;  public:          queue(): m_pHead( NULL ), m_pTail( NULL ) {}      void enqueue( Node * p )      {          p->m_pNext = nullptr;          if ( m_pTail )            m_pTail->m_pNext = p;          else              m_pHead = p ;          m_pTail = p ;      }      Node * dequeue()      {          if ( !m_pHead ) return nullptr ;          Node * p = m_pHead ;          m_pHead = p->m_pNext ;          if ( !m_pHead )              m_pTail = nullptr ;          return p ;      }  };  </code></pre>    <p>这里就不要过多纠结于此,它不适用于并发,列出来只是为了印证主题,说明该队列有多简单。本文会向大家展示,该队列适用于并发场景时,其简单算法做了哪些变动。</p>    <p>Michael和Scott的算法被认为是无锁队列的经典算法。</p>    <p>以下代码来自libcds库,它是经典算法的一种简单实现。若想查看全部代码,请看cds::intrusive::MSQueue类。代码中包含有注释,避免大家读起来乏味:</p>    <pre>  <code class="language-cpp">bool enqueue( value_type& val )  {      /*           实现细节:NODE_TYPE和VALUE_TYPE-          是不一样的,因此需要类型转换。         为了简单起见,我们假定node_traits :: to_node_ptr -        仅仅是的static_cast<node_type *>( &val )     */           node_type * pNew = node_traits::to_node_ptr( val )  ;        typenamegc::Guardguard; // A guard, for example, Hazard Pointer        // Back-off strategy (of the template-argument class)        back_offbkoff;        node_type * t;        // As always in lock-free, we’ll deal with it, till we make the right thing...        while ( true ) {            /*              保护m_pTail, 在读取该字段时            可以规避已删除内存被读的情形            */            t = guard.protect( m_pTail, node_to_value() );            node_type * pNext = t->m_pNext.load(                      memory_model::memory_order_acquire);            /*              有趣的是:该算法假定              m_pTail不能指向尾部(Tail),              而是希望通过进一步的调用实现对Tail正确的设置。              多线程互助就是一个典型的例子              */            if ( pNext != nullptr ) {                // 在接下来的线程之后                  // 有必要有效地清理Tail                  m_pTail.compare_exchange_weak( t, pNext,                        std::memory_order_release, std::memory_order_relaxed);                  /*                    全部必须从新开始,即使CAS不成功;                    CAS不成功的,意味着在我们读取m_pTail之前,它已经被改变                    */                  continue    ;            }            node_type * tmp = nullptr;            if ( t->m_pNext.compare_exchange_strong( tmp, pNew,                                  std::memory_order_release,                                  std::memory_order_relaxed ))            {          // Have successfully added a new element to the queue.                      break    ;            }          /*            执行失败- 即CAS运算没有成功            这意味着有人先我们到达            检测到有并发,为了不恶化CAS            调用back_off函数            我们及时地做一个短时间的回退            */            bkoff();      }        /*         通常,我们可以利用元素计数器        显而易见,此计数器是不很准确:        元素早已添加,我们现在的才进行计数        这样的计数器只能作为元素数量的清单,        不能作为一个空队列符号        */      ++m_ItemCounter    ;      /*        最后,试着改变m_pTail的Tail。        不用在意成功与否,        如果没有成功,        它们会在dequeue()方法的Oops!注解前后被清理掉        */      m_pTail.compare_exchange_strong( t, pNew,                std::memory_order_acq_rel, std::memory_order_relaxed );      /*      本算法返回必然是true。      其它算法,比如有界队列,      当队列已满时,可以返回false      为了保证接口的一致性,enqueue()      总是返回成功的标志      */      return true;        }  value_type * dequeue()  {      node_type * pNext;      back_offbkoff;      // We need 2 Hazard Pointers for dequeue      typenamegc::templateGuardArray<2>  guards;        node_type * h;        // Keep trying till we can execute it…        while ( true ) {            // Read and guard our Head m_pHead            h = guards.protect( 0, m_pHead, node_to_value() );            // and the element that follows the Head            pNext = guards.protect( 1, h->m_pNext, node_to_value() );            // Check: is what we have just read            // remains bounded?..            if ( m_pHead.load(std::memory_order_relaxed) != h ) {                  // no, - someone has managed to spoil everything...                  // Start all over again                  continue;            }            /*              此标记显示队列已空              注意与tail不同的是,              H=head始终不会错的              */            if ( pNext == nullptr )                  return nullptr;    // the queue is empty            /*              此处读取tail,但未采用Hazard Pointer对其进行保护              我们对其指向的上下文(数据结构中字段)不感兴趣              */            node_type * t = m_pTail.load(std::memory_order_acquire);            if ( h == t ) {                  /*                    糟糕,有头无尾                    元素紧随头之后,                    并且尾指向头。                    我想到了应该纠错的时候了                    */                m_pTail.compare_exchange_strong( t, pNext,                          std::memory_order_release, std::memory_order_relaxed);                // After helping them, we have to start over.                // Therefore, the CAS result is not important for us                continue;            }            // The most important thing is to link a new Head            // That is, we move down the list            if ( m_pHead.compare_exchange_strong( h, pNext,                       std::memory_order_release, std::memory_order_relaxed ))            {                      // Success! Terminate our infinite loop                      break;            }            /*              倘若失败,意味着有其干预;              不干扰它们,退回去小歇片刻              */            bkoff()    ;      }      // Change the not very useful counter of elements,      // see the comment in enqueue      --m_ItemCounter;      // It’s the call of 'remove the h element' functor      dispose_node( h );      /*     /*     有趣的是,     返回[ex] Head后面的元素,     但pNext仍然在队列中-     这是队列的新头部!    */      */      return pNext;  }  </code></pre>    <p>正如大家所看到的,队列由一个有头有尾的单链表组成。</p>    <p>算法的核心是什么呢?通过常规的CAS控制两个指针——这俩指针分别指向头部的和尾部。实际上得到的队列永远不为空。查看代码,是否有任何一处对头和尾做了nullptr检查?没有吧。非空的队列构造器中,添加哑元素(dummy element)给它,作为头和尾。出队返回一个元素,该元素作为一个新的头哑元素,其前面的哑元素被移除。</p>    <p>(译者注:所谓哑元素,仅是为了占一个位置,让链表永远不为空,从而简化判断的边缘条件,其数据部分没有任何意义)</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/c1c7063cf5616dcf9b70cd495dcd1189.png"></p>    <p>在设计侵入式队列时必须考虑,返回指针是队列的一部分,仅在下一次出队时可以移除它。</p>    <p>其次,算法假定尾部指针不指向最后一个元素。每一次读取尾部,需检查尾部是否包含下一个m_pNext元素。倘若该指针不为nullptr , 说明tail位置不对,应该后移。但这里有另外一个陷阱:或许tail会指向head前面的元素。为了避免这一点,出队方法中对m_pTail->m_pNext做了隐式地检查:先读取head,m_pHead->m_pNext元素紧随其后,确保pNext != nullptr。接着看到head等于tail,tail后面必然还有元素,即pNext,此时应该后移tail。这是一个典型的线程互助案例,它在无锁编程中很常见。</p>    <p>2000年,小范围的算法优化被提出 。 该观点认为出队方法中的MSQueue算法,在每一次的循环迭代中,读取tail是没有必要的;只有在成功更新head时,才有必要读取tail,验证tail是否真的指向最后一个元素。因此,可以在某种程度上减少加载m_pTail的压力。这个优化参见libcds库中的cds::intrusive::MoirQueue类。</p>    <h2>菜篮队列</h2>    <p>2007年,一个MSQueue有趣的 <a href="/misc/goto?guid=4959734619193410580" rel="nofollow,noindex">变体</a> 被引入。无锁领域久负盛名的研究者Nir Shavit和他的助理们,采用不同的方法优化了Michael和Scott经典的无锁队列。</p>    <p>Nir Shavit将队列作为一组逻辑菜篮,短时间内,每一个都可以插入一个新元素。一旦这个时间点过了,一个新的菜篮就会被创建。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/e4ac8755e8206adbe022a6ba176fceef.png"></p>    <p>每个菜篮包含一组无序元素,这种定义看似违反了队列-FIFO的基本特性;也就是说队列变成了非公平。FIFO是针对菜篮的,而非菜篮中的元素。倘若菜篮用来插入的时间段非常短暂,我们可以忽视菜篮中无序项。(译者注:时间短意味着,没放几个就创建了新的菜篮,因此可以近似地看做是FIFO)</p>    <p>如何确定时间段的长短呢?菜篮队列作者认为,实际上,无需确定该时间短长短。让我们回头看一眼MSQueue队列,在入队运算中(enqueue),当并发很高时,CAS改变尾部(tail)无法正常工作;这就是为什么在MSQueue调用回退(back-off)的原因,在并发情况下加入元素,无法保证队列中元素项的排序。就是这个逻辑菜篮。正好说明,抽象的逻辑菜篮就是一种回退策略。</p>    <p>在此,我不想过多地谈论代码实现,因此就不提供具体代码了。MSQueue例子已经很好的向我阐述了,无锁代码确实相当的简洁。有计划查看代码实现的,请参看libcds库中cds::intrusive::BasketQueue类,cds/intrusive/basket_queue.h文件。同时,为了解释本算法,我从Nir Shavit及其同事的工作中拷贝了另一张图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/03e2157da74185ca7e2668955afcf2cf.png"></p>    <p>1、A、B、C三个线程打算往队列中插入项。它们看到尾部(tail)在正确的位置,并试图并发改变tail(还记得在MSQueue中,尾部(tail)可以不指向队列中最后的元素)。</p>    <p>2、A线程获胜,成功插入一个新项。B和C则失败了——它们的tail的CAS运算没有成功执行。因此 , 它俩开始基于之前的读到的tail旧值,往菜篮中插入各自的项。</p>    <p>3、B线程先一步成功插入,与此同时,D线程调用入队(enqueue),成功完成项插入,并改变了尾部(tail)。</p>    <p>4、C线程此后也完成了插入,请看,它将项插入队列中间位置。在这个插入过程中,C使用的指向旧tail的指针,在线程进入运算但未成功执行CAS时,就已经读取此指针了。</p>    <p>需要注意的是,在插入过程中,插入项可能被放入队列head前面。比如图NR 4中C前面的项:当C线程执行入队(enqueue)时,其它线程删除C前面的项。(译者注,旧的头部被删除了)</p>    <p>为了防止此类情形出现,建议采用逻辑删除,即用一个特殊删除标签标记待删除元素。这就要求指向项的标签和指针必须为原子性读取,我们在指向pNext项指针的最低有效位(lsb)中存入此标签。这是可以接受的,现代系统中内存分配都是以4个字节对齐,因为指针最低有效位的2个比特一直为零。所以我们创造了标记指针方法,该方法被广泛地应用于无锁数据结构中。当然未来我们会多次碰到此方法。应用逻辑删除,即在CAS帮助下,将pNext最低位比特值设为1,这样就可以避免插入项在head前面的情形出现。这样插入依旧由CAS来完成,与此同时,待删除项在最低位值为1.因此,CAS可能会失败。(当然,在插入项时,我们无需获取整个标记指针,只有最高有效位(msb)包含地址;我们假定最低有效位等于零)。</p>    <p>菜篮队列最后一项改进是删除项实体,据观察,在每次成功调用出队时,改变头部令人不爽,因为CAS会被调用,正如你所知道的那样,这个操作太笨重了。因此,我们仅在存在好几个逻辑删除元素之后,才会改变头部。(在libcds库中,缺省值是三)。同样,当队列为空时,我们也可以改变头部(head)。可以说,在菜篮队列中,头部是变化跳动的。</p>    <p>所有对经典无锁队列优化设计都是在高并发这个背景下展开的。</p>    <h2>乐观方式</h2>    <p>2004年 Nir Shavit和Edya Ladan Mozes在MSQueue引入另一种叫做 乐观的优化方式 。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/a48346fb6abfbb1bd7f8c7044d7bcbff.png"></p>    <p>他们注意到Michael和Scott的算法中,出队运算仅需要一个CAS,而入队需要两个CAS,如上图所示。</p>    <p>而入队中第二个CAS甚至在低加载时。能显著降低性能,因为在现代处理器中,CAS是一个相当重量级运算。是否在某种程度上可以处理掉这个不足呢?</p>    <p>试想MSQueue::enqueue中存在两个CAS会怎样?第一个CAS添加新项到tail , 使得pTail->pNext。第二个CAS将尾部向后移动。那么可否用一个原子性记录而非CAS改变pNext字段呢?确实可以,假定单链表的方向与以往不一样,并非从头到尾,而是从尾到头。因此可以采用原子性store(pNew->pNext = pTail)完成pNew->pNext任务,接着再通过CAS改变pTail。不过一旦改变了单链表方向,接下来如何进行出队运算呢?因为方向改变,pHead->pNext 必然不会存在了。</p>    <p>乐观队列作者建议改用一个双链表。</p>    <p>但问题是,针对CAS的无锁双链表有效算法迄今还未可知。已知的算法有DCAS,但没有对应的硬件实现。针对CAS的MCAS(CAS for M unbounded memory cells)仿真算法,但没那么有效(需要2M + 1 CAS),充其量就是一个理论的玩意。</p>    <p>作者给出了以下方法:链表从尾部到头部的链接依旧是一致的(队列中并不需要next链接,但它可以处理入队第一个CAS)。正是由于从头到尾相反的方向,最重要的链接-prev-并不能真正的一致,意味着允许出现这种违例的。找出此类违例,我们就可以重建正确的表,放在next引用后面。如何检测此类违例了?事实上,这个相当简单:pHead->prev->next != pHead。如果这个不等在出队(dequeue)被发现, fix_list这个辅助处理过程就会被调用。</p>    <p>摘自libcds库cds::intrusive::OptimisticQueue类</p>    <pre>  <code class="language-cpp">void fix_list( node_type * pTail, node_type * pHead )  {                // pTail and pHead are already protected by Hazard Pointers    node_type * pCurNode;    node_type * pCurNodeNext;    typename gc::template GuardArray<2> guards;    pCurNode = pTail;    while ( pCurNode != pHead ) { // Till we reach the head      pCurNodeNext = guards.protect(0, pCurNode->m_pNext, node_to_value() );      if ( pHead != m_pHead.load(std::memory_order_relaxed) )          break;      pCurNodeNext->m_pPrev.store( pCurNode, std::memory_order_release );      guards.assign( 1, node_traits::to_value_ptr( pCurNode = pCurNodeNext ));    }  }  </code></pre>    <p>fix_list从队列的尾查至头,用正确的pNext引用,正确的pPrev。</p>    <p>列表从头至尾的违例也是有可能的,更多的是因为延迟,而非重加载。延迟是因为操作系统替换或线程中断。具体请看 OptimisticQueue::enqueue中的代码:</p>    <pre>  <code class="language-cpp">bool enqueue( value_type& val )  {    node_type * pNew = node_traits::to_node_ptr( val );    typename gc::template GuardArray<2> guards;    back_offbkoff;    guards.assign( 1, &val);    node_type * pTail = guards.protect( 0, m_pTail, node_to_value());    while( true ) {      // 从尾至头形成一个直接列表      pNew->m_pNext.store( pTail, std::memory_order_release );      // Trying to change the tail      if ( m_pTail.compare_exchange_strong( pTail, pNew,          std::memory_order_release, std::memory_order_relaxed ))       {          /*            从头到尾形成一个相反的列表            操作系统可以中断、替换。            结果,pTail从队列中被替换掉,即出队            (不用担心,pTail会被Hazard Pointer保护,因此具有不可被移除性)             */          pTail->m_pPrev.store( pNew, std::memory_order_release );          break ;                            // Enqueue done!      }      /*          CAS没有成功,pTail已被改变,(记住C++11中CAS特点:          第一个元素传的是引用)          Hazard Pointer保护新的pTail             */      guards.assign( 0, node_traits::to_value_ptr( pTail ));      // High contention – let’s step back      bkoff();    }    return true;  }  </code></pre>    <p>这段代码证明了我们所做出的优化:建立pPrev列表(对我们最重要了),希望能成功。倘若发现直接列表和反向列表之间有错位,我们不得不花时间确认了(运行fix_list)。</p>    <p>那么,底线在哪里?入队和出队各自都有一个CAS。代价就是一旦列表被检测出违例,就会运行fix_list。代价究竟有多大?实验结果会告诉我们。</p>    <p>大家可以在cds/intrusive.optimistic_queue.h文件,以及libcds库中的cds::intrusive::OptimisticQueue类中找到源代码。</p>    <h2>无等待队列</h2>    <p>为了完整地阐述经典队列,我们应该谈谈无等待队列算法。</p>    <p>无等待几乎是算法中最严格的,算法的执行时间必须可限定并且可预测。在实践中,无等待算法通常比诸如无锁、无干扰算法性能要低。但它们数量众多,实现起来也不复杂。</p>    <p>许多无等待算法结构是相当标准:不是执行一运算(例如入队/出队),而是先声明 —— 存储带参数的运算描述于一些可公开访问的共享存储中 , 接着不停地开启并发线程。接着它们浏览存储中的描述符,并试着执行该代码。结果,很多线程以很高的负载执行相同的任务,仅有一个线程成功。</p>    <p>诸如此类的C++算法实现复杂度,主要涉及如何实现存储,以及处理描述符的内存分配。</p>    <p>libcds库没有实现无等待队列,是因为该队列作者在其研究中,性能测试结果不尽人意。</p>    <h2>测试结果</h2>    <p>本文中,我决定提供以上算法的测试结果。</p>    <p>测试是综合性的,测试机为双核处理器,Debian Linux,Intel Dual Xeon X5670 2.93 GHz, 6 cores per processor + hyperthreading,总共24逻辑处理器。测试过程中,机器闲置达百分之九十。</p>    <p>编译器为GCC4.8.2,优化参数为-O3 -march=native -mtune=native。</p>    <p>测试队列来自cds::container命名空间,因此,它们是非侵入式的,即每个元素执行各种的内存分配。随后我们会将队列与采用互斥量(mutex)的std::queue<T, std::deque<T>>和std::queue<T, std::list<T>>标准同步实现做比较。</p>    <p>T类型为两个整型的数据结构。所有无锁队列都基于Hazard Pointer。</p>    <h2>持久性测试</h2>    <p>该测试相当特殊,有一千万对入队/出队运算。第一部分,测试执行一千万入队,75%为入队运算,剩余25%为出队运算,即一千万的入队运算,二百五十万的出队运算)。第二部分,出队运算执行七百五十万次,直到队列为空。</p>    <p>测试遵循以下理念:减小缓存分配器的不利影响,当然前提是分配器含有缓存。</p>    <p>测试时间的绝对值:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/2b85b4e89dcc40787cec2dd1fc7da483.png"></p>    <p>大家看到了什么?</p>    <p>首先映入眼帘的是,有锁std::queue<T, std::deque<T>>被证明是最快的。怎么可能呢?我认为这个跟内存有关:std::deque以N元素的块来分配内存,而非每个元素。这暗示我们应该移除测试中分配器的影响,这会带来相当长的延迟,另外,还有互斥量。当然, libcds的所有侵入式容器版本,没有为元素分配内存。理应对它们进行测试。</p>    <p>显而易见,无锁队列,针对MSQueue所有缜密的优化开出了丰硕的果实,即便不是那么完美。</p>    <h2>生产者和消费者测试</h2>    <p>这个测试相当切合实际,队列中包含N个生产者,N个消费者,分别执行百万条写运算,百万条读运算。图表中的线程数,为生产者和消费者的线程数之和。</p>    <p>测试时间的绝对值:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/2b85b4e89dcc40787cec2dd1fc7da483.png"></p>    <p>此处可以看出无锁队列是相当优雅,胜出者为OpimisticQueue。这就是说该无锁数据结构的所有假设被证明都是正确的。</p>    <p>本测试接近实际情形,队列中没有出现大量元素堆积现象。为什么呢?个人认为,分配器内在的优化在起作用。确实如此,在最后阶段,队列的存在不是为了大量元素堆积,而是削峰,通常队列中是不存在元素的。</p>    <h2>关于栈的补充说明</h2>    <p>既然谈到测试,就来谈谈栈。</p>    <p>本文以及前文所涉及的无锁栈,针对Treiber栈,我移除了回退(back-off)。不论实现,亦或者伪码描述、C++完整产品实现,理应单独作为一篇文章。不过我可能永远不会写,因为其中所涉及的代码是在太多。实际上,你会发现移除回退(back-off)之后,若你查看源码,完全不同。截止目前,只有libcds库里有。</p>    <p>同样,我也提供了综合测试结果,测试机器和前面的一样。</p>    <p>生产者和消费者测试:一些线程会写入栈中即压栈,而另一些线程会读取栈即弹栈。一组相同数量的生产者和消费者,生产者和消费者的线程总数都是百万级。标准的栈,其同步由互斥量完成。</p>    <p>测试时间的绝对值:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/cdb442d42c5e3c52e64b379ef108f090.png"></p>    <p>显而易见,图表本身就可以很好地说明事实。</p>    <p>移除回退(back-off)之后,什么促使性能的显著增加?好像是因为压栈、弹栈相互抵消。然而,我们查看内部统计,就会发现百万个执行仅有十万到十五万个压栈、弹栈相互抵消,大约为0.1%。而移除回退整个进入数大约为三十五万。这说明移除回退最大的优势就是一些线程在负载高的时候休眠,进而自动降低了整个栈的负载。现实的例子,移除回退(back-off)的休眠线程会持续大约5毫秒。</p>    <h2>总结</h2>    <p>本文阐述了经典无锁队列,展示了列表元素。该对列显著的特点就是存在两个并发端-头部和尾部。接着缜密地阐述了Michael和Scott经典算法的一些改进。我希望你会对此感兴趣,并能在每天的生活中用到它。</p>    <p>从测试结果看,尽管队列是无锁的,但神奇的CAS并没有带来任何特别的性能提升。因此,很有必要寻找其它一些方法消除瓶颈即头部和尾部,在某种程度上实现队列并行工作。</p>    <p> </p>    <p> </p>    <p> </p>    <p>来自:http://blog.jobbole.com/109648/</p>    <p> </p>