使用codecs自定义编/解码方案

BorisGoldie 7年前
   <p>相信很多使用Python的同学熟悉编解码,比如:</p>    <pre>  <code class="language-python">In : print u'\U0001F3F4'.encode('utf-8')  ?  In : '哈哈'.decode('utf8')  Out: u'\u54c8\u54c8'  </code></pre>    <p>这样可以在字符串和unicode之前转换,不过细心的同学可能发现了,我使用了「utf-8」和「utf8」,这2个词长得很像。事实上都能正常使用是由于他们都是「utf_8」的别名,这些别名的对应关系可以用如下方法找到:</p>    <pre>  <code class="language-python">In : import encodings    In : encodings.aliases.aliases['utf8']  Out: 'utf_8'    In : '哈哈'.decode('u8')  Out: u'\u54c8\u54c8'    In : '哈哈'.decode('utf')  Out: u'\u54c8\u54c8'    In : '哈哈'.decode('utf8_ucs2')  Out: u'\u54c8\u54c8'    In : '哈哈'.decode('utf8_ucs4')  Out: u'\u54c8\u54c8'  </code></pre>    <p>encodings是标准库中自带的编码库,其中包含了上百个标准的编码转换方案。但是通常并不需要使用encodings,而是使用codecs。</p>    <p>codecs包含了编码解码器的注册和其他基本的类,开发者还可以通过codecs提供的接口自定义编/解码方案,也就是可以创造一个新的编解码转换方案,使用encode(‘XX’)和decode(‘XX’)的方式使用。今天我给大家演示直接进行Fernet对称加密的例子。</p>    <p>互联网安全的重要性不必在复述了,大家都应该接触过一些加密技术,可能听过M2Crypto、PyCrypto、Cryptography之类的库。在这里歪个楼,现在的主流是使用Cryptography,它的出现就是为了替代之前的那些库,具体的可以看官方文档。我们使用Cryptography提供的Fernet类来实现,首先实现一个Codec类:</p>    <pre>  <code class="language-python">import codecs    from cryptography.fernet import Fernet  from cryptography.fernet import InvalidToken    KEY = Fernet.generate_key()  f = Fernet(KEY)      classFernetCodec(codecs.Codec):      defencode(self, input, errors='fernet.strict'):          return f.encrypt(input), len(input)        defdecode(self, input, errors='fernet.strict'):          try:              return f.decrypt(input), len(input)          except InvalidToken:              error = codecs.lookup_error(errors)              return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,                                              'Invalid Token'))  </code></pre>    <p>当然也不必在类中实现encode和decode方法,单独的2个函数也可以。我这里是为了给之后的演示到的类复用。</p>    <p>如果你看过内置的字符串encode的方法,它的文档说还接收第二个参数,让你告诉它当出现出错的时候如何去处理,默认是strict,直接就会抛出来错误。</p>    <p>其余可选的还有ignore、replace、xmlcharrefreplace:</p>    <pre>  <code class="language-python">In [35]: '\x80abc'.decode('utf-8', 'strict')  ---------------------------------------------------------------------------  UnicodeDecodeError                        Traceback (most recent call last)  <ipython-input-35-974ebc908d50> in <module>()  ----> 1 '\x80abc'.decode('utf-8', 'strict')    /Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)       14       15 def decode(input, errors='strict'):  ---> 16     return codecs.utf_8_decode(input, errors, True)       17       18 class IncrementalEncoder(codecs.IncrementalEncoder):    UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte    In [36]: '\x80abc'.decode('utf-8', 'replace')  Out[36]: u'\uffabc'    In [37]: '\x80abc'.decode('utf-8', 'ignore')  Out[37]: u'abc'  </code></pre>    <p>事实上Python还内置了其他的选项,如backslashreplace、namereplace、surrogatepass、surrogateescape等,有兴趣的可以看 <a href="/misc/goto?guid=4959748303016804242" rel="nofollow,noindex">源码实现</a> .</p>    <p>我也会定义2种错误函数,因为在解密(执行decrypt)的时候可能会报InvalidToken错误,但是InvalidToken不包含任何参数,而对错误处理的时候需要知道起始和结束的位置,所以我就直接抛一个UnicodeDecodeError错误了。</p>    <p>接着我们定义递增式和流式的编码类:</p>    <pre>  <code class="language-python">class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):      pass      class IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):      pass      class StreamReader(FernetCodec, codecs.StreamReader):      pass      class StreamWriter(FernetCodec, codecs.StreamWriter):      pass  </code></pre>    <p>由于这个要加密的字符串不会很长,没有必要实现这些类,我就简单的继承然后pass了。接着开开放入口:</p>    <pre>  <code class="language-python">defgetregentry():      return codecs.CodecInfo(          name='fernet',          encode=FernetCodec().encode,          decode=FernetCodec().decode,          incrementalencoder=IncrementalEncoder,          incrementaldecoder=IncrementalDecoder,          streamwriter=StreamWriter,          streamreader=StreamReader,      )  </code></pre>    <p>其实incrementalencoder、streamwriter这些参数不传递也是可以的,默认是None,今天只是为了让大家知道是有这部分接口的。然后是注册这个入口,为了提供更好的性能,我创建一个函数加上缓存功能:</p>    <pre>  <code class="language-python">_cache = {}  _unknown = '--unknown--'    defsearch_function(encoding):      import encodings      encoding = encodings.normalize_encoding(encoding)      entry = _cache.get(encoding, _unknown)      if entry is not _unknown:          return entry        if encoding == 'fernet':          entry = getregentry()          _cache[encoding] = entry          return entry                                                    codecs.register(search_function)  </code></pre>    <p>这里简单介绍下normalize_encoding的意义,之前我说到了别名,我们再感受下:</p>    <pre>  <code class="language-python">In : u'哈哈'.encode('utf-8')  Out: '\xe5\x93\x88\xe5\x93\x88'    In : u'哈哈'.encode('utf_8')  Out: '\xe5\x93\x88\xe5\x93\x88'  </code></pre>    <p>这种中划线和下划线最后无差别对待,就是通过这个函数标准化的。</p>    <p>codecs模块底层维护了一个搜索函数的列表,通过调用codecs.register方法就把上述函数append进去了。</p>    <p>最后我们注册2个错误处理的函数:</p>    <pre>  <code class="language-python">defstrict_errors(exc):      if isinstance(exc, UnicodeDecodeError):          raise TypeError('Invalid Token')      deffallback_errors(exc):      s = []      for c in exc.object[exc.start:exc.end]:  # 只是为了演示提供的接口,实施上就是返回原来的input          s.append(c)      return ''.join(s), exc.end    codecs.register_error('fernet.strict', strict_errors)  codecs.register_error('fernet.fallback', fallback_errors)  </code></pre>    <p>默认使用的是fernet.strict这种处理方案,也可以使用fallback模式。ok,我们现在感受一下:</p>    <pre>  <code class="language-python">In [1]: import fernet    In [2]: input = 'hah'    In [3]: output = input.encode('fernet')    In [4]: output  # 已经是加密后的结果了  Out[4]: 'gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA=='    In [5]: output.decode('fernet')  # 解密后还原成原来的字符串  Out[5]: 'hah'    In [6]: input.decode('fernet')  'fernet' codec can't decode bytes in position 0-3: Invalid Token  ---------------------------------------------------------------------------  TypeError                                 Traceback (most recent call last)  <ipython-input-6-1c20dc246c52> in <module>()  ----> 1 input.decode('fernet')    /Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)       20             error = codecs.lookup_error(errors)       21             return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,  ---> 22                                             'Invalid Token'))       23       24    /Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)       68     print exc       69     if isinstance(exc, UnicodeDecodeError):  ---> 70         raise TypeError('Invalid Token')       71       72    TypeError: Invalid Token  # 抛了个自定义错误    In [7]: input.decode('fernet', 'fernet.fallback')  Out[7]: 'hah'  # 解密不成功,返回原来的字符串  </code></pre>    <p>好了,自定义的加解密方案完成了,整理全部的代码如下:</p>    <pre>  <code class="language-python">import codecs    from cryptography.fernet import Fernet  from cryptography.fernet import InvalidToken    KEY = Fernet.generate_key()  f = Fernet(KEY)  _cache = {}  _unknown = '--unknown--'      classFernetCodec(codecs.Codec):      defencode(self, input, errors='fernet.strict'):          return f.encrypt(input), len(input)        defdecode(self, input, errors='fernet.strict'):          try:              return f.decrypt(input), len(input)          except InvalidToken:              error = codecs.lookup_error(errors)              return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,                                              'Invalid Token'))      classIncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):      pass      classIncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):      pass      classStreamReader(FernetCodec, codecs.StreamReader):      pass      classStreamWriter(FernetCodec, codecs.StreamWriter):      pass        defgetregentry():      return codecs.CodecInfo(          name='fernet',          encode=FernetCodec().encode,          decode=FernetCodec().decode,          incrementalencoder=IncrementalEncoder,          incrementaldecoder=IncrementalDecoder,          streamwriter=StreamWriter,          streamreader=StreamReader,      )      defsearch_function(encoding):      import encodings      encoding = encodings.normalize_encoding(encoding)      entry = _cache.get(encoding, _unknown)      if entry is not _unknown:          return entry        if encoding == 'fernet':          entry = getregentry()          _cache[encoding] = entry          return entry      defstrict_errors(exc):      if isinstance(exc, UnicodeDecodeError):          raise TypeError('Invalid Token')      deffallback_errors(exc):      s = []      for c in exc.object[exc.start:exc.end]:          s.append(c)      return ''.join(s), exc.end      codecs.register(search_function)  codecs.register_error('fernet.strict', strict_errors)  codecs.register_error('fernet.fallback', fallback_errors)  </code></pre>    <p> </p>    <p>来自:http://www.dongwm.com/archives/使用codecs自定义编-解码方案/</p>    <p> </p>