时间:2021-07-01 10:21:17 帮助过:22人阅读
1 import socket 2 import sys 3 4 messages = [ b‘This is the message. ‘, 5 b‘It will be sent ‘, 6 b‘in parts.‘, 7 ] 8 server_address = (‘localhost‘, 10000) 9 10 # Create a TCP/IP socket 11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 12 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 13 ] 14 15 # Connect the socket to the port where the server is listening 16 print(‘connecting to %s port %s‘ % server_address) 17 for s in socks: 18 s.connect(server_address) 19 20 for message in messages: 21 22 # Send messages on both sockets 23 for s in socks: 24 print(‘%s: sending "%s"‘ % (s.getsockname(), message) ) 25 s.send(message) 26 27 # Read responses on both sockets 28 for s in socks: 29 data = s.recv(1024) 30 print( ‘%s: received "%s"‘ % (s.getsockname(), data) ) 31 if not data: 32 print(sys.stderr, ‘closing socket‘, s.getsockname() )client
selectors模块
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print(‘accepted‘, conn, ‘from‘, addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print(‘echoing‘, repr(data), ‘to‘, conn) 16 conn.send(data) # Hope it won‘t block 17 else: 18 print(‘closing‘, conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind((‘localhost‘, 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)selectors
开发堡垒机之前,先来学习Python的paramiko模块,该模块机遇SSH用于连接远程服务器并执行相关操作
SSHClient
用于连接远程服务器并执行基本命令
基于用户名密码连接:
1 import paramiko 2 3 # 创建SSH对象 4 ssh = paramiko.SSHClient() 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, password=‘123‘) 9 10 # 执行命令 11 stdin, stdout, stderr = ssh.exec_command(‘df‘) 12 # 获取命令结果 13 result = stdout.read() 14 15 # 关闭连接 16 ssh.close()
1 import paramiko 2 3 transport = paramiko.Transport((‘hostname‘, 22)) 4 transport.connect(username=‘wupeiqi‘, password=‘123‘) 5 6 ssh = paramiko.SSHClient() 7 ssh._transport = transport 8 9 stdin, stdout, stderr = ssh.exec_command(‘df‘) 10 print stdout.read() 11 12 transport.close()
基于公钥密钥连接:
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 # 创建SSH对象 6 ssh = paramiko.SSHClient() 7 # 允许连接不在know_hosts文件中的主机 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 连接服务器 10 ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, key=private_key) 11 12 # 执行命令 13 stdin, stdout, stderr = ssh.exec_command(‘df‘) 14 # 获取命令结果 15 result = stdout.read() 16 17 # 关闭连接 18 ssh.close()
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 transport = paramiko.Transport((‘hostname‘, 22)) 6 transport.connect(username=‘wupeiqi‘, pkey=private_key) 7 8 ssh = paramiko.SSHClient() 9 ssh._transport = transport 10 11 stdin, stdout, stderr = ssh.exec_command(‘df‘) 12 13 transport.close()SSHClient 封装 Transport
1 import paramiko 2 from io import StringIO 3 4 key_str = """-----BEGIN RSA PRIVATE KEY----- 5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8 6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans 7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e 8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC 9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP 10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A 11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+ 12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh 13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8 14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz 15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6 16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC 17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl 18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq 19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65 20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA 21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM 22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH 23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8 24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg 25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/ 26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj 27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw 28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI 29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0= 30 -----END RSA PRIVATE KEY-----""" 31 32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) 33 transport = paramiko.Transport((‘10.0.1.40‘, 22)) 34 transport.connect(username=‘wupeiqi‘, pkey=private_key) 35 36 ssh = paramiko.SSHClient() 37 ssh._transport = transport 38 39 stdin, stdout, stderr = ssh.exec_command(‘df‘) 40 result = stdout.read() 41 42 transport.close() 43 44 print(result)基于私钥字符串进行连接
SFTPClient
用于连接远程服务器并执行上传下载
基于用户名密码上传下载
1 import paramiko 2 3 transport = paramiko.Transport((‘hostname‘,22)) 4 transport.connect(username=‘wupeiqi‘,password=‘123‘) 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 将location.py 上传至服务器 /tmp/test.py 8 sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘) 9 # 将remove_path 下载到本地 local_path 10 sftp.get(‘remove_path‘, ‘local_path‘) 11 12 transport.close()
基于公钥密钥上传下载
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 transport = paramiko.Transport((‘hostname‘, 22)) 6 transport.connect(username=‘wupeiqi‘, pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 将location.py 上传至服务器 /tmp/test.py 10 sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘) 11 # 将remove_path 下载到本地 local_path 12 sftp.get(‘remove_path‘, ‘local_path‘) 13 14 transport.close()
安装 rabbitMA
http://www.cnblogs.com/ericli-ericli/p/5902270.html
http://blog.csdn.net/zyz511919766/article/details/41946521
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host=‘localhost‘)) 4 channel = connection.channel() 5 6 channel.queue_declare(queue=‘hello‘) 7 8 channel.basic_publish(exchange=‘‘, 9 routing_key=‘hello‘, 10 body=‘Hello World!‘) 11 print(" [x] Sent ‘Hello World!‘") 12 connection.close()生产者
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host=‘localhost‘)) 4 channel = connection.channel() 5 6 channel.queue_declare(queue=‘hello‘) 7 8 def callback(ch, method, properties, body): 9 print(" [x] Received %r" % body) 10 11 channel.basic_consume(callback, 12 queue=‘hello‘, 13 no_ack=True) 14 15 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 16 channel.start_consuming()消费者
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘10.211.55.4‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello‘) 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print ‘ok‘ 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 16 channel.basic_consume(callback, 17 queue=‘hello‘, 18 no_ack=False) 19 20 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 21 channel.start_consuming()消费者
2、durable 消息不丢失
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent ‘Hello World!‘") connection.close()生产者
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue=‘hello‘, durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print ‘ok‘ 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue=‘hello‘, 19 no_ack=False) 20 21 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 22 channel.start_consuming()消费者
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,但是在消费者端,配置prefetch_count
=
1
,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()消费者
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=‘localhost‘)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=‘logs‘, 9 type=‘fanout‘) 10 11 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange=‘logs‘, 13 routing_key=‘‘, 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()发布者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()订阅者
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=‘localhost‘)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=‘direct_logs‘, 9 type=‘direct‘) 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 17 sys.exit(1