当前位置:Gxlcms > 数据库问题 > 异步IO\数据库\队列\缓存

异步IO\数据库\队列\缓存

时间:2021-07-01 10:21:17 帮助过:22人阅读

import select 2 import socket 3 import sys 4 import queue 5 6 7 server = socket.socket() 8 server.setblocking(0) 9 10 server_addr = (localhost,10000) 11 12 print(starting up on %s port %s % server_addr) 13 server.bind(server_addr) 14 15 server.listen(5) 16 17 18 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd 19 outputs = [] 20 21 message_queues = {} 22 23 while True: 24 print("waiting for next event...") 25 26 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 27 28 for s in readable: #每个s就是一个socket 29 30 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了, 31 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀 32 #新连接进来了,接受这个连接 33 conn, client_addr = s.accept() 34 print("new connection from",client_addr) 35 conn.setblocking(0) 36 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接 37 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到 38 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 39 40 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 41 42 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了 43 #客户端的数据过来了,在这接收 44 data = s.recv(1024) 45 if data: 46 print("收到来自[%s]的数据:" % s.getpeername()[0], data) 47 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 48 if s not in outputs: 49 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 50 51 52 else:#如果收不到data代表什么呢? 代表客户端断开了呀 53 print("客户端断开了",s) 54 55 if s in outputs: 56 outputs.remove(s) #清理已断开的连接 57 58 inputs.remove(s) #清理已断开的连接 59 60 del message_queues[s] ##清理已断开的连接 61 62 63 for s in writeable: 64 try : 65 next_msg = message_queues[s].get_nowait() 66 67 except queue.Empty: 68 print("client [%s]" %s.getpeername()[0], "queue is empty..") 69 outputs.remove(s) 70 71 else: 72 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 73 s.send(next_msg.upper()) 74 75 76 for s in exeptional: 77 print("handling exception for ",s.getpeername()) 78 inputs.remove(s) 79 if s in outputs: 80 outputs.remove(s) 81 s.close() 82 83 del message_queues[s] server 技术分享
 1 import socket
 2 import sys
 3 
 4 messages = [ bThis is the message. ,
 5              bIt will be sent ,
 6              bin parts.,
 7              ]
 8 server_address = (localhost, 10000)
 9 
10 # Create a TCP/IP socket
11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
12           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
13           ]
14 
15 # Connect the socket to the port where the server is listening
16 print(connecting to %s port %s % server_address)
17 for s in socks:
18     s.connect(server_address)
19 
20 for message in messages:
21 
22     # Send messages on both sockets
23     for s in socks:
24         print(%s: sending "%s" % (s.getsockname(), message) )
25         s.send(message)
26 
27     # Read responses on both sockets
28     for s in socks:
29         data = s.recv(1024)
30         print( %s: received "%s" % (s.getsockname(), data) )
31         if not data:
32             print(sys.stderr, closing socket, s.getsockname() )
client

selectors模块

技术分享
 1 import selectors
 2 import socket
 3  
 4 sel = selectors.DefaultSelector()
 5  
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()  # Should be ready
 8     print(accepted, conn, from, addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11  
12 def read(conn, mask):
13     data = conn.recv(1000)  # Should be ready
14     if data:
15         print(echoing, repr(data), to, conn)
16         conn.send(data)  # Hope it won‘t block
17     else:
18         print(closing, conn)
19         sel.unregister(conn)
20         conn.close()
21  
22 sock = socket.socket()
23 sock.bind((localhost, 10000))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ, accept)
27  
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)
selectors

堡垒机前戏

开发堡垒机之前,先来学习Python的paramiko模块,该模块机遇SSH用于连接远程服务器并执行相关操作

SSHClient

用于连接远程服务器并执行基本命令

基于用户名密码连接:

 1 import paramiko
 2   
 3 # 创建SSH对象
 4 ssh = paramiko.SSHClient()
 5 # 允许连接不在know_hosts文件中的主机
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 连接服务器
 8 ssh.connect(hostname=c1.salt.com, port=22, username=wupeiqi, password=123)
 9   
10 # 执行命令
11 stdin, stdout, stderr = ssh.exec_command(df)
12 # 获取命令结果
13 result = stdout.read()
14   
15 # 关闭连接
16 ssh.close()
 1 import paramiko
 2 
 3 transport = paramiko.Transport((hostname, 22))
 4 transport.connect(username=wupeiqi, password=123)
 5 
 6 ssh = paramiko.SSHClient()
 7 ssh._transport = transport
 8 
 9 stdin, stdout, stderr = ssh.exec_command(df)
10 print stdout.read()
11 
12 transport.close()

基于公钥密钥连接:

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file(/home/auto/.ssh/id_rsa)
 4  
 5 # 创建SSH对象
 6 ssh = paramiko.SSHClient()
 7 # 允许连接不在know_hosts文件中的主机
 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 9 # 连接服务器
