基于Zookeeper的分布式共享锁

y35w 9年前

首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。

共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。

官网资料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文资料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

详见Locks章节。

原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用 Curator 。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:

首先,创建一个Maven工程,在pom文件里导入下面的包:

<dependencies>    <dependency>     <groupId>org.apache.zookeeper</groupId>     <artifactId>zookeeper</artifactId>     <version>3.4.6</version>    </dependency>    <dependency>     <groupId>org.apache.curator</groupId>     <artifactId>curator-client</artifactId>     <version>2.8.0</version>    </dependency>    <dependency>     <groupId>org.apache.curator</groupId>     <artifactId>curator-recipes</artifactId>     <version>2.8.0</version>    </dependency>    <dependency>     <groupId>org.apache.curator</groupId>     <artifactId>curator-framework</artifactId>     <version>2.8.0</version>    </dependency>    <dependency>     <groupId>commons-beanutils</groupId>     <artifactId>commons-beanutils</artifactId>     <version>1.9.2</version>    </dependency>    <dependency>     <groupId>commons-logging</groupId>     <artifactId>commons-logging</artifactId>     <version>1.2</version>    </dependency>    <dependency>     <groupId>commons-lang</groupId>     <artifactId>commons-lang</artifactId>     <version>2.6</version>    </dependency>   </dependencies>

LockZookeeperClient接口:

package com.XXX.framework.lock;    import org.apache.curator.framework.CuratorFramework;    /**   *    * description   *    * @author Roadrunners   * @version 1.0, 2015年7月9日   */  public interface LockZookeeperClient {   /**    *     * @return    */   CuratorFramework getCuratorFramework();     /**    *     * @return    */   String getBasePath();     /**    * garbage collector    *     * @param gcPath    */   void gc(String gcPath);  }

LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.XXX.framework.lock;    import java.util.Date;  import java.util.List;  import java.util.Timer;  import java.util.TimerTask;  import java.util.concurrent.ConcurrentSkipListSet;    import org.apache.commons.collections.CollectionUtils;  import org.apache.commons.lang.StringUtils;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.curator.framework.CuratorFramework;  import org.apache.curator.framework.CuratorFrameworkFactory;  import org.apache.curator.retry.ExponentialBackoffRetry;    /**   *    * description   *     * @author Roadrunners   * @version 1.0, 2015年7月9日   */  public class LockZookeeperClientFactory implements LockZookeeperClient {   private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);     private boolean hasGc = true;   private Timer gcTimer;   private TimerTask gcTimerTask;   private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();   private int gcIntervalSecond = 60;     private CuratorFramework curatorFramework;   private String zookeeperIpPort = "localhost:2181";   private int sessionTimeoutMs = 10000;   private int connectionTimeoutMs = 10000;   private String basePath = "/locks";     public void setHasGc(boolean hasGc) {    this.hasGc = hasGc;   }     public void setGcIntervalSecond(int gcIntervalSecond) {    this.gcIntervalSecond = gcIntervalSecond;   }     public void setZookeeperIpPort(String zookeeperIpPort) {    this.zookeeperIpPort = zookeeperIpPort;   }     public void setSessionTimeoutMs(int sessionTimeoutMs) {    this.sessionTimeoutMs = sessionTimeoutMs;   }     public void setConnectionTimeoutMs(int connectionTimeoutMs) {    this.connectionTimeoutMs = connectionTimeoutMs;   }     public void setBasePath(String basePath) {    basePath = basePath.trim();    if (basePath.endsWith("/")) {     basePath = basePath.substring(0, basePath.length() - 1);    }      this.basePath = basePath;   }     public void init() {    if(StringUtils.isBlank(zookeeperIpPort)){     throw new NullPointerException("zookeeperIpPort");    }      ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);    curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);    curatorFramework.start();    LOG.info("CuratorFramework initialise succeed.");      if (hasGc) {     gc();    }   }     public void destroy() {    gcPaths.clear();    gcPaths = null;    gcStop();    curatorFramework.close();    curatorFramework = null;   }     @Override   public void gc(String gcPath) {    if (hasGc && StringUtils.isNotBlank(gcPath)) {     gcPaths.add(gcPath.trim());    }   }     @Override   public CuratorFramework getCuratorFramework() {    return this.curatorFramework;   }     @Override   public String getBasePath() {    return this.basePath;   }     private synchronized void gc() {    gcStop();      try {     scanningGCNodes();    } catch (Throwable e) {     LOG.warn(e);    }      gcTimerTask = new TimerTask() {     @Override     public void run() {      doingGc();     }    };      Date begin = new Date();    begin.setTime(begin.getTime() + (10 * 1000L));    gcTimer = new Timer("lock-gc", true);    gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);   }     private synchronized void gcStop() {    if (null != gcTimer) {     gcTimer.cancel();     gcTimer = null;    }      if (null != gcTimerTask) {     gcTimerTask.cancel();     gcTimerTask = null;    }   }     private synchronized void scanningGCNodes() throws Exception {    if (null == curatorFramework.checkExists().forPath(basePath)) {     return;    }      List<String> paths = curatorFramework.getChildren().forPath(basePath);    if (CollectionUtils.isEmpty(paths)) {     gcPaths.add(basePath);     return;    }      for (String path : paths) {     try{      String tmpPath = basePath + "/" + path;      if (null == curatorFramework.checkExists().forPath(tmpPath)) {       continue;      }            gcPaths.add(tmpPath);     } catch(Throwable e){      LOG.warn("scanning gc nodes error.", e);     }    }   }      private synchronized void doingGc() {    LOG.debug("GC beginning.");      if (CollectionUtils.isNotEmpty(gcPaths)) {     for (String path : gcPaths) {      try {       if (null != curatorFramework.checkExists().forPath(path)) {        if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {         curatorFramework.delete().forPath(path);         gcPaths.remove(path);         LOG.debug("GC " + path);        }       } else {        gcPaths.remove(path);       }      } catch (Throwable e) {       gcPaths.remove(path);       LOG.warn(e);      }     }    }      LOG.debug("GC ended.");   }    }

