推ter开源库Twemcache分析

openkk 12年前

      Twemcache(发音: “two-em-cache”),是推ter公司使用的内存缓存技术,在2012.7.17号向github提交了推ter定制过的memcached,命名为twemcache;并在上周其官网首次出现了对twemcache的介绍。
      与memcache相比,twemcache更轻便,定制性更强,无论从代码结构还是模块设计,都是很优雅的。
      源代下载网址 git clone
https://github.com/推ter/twemcache.git
      官方特性:
            支持完整memcached的ASCII协议
            支持TCP,UDP,UNIX域套接字
            可观测性(stats / klogger)
            可替换淘汰策略
            简单调试性(assertion / logging)

      Twemcache代码只有150 000行,其模块组织结构十分清晰,如下表所示:
            主程序    mc.c
            线程模型 mc_thread.h / mc_thread.c
            内存管理 mc_items.h / mc_items.c / mc_slabs.h / mc_slabs.c
            存储模型 mc_assoc.h / mc_assoc.c
            网络接口 mc_connection.h / mc_connection.c
            命令解析 mc_ascii.h / mc_ascii.c
            log相关   mc_log.h / mc_log.c / mc_klog.h / mc_klog.c
            其它        mc_time.h / mc_queue.h / mc_hash.h / mc_cache.h / mc_util.h / mc_signal.h ……
 
Twemcache模块分析
      twemcache以服务器形式存在,可以接收来自TCP/UDP/UNIX域套接字的请求,默认的TCP/UDP端口都是11211。关于服务器的模型上,twemcache都使用了libevent库来处理各种网络IO事件,同时又使用了多线程来提升性能,异步与多线程结合都是使用的经典的网络模型,因此网络通信这一块很通用,可以作为很好的服务器通信模块。

线程模型
      Twemcache中大量使用了多线程,将任务的分配明晰化,每样任务由一个线程去完成。Aggregator线程负责采集twemcache的运行状态,客户端可以使用stats命令进行查询,线程在初始化时启动,包含独立的事件域ag->base,并注册了定时器事件,默认间隔100ms,每次定时器触发,线程去采集运行数据;
      Klogger线程负责打印log信息,这些log信息由log_XXX簇函数打印,线程在初始化时启动,包含独立事件域k->base,并注册了定时器事件,默认间隔1000ms,每次定时器触发,线程收集所有工作线程的log信息,并打印到指定描述符。这里的log打印使用了缓冲策略,每1000ms的信息缓存在线程的buffer中,并不进行打印,最终由klogger进行统一收集并打印。
      主进程负责所有的初始化工作,拥有独立的事件域main_base,注册了定时器事件和网络IO事件,其中定时器事件用于提供时间服务,程序中会大量用到当前时间,会导致大量的time()系统调用开销,定时器事件提供了秒级的精确,避免了time()调用;网络IO事件主要用于TCP/UNIX域的监听listen,所有连接的处理都交由工作线程完成。
      Worker线程负责连接的处理工作,线程在初始化时启动,包含独立事件域t->base,并注册了管道IO事件和网络IO事件。管道IO事件用于主进程通知其有连接到来,网络IO事件用于处理到来的连接。
      这里的多线程使用了线程池的概念,但工作的方式不再是用cond_signal,而是用管道IO事件代替,这是为了保证线程内也是非阻塞的,可以并发执行多个任务。这里没有提及的是hashtable维护线程,它负责hashtable扩容时数据的迁移工作。

推ter开源库Twemcache分析

网络模型
      网络通信twemcache将TCP/UDP/UNIX域进行统一,其中TCP和UNIX域套接字流程是一样的,UDP因为不用建立连接则少了listen+accept的过程,下面以TCP和UDP进行说明。
      TCP:与主进程相关联的事件域是main_base,首先会向main_base中注册一个listen事件,用于监听连接的到达。当连接到来后,listen事件触发并调用accept()接收,并为这个新的cli_fd分配一个新的连接标识conn,此时conn的事件域是main_base,然后选择一个线程t去处理这个连接,这里主进程与线程间通信是通过管道来实现的,向选定的t->thread_send_fd发送一个字符。而工作线程都会监听管道IO事件,事件触发后会注册连接conn的读事件并变更conn的事件域为t->base,从而将一个连接交由线程去处理。
      这里的模型与传统模型一样,主进程监听,连接到来后交由线程去处理。每到来一个连接,就触发一个线程的管道IO事件去处理连接,并且这里的线程选择是用的RoundRobin算法,所有每个线程是均匀分配任务的。

