时间:2021-07-01 10:21:17 帮助过:19人阅读
订阅者:
?1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from monitor.RedisHelper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True :
msg = redis_sub.parse_response()
print msg
|
发布者:
?1 2 3 4 5 6 7 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from monitor.RedisHelper import RedisHelper
obj = RedisHelper()
obj.public( ‘hello‘ )
|
更多参见:https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ安装
?1 2 3 4 5 6 7 8 |
安装配置epel源
$ rpm - ivh http: / / dl.fedoraproject.org / pub / epel / 6 / i386 / epel - release - 6 - 8.noarch .rpm
安装erlang
$ yum - y install erlang
安装RabbitMQ
$ yum - y install rabbitmq - server
|
注意:service rabbitmq-server start/stop
安装API
?1 2 3 4 5 6 7 |
pip install pika
or
easy_install pika
or
源码
https: / / pypi.python.org / pypi / pika
|
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()View Code
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/env python
import pika
# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = ‘localhost‘ ))
channel = connection.channel()
channel.queue_declare(queue = ‘hello‘ )
channel.basic_publish(exchange = ‘‘,
routing_key = ‘hello‘ ,
body = ‘Hello World!‘ )
print ( " [x] Sent ‘Hello World!‘" )
connection.close()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#!/usr/bin/env python
import pika
# ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = ‘localhost‘ ))
channel = connection.channel()
channel.queue_declare(queue = ‘hello‘ )
def callback(ch, method, properties, body):
print ( " [x] Received %r" % body)
channel.basic_consume(callback,
queue = ‘hello‘ ,
no_ack = True )
print ( ‘ [*] Waiting for messages. To exit press CTRL+C‘ )
channel.start_consuming()
|
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘10.211.55.4‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()消费者
2、durable 消息不丢失
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent ‘Hello World!‘") connection.close()生产者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘, durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()消费者
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()消费者
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close()发布者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()订阅者
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()消费者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()生产者
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
1 2 3 |
发送者路由值 队列中
old.boy.python old. * - - 不匹配
old.boy.python old. # -- 匹配
|
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()消费者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()生产者
注意:
sudo rabbitmqctl add_user alex 123 # 设置用户为administrator角色 sudo rabbitmqctl set_user_tags alex administrator # 设置权限 sudo rabbitmqctl set_permissions -p "/" alex ‘.‘‘.‘‘.‘ # 然后重启rabbiMQ服务 sudo /etc/init.d/rabbitmq-server restart # 然后可以使用刚才的用户远程连接rabbitmq server了。 ------------------------------ credentials = pika.PlainCredentials("alex","123") connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.14.47‘,credentials=credentials))View Code
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
?1 2 3 4 5 6 7 8 9 10 11 12 13 |
MySQL - Python
mysql + mysqldb: / / <user>:<password>@<host>[:<port>] / <dbname>
pymysql
mysql + pymysql: / / <username>:<password>@<host> / <dbname>[?<options>]
MySQL - Connector
mysql + mysqlconnector: / / <user>:<password>@<host>[:<port>] / <dbname>
cx_Oracle
oracle + cx_oracle: / / user: pass @host:port / dbname[?key = value&key = value...]
更多详见:http: / / docs.sqlalchemy.org / en / latest / dialects / index.html
|
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
engine = create_engine( "mysql+mysqldb://root:123@127.0.0.1:3306/s11" , max_overflow = 5 )
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (‘2‘, ‘v1‘)"
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)" ,
(( 555 , "v1" ),( 666 , "v1" ),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)" ,
id = 999 , name = "v1"
)
result = engine.execute( ‘select * from ts_test‘ )
result.fetchall()
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) # 事务操作 with engine.begin() as conn: conn.execute("insert into table (x, y, z) values (1, 2, 3)") conn.execute("my_special_procedure(5)") conn = engine.connect() # 事务操作 with conn.begin(): conn.execute("some statement", {‘x‘:5, ‘y‘:10})事务操作
注:查看数据库连接:show status like ‘Threads%‘;
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table( ‘user‘ , metadata,
Column( ‘id‘ , Integer, primary_key = True ),
Column( ‘name‘ , String( 20 )),
)
color = Table( ‘color‘ , metadata,
Column( ‘id‘ , Integer, primary_key = True ),
Column( ‘name‘ , String( 20 )),
)
engine = create_engine( "mysql+mysqldb://root:123@127.0.0.1:3306/s11" , max_overflow = 5 )
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table(‘user‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)), ) color = Table(‘color‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{‘id‘:7,‘name‘:‘seven‘}) conn.close() # sql = user.insert().values(id=123, name=‘wu‘) # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == ‘jack‘).values(name=‘ed‘) # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()增删改查
更多内容详见:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
engine = create_engine( "mysql+mysqldb://root:123@127.0.0.1:3306/s11" , max_overflow = 5 )
Base = declarative_base()
class User(Base):
__tablename__ = ‘users‘
id = Column(Integer, primary_key = True )
name = Column(String( 50 ))
# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine)
session = Session()
# ########## 增 ##########
# u = User(id=2, name=‘sb‘)
# session.add(u)
# session.add_all([
# User(id=3, name=‘sb‘),
# User(id=4, name=‘sb‘)
# ])
# session.commit()
# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({‘cluster_id‘ : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name=‘sb‘).first()
# ret = session.query(User).filter_by(name=‘sb‘).all()
# print ret
# ret = session.query(User).filter(User.name.in_([‘sb‘,‘bb‘])).all()
# print ret
# ret = session.query(User.name.label(‘name_label‘)).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()
|
Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy
标签:param net https produce html min ref 插入 update