zookeeper应用场景练习(分布式锁)

casonstart 8年前

来自: http://blog.csdn.net/luckyzhoustar/article/details/50628419


 

 在平常的高并发的程序中,为了保证数据的一致性,因此都会用到锁,来对当前的线程进行锁定。在单机操作中,很好做到,比如可以采用Synchronized、Lock或者其他的读写多来锁定当前的线程。但是在分布式的系统中,就很难做到这一点。因此可以采用zookeeper中节点的特性来满足这一点。大致实现的思路如下。

 1.每个客户端都去zookeeper上创建临时的顺序节点

 2.客户端判断当前自己创建的节点是不是最小的

 3.如果是的话,就获得了执行当前任务的锁

 4.如果不是的话,就找到比自己小的节点,然后进行监听,如果被删除的话,就可以获得锁


 上面就是大致的实现思路,下面我们来通过代码来实现一下。

 

package com.test;    import java.util.ArrayList;  import java.util.Collections;  import java.util.List;  import java.util.concurrent.CountDownLatch;    import org.apache.curator.framework.CuratorFramework;  import org.apache.curator.framework.CuratorFrameworkFactory;  import org.apache.curator.framework.api.CuratorWatcher;  import org.apache.curator.framework.state.ConnectionState;  import org.apache.curator.framework.state.ConnectionStateListener;  import org.apache.curator.retry.RetryNTimes;  import org.apache.zookeeper.CreateMode;  import org.apache.zookeeper.WatchedEvent;  import org.apache.zookeeper.Watcher.Event.EventType;  import org.apache.zookeeper.data.Stat;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    public class DistributedLock {     private String lockName;   private final int timeOut = 3000;   private final String root = "/locks";   private String myZnode;// 代表当前节点信息   private String waitZnode;   private static Logger logger = LoggerFactory     .getLogger(DistributedLock.class);   private CuratorFramework client;   private CountDownLatch latch = new CountDownLatch(1);     public DistributedLock(String connectString, String lockName) {    this.lockName = lockName;    client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut)      .connectString(connectString)      .retryPolicy(new RetryNTimes(3, 3000)).build();    ConnectionStateListener listener = new ConnectionStateListener() {       public void stateChanged(CuratorFramework client,       ConnectionState newState) {      if (newState == ConnectionState.CONNECTED) {       logger.info("连接成功了");       latch.countDown();      }     }    };      client.getConnectionStateListenable().addListener(listener);    client.start();    try {     latch.await();     createRoot();    } catch (InterruptedException e) {     // TODO Auto-generated catch block     e.printStackTrace();    }     }     /**    * @Title: 创建根节点root    * @Description: TODO    * @param    * @return void    * @throws    */   private void createRoot() {    try {     Stat stat = client.checkExists().forPath(root);     if (stat != null) {      logger.info("root has already exists");     } else {      // 创建跟节点      client.create().creatingParentsIfNeeded().forPath(root);       }    } catch (Exception e) {     // TODO Auto-generated catch block     e.printStackTrace();    }   }     public void getLocks() {      try {     myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)       .forPath(root + "/" + lockName);     logger.info(myZnode + "has created");     // 取出所有的子节点,然后找出比自己小的节点,进行监听的设置     List<String> subNodes = client.getChildren().forPath(root);     // 取出所有带有lockname的节点信息     List<String> lockObjNodes = new ArrayList<String>();     for (String node : subNodes) {      if (node.contains(lockName)) {       lockObjNodes.add(node);      }     }     // 对当前节点进行排序     Collections.sort(lockObjNodes);     // 判断当前的节点是不是最小的节点     if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {      doAction();     } else {      // 找到比自己节点大一的节点进行监听      String subMyZone = myZnode        .substring(myZnode.lastIndexOf("/") + 1);      waitZnode = lockObjNodes.get(Collections.binarySearch(        lockObjNodes, subMyZone) - 1);      // 对节点进行监听      Stat stat = client.checkExists()        .usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode);      if (stat != null) {       System.out.println(Thread.currentThread().getName()         + "处于等待状态");      } else {       doAction();      }     }    } catch (Exception e) {     logger.error(e.getMessage());    }   }     // 删除节点的事件监听   CuratorWatcher deleteNodeWatcher = new CuratorWatcher() {      public void process(WatchedEvent event) throws Exception {       if (event.getType() == EventType.NodeDeleted) {      doAction();     }    }   };     private void doAction() {    System.out.println(Thread.currentThread().getName() + "开始执行");    client.close();   }  }


 下面来测试一下

 

/**        * @FileName: TestCurrentZk.java      * @Package:com.test      * @Description: TODO     * @author: LUCKY       * @date:2016年2月2日 下午11:36:04      * @version V1.0        */  package com.test;    /**   * @ClassName: TestCurrentZk   * @Description: TODO   * @author: LUCKY   * @date:2016年2月2日 下午11:36:04   */  public class TestCurrentZk {     public static void main(String[] args) throws Exception {    Thread threads[] = new Thread[10];    for (int i = 0; i < threads.length; i++) {     threads[i] = new Thread(new Runnable() {      public void run() {       ClientTest clientTest = new ClientTest(         "100.66.162.36:2181", "locknametest");       clientTest.getLocks();      }     });       threads[i].start();      }    Thread.sleep(Integer.MAX_VALUE);   }  }