推ter开源库Twemcache分析

      UDP:UDP中只有一个接收fd,初始化时主进程会对每个工作线程tX的管道tX->thread_send_fd写字符,从而触发所有工作线程,注册conn的读事件[此时conn不代表一个完整的连接,只含有服务端信息],并变更conn的事件域为tX->base,这样所有线程都监听fd的报文。
      这里看出线程策略上与TCP的不同,TCP是均匀的分配任务给线程,UDP则是启动所有线程去监听fd并竞争接收报文,任务的分配并不保证均匀。

推ter开源库Twemcache分析

存储模型
      为了解决内存malloc/free带来的性能开销,twemcache使用了slab来管理内存,具体如下图所示:

推ter开源库Twemcache分析

       slabclass是一个数组,每个槽对应不同大小的item大小,size表示该槽的item大小;需要slabclass数组的原因是item是变长的,预分配全部大小相同的item会造成大量碎片。从上至下,item的大小依次增加,每个slab是一组item的集合,slab的大小是固定的(slab_size),可以通过参数--slab_size配置;当一个slab用完后,新的分配请求要来,则分配一个新的slab,比较重要的属性是free_item和nfree_itemq,free_item指向当前槽中第一个空闲的item,nfree_itemq是一个链表,所有使用过被删除了的item会放入其中重复利用。
      item是实际存储的数据单元,因此这里slab管理以item为单位,当需要分配一个item时,并不直接malloc,而是从slabclass中取一个已经分配好的,因为每个槽代表了一种item大小且是有序的(递增),可以用二分查找到最接近要分配item大小的槽,并从中直接获取。
      同时,slab是由heapinfo来进行管理的,nslab表示当前slab的数目,每分配一个新的slab(malloc了一块内存),就会顺序的插入到slab_table中,即slab_table[nslab++]=slab。在淘汰slab时使用的是LRU算法,heapinfo->slab_lruq维护了这样一个slab的LRU链表,每次被修改过的slab会移到链表尾,淘汰时选择链表头。

推ter开源库Twemcache分析

      item则是由item_lruq[]管理的,item_lruq是一个数组,与slabclass数组一一对应,即item_lruq[id]链接了slabclass[id]中被使用的item,当item需要进行淘汰时,采用的同样是LRU算法。

推ter开源库Twemcache分析

      上面都是内存的管理,item真正存储的数据结构是hashtable,因为twemcache要实现的是{key, value}的映射。primary_hashtable就是存储的数据结构,只有插入hashtable的item才算作被使用,需要被链到item_lruq中。而同时存在的old_hashtable则是当primary_hashtable需要进行扩容时使用的:当primary_hashtable中item数据超过1.5倍hashtable大小时,进行扩容,此时old_hashtable指向primary_hashtable,在扩容和迁移数据期间所有的item操作转移到old_hashtable中,同时唤醒assoc_maintenance_thread进行数据迁移工作,将所有old_hashtable中的item迁移到扩容后的primary_hashtable中。

推ter开源库Twemcache分析

Slab分配策略
      slab的分配策略是写覆盖,当有新的slab分配请求,不断分配slab,直到达到上限max_nslab,此时slab不再重新分配,而是从已在使用的slab中找出一个淘汰掉,并作为此次的要用的slab,分配的策略是由slab_get()完成的,决策过程如下:
      1. 分配新的slab,若失败则2
      2. 如设置LRU淘汰策略,则淘汰最近未使用的,若失败则3
      3. 如设置RANDOM淘汰策略,则随机淘汰一个

slab_get()
      大体上来说,slab_get_new()是决策1,slab_evict_lru()是决策2,slab_evict_rand()是决策3,经过决策后,如果分配到了新的slab,则重新初始化它,并添加到slab_table和slab_lruq中,这是由slab_add_one()完成的。下面分析下slab_evict_lru()淘汰规则。

