zookeeper 分布式锁的实现

jopen 9年前

zookeeper 分布式锁的实现

临时顺序节点,这种类型的节点有几下几个特性:

  • 节点的生命周期和客户端会话绑定,即创建节点的客户端会话一旦失效,那么这个节点也会被清除。

  • 每个父节点都会负责维护其子节点创建的先后顺序,并且如果创建的是顺序节点(SEQUENTIAL)的话,父节点会自动为这个节点分配一个整形数值,以后缀的形式自动追加到节点名中,作为这个节点最终的节点名。

利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:

  1. 客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL

  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。

  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。

  4. 如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

以上信息来自:http://jm-blog.aliapp.com/?p=2554

根据这个思路,来实现基于zookeeper的分布式锁。

直接贴代码,如下,如果有不合适的或需要改进的地方,请指教。

package com.usfot;    import org.apache.zookeeper.CreateMode;  import org.apache.zookeeper.KeeperException;  import org.apache.zookeeper.WatchedEvent;  import org.apache.zookeeper.Watcher;  import org.apache.zookeeper.ZooDefs;  import org.apache.zookeeper.ZooKeeper;  import org.apache.zookeeper.data.Stat;    import java.io.IOException;  import java.nio.ByteBuffer;  import java.util.List;  import java.util.SortedSet;  import java.util.TreeSet;  import java.util.concurrent.ThreadLocalRandom;    /**   * Shared意味着锁是全局可见的,客户端都可以请求锁。   * DistributedSharedLock 应该是线程安全的。有待验证   * Created by liyanxin on 2015/3/18.   */  public class DistributedSharedLock implements Watcher {        private static final String ADDR = "127.0.0.1:2181";      private static final String LOCK_NODE = "guid-lock-";      private String rootLockNode; //锁目录      private ZooKeeper zk = null;      private Integer mutex;      private Integer currentLock;        /**       * 构造函数实现       * 连接zk服务器       * 创建zk锁目录       *       * @param rootLockNode       */      public DistributedSharedLock(String rootLockNode) {          this.rootLockNode = rootLockNode;          try {              //连接zk服务器              zk = new ZooKeeper(ADDR, 10 * 10000, this);          } catch (IOException e) {              e.printStackTrace();          }          mutex = new Integer(-1);          // Create ZK node name          if (zk != null) {              try {                  //建立根目录节点                  Stat s = zk.exists(rootLockNode, false);                  if (s == null) {                      zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,                              CreateMode.PERSISTENT);                  }              } catch (KeeperException e) {                  System.out.println("Keeper exception when instantiating queue: " + e.toString());              } catch (InterruptedException e) {                  System.out.println("Interrupted exception");              }          }      }        /**       * 请求zk服务器,获得锁       *       * @throws KeeperException       * @throws InterruptedException       */      public void acquire() throws KeeperException, InterruptedException {          ByteBuffer b = ByteBuffer.allocate(4);          byte[] value;          // Add child with value i          b.putInt(ThreadLocalRandom.current().nextInt(10));          value = b.array();            // 创建锁节点          String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,                  CreateMode.EPHEMERAL_SEQUENTIAL);            synchronized (mutex) {              while (true) {                  // 获得当前锁节点的number,和所有的锁节点比较                  Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1));                  List<String> childLockNode = zk.getChildren(rootLockNode, true);                    SortedSet<Integer> sortedLock = new TreeSet<Integer>();                  for (String temp : childLockNode) {                      Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1));                      sortedLock.add(tempLockNumber);                  }                    currentLock = sortedLock.first();                    //如果当前创建的锁的序号是最小的那么认为这个客户端获得了锁                  if (currentLock >= acquireLock) {                      System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock);                      return;                  } else {                      //没有获得锁则等待下次事件的发生                      System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock);                      mutex.wait();                  }              }          }      }          /**       * 释放锁       *       * @throws KeeperException       * @throws InterruptedException       */      public void release() throws KeeperException, InterruptedException {          String lockName = String.format("%010d", currentLock);          zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1);          System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock);      }        @Override      public void process(WatchedEvent event) {          synchronized (mutex) {              mutex.notify();          }      }  }

测试代码如下,

package com.usfot;    import org.apache.zookeeper.KeeperException;    /**   * 启动10个线程,都获得一个锁对象,每个线程都有一个锁对象。   * 锁对象请求zk服务器获得锁。如果不能获得锁,则等待。   * 当一个线程获得锁时,其他线程将等待锁的释放。   * Created by liyanxin on 2015/3/18.   */  public class DistributedSharedLockTest {        public static void main(String args[]) {          for (int i = 0; i < 10; i++) {              Thread t = new Thread(new Runnable() {                  @Override                  public void run() {                      DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");                      try {                          lock.acquire();                          Thread.sleep(1000); //获得锁之后可以进行相应的处理                          System.out.println("======获得锁后进行相应的操作======");                          lock.release();                          System.err.println("=============================");                      } catch (KeeperException e) {                          e.printStackTrace();                      } catch (InterruptedException e) {                          e.printStackTrace();                      }                  }              });              t.start();          }      }  }

看一下测试情况,

thread_name=Thread-5|wait lcok|lock_num=300  thread_name=Thread-1|attend lcok|lock_num=300  thread_name=Thread-9|wait lcok|lock_num=300  thread_name=Thread-3|wait lcok|lock_num=300  thread_name=Thread-7|wait lcok|lock_num=300  thread_name=Thread-6|wait lcok|lock_num=300  thread_name=Thread-2|wait lcok|lock_num=300  thread_name=Thread-0|wait lcok|lock_num=300  thread_name=Thread-8|wait lcok|lock_num=300  thread_name=Thread-4|wait lcok|lock_num=300  ======获得锁后进行相应的操作======  thread_name=Thread-2|wait lcok|lock_num=301  thread_name=Thread-6|wait lcok|lock_num=301  thread_name=Thread-1|release lcok|lock_num=300  =============================  .................................

这是一部分的打印日志。

在换一种方式测试一下锁对象的线程安全性。如下测试代码,

package com.usfot;    import org.apache.zookeeper.KeeperException;    /**   * Created by liyanxin on 2015/3/18.   */  public class DistributedSharedLockTest2 {          public static void main(String args[]) {          final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");            /**           * 所有的线程都共享一个锁对象,验证锁对象的线程安全性           * 锁是阻塞的           */          for (int i = 0; i < 10; i++) {              Thread t = new Thread(new Runnable() {                  @Override                  public void run() {                      try {                          lock.acquire();                          Thread.sleep(1000); //获得锁之后可以进行相应的处理                          System.out.println("======获得锁后进行相应的操作======");                          lock.release();                          System.err.println("=============================");                      } catch (KeeperException e) {                          e.printStackTrace();                      } catch (InterruptedException e) {                          e.printStackTrace();                      }                  }              });              t.start();          }      }  }

经过测试,发现了死锁deadlock的问题,这个问题如何导致的呢?是因为多个线程都会在mutex对象的内置锁上发生竞争。当线程A获得mutex对象的内置锁时,会进入到同步代码块,进行获取zk服务器的分布式锁的操作,当获得分布式锁后,退出同步代码块,mutex的内置锁也就被线程A释放。大量的线程都在竞争mutex对象的内置锁。这时,线程B获得mutex的内置锁,进入同步代码块,由于没有获得分布式锁,线程B等待。然后这时线程A释放分布式锁,删除zk服务器锁节点,此时触发watcher事件,唤醒mutex对象内置锁上等待的线程,

注意使用notify唤醒。notify大家应该知道,只能唤醒所有等待线程的其中一个,或许刚好此时唤醒的不是线程B,那么deadlock就来了。

怎么解决?我把notify换成notifyAll试了下,程序能顺利执行,没有死锁的现象。

重新运行代码,如下日志,

thread_name=Thread-9|wait lcok|lock_num=360  thread_name=Thread-0|wait lcok|lock_num=360  thread_name=Thread-1|wait lcok|lock_num=360  thread_name=Thread-5|wait lcok|lock_num=360  thread_name=Thread-7|wait lcok|lock_num=360  thread_name=Thread-3|wait lcok|lock_num=360  thread_name=Thread-6|wait lcok|lock_num=360  thread_name=Thread-2|attend lcok|lock_num=360  thread_name=Thread-4|wait lcok|lock_num=360  thread_name=Thread-8|wait lcok|lock_num=360  ======获得锁后进行相应的操作======  thread_name=Thread-2|release lcok|lock_num=360  =============================  thread_name=Thread-8|attend lcok|lock_num=361  thread_name=Thread-4|wait lcok|lock_num=361  thread_name=Thread-6|wait lcok|lock_num=361  ..............

一部分的打印日志

参考:http://blog.csdn.net/java2000_wl/article/details/8694270

=====================================================================

改进后的分布式锁实现

下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”_locknode_”节点下序号比自己小的那个节点是否存在即可。实现如下:

  1. 客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。

  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。

  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。

  4. 如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。

  5. 之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。

=================================END=================================