client = redis.StrictRedis()  client.setex("key", 10, "value")

接下来作为一个程序的客户端,需要去做的就是封装出一个Redis Client。比如setex方法:

    def setex(self, key, seconds, value):          """Set the value and expiration of a key.            :raises TypeError: if seconds is neither int          """          if not isinstance(seconds, int):              raise TypeError("milliseconds argument must be int")          fut = self._conn.execute(b'SETEX', key, seconds, value)          return wait_ok(fut)







class RedisPool:      """Redis connections pool.      """        def __init__(self, address, db=0, password=None, encoding=None,                   *, minsize, maxsize, commands_factory, loop=None):          if loop is None:              loop = asyncio.get_event_loop()          self._address = address          self._db = db          self._password = password          self._encoding = encoding          self._minsize = minsize          self._factory = commands_factory          self._loop = loop  # 连接池数组          self._pool = collections.deque(maxlen=maxsize)          self._used = set()          self._acquiring = 0          self._cond = asyncio.Condition(loop=loop)        def _create_new_connection(self):          return create_redis(self._address,                              db=self._db,                              password=self._password,                              encoding=self._encoding,                              commands_factory=self._factory,                              loop=self._loop)


    async def create_pool(self, *, override_min):          # todo: drop closed connections first          # 判断是否达到了连接池数量限制          while not self._pool and self.size < self.maxsize:              self._acquiring += 1              try:                  conn = await self._create_new_connection()                  self._pool.append(conn)              finally:                  self._acquiring -= 1                  # connection may be closed at yeild point                  self._drop_closed()


    @asyncio.coroutine      def acquire(self):          """Acquires a connection from free pool.          Creates new connection if needed.          """          with await self._cond:              while True:                  await self._fill_free(override_min=True)                  if self.freesize:                      conn = self._pool.popleft()                      assert not conn.closed, conn                      assert conn not in self._used, (conn, self._used)                      self._used.add(conn)                      return conn                  else:                      await self._cond.wait()


   def release(self, conn):          """Returns used connection back into pool.          When returned connection has db index that differs from one in pool          the connection will be closed and dropped.          When queue of free connections is full the connection will be dropped.          """          assert conn in self._used, "Invalid connection, maybe from other pool"          self._used.remove(conn)          if not conn.closed:              if conn.in_transaction:                  logger.warning("Connection %r in transaction, closing it.",                                 conn)                  conn.close()              elif conn.db == self.db:                  if self.maxsize and self.freesize < self.maxsize:                      self._pool.append(conn)                  else:                      # consider this connection as old and close it.                      conn.close()              else:                  conn.close()          # FIXME: check event loop is not closed          asyncio.async(self._wakeup(), loop=self._loop)


注: 文中Redis库参考了aio-lib/aioredis库。

来自: http://ipfans.github.io/2015/10/write-aio-python-redis-client-as-dummy-2/