slab = slab_get_new();  if (slab == NULL && (settings.evict_opt & EVICT_LS)) {      slab = slab_evict_lru(id);  }  if (slab == NULL && (settings.evict_opt & EVICT_RS)) {      slab = slab_evict_rand();  }  if (slab != NULL) {      stats_slab_settime(id, slab_new_ts, time_now());      slab_add_one(slab, id);      status = MC_OK;  } 


slab_evict_lru()
      所有使用的slab都会添加到slab_lurq中(即slab_lruq_head()),找到最近未使用的即是查找LRU链表中最靠前的slab且其refcount==0,为了避免遍历链表的时间消耗,tries限制了至多遍历slab_lruq的前SLAB_LRU_MAX_TRIES个元素,如果找到了这样的slab,则掉用slab_evict_one()将它淘汰,淘汰包括将它从slab_lruq上删除,其中所有item从item_lruq上删除,从slabclass相应槽中删除。

for (tries = SLAB_LRU_MAX_TRIES, slab = slab_lruq_head();      tries > 0 && slab != NULL;      tries--, slab = TAILQ_NEXT(slab, s_tqe)) {      if (slab->refcount == 0) {          break;      }  }  ……  slab_evict_one(slab);


Item分配策略
      当需要新的item时,会经过一组决策来决定新分配的item取自哪里,这组决策都是在_item_alloc()中完成的,决策过程如下:
       1. 查找一个过时的item,如无则2
       2. 查找一个slab上空闲的item,如无则3
         a. 当前slab上有空闲item
         b. 当前slab上没有空闲item,分配新的slab
      3. 淘汰一个item

_item_alloc()
      大体来说,item_get_from_lruq()是决策1,slab_get_item()是决策2,item_reuse()是决策3。这里的uit是LRU上最近未使用的一个item,如果设置了EVICT_LRU即LRU淘汰策略的话,则在决策1和2未成功时执行3。除了决策2,其它两个都是对不会再使用的item的复用,过程是先在item_lruq[id]中查找是否有已超时的,有则返回给it;没有则试图从slab上分配一个item,有则返回给it;没有则试图复用最近未使用的uit。下面分析item_get_from_lruq()和slab_get_item()。

it = item_get_from_lruq(id); /* expired / unexpired lru item */    if (it != NULL && item_expired(it)) {      stats_slab_incr(id, item_expire);      stats_slab_settime(id, item_expire_ts, it->exptime);      item_reuse(it);      goto done;  }    uit = (settings.evict_opt & EVICT_LRU)? it : NULL; /* keep if can be used */  it = slab_get_item(id);  if (it != NULL) {      goto done;  }    if (uit != NULL) {      it = uit;      stats_slab_incr(id, item_evict);      stats_slab_settime(id, item_evict_ts, time_now());        item_reuse(it);      goto done;  }


item_get_from_lruq()
      函数从item_lruq中查找已超时的item记录为it,最近未使用的item记录为uit。id是根据item的大小所对应的槽id,槽中所有使用的item都会链在item_lruq[id]上,遍历item_lruq[id]上的item,如果it->refcount!=0则表示还在被使用,这样的item不能复用,直接跳过;对于refcount==0的item,如果找到超时的,则直接返回它,在查找过程中,记录第一个refcount==0并且未超时的item(即最近未使用),作为决策3淘汰的对象。
      tries限制了遍历的长度不能超过ITEM_LRUQ_MAX_TRIES,这样节省了大量链表遍历的时间,并且按LRU的性质,越靠近链表头的元素越有可能作为淘汰对象,所有遍历前ITEM_LRUQ_MAX_TRIES已经覆盖了大部分情况。

for (tries = ITEM_LRUQ_MAX_TRIES, it = TAILQ_FIRST(&item_lruq[id]),       uit = NULL; it != NULL && tries > 0;       tries--, it = TAILQ_NEXT(it, i_tqe)) {        if (it->refcount != 0) {       ……          continue;      }        if (item_expired(it)) {          return it;      } else if (uit == NULL) {          uit = it;      }  }


slab_get_item() -> _slab_get_item()
       slab_get_item_from_freeq()从slabclass[id]中查找是否有空闲的item可用(即p->free_itemq),有则返回it,p->free_itemq上记录的是使用过后被删除的item;如果没有这样的item,则从p->free_item上取,它记录还未使用过的item的首地址;如果没有这样的item,则表示当前slab已经用满了,需要分配新的slab,slab_get()使用slab分配策略分配一个新的slab,此时slab中的item都未使用,都记录到p->free_item中。最后从p->free_item中最得一个item返回就可以了。

