零基础编写Python Redis Client(一)

jopen 8年前

什么是AIO

AIO是Asynchronous Input/Output的简写,也就是异步IO。不过在谈什么是AIO之前,我们可能要先介绍一下BIO。那么什么是BIO呢?简单的说,BIO是Blocking Input/Output,也就是阻塞IO,他实现的通常是在线程池中找出一个线程处理IO,在IO过程中,其他线程都需要等待IO完成后才可以从中选取一个线程占用IO。这样最大的问题是,当线程数量较多,并且需要大量的IO操作时,就会造成一个大量的阻塞,因为实际上每次只有一个线程在处理IO。

那么如何解决这个时候的问题呢?这时候就提出了AIO的概念。通常在IO处理过程中也会伴有一些其他的处理操作,假如把所有的操作都浪费在了等待IO释放上,线程池中的线程利用率也太低了,因此我们需要一种方式,在申请IO处理之后,就去继续做其他的事情,等IO操作完成了,然后通知我们已经OK,我们可以继续处理了。这也就是我们常说的AIO的原型。

AIO的情况也说明了它适用的场景:长连接场景,或者重度的IO操作等等的情况。

如果找软件来做案例,我们可以找一个可能大家熟知的:NGINX。正如我们所知,NGINX采用了 异步、事件驱动的方法来处理连接 。这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或者线程,而是在一个工作进程中处理多个连接和请求。为此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue这样有效的方法。

这部分的内容,在 NGINX引入线程池 性能提升9倍 中进行了详细的介绍,包含了NGINX的异步应用经验,同时介绍了NGINX中引入了阻塞的线程池用于解决某些特定场景问题下的效率。

如何实现Python的异步IO

这篇文章会以最新的Python 3.5为基础来介绍实现一个异步的Python Redis Client。不过在此之前,我们先来看一下,怎么实现Python的aio。

Python的aio官方封装了一个比较合适的基础库 asyncio 。

从一个例子开始简单认识一下如何实现一个异步的aio client。这里以官方文档中的例子为例:

import asyncio    async def tcp_echo_client(message, loop):      reader, writer = await asyncio.open_connection('127.0.0.1', 8888,                                                     loop=loop)        print('Send: %r' % message)      writer.write(message.encode())        data = await reader.read(100)      print('Received: %r' % data.decode())        print('Close the socket')      writer.close()    message = 'Hello World!'  loop = asyncio.get_event_loop()  loop.run_until_complete(tcp_echo_client(message, loop))  loop.close()

这里面用到的Python 3.5中引入的 async/await 关键字,还有 asyncio 库。这里面 asyncio.open_connection 会返回一个coroutine,这个可以使用await进行一个aio的调用,即,在收到返回信号之前,程序可以继续去处理其他的任务。这里面真正核心的就是 EventLoop ,它负责监视发送这些信号,并且返回数据,它可以通过 asyncio.get_event_loop 获取到。然后他会真正返回的数据是一个读取 StreamReader 和写入 StreamWriter 的对象。

接下来,就可以通过这个 reader 和 writer 进行数据的读取和写入。 writer 是可以直接写入的,如果是 reader 的话,就需要aio的方式等待受到数据后返回。这样看起来更接近于普通的socket编程。不过关闭连接时,仅仅需要关闭 writer 就足够了。

从socket通讯到redis通讯

本质上来说,所有的网络请求都可以看成是SocketIO的请求,因此,我们可以把Redis的请求当做是一个socket的通讯来进行,这样就很方便了。

不过先等一等,那么通讯的数据格式怎么办?没关系,这里我们使用 hiredis-py 来解决协议解析的问题。不过,从库设计的角度来说,我们需要封装一个RedisConnection的类出来解决Redis的通讯协议。它可能传入的参数包含,一个 StreamReader 、一个 StreamWriter ,一个 EventLoop ,哦,别忘记还有编码 encoding 。其他的我们就用一个 * 来表示好了。

class RedisConnection(object):      '''Redis Connection'''      def __init__(self, reader, writer, *, encoding=None, loop=None):          if loop is None:              loop = asyncio.get_event_loop()          self._reader = reader          self._writer = writer          self._encoding = encoding          self._loop = loop          self._db = 0        def __repr__(self):          return '<RedisConnection [db:{}]>'.format(self._db)

记得加上 __repr__ 用来描述这个对象,这个可是一个好习惯。接下来就需要完善这个类了,比如,我们需要添加一个关闭连接的方法,这需要至少一个参数用于标记连接是否关闭,一个用于执行关闭操作,比如我们需要这样子的:

    def close(self):          """Close connection."""          self._do_close(None)        def _do_close(self, exc):          if self._closed:              return          self._closed = True          self._closing = False          # 关闭写入          self._writer.transport.close()          # 取消读取任务          self._reader_task.cancel()          self._reader_task = None          self._writer = None          self._reader = None        @property      def closed(self):          """True if connection is closed."""          closed = self._closing or self._closed          if not closed and self._reader and self._reader.at_eof():              self._closing = closed = True              self._loop.call_soon(self._do_close, None)          return closed