SharedLock共享锁:

package com.XXX.framework.lock.shared;    import java.util.concurrent.TimeUnit;    import org.apache.commons.lang.StringUtils;  import org.apache.curator.framework.recipes.locks.InterProcessLock;  import org.apache.curator.framework.recipes.locks.InterProcessMutex;    import com.XXX.framework.lock.LockZookeeperClient;    /**   *    * description   *    * @author Roadrunners   * @version 1.0, 2015年7月9日   */  public class SharedLock {   private InterProcessLock interProcessLock;     public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {    super();        if (StringUtils.isBlank(resourceId)) {     throw new NullPointerException("resourceId");    }    String path = lockZookeeperClient.getBasePath();    path += ("/" + resourceId.trim());      interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);    lockZookeeperClient.gc(path);   }         /**       * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call       * to {@link #release()}       *       * @throws Exception ZK errors, connection interruptions       */   public void acquire() throws Exception {    interProcessLock.acquire();   }        /**       * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call       * to {@link #release()}       *       * @param time time to wait       * @param unit time unit       * @return true if the mutex was acquired, false if not       * @throws Exception ZK errors, connection interruptions       */   public boolean acquire(long time, TimeUnit unit) throws Exception {    return interProcessLock.acquire(time, unit);   }        /**       * Perform one release of the mutex.       *       * @throws Exception ZK errors, interruptions, current thread does not own the lock       */   public void release() throws Exception {    interProcessLock.release();   }         /**       * Returns true if the mutex is acquired by a thread in this JVM       *       * @return true/false       */   public boolean isAcquiredInThisProcess() {    return interProcessLock.isAcquiredInThisProcess();   }  }

到此代码已经完成。下面写一个简单的Demo:

//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写    LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();    lzc.setZookeeperIpPort("10.100.15.1:8900");    lzc.setBasePath("/locks/sharedLock/");    lzc.init();      SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");    try {     if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {      System.out.println("sharedLock1 get");     }    } catch (Exception e) {     e.printStackTrace();    } finally {     try {      sharedLock.release();     } catch (Exception e) {      e.printStackTrace();     }    }      lzc.destroy();

就这样,系统就会每隔一分钟去回收一次没有使用的结点。