p = &slabclass[id];  it = slab_get_item_from_freeq(id);  if (it != NULL)  return it;  if (p->free_item == NULL && (slab_get(id) != MC_OK)) {      return NULL;  }  it = p->free_item;  if (--p->nfree_item != 0) {      p->free_item = (struct item *)(((uint8_t *)p->free_item) + p->size);  } else {      p->free_item = NULL;  }


Hashtable策略
      作为核心的存储结构,twemcache使用的是链式哈希表,其主体由mc_assoc.c实现,hashtable初始大小为64K,在需要时进行扩容,在操作上与平时使用的hashtable并无差别,下面仅分析插入时assoc_insert()及扩容时assoc_expand()。
assoc_insert()
      assoc_get_bucket()获取当前需要插入的桶,里面封装了对hashtable的选择,在存储模型里已经说明了primary_hashtable和old_hashtable的不同作用,当hashtable正在扩容时,expanding==1(并且expand_bucket小于hashtable大小),返回old_hashtable;否则返回primary_hashtable。SLIST_INSERT_HEAD将新的item插入到桶中,nhash_item表示hashtable中item的数目,当其达到hashtable大小的1.5倍时,调用assoc_expand()进行扩容。注意这里的插入操作不用去查找是否已有item存在,这里使用的策略是先删除已存在的item,再插入新的item,所有查找已存在操作会存在于删除操作中,不会存在于插入操作中。

bucket = assoc_get_bucket(item_key(it), it->nkey);  SLIST_INSERT_HEAD(bucket, it, h_sle);  nhash_item++;  if ((expanding == 0) && (nhash_item > (HASHSIZE(hash_power) * 3) / 2)) {      assoc_expand();  }


assoc_expand()
      函数进行hashtable的扩容,hash_power表示表大小的2次幂,当需要扩容时,hash_power + 1表示扩容一倍,old_hashtable指向primary_hashtable,primary_hashtable则指向新创建的hashtable,最近发送信号量给maintenance线程,这个线程一直等待在maintenance_cond信号量上,它负责将old_hashtable中的”所有”item插入到新的primary_hashtable。
      这里要注意的是,在扩容期间,新的item会插入到old_hashtable,这样不断有item到来,扩容线程可能永远也无法将item完全从old_hashtable迁移到primary_hashtable。这里使用了expland_bucket,它标识扩容了多少个桶,当expland_bucket > HASHSIZE(hash_power - 1)时(即超过了扩容前hashtable大小)时,这时新的item不再会插入到old_hashtable,而是插入到primary_hashtable,从而保证数据迁移一定可以在有限时间内完成。

uint32_t hashtable_sz = HASHSIZE(hash_power + 1);  old_hashtable = primary_hashtable;  primary_hashtable = assoc_create_table(hashtable_sz);  ……  hash_power++;  expanding = 1;  expand_bucket = 0;  pthread_cond_signal(&maintenance_cond);


状态机
      非阻塞自然会与状态机相关联,twemcache也使用了状态机来结合epoll调用,状态机的核心处理函数是core_drive_machine(),下面的所有状态迁移入口都是以该函数为入口的,它的大致结构如下,每次循环结束代表一次事件处理完成,在一次事件中可能发生多个状态迁移。

while (!stop) {      switch (c->state) {      case CONN_LISTEN:  ……      case CONN_WAIT:  ……      case CONN_READ:  ……      case CONN_PARSE:  ……      case CONN_NEW_CMD:  ……      case CONN_NREAD:  ……      case CONN_SWALLOW:  ……      case CONN_WRITE:  ……      case CONN_MWRITE:  ……      case CONN_CLOSE:  ……      default:  ……      }  }

       TCP/UNIX域和UDP的流程稍有不同,前者多了客户端建立连接的过程,它们的流程图如下所示,图中用蓝色虚线圈住的是一次连接的状态转移,在一个循环中,它们拥有相同的连接标识conn。

推ter开源库Twemcache分析

TCP状态机

推ter开源库Twemcache分析

UDP状态机


Twemcache实例分析
 系统初始化

core_init() // 初始化  core_loop() // 系统启动


