当前位置:Gxlcms > 数据库问题 > Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

时间:2021-07-01 10:21:17 帮助过:19人阅读

订阅者:

?
1 2 3 4 5 6 7 8 9 10 11 #!/usr/bin/env python # -*- coding:utf-8 -*-   from monitor.RedisHelper import RedisHelper   obj = RedisHelper() redis_sub = obj.subscribe()   while True:     msg= redis_sub.parse_response()     print msg

发布者:

?
1 2 3 4 5 6 7 #!/usr/bin/env python # -*- coding:utf-8 -*-   from monitor.RedisHelper import RedisHelper   obj = RedisHelper() obj.public(‘hello‘)

更多参见:https://github.com/andymccurdy/redis-py/

http://doc.redisfans.com/

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ安装

?
1 2 3 4 5 6 7 8 安装配置epel源    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm   安装erlang    $ yum -y install erlang   安装RabbitMQ    $ yum -y install rabbitmq-server

注意:service rabbitmq-server start/stop

安装API

?
1 2 3 4 5 6 7 pip install pika or easy_install pika or 源码   https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

技术分享技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
View Code

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #!/usr/bin/env python import pika   # ######################### 生产者 #########################   connection = pika.BlockingConnection(pika.ConnectionParameters(         host=‘localhost‘)) channel = connection.channel()   channel.queue_declare(queue=‘hello‘)   channel.basic_publish(exchange=‘‘,                       routing_key=‘hello‘,                       body=‘Hello World!‘) print(" [x] Sent ‘Hello World!‘") connection.close()
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #!/usr/bin/env python import pika   # ########################## 消费者 ##########################   connection = pika.BlockingConnection(pika.ConnectionParameters(         host=‘localhost‘)) channel = connection.channel()   channel.queue_declare(queue=‘hello‘)   def callback(ch, method, properties, body):     print(" [x] Received %r" % body)   channel.basic_consume(callback,                       queue=‘hello‘,                       no_ack=True)   print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()

1、acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

技术分享技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘10.211.55.4‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
消费者

2、durable   消息不丢失

技术分享技术分享
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=‘hello‘, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘,
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent ‘Hello World!‘")
connection.close()
生产者 技术分享技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=‘hello‘, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
消费者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

技术分享技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=‘hello‘)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
消费者

4、发布订阅

技术分享

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

 exchange type = fanout

技术分享技术分享
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
发布者 技术分享技术分享
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=‘logs‘,
                   queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
订阅者

5、关键字发送

技术分享

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

技术分享技术分享
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         type=‘direct‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消费者 技术分享技术分享
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         type=‘direct‘)

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
生产者

6、模糊匹配

技术分享

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
?
1 2 3 发送者路由值              队列中 old.boy.python          old.*  -- 不匹配 old.boy.python          old.#  -- 匹配
技术分享技术分享
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         type=‘topic‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=‘topic_logs‘,
                       queue=queue_name,
                       routing_key=binding_key)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消费者 技术分享技术分享
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         type=‘topic‘)

routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生产者

注意:

 

技术分享技术分享
sudo rabbitmqctl add_user alex 123
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags alex administrator
# 设置权限
sudo rabbitmqctl set_permissions -p "/" alex ‘.‘‘.‘‘.‘

# 然后重启rabbiMQ服务
sudo /etc/init.d/rabbitmq-server restart
 
# 然后可以使用刚才的用户远程连接rabbitmq server了。


------------------------------
credentials = pika.PlainCredentials("alex","123")

connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.14.47‘,credentials=credentials))
View Code

SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

技术分享

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 MySQL-Python     mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>   pymysql     mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]   MySQL-Connector     mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>   cx_Oracle     oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]   更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

步骤一:

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/usr/bin/env python # -*- coding:utf-8 -*-   from sqlalchemy import create_engine     engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)   engine.execute(     "INSERT INTO ts_test (a, b) VALUES (‘2‘, ‘v1‘)" )   engine.execute(      "INSERT INTO ts_test (a, b) VALUES (%s, %s)",     ((555, "v1"),(666, "v1"),) ) engine.execute(     "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",     id=999, name="v1" )   result = engine.execute(‘select * from ts_test‘) result.fetchall()
技术分享技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from sqlalchemy import create_engine


engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)


# 事务操作
with engine.begin() as conn:
    conn.execute("insert into table (x, y, z) values (1, 2, 3)")
    conn.execute("my_special_procedure(5)")
    
    
conn = engine.connect()
# 事务操作 
with conn.begin():
       conn.execute("some statement", {‘x‘:5, ‘y‘:10})
事务操作

注:查看数据库连接:show status like ‘Threads%‘;

步骤二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #!/usr/bin/env python # -*- coding:utf-8 -*-   from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey   metadata = MetaData()   user = Table(‘user‘, metadata,     Column(‘id‘, Integer, primary_key=True),     Column(‘name‘, String(20)), )   color = Table(‘color‘, metadata,     Column(‘id‘, Integer, primary_key=True),     Column(‘name‘, String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)   metadata.create_all(engine) # metadata.clear() # metadata.remove()
技术分享技术分享
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey

metadata = MetaData()

user = Table(‘user‘, metadata,
    Column(‘id‘, Integer, primary_key=True),
    Column(‘name‘, String(20)),
)

color = Table(‘color‘, metadata,
    Column(‘id‘, Integer, primary_key=True),
    Column(‘name‘, String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)

conn = engine.connect()

# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{‘id‘:7,‘name‘:‘seven‘})
conn.close()

# sql = user.insert().values(id=123, name=‘wu‘)
# conn.execute(sql)
# conn.close()

# sql = user.delete().where(user.c.id > 1)

# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == ‘jack‘).values(name=‘ed‘)

# sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name)

# result = conn.execute(sql)
# print result.fetchall()
# conn.close()
增删改查

更多内容详见:

    http://www.jianshu.com/p/e6bba189fcbd

    http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

步骤三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #!/usr/bin/env python # -*- coding:utf-8 -*-   from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine   engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)   Base = declarative_base()     class User(Base):     __tablename__ = ‘users‘     id = Column(Integer, primary_key=True)     name = Column(String(50))   # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息 # Base.metadata.create_all(engine)   Session = sessionmaker(bind=engine) session = Session()     # ########## 增 ########## # u = User(id=2, name=‘sb‘) # session.add(u) # session.add_all([ #     User(id=3, name=‘sb‘), #     User(id=4, name=‘sb‘) # ]) # session.commit()   # ########## 删除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit()   # ########## 修改 ########## # session.query(User).filter(User.id > 2).update({‘cluster_id‘ : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name=‘sb‘).first()   # ret = session.query(User).filter_by(name=‘sb‘).all() # print ret   # ret = session.query(User).filter(User.name.in_([‘sb‘,‘bb‘])).all() # print ret   # ret = session.query(User.name.label(‘name_label‘)).all() # print ret,type(ret)   # ret = session.query(User).order_by(User.id).all() # print ret   # ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()

Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

标签:param   net   https   produce   html   min   ref   插入   update   

人气教程排行