Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Maryellen94 8年前

来自: http://my.oschina.net/eddylinux/blog/607074


Memcached  Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。  Memcached安装和基本使用  Memcached安装:  wget http://memcached.org/latest  tar -zxvf memcached-1.x.x.tar.gz  cd memcached-1.x.x  ./configure && make && make test && sudo make install     PS:依赖libevent         yum install libevent-devel         apt-get install libevent-dev  启动Memcached   memcached -d -m 10    -u root -l 10.211.55.4 -p 12000 -c 256 -P /tmp/memcached.pid     参数说明:      -d 是启动一个守护进程      -m 是分配给Memcache使用的内存数量,单位是MB      -u 是运行Memcache的用户      -l 是监听的服务器IP地址      -p 是设置Memcache监听的端口,最好是1024以上的端口      -c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定      -P 是设置保存Memcache的pid文件   Memcached命令    存储命令: set/add/replace/append/prepend/cas  获取命令: get/gets  其他命令: delete/stats..   Python操作Memcached  安装API  python操作Memcached使用Python-memcached模块  下载安装:https://pypi.python.org/pypi/python-memcached   1、第一次操作       import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)  mc.set("foo", "bar")  ret = mc.get('foo')  print ret  Ps:debug = True 表示运行出现错误时,现实错误信息,上线后移除该参数。  2、天生支持集群  python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比   主机    权重      1.1.1.1   1      1.1.1.2   2      1.1.1.3   1     那么在内存中主机列表为:      host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]  如果用户根据如果要在内存中创建一个键值对(如:k1 = "v1"),那么要执行一下步骤:  根据算法将 k1 转换成一个数字  将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )  在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]  连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中  代码实现如下:    mc = memcache.Client([('1.1.1.1:12000', 1), ('1.1.1.2:12000', 2), ('1.1.1.3:12000', 1)], debug=True)     mc.set('k1', 'v1')    3、add  添加一条键值对,如果已经存在的 key,重复执行add操作异常  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)  mc.add('k1', 'v1')  # mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!  4、replace  replace 修改某个key的值,如果key不存在,则异常  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)  # 如果memcache中存在kkkk,则替换成功,否则一场  mc.replace('kkkk','999')  5、set 和 set_multi  set            设置一个键值对,如果key不存在,则创建,如果key存在,则修改  set_multi   设置多个键值对,如果key不存在,则创建,如果key存在,则修改  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)     mc.set('key0', 'eddy')     mc.set_multi({'key1': 'val1', 'key2': 'val2'})  6、delete 和 delete_multi  delete          在Memcached中删除指定的一个键值对  delete_multi    在Memcached中删除指定的多个键值对  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)     mc.delete('key0')  mc.delete_multi(['key1', 'key2'])  7、get 和 get_multi  get            获取一个键值对  get_multi   获取多一个键值对  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)     val = mc.get('key0')  item_dict = mc.get_multi(["key1", "key2", "key3"])  8、append 和 prepend  append    修改指定key的值,在该值 后面 追加内容  prepend   修改指定key的值,在该值 前面 插入内容  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)  # k1 = "v1"     mc.append('k1', 'after')  # k1 = "v1after"     mc.prepend('k1', 'before')  # k1 = "beforev1after"  9、decr 和 incr    incr  自增,将Memcached中的某一个值增加 N ( N默认为1 )  decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache     mc = memcache.Client(['10.211.55.4:12000'], debug=True)  mc.set('k1', '777')     mc.incr('k1')  # k1 = 778     mc.incr('k1', 10)  # k1 = 788     mc.decr('k1')  # k1 = 787     mc.decr('k1', 10)  # k1 = 777  10、gets 和 cas  如商城商品剩余个数,假设改值保存在memcache中,product_count = 900  A用户刷新页面从memcache中读取到product_count = 900  B用户刷新页面从memcache中读取到product_count = 900  如果A、B用户均购买商品  A用户修改商品剩余个数 product_count=899  B用户修改商品剩余个数 product_count=899  如此一来缓存内的数据便不在正确,两个用户购买商品后,商品剩余还是 899  如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!  如果想要避免此情况的发生,只要使用 gets 和 cas 即可,如:  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import memcache  mc = memcache.Client(['10.211.55.4:12000'], debug=True, cache_cas=True)     v = mc.gets('product_count')  # ...  # 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生  mc.cas('product_count', "899")  Ps:本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。  Memcached 真的过时了吗?  Redis  redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。  Redis安装和基本使用  wget http://download.redis.io/releases/redis-3.0.6.tar.gz  tar xzf redis-3.0.6.tar.gz  cd redis-3.0.6  make  启动服务端  src/redis-server  启动客户端  src/redis-cli  redis> set foo bar  OK  redis> get foo  "bar"  Python操作Redis  安装API  sudo pip install redis  or  sudo easy_install redis  or  源码安装     详见:  常用操作  1、操作模式  redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。   #!/usr/bin/env python  # -*- coding:utf-8 -*-     import redis     r = redis.Redis(host='10.211.55.4', port=6379)  r.set('foo', 'Bar')  print r.get('foo')  2、连接池  redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。  #!/usr/bin/env python  # -*- coding:utf-8 -*-     import redis     pool = redis.ConnectionPool(host='10.211.55.4', port=6379)     r = redis.Redis(connection_pool=pool)  r.set('foo', 'Bar')  print r.get('foo')  3、管道  redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。  #!/usr/bin/env python  # -*- coding:utf-8 -*-     import redis     pool = redis.ConnectionPool(host='10.211.55.4', port=6379)     r = redis.Redis(connection_pool=pool)     # pipe = r.pipeline(transaction=False)  pipe = r.pipeline(transaction=True)     r.set('name', 'alex')  r.set('role', 'sb')     pipe.execute()  4、发布订阅  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import redisclass RedisHelper:          def __init__(self):          self.__conn = redis.Redis(host='10.211.55.4')          self.chan_sub = 'fm104.5'          self.chan_pub = 'fm104.5'        def public(self, msg):          self.__conn.publish(self.chan_pub, msg)                  return True          def subscribe(self):          pub = self.__conn.pubsub()          pub.subscribe(self.chan_sub)          pub.parse_response()                  return pub  订阅者:     #!/usr/bin/env python  # -*- coding:utf-8 -*-     from monitor.RedisHelper import RedisHelper     obj = RedisHelper()  redis_sub = obj.subscribe()     while True:      msg= redis_sub.parse_response()      print msg       发布者:      #!/usr/bin/env python  # -*- coding:utf-8 -*-     from monitor.RedisHelper import RedisHelper     obj = RedisHelper()  obj.public('hello')  更多参见:https://github.com/andymccurdy/redis-py/  RabbitMQ  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。  RabbitMQ安装  安装配置epel源     $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm     安装erlang     $ yum -y install erlang     安装RabbitMQ     $ yum -y install rabbitmq-server  注意:service rabbitmq-server start/stop  安装API    pip install pika  or  easy_install pika  or  源码     https://pypi.python.org/pypi/pika   使用API操作RabbitMQ  基于Queue实现生产者消费者模型  #!/usr/bin/env python  # -*- coding:utf-8 -*-  import Queue  import threading      message = Queue.Queue(10)      def producer(i):      while True:          message.put(i)      def consumer(i):      while True:          msg = message.get()      for i in range(12):      t = threading.Thread(target=producer, args=(i,))      t.start()    for i in range(10):      t = threading.Thread(target=consumer, args=(i,))      t.start()  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。  #!/usr/bin/env python  import pika     # ######################### 生产者 #########################     connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()     channel.queue_declare(queue='hello')     channel.basic_publish(exchange='',                        routing_key='hello',                        body='Hello World!')  print(" [x] Sent 'Hello World!'")  connection.close()    #!/usr/bin/env python  import pika     # ########################## 消费者 ##########################     connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()     channel.queue_declare(queue='hello')     def callback(ch, method, properties, body):      print(" [x] Received %r" % body)     channel.basic_consume(callback,                        queue='hello',                        no_ack=True)     print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()  1、acknowledgment 消息不丢失  no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。    import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='10.211.55.4'))  channel = connection.channel()    channel.queue_declare(queue='hello')    def callback(ch, method, properties, body):      print(" [x] Received %r" % body)      import time      time.sleep(10)      print 'ok'      ch.basic_ack(delivery_tag = method.delivery_tag)    channel.basic_consume(callback,                        queue='hello',                        no_ack=False)    print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()    消费者    2、durable   消息不丢失  #!/usr/bin/env python  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))  channel = connection.channel()    # make message persistent  channel.queue_declare(queue='hello', durable=True)    channel.basic_publish(exchange='',                        routing_key='hello',                        body='Hello World!',                        properties=pika.BasicProperties(                            delivery_mode=2, # make message persistent                        ))  print(" [x] Sent 'Hello World!'")  connection.close()    生产者    #!/usr/bin/env python  # -*- coding:utf-8 -*-  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))  channel = connection.channel()    # make message persistent  channel.queue_declare(queue='hello', durable=True)      def callback(ch, method, properties, body):      print(" [x] Received %r" % body)      import time      time.sleep(10)      print 'ok'      ch.basic_ack(delivery_tag = method.delivery_tag)    channel.basic_consume(callback,                        queue='hello',                        no_ack=False)    print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()    消费者    3、消息获取顺序  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。  channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列    #!/usr/bin/env python  # -*- coding:utf-8 -*-  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))  channel = connection.channel()    # make message persistent  channel.queue_declare(queue='hello')      def callback(ch, method, properties, body):      print(" [x] Received %r" % body)      import time      time.sleep(10)      print 'ok'      ch.basic_ack(delivery_tag = method.delivery_tag)    channel.basic_qos(prefetch_count=1)    channel.basic_consume(callback,                        queue='hello',                        no_ack=False)    print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()    消费者    4、发布订阅  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。   exchange type = fanout   #!/usr/bin/env python  import pika  import sys    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='logs',                           type='fanout')    message = ' '.join(sys.argv[1:]) or "info: Hello World!"  channel.basic_publish(exchange='logs',                        routing_key='',                        body=message)  print(" [x] Sent %r" % message)  connection.close()    发布者    #!/usr/bin/env python  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='logs',                           type='fanout')    result = channel.queue_declare(exclusive=True)  queue_name = result.method.queue    channel.queue_bind(exchange='logs',                     queue=queue_name)    print(' [*] Waiting for logs. To exit press CTRL+C')    def callback(ch, method, properties, body):      print(" [x] %r" % body)    channel.basic_consume(callback,                        queue=queue_name,                        no_ack=True)    channel.start_consuming()    订阅者    5、关键字发送   exchange type = direct  之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。  #!/usr/bin/env python  import pika  import sys    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='direct_logs',                           type='direct')    result = channel.queue_declare(exclusive=True)  queue_name = result.method.queue    severities = sys.argv[1:]  if not severities:      sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])      sys.exit(1)    for severity in severities:      channel.queue_bind(exchange='direct_logs',                         queue=queue_name,                         routing_key=severity)    print(' [*] Waiting for logs. To exit press CTRL+C')    def callback(ch, method, properties, body):      print(" [x] %r:%r" % (method.routing_key, body))    channel.basic_consume(callback,                        queue=queue_name,                        no_ack=True)    channel.start_consuming()    消费者    #!/usr/bin/env python  import pika  import sys    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='direct_logs',                           type='direct')    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  message = ' '.join(sys.argv[2:]) or 'Hello World!'  channel.basic_publish(exchange='direct_logs',                        routing_key=severity,                        body=message)  print(" [x] Sent %r:%r" % (severity, message))  connection.close()    生产者    6、模糊匹配   exchange type = topic  在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。  # 表示可以匹配 0 个 或 多个 单词  *  表示只能匹配 一个 单词  发送者路由值              队列中  old.eddy.python          old.*  -- 不匹配  old.eddy.python          old.#  -- 匹配  #!/usr/bin/env python  import pika  import sys    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='topic_logs',                           type='topic')    result = channel.queue_declare(exclusive=True)  queue_name = result.method.queue    binding_keys = sys.argv[1:]  if not binding_keys:      sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])      sys.exit(1)    for binding_key in binding_keys:      channel.queue_bind(exchange='topic_logs',                         queue=queue_name,                         routing_key=binding_key)    print(' [*] Waiting for logs. To exit press CTRL+C')    def callback(ch, method, properties, body):      print(" [x] %r:%r" % (method.routing_key, body))    channel.basic_consume(callback,                        queue=queue_name,                        no_ack=True)    channel.start_consuming()    消费者    #!/usr/bin/env python  import pika  import sys    connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()    channel.exchange_declare(exchange='topic_logs',                           type='topic')    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'  message = ' '.join(sys.argv[2:]) or 'Hello World!'  channel.basic_publish(exchange='topic_logs',                        routing_key=routing_key,                        body=message)  print(" [x] Sent %r:%r" % (routing_key, message))  connection.close()    生产者    SQLAlchemy  SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。  Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:  MySQL-Python      mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>     pymysql      mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]     MySQL-Connector      mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>     cx_Oracle      oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]     更多详见:   步骤一:  使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。  #!/usr/bin/env python  # -*- coding:utf-8 -*-     from sqlalchemy import create_engine        engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)     engine.execute(      "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"  )     engine.execute(       "INSERT INTO ts_test (a, b) VALUES (%s, %s)",      ((555, "v1"),(666, "v1"),)  )  engine.execute(      "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",      id=999, name="v1"  )     result = engine.execute('select * from ts_test')  result.fetchall()  #!/usr/bin/env python  # -*- coding:utf-8 -*-    from sqlalchemy import create_engine      engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)      # 事务操作  with engine.begin() as conn:      conn.execute("insert into table (x, y, z) values (1, 2, 3)")      conn.execute("my_special_procedure(5)")              conn = engine.connect()  # 事务操作   with conn.begin():         conn.execute("some statement", {'x':5, 'y':10})    事务操作  注:查看数据库连接:show status like 'Threads%';  步骤二:  使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。  !/usr/bin/env python  # -*- coding:utf-8 -*-     from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey     metadata = MetaData()     user = Table('user', metadata,      Column('id', Integer, primary_key=True),      Column('name', String(20)),  )     color = Table('color', metadata,      Column('id', Integer, primary_key=True),      Column('name', String(20)),  )  engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)     metadata.create_all(engine)  # metadata.clear()  # metadata.remove()    #!/usr/bin/env python  # -*- coding:utf-8 -*-    from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey    metadata = MetaData()    user = Table('user', metadata,      Column('id', Integer, primary_key=True),      Column('name', String(20)),  )    color = Table('color', metadata,      Column('id', Integer, primary_key=True),      Column('name', String(20)),  )  engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)    conn = engine.connect()    # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)  conn.execute(user.insert(),{'id':7,'name':'seven'})  conn.close()    # sql = user.insert().values(id=123, name='eddy')  # conn.execute(sql)  # conn.close()    # sql = user.delete().where(user.c.id > 1)    # sql = user.update().values(fullname=user.c.name)  # sql = user.update().where(user.c.name == 'jack').values(name='ed')    # sql = select([user, ])  # sql = select([user.c.id, ])  # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)  # sql = select([user.c.name]).order_by(user.c.name)  # sql = select([user]).group_by(user.c.name)    # result = conn.execute(sql)  # print result.fetchall()  # conn.close()    增删改查    更多内容详见:      http://www.jianshu.com/p/e6bba189fcbd      http://docs.sqlalchemy.org/en/latest/core/expression_api.html  注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。  步骤三:  使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。  #!/usr/bin/env python  # -*- coding:utf-8 -*-     from sqlalchemy.ext.declarative import declarative_base  from sqlalchemy import Column, Integer, String  from sqlalchemy.orm import sessionmaker  from sqlalchemy import create_engine     engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)     Base = declarative_base()        class User(Base):      __tablename__ = 'users'      id = Column(Integer, primary_key=True)      name = Column(String(50))     # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息  # Base.metadata.create_all(engine)     Session = sessionmaker(bind=engine)  session = Session()        # ########## 增 ##########  # u = User(id=2, name='sb')  # session.add(u)  # session.add_all([  #     User(id=3, name='sb'),  #     User(id=4, name='sb')  # ])  # session.commit()     # ########## 删除 ##########  # session.query(User).filter(User.id > 2).delete()  # session.commit()     # ########## 修改 ##########  # session.query(User).filter(User.id > 2).update({'cluster_id' : 0})  # session.commit()  # ########## 查 ##########  # ret = session.query(User).filter_by(name='sb').first()     # ret = session.query(User).filter_by(name='sb').all()  # print ret     # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()  # print ret     # ret = session.query(User.name.label('name_label')).all()  # print ret,type(ret)     # ret = session.query(User).order_by(User.id).all()  # print ret     # ret = session.query(User).order_by(User.id)[1:3]  # print ret  # session.commit()