core_init()
       下面是提取的core_init的核心代码段,main_base是创建的主进程的事件域,assoc_init()初始化核心存储结构hashtable,item_init()初始化了管理item的item_lruq,slab_init()决定了每个slabclass槽的item大小,并预分配的内存,time_init()则向main_base中注册了定时器事件clockevent,为系统提供秒级的当前时间,thread_init()分配并启动了线程模型中描述的各类线程。下面就重要的slab_init()和thread_init()详细分析下。

status = log_init(settings.verbose, settings.log_filename);  status = signal_init();  pthread_mutex_init(&accept_lock, NULL);  STAILQ_INIT(&listen_connq);  main_base = event_base_new();  status = assoc_init();  conn_init();  item_init();  status = slab_init();  stats_init();  status = klog_init();  time_init();  status = thread_init(main_base);


slab_init()
      执行两步操作:slab_slabclass_init() / slab_heapinfo_init()。

slab_slabclass_init()
      它的作用是设置slabclass每个槽中item的大小,这里的nitem是slabclass中item的个数,item_sz是item的大小,free_itemq链接被删除的item,nfree_item记录空闲的item个数,free_item指向第一个空闲的item。这里决定item大小很重要的因素是profile => settings.profile,它记录了每个槽item的大小,在mc_generate_profile()中设置。

for (id = SLABCLASS_MIN_ID; id <= slabclass_max_id; id++) {      struct slabclass *p; /* slabclass */      uint32_t nitem;      /* # item per slabclass */      size_t item_sz;      /* item size */        nitem = slab_size() / profile[id];      item_sz = profile[id];      p = &slabclass[id];        p->nitem = nitem;      p->size = item_sz;        p->nfree_itemq = 0;      TAILQ_INIT(&p->free_itemq);        p->nfree_item = 0;      p->free_item = NULL;  }


slab_heapinfo_init()
       nslab表示当前分配的slab,max_nslab表示最多能分配的slab,base表示slab内存的基址,如果是预分配策略的话,则一次性全部分配,否则则在每次需要时分配slab;curr表示当前指向的slab,slab_table记录所有分配使用的slab,slab_lruq链接所有分配使用的slab,并在需要时用LRU算法进行淘汰。

heapinfo.nslab = 0;  heapinfo.max_nslab = settings.maxbytes / settings.slab_size;    heapinfo.base = NULL;  if (settings.prealloc) {      heapinfo.base = mc_alloc(heapinfo.max_nslab * settings.slab_size);      ......  }  heapinfo.curr = heapinfo.base;  heapinfo.slab_table = mc_alloc(sizeof(*heapinfo.slab_table) * heapinfo.max_nslab);  ....  TAILQ_INIT(&heapinfo.slab_lruq);


thread_init()
       分配线程,nworkers代表工作线程的数目,1是主进程,即这里的dispatcher。以后在使用线程时,threads + id * sizeof(*threads)即为每id个线程。

threads = mc_zalloc(sizeof(*threads) * (1 + nworkers));  if (threads == NULL) {      return MC_ENOMEM;  }  dispatcher = &threads[nworkers];

       对于每个工作线程,建立一个管道,fds[0]用于工作线程接收来自主进程的数据,fds[1]用于主进程向工作线程发送数据(这里的数据只做信号作用),thread_setup()则为每个事件创建一个独立的事件域t->base,并在t->base中注册了管道IO事件,监听fds[0]的读事件,读事件触发则执行thread_libevent_process(),它负责完成由主进程转来的客户端连接conn。

for (i = 0; i < nworkers; i++) {      int fds[2];      status = pipe(fds);      if (status < 0) {          log_error("pipe failed: %s", strerror(errno));          return status;      }      threads[i].notify_receive_fd = fds[0];      threads[i].notify_send_fd = fds[1];      status = thread_setup(&threads[i]);      if (status != MC_OK) {          return status;      }  }

       然后启动所有工作线程,thread_worker_main执行的操作很简单 – 开始事件的监听(event_base_loop)。

for (i = 0; i < nworkers; i++) {      status = thread_create(thread_worker_main, &threads[i]);      if (status != MC_OK) {          return status;      }  }

      最后,还会设置和启动aggregator线程和klogger线程,在线程模型中已有描述,两个线程都有独立的事件域,并在其上注册了定时器事件,前者用于采集状态数据,后者用于输出log信息,启动线程后执行的操作与工作线程一样 – 开始事件的监听event_base_dispatch()。