连接这类的方法已经处理完了,接下来就应该是执行Redis命令了,我们可以叫它 execute 。那他需要几个东西,一个是执行的指令 command ,一个是指令参数 *args ,还有一些其他的,比如编码 encoding 。这里为了节省时间,只是考虑一些Set和Get的基本操作。哦,不过等等,那么Redis的数据结构是什么样子的呢?我们还需要先把它编译成Redis-server可以识别的形式,那么需要一个 encode_command 方法。

_converters = {      bytes: lambda val: val,      bytearray: lambda val: val,      str: lambda val: val.encode('utf-8'),      int: lambda val: str(val).encode('utf-8'),      float: lambda val: str(val).encode('utf-8'),  }      def encode_command(*args):      """Encodes arguments into redis bulk-strings array.      Raises TypeError if any of args not of bytes, str, int or float type.      """      buf = bytearray()        def add(data):          return buf.extend(data + b'\r\n')        add(b'*' + _bytes_len(args))      for arg in args:          if type(arg) in _converters:              barg = _converters[type(arg)](arg)              add(b'$' + _bytes_len(barg))              add(barg)          else:              raise TypeError("Argument {!r} expected to be of bytes,"                              " str, int or float type".format(arg))      return buf

这样可以转化为可以识别的形式了,接下来还有一个问题,那么怎么让程序可以等待信号的生效呢?这里介绍一下 asyncio.Future 。这个 asyncio.Future 类是用于封装回调函数的类,包含了一些更加方便使用的方法。通过这个类,可以实现aio的通知机制,也就是回调。这个类实例可以通过 await 返回我们需要的结果。不过这样就还需要在项目中添加一些更多的变量,比如所有等待返回的 self._waiters 。

    def execute(self, command, *args, encoding=None):          """Executes redis command and returns Future waiting for the answer.          Raises:          * TypeError if any of args can not be encoded as bytes.          * ReplyError on redis '-ERR' resonses.          * ProtocolError when response can not be decoded meaning connection            is broken.          """          assert self._reader and not self._reader.at_eof(), (              "Connection closed or corrupted")          if command is None:              raise TypeError("command must not be None")          if None in set(args):              raise TypeError("args must not contain None")          # 这样小写也没有问题了          command = command.upper().strip()          if encoding is None:              encoding = self._encoding          fut = asyncio.Future(loop=self._loop)          self._writer.write(encode_command(command, *args))          self._waiters.append((fut, encoding, cb))          return fut

现在所有的命令都已经发送到了redis-server,接下来就需要读取对应的结果了。

    async def _read_data(self):          """Response reader task."""          while not self._reader.at_eof():              try:                  data = await self._reader.read(65536)              except asyncio.CancelledError:                  break              except Exception as exc:                  # XXX: for QUIT command connection error can be received                  #       before response                  logger.error("Exception on data read %r", exc, exc_info=True)                  break              self._parser.feed(data)              while True:                  try:                      obj = self._parser.gets()                  except ProtocolError as exc:                      # ProtocolError is fatal                      # so connection must be closed                      self._closing = True                      self._loop.call_soon(self._do_close, exc)                      if self._in_transaction:                          self._transaction_error = exc                      return                  else:                      if obj is False:                          break                      else:                          self._process_data(obj)          self._closing = True          self._loop.call_soon(self._do_close, None)        def _process_data(self, obj):          """Processes command results."""          waiter, encoding, cb = self._waiters.popleft()          if waiter.done():              logger.debug("Waiter future is already done %r", waiter)              assert waiter.cancelled(), (                  "waiting future is in wrong state", waiter, obj)              return          if isinstance(obj, RedisError):              waiter.set_exception(obj)              if self._in_transaction:                  self._transaction_error = obj          else:              if encoding is not None:                  try:                      obj = decode(obj, encoding)                  except Exception as exc:                      waiter.set_exception(exc)                      return              waiter.set_result(obj)              if cb is not None:                  cb(obj)

有了这些之后,我们就可以简单创建一个连接了:

async def create_connection(address, *, db=None, password=None,                        encoding=None, loop=None):      """Creates redis connection.      Opens connection to Redis server specified by address argument.      Address argument is similar to socket address argument, ie:      * when address is a tuple it represents (host, port) pair;      * when address is a str it represents unix domain socket path.      (no other address formats supported)      Encoding argument can be used to decode byte-replies to strings.      By default no decoding is done.      Return value is RedisConnection instance.      This function is a coroutine.      """      assert isinstance(address, (tuple, list, str)), "tuple or str expected"        if isinstance(address, (list, tuple)):          host, port = address          reader, writer = await asyncio.open_connection(              host, port, loop=loop)      else:          reader, writer = await asyncio.open_unix_connection(              address, loop=loop)      conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)        try:          if password is not None:              yield from conn.auth(password)          if db is not None:              yield from conn.select(db)      except Exception:          conn.close()      return conn

这样,连接部分的代码基本上已经处理完成了,接下来要做的就是实现基于这个连接的命令执行了,下面的内容会下一个文章中继续介绍,敬请期待。

来自: http://ipfans.github.io/2015/10/write-aio-python-redis-client-as-dummy-1/

</code></code></code></code></code></code></code>