10 ssh.connect(hostname=c1.salt.com, port=22, username=wupeiqi, key=private_key)
11  
12 # 执行命令
13 stdin, stdout, stderr = ssh.exec_command(df)
14 # 获取命令结果
15 result = stdout.read()
16  
17 # 关闭连接
18 ssh.close()
技术分享
 1 import paramiko
 2 
 3 private_key = paramiko.RSAKey.from_private_key_file(/home/auto/.ssh/id_rsa)
 4 
 5 transport = paramiko.Transport((hostname, 22))
 6 transport.connect(username=wupeiqi, pkey=private_key)
 7 
 8 ssh = paramiko.SSHClient()
 9 ssh._transport = transport
10 
11 stdin, stdout, stderr = ssh.exec_command(df)
12 
13 transport.close()
SSHClient 封装 Transport 技术分享
 1 import paramiko
 2 from io import StringIO
 3 
 4 key_str = """-----BEGIN RSA PRIVATE KEY-----
 5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
 6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
 7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
 8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
 9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
30 -----END RSA PRIVATE KEY-----"""
31 
32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
33 transport = paramiko.Transport((10.0.1.40, 22))
34 transport.connect(username=wupeiqi, pkey=private_key)
35 
36 ssh = paramiko.SSHClient()
37 ssh._transport = transport
38 
39 stdin, stdout, stderr = ssh.exec_command(df)
40 result = stdout.read()
41 
42 transport.close()
43 
44 print(result)
基于私钥字符串进行连接

SFTPClient

用于连接远程服务器并执行上传下载

基于用户名密码上传下载

 1 import paramiko
 2  
 3 transport = paramiko.Transport((hostname,22))
 4 transport.connect(username=wupeiqi,password=123)
 5  
 6 sftp = paramiko.SFTPClient.from_transport(transport)
 7 # 将location.py 上传至服务器 /tmp/test.py
 8 sftp.put(/tmp/location.py, /tmp/test.py)
 9 # 将remove_path 下载到本地 local_path
10 sftp.get(remove_path, local_path)
11  
12 transport.close()

基于公钥密钥上传下载

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file(/home/auto/.ssh/id_rsa)
 4  
 5 transport = paramiko.Transport((hostname, 22))
 6 transport.connect(username=wupeiqi, pkey=private_key )
 7  
 8 sftp = paramiko.SFTPClient.from_transport(transport)
 9 # 将location.py 上传至服务器 /tmp/test.py
10 sftp.put(/tmp/location.py, /tmp/test.py)
11 # 将remove_path 下载到本地 local_path
12 sftp.get(remove_path, local_path)
13  
14 transport.close()

RabbitMQ队列  

安装 rabbitMA

http://www.cnblogs.com/ericli-ericli/p/5902270.html

http://blog.csdn.net/zyz511919766/article/details/41946521

Work Queues

技术分享

技术分享
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3         host=localhost))
 4 channel = connection.channel()
 5  
 6 channel.queue_declare(queue=hello)
 7  
 8 channel.basic_publish(exchange=‘‘,
 9                       routing_key=hello,
10                       body=Hello World!)
11 print(" [x] Sent ‘Hello World!‘")
12 connection.close()
生产者 技术分享
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3         host=localhost))
 4 channel = connection.channel()
 5  
 6 channel.queue_declare(queue=hello)
 7  
 8 def callback(ch, method, properties, body):
 9     print(" [x] Received %r" % body)
10  
11 channel.basic_consume(callback,
12                       queue=hello,
13                       no_ack=True)
14  
15 print( [*] Waiting for messages. To exit press CTRL+C)
16 channel.start_consuming()
消费者

1、acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

技术分享
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host=10.211.55.4))
 5 channel = connection.channel()
 6 
 7 channel.queue_declare(queue=hello)
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % body)
11     import time
12     time.sleep(10)
13     print ok
14     ch.basic_ack(delivery_tag = method.delivery_tag)
15 
16 channel.basic_consume(callback,
17                       queue=hello,
18                       no_ack=False)
19 
20 print( [*] Waiting for messages. To exit press CTRL+C)
21 channel.start_consuming()
消费者

2、durable   消息不丢失

技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=hello, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body=Hello World!,
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent ‘Hello World!‘")
connection.close()
生产者 技术分享
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4))
 4 channel = connection.channel()
 5 
 6 # make message persistent
 7 channel.queue_declare(queue=hello, durable=True)
 8 
 9 
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12     import time
13     time.sleep(10)
14     print ok
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16 
17 channel.basic_consume(callback,
18                       queue=hello,
19                       no_ack=False)
20 
21 print( [*] Waiting for messages. To exit press CTRL+C)
22 channel.start_consuming()
消费者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,但是在消费者端,配置prefetch_count=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=hello)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ok
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue=hello,
                      no_ack=False)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
消费者

发布订阅

技术分享

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

 exchange type = fanout

技术分享
 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host=localhost))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange=logs,
 9                          type=fanout)
10 
11 message =  .join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange=logs,
13                       routing_key=‘‘,
14                       body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
发布者 技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         type=fanout)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=logs,
                   queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
订阅者

有选择的接收消息

技术分享

 

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

技术分享
 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host=localhost))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange=direct_logs,
 9                          type=direct)
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17     sys.exit(1                    

人气教程排行