status = thread_setup_aggregator();  status = thread_create(thread_aggregator_main, NULL);  status = thread_setup_klogger();  status = thread_create(thread_klogger_main, NULL);


core_loop()
       core_create_socket创建服务器的套接字,然后event_base_loop()开始监听事件,下面详细分析core_create_socket()。

status = core_create_socket();  event_base_loop(main_base, 0);


core_create_socket()    -> core_create_inet_socket()
       如果是udp端口,则没有listen()和accept()的过程,thread_dispatch()向每个工作线程的管道写入字符,触发工作线程执行连接sd的监听事件,竞争地读取客户端发往sd的请求报文。
如果是tcp端口,conn_set_event()向主进程main_base中注册sd的监听事件,当sd有连接到来由主进程经过accept()后再交由指定的线程去处理。
       这里的conn代表了一个连接的标识,用完的conn会放入free_connq中,当下次需要conn时就不用重新分配内存了,而会直接从free_connq中复用。

if (udp) {      int c;      for (c = 0; c < settings.num_workers; c++) {          status = thread_dispatch(sd, CONN_READ, EV_READ | EV_PERSIST, 1);          ......      }  } else {      conn = conn_get(sd, CONN_LISTEN, EV_READ | EV_PERSIST, 1, 0);      ......      STAILQ_INSERT_HEAD(&listen_connq, conn, c_tqe);        status = conn_set_event(conn, main_base);      ......  }


实例 [TCP连接,客户端请求”set foo bar”]
CONN_LISTEN -> CONN_NEW_CMD
      当客户端连接到达后,c->sd监听事件触发,调用core_event_handler() -> core_accept()接收客户端连接。
core_accept()
      它的核心代码如下,accept()完成与客户端的三次握手建立连接,返回socket sd,然后主进程将这个连接sd交由一个工作线程去处理,这是由thread_dispatch()完成的。

sd = accept(c->sd, NULL, NULL);  ......  status = thread_dispatch(sd, CONN_NEW_CMD, EV_READ | EV_PERSIST, 0);


thread_dispatch()
      conn_get()获取一个连接conn并将它初始化为本次连接的标识,接下来tid是选择要处理这个连接的线程,选择的算法是Round Robin,即每次循环递增一个id号,然后将连接标识c压入选定的线程t->new_cq中,它存储了线程要处理的所有连接,最后向t->notify_send_fd写一个字符,触发工作线程t的管道IO事件,让其处理新的连接。此时,连接由主进程由给了工作线程t,表现在事件域发生了变更,接下来c的处理都在工作线程中,直到连接关闭。

c = conn_get(sd, state, ev_flags, rsize, udp);  ......  tid = (last_thread + 1) % settings.num_workers;  t = threads + tid;  last_thread = tid;    conn_cq_push(&t->new_cq, c);  n = write(t->notify_send_fd, "", 1);


CONN_NEW_CMD -> CONN_WAIT
      nreqs是一次事件中,能处理的最大请求数目,避免工作线程被某个连接完全占用,core_reset_cmd_handler()会重新初始化连接相关的数据如req_type, item等,最后设置状态为CONN_WAIT。

--nreqs;  if (nreqs >= 0) {   core_reset_cmd_handler(c);  }


CONN_WAIT -> CONN_READ
      更新事件为监听可读事件,并设置状态为CONN_READ,stop是个标志,所有的状态迁移在一个while(!stop)循环中,只要stop未设为true,则这次状态迁移还要继续,只有当stop为true时才代表一次处理完成,重新回到epoll进入监听状态。

status = core_update(c, EV_READ | EV_PERSIST);  if (status != MC_OK) {      log_error("update on c %d failed: %s", c->sd, strerror(errno));      conn_set_state(c, CONN_CLOSE);      break;  }  conn_set_state(c, CONN_READ);  stop = true;


CONN_READ -> CONN_PARSE
      状态CONN_READ作用是完成客户端命令读取。假设是TCP连接,core_read()调用core_read_tcp()完成命令读取,并根据读取结果设置连接状态,读取完整会设为CONN_PARSE状态。

case CONN_READ:      core_read(c);      break;


