基于EHcache实现高并发数据缓存池

jopen 8年前

在高并发的场景里面经常会使用到localcache内容,但是一直没有一个很好的内存管理工具。在开发的时候发现了ehcache,这么一个开源的工具。唯一的缺点就是无法对于多块数据单元进行一个有效的管理,并且在数据过期的时候无法提供有效的更新机制,所以这里写了一个数据缓存池来满足这个需求。

下面是设计组织结构:

153022_tuje_917881.jpg

这里主要是在数据实体内部封装了数据更新器,这样在数据过期的时候可以调用更新器的方法。

1. Ehcache数据缓冲的具体代码:(主要是get方法内部进行数据更新,使用对象锁的方式来进行数据过期的并发控制,缺点是可能在非常高的并发里面会出现数据阻塞的现象,但是因为这里大部分都是内存的运算操作,所以相对来说阻塞的效果还好)

package com.tmall.lafite.core.manager.localcache;    import java.util.List;    import net.sf.ehcache.Cache;  import net.sf.ehcache.CacheManager;  import net.sf.ehcache.Element;  import net.sf.ehcache.concurrent.LockType;  import net.sf.ehcache.concurrent.ReadWriteLockSync;  import net.sf.ehcache.config.CacheConfiguration;  import net.sf.ehcache.store.MemoryStoreEvictionPolicy;    import com.tmall.lafite.core.LafiteResult;  import com.tmall.lafite.core.ResultCode;    /**   * cache数据实体   * @author wangxiao   *   */  public class LafiteCache {      private CacheManager cacheManager = null;   private Cache cacheImpl = null;      ReadWriteLockSync rwLock = new ReadWriteLockSync();      private int capability = 30;   private long expireTime = 30;      public static final int DEFAULT_CAPABILITY = 30;   public static final int DEFAULT_EXPIRETIME = 30;      private String cacheName = "Tair Local Cache";      public LafiteCache(String id, int capability, long expireTimeMS) {    this.cacheName = id;    this.capability = capability;    this.expireTime = expireTimeMS;   }      public void setExpireTime(long expireTimeMS) {    this.expireTime = expireTimeMS;   }      public void setCapacity(int cap) {    cacheImpl.getCacheConfiguration().setMaxEntriesLocalHeap(cap);   }      public long getExpireTime() {    return expireTime;   }     @SuppressWarnings("deprecation")   public void initialize() {    CacheConfiguration cacheConfiguration = new CacheConfiguration();    cacheConfiguration.setDiskPersistent(false);        cacheConfiguration.name(cacheName)          .maxEntriesLocalHeap(capability)             .diskPersistent(false)             .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);            // .timeToLiveSeconds(expireTime);    //cacheConfiguration.transactionalMode("LOCAL");    //Configuration config = new Configuration().name(cacheName).cache(cacheConfiguration);        cacheManager = CacheManager.create();        cacheImpl =  new Cache(cacheConfiguration);    cacheManager.addCache(cacheImpl);    //cache = MemoryStore.create(cacheImpl, new UnboundedPool());       }      public int size() {    return (int) cacheImpl.getSize();   }      public void destroy() {    cacheImpl.dispose();    //cacheManager.removeCache(cacheName);    cacheManager.shutdown();   }      public void clear() {    rwLock.lock(LockType.WRITE);    try {     cacheImpl.removeAll();    } finally {     rwLock.unlock(LockType.WRITE);    }   }      public void del(Object key) {    cacheImpl.remove(key);    return;   }      public void put(Object key, Object value) {     cacheImpl.put(new Element(key, value));    return ;   }      public LafiteResult get(Object key) {    LafiteResult lafiteResult = new LafiteResult();         Element element = cacheImpl.get(key);    if (element == null) {     lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);     return lafiteResult;    }    long now = System.currentTimeMillis();        long pastTime = now - element.getLastUpdateTime();    if (pastTime >= expireTime) {     // double check     synchronized (element) {      pastTime = now - element.getLastUpdateTime();      if (pastTime >= expireTime) {       // expired, update entry       element.updateUpdateStatistics();       lafiteResult.setError(ResultCode.Error.Cache.DATA_OVERDUE);      }     }    }    // element object value never null;    lafiteResult.setDefaultModel(element.getObjectValue());    return lafiteResult;   }      @SuppressWarnings("unchecked")   public List<Object> getKeys() {    List<Object> keys = cacheImpl.getKeys();    return keys;   }  }

2. 数据逻辑单元定义(这里封装了数据容器单元,把原来的数据池方法传入,在数据更新的时候使用namespace来获取内存实体,进而来获取数据。)

(注:这里目前还没有想好是否把数据的初始化放在容器当中,这里暂时不放入。只是在容器里面进行初始化方法的调用,真正的数据设置方式由逻辑单元获取数据池进行自身的put)

package com.tmall.lafite.core.manager.localcache.util;    import org.springframework.beans.factory.annotation.Autowired;    import com.tmall.lafite.core.LafiteResult;  import com.tmall.lafite.core.manager.localcache.LafiteContainer;    /**   * 缓存逻辑单元   * @author wangxiao   *   */  public abstract class LogicCenter {   @Autowired   private LafiteContainer lafiteContainer;      /**    * 初始化方法    * @return    */   public abstract Object initialize();      /**    * 回调函数    * @param lafiteCache    * @param key    * @return    */   public abstract Object callBack(String namespace, Object key, LafiteResult lafiteResult);     public LafiteContainer getLafiteContainer() {    return lafiteContainer;   }  }
3. 数据池(数据池,使用init来循环调用内存实体里的逻辑单元进行数据的初始化)
package com.tmall.lafite.core.manager.localcache;      import java.util.ArrayList;  import java.util.List;  import java.util.Map;  import java.util.concurrent.ConcurrentHashMap;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    import com.tmall.lafite.core.LafiteResult;  import com.tmall.lafite.core.ResultCode;  import com.tmall.lafite.core.manager.localcache.entity.CacheEntity;  import com.tmall.lafite.core.manager.localcache.util.LogicCenter;    /**   * cache容器   * @author wangxiao   *   */  public class LafiteContainer {      protected final Logger logger = LoggerFactory.getLogger(LafiteContainer.class);     private Map<String, CacheEntity> cacheMap = new ConcurrentHashMap<String, CacheEntity>();      /**    * 注册缓存对象    * @param namespace    * @param key    * @param lafiteCache 缓存对象    * @param logicCenter 逻辑对象 (包含:初始化方法和callback方法)    * @return    */  // public String register(String namespace, String key, LafiteCache lafiteCache, LogicCenter logicCenter) {  //  if(namespace != null && StringUtils.isEmpty(key) && lafiteCache != null) {  //   if(cacheMap.containsKey(namespace)) {  //    return ResultCode.Error.Cache.NAMESPACE_REPETITION;  //   }  //     //   CacheEntity cacheEntity = new CacheEntity(lafiteCache, logicCenter);  //   try {  //    cacheEntity.initialize();  //   } catch (Exception e) {  //    logger.error("LafiteContainer.register ", e);  //   }  //   cacheMap.put(namespace, cacheEntity);  //   return null;  //  }   //  return ResultCode.Error.Cache.COMMON_PARAM_LOST;  // }      /**    * 获取cache内的数据    * @param namespace    * @param key    * @return    */   public LafiteResult get(String namespace, Object key) {    LafiteResult lafiteResult = new LafiteResult();    CacheEntity cacheEntity = cacheMap.get(namespace);//获取缓存实体    if(cacheEntity == null) {     lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);    } else {     LafiteCache lafiteCache = cacheEntity.getLafiteCache();     LafiteResult result = lafiteCache.get(key);     if(ResultCode.Error.Cache.NO_DATA.equals(result.getError())        || ResultCode.Error.Cache.DATA_OVERDUE.equals(result.getError())) {      cacheEntity.getLogicCenter().callBack(namespace, key, result);//数据过期触发callback事件     }     lafiteResult = result;    }        return lafiteResult;   }      /**    * 获取指定命名空间的全部数据    *  这里采用的是逐条遍历的方式    * @param namespace    * @return    */   public LafiteResult getAll(String namespace) {    LafiteResult lafiteResult = new LafiteResult();    List<Object> objects = new ArrayList<Object>();        CacheEntity cacheEntity = cacheMap.get(namespace);    if(cacheEntity == null) {     lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);    } else {     LafiteCache lafiteCache = cacheEntity.getLafiteCache();     List<Object> keys = lafiteCache.getKeys();     if(keys.isEmpty()) {      lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);     } else {      for(Object key : keys) {       LafiteResult lr = get(namespace, key);       objects.add(lr.getDefaultModel());      }     }    }    lafiteResult.setDefaultModel(objects);    return lafiteResult;   }      /**    * 设置数据对象内容    * @param namespace    * @param key    * @param value    * @return    */   public LafiteResult put(String namespace, Object key, Object value) {    LafiteResult lafiteResult = new LafiteResult();    CacheEntity cacheEntity = cacheMap.get(namespace);    if(cacheEntity == null) {     lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);    } else {     LafiteCache lafiteCache = cacheEntity.getLafiteCache();     lafiteCache.put(key, value);    }    return lafiteResult;   }     public void setCacheMap(Map<String, CacheEntity> cacheMap) {    this.cacheMap = cacheMap;   }     public Map<String, CacheEntity> getCacheMap() {    return cacheMap;   }      public void initialize() {        new Thread(new Runnable() {     @Override     public void run() {      for(String key : cacheMap.keySet()) {       try {        CacheEntity cacheEntity = cacheMap.get(key);        if(cacheEntity != null) {         LogicCenter logicCenter = cacheEntity.getLogicCenter();         if(logicCenter != null) {          logicCenter.initialize();         }        }       }catch (Exception e) {        e.printStackTrace();       }      }           }    }).start();       }  }
4. spring初始化方式
<!-- 权限角色缓存   <bean id="permitRoleCache" class="com.tmall.lafite.core.manager.localcache.LafiteCache" init-method="initialize">   <constructor-arg value="_permit_role_cache_"/>   <constructor-arg value="100"/>   <constructor-arg value="2000"/>   </bean>   <bean id="permitRoleLogicCenter" class="com.tmall.lafite.core.manager.permit.cache.PermitRoleLogicCenter"/>   <bean id="permitRoleCacheEntity" class="com.tmall.lafite.core.manager.localcache.entity.CacheEntity">   <property name="logicCenter" ref="permitRoleLogicCenter" />   <property name="lafiteCache" ref="permitRoleCache" />   </bean>   -->       <!-- 缓存容器 -->   <bean id="lafiteContainer" class="com.tmall.lafite.core.manager.localcache.LafiteContainer" init-method="initialize">   <!--    <property name="cacheMap">       <map>        <entry key="PermitCommon" value-ref="permitCommonCacheEntity" />        <entry key="PermitAlgorithm" value-ref="permitAlgorithmCacheEntity"/>        <entry key="PermitRole" value-ref="permitRoleCacheEntity"/>       </map>      </property>       -->   </bean>

5. 使用示例

@Autowired   private LafiteContainer lafiteContainer;      private String namespace = LafiteNameSpace.PermitCommon;     @SuppressWarnings("unchecked")   @Transactional   public List<PermitCommonDO> getPermitDOCache() {    LafiteResult lafiteResult = lafiteContainer.getAll(namespace);    if(lafiteResult.getDefaultModel() != null) {     return (List<PermitCommonDO>) lafiteResult.getDefaultModel();    }    return null;   }