core_read_tcp()
      c代表了客户端连接,读取数据到c->rbuf中,根据返回值会有三种情况:
        1. n<0&&(errno==EGAIN||errno==EWOULDBLOCK)    连接不可读,返回等待下次读取
        2. 0         3. n==size    数据占满了c->rbuf,但仍未读完,重新分配rbuf大小,并再次读取数据到c->rbuf中,直到读取完成。

for (;;) {      ......      size = c->rsize - c->rbytes;      n = read(c->sd, c->rbuf + c->rbytes, size);      if (n > 0) {          stats_thread_incr_by(data_read, n);          gotdata = READ_DATA_RECEIVED;          c->rbytes += n;          if (n == size) {              continue;          } else {              break;          }      }      ……      if (errno == EAGAIN || errno == EWOULDBLOCK) {          log_debug(LOG_VERB, "recv on c %d not ready - eagain", c->sd);          break;      }  }


CONN_PARSE -> CONN_NREAD
      状态CONN_PARSE作用是完成客户端命令的分析(命令的分析并不包括附带的数据),调用core_parse()完成[假设客户端命令是”set foo bar”]。
      core_parse() -> asc_parse() -> asc_dispatch() -> asc_process_update()
asc_process_update()
      之前的函数对命令进行了解析,假设客户端命令是”set foo bar”,则收到数据与解析后的结果如图所示(其中0x20是空格,0x0D 0x0A是回车换行符):

推ter开源库Twemcache分析

      item_alloc()按前面的item分配策略为本次命令分配了一个item – it,并设置了c->ritem指向item的数据(即value),rlbytes表示仍未读取的命令长部,即数据部分(“bar”),最后设置状态为CONN_NREAD。

it = item_alloc(key, nkey, flags, time_reltime(exptime), vlen);  ......  c->item = it;  c->ritem = item_data(it);  c->rlbytes = it->nbyte;  conn_set_state(c, CONN_NREAD);


CONN_NREAD -> CONN_WRITE
      状态CONN_NREAD完成命令数据部分的分析,这个状态至少要循环两次,前面几次将c->rlbytes(数据部分长度)读入到c->ritem中,这部分数据可能在CONN_READ时已读入到c->rbuf中,那么此时c->rbytes > 0,直接从缓冲区取这部分数据就可以了,即第二个if语句段;这部分数据可能还没有读取,那么调用read()从c->sd中读取。读取的数据放到c->ritem即数据区,并更新c->rlbytes,它表示数据部分还有多少字节未读取,当读取完后最后一次进入循环,c->rlbytes == 0,此时调用core_complete_nread()完成数据部分的存储,下面分析这个函数。

if (c->rlbytes == 0) {   core_complete_nread(c);   break;  }    if (c->rbytes > 0) {   int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;   if (c->ritem != c->rcurr) {    memmove(c->ritem, c->rcurr, tocopy);   }   c->ritem += tocopy;   c->rlbytes -= tocopy;   c->rcurr += tocopy;   c->rbytes -= tocopy;   if (c->rlbytes == 0) {    break;   }  }    n = read(c->sd, c->ritem, c->rlbytes);  if (n > 0) {   stats_thread_incr_by(data_read, n);   if (c->rcurr == c->ritem) {    c->rcurr += n;   }   c->ritem += n;   c->rlbytes -= n;   break;  }


core_complete_nread() -> asc_complete_nread()
      item_store()将读取的数据部分(即value)存入相应的item,根据返回值,成功存入则返回STORED,执行asc_write_stored()将回送信息”STORED”写入连接c的发送缓冲区c->wbuf,并设置状态为CONN_WRITE,c->write_and_go = CONN_NEW_CMD,这个变量指示了CONN_WRITE状态后要迁移到哪个状态。至此,这次请求对item的使用已经完成了,调用item_remove()减小计数,因为item还链在item_lruq上,所以不并实际删除,然后c->item = NULL,表示此次请求不再使用item。下面详细分析item的value存储函数item_store()。

ret = item_store(it, c->req_type, c);  switch (ret) {  case STORED:   asc_write_stored(c);   break;   ......  }  ......  item_remove(c->item);  c->item = NULL;


item_store() -> _item_store()
      在存储模型中已经描述,item最终存储的数据结构是hashtable,_item_get()从hashtable中以键值key(即foo)查找相应的item。
      如果没有找到item,则是首次插入,调用_item_link()将它插入到hashtable中;
      如果找到item,则调用_item_replace()替代之前的item。

key = item_key(it);  nit = NULL;  oit = _item_get(key, it->nkey);  ……  if (result == NOT_STORED && store_it) {      if (oit != NULL) {          _item_replace(oit, it);      } else {          _item_link(it);      }      result = STORED;  }

       此例中是第一次set foo,_item_get()会返回NULL,最终调用_item_link(),这个函数很简单,更改它的flags |= ITEM_LINKED表示被链接,assoc_insert()将这个item插入到hashtable中,item_link_q()将item链到item_lruq上。

it->flags |= ITEM_LINKED;  item_set_cas(it, item_next_cas());  assoc_insert(it);  item_link_q(it);


CONN_WRITE -> CONN_NEW_CMD
      CONN_WRITE状态完成客户端的应答,应答内容在CONN_NREAD状态下已经写入到c->wbuf中了,首先调用conn_add_iov()将c->wbuf中的内容组装成msgbuf的形式。

if (c->iov_used == 0 || (c->udp && c->iov_used == 1)) {   status = conn_add_iov(c, c->wcurr, c->wbytes);   ......  }

      然后由core_transmit()完成内容的发送,发送成功会返回TRANSMIT_COMPLETE(至少需要两次循环,同CONN_NREAD),因为此时c->state为CONN_WRITE,变迁状态至c->write_and_go(即CONN_NEW_CMD),从而完成了这一次请求。当然,core_transmit()也会失败,最大可能是因为socket当时并不可写,写socket的时机并不是由epoll的写事件触发的,这种情况下会返回TRANSMIT_SOFT_ERR,它置stop=true,表示此次事件处理完成,等待socket的可写事件到达。下面分析core_transmit()函数。

switch (core_transmit(c)) {  case TRANSMIT_COMPLETE:   if (c->state == CONN_MWRITE) {          ......    conn_set_state(c, CONN_NEW_CMD);   } else if (c->state == CONN_WRITE) {    if (c->write_and_free) {     mc_free(c->write_and_free);     c->write_and_free = 0;    }    conn_set_state(c, c->write_and_go);   } else {    log_debug(LOG_INFO, "unexpected state %d", c->state);    conn_set_state(c, CONN_CLOSE);   }   break;  case TRANSMIT_INCOMPLETE:  case TRANSMIT_HARD_ERROR:   break;  case TRANSMIT_SOFT_ERROR:   stop = true;   break;  }


core_transmit()
      msg_curr和msg_used对比表示是否还有数据需要发送,没有时返回TRANSMIT_COMPLETE;仍有数据则调用sendmsg()进行发送,res > 0表示发送成功,此时返回TRANSMIT_INCOMPLETE,这和CONN_NREAD状态下读取数据的做法是一样的,至少需要两次core_transmit,在发送完后最后一次进入会返回TRANSMIT_COMPLETE;res == -1及errno判断表示c->sd此时并不可写,我们是在读事件触发后直接写socket,不可写则core_update()更新c->sd上的监听事件为写事件,并返回TRANSMIT_SOFT_ERROR,它会导致此次事件处理结束,结果就是等待写事件的到来。

if (c->msg_curr < c->msg_used) {  ……  res = sendmsg(c->sd, m, 0);  if (res > 0) {    ......    return TRANSMIT_INCOMPLETE;  }  if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {    status = core_update(c, EV_WRITE | EV_PERSIST);    if (status != MC_OK) {     log_error("update on c %d failed: %s", c->sd, strerror(errno));     conn_set_state(c, CONN_CLOSE);     return TRANSMIT_HARD_ERROR;    }    return TRANSMIT_SOFT_ERROR;  }  } else {   return TRANSMIT_COMPLETE;  }


CONN_NEW_CMD
       这个状态代表了该连接上可以接受下一个请求了,即一次客户端请求结束。

      由上面的分析可见,所有的状态转移都是在core_dirve_machine()函数中完成的,并且并不是每个状态对应一个事件,twemcache对状态的划分是按功能来的,比如在读事件中就会完成读数据、分析数据两个功能,下面的图表示了各状态执行时所处的事件:

推ter开源库Twemcache分析

 

 

转自:http://blog.csdn.net/qy532846454/article/details/7899780