当前位置:Gxlcms > Python > Python基于Socket实现异步非阻塞

Python基于Socket实现异步非阻塞

时间:2021-07-01 10:21:17 帮助过:96人阅读

本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow。具有很好的参考价值,下面跟着小编一起来看下吧

Python的Web框架中Tornado以异步非阻塞而闻名。本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow。

一、源码

本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞的Web框架,其中便是众多异步非阻塞Web框架内部原理。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import socket
import select
import time
class HttpResponse(object):
 """
 封装响应信息
 """
 def init(self, content=''):
  self.content = content
  self.headers = {}
  self.cookies = {}
 def response(self):
  return bytes(self.content, encoding='utf-8')
class HttpNotFound(HttpResponse):
 """
 404时的错误提示
 """
 def init(self):
  super(HttpNotFound, self).init('404 Not Found')
class HttpRequest(object):
 """
 用户封装用户请求信息
 """
 def init(self, conn):
  self.conn = conn
  self.header_bytes = bytes()
  self.header_dict = {}
  self.body_bytes = bytes()
  self.method = ""
  self.url = ""
  self.protocol = ""
  self.initialize()
  self.initialize_headers()
 def initialize(self):
  header_flag = False
  while True:
   try:
    received = self.conn.recv(8096)
   except Exception as e:
    received = None
   if not received:
    break
   if header_flag:
    self.body_bytes += received
    continue
   temp = received.split(b'\r\n\r\n', 1)
   if len(temp) == 1:
    self.header_bytes += temp
   else:
    h, b = temp
    self.header_bytes += h
    self.body_bytes += b
    header_flag = True
 @property
 def header_str(self):
  return str(self.header_bytes, encoding='utf-8')
 def initialize_headers(self):
  headers = self.header_str.split('\r\n')
  first_line = headers[0].split(' ')
  if len(first_line) == 3:
   self.method, self.url, self.protocol = headers[0].split(' ')
   for line in headers:
    kv = line.split(':')
    if len(kv) == 2:
     k, v = kv
     self.header_dict[k] = v
class Future(object):
 """
 异步非阻塞模式时封装回调函数以及是否准备就绪
 """
 def init(self, callback):
  self.callback = callback
  self._ready = False
  self.value = None
 def set_result(self, value=None):
  self.value = value
  self._ready = True
 @property
 def ready(self):
  return self._ready
class TimeoutFuture(Future):
 """
 异步非阻塞超时
 """
 def init(self, timeout):
  super(TimeoutFuture, self).init(callback=None)
  self.timeout = timeout
  self.start_time = time.time()
 @property
 def ready(self):
  current_time = time.time()
  if current_time > self.start_time + self.timeout:
   self._ready = True
  return self._ready
class Snow(object):
 """
 微型Web框架类
 """
 def init(self, routes):
  self.routes = routes
  self.inputs = set()
  self.request = None
  self.async_request_handler = {}
 def run(self, host='localhost', port=9999):
  """
  事件循环
  :param host:
  :param port:
  :return:
  """
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.bind((host, port,))
  sock.setblocking(False)
  sock.listen(128)
  sock.setblocking(0)
  self.inputs.add(sock)
  try:
   while True:
    readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
    for conn in readable_list:
     if sock == conn:
      client, address = conn.accept()
      client.setblocking(False)
      self.inputs.add(client)
     else:
      gen = self.process(conn)
      if isinstance(gen, HttpResponse):
       conn.sendall(gen.response())
       self.inputs.remove(conn)
       conn.close()
      else:
       yielded = next(gen)
self.async_request_handler[conn] = yielded
    self.polling_callback()
except Exception as e:
   pass
  finally:
   sock.close()
def polling_callback(self):
  """
  遍历触发异步非阻塞的回调函数
  :return:
  """
  for conn in list(self.async_request_handler.keys()):
   yielded = self.async_request_handler[conn]
   if not yielded.ready:
    continue
   if yielded.callback:
    ret = yielded.callback(self.request, yielded)
    conn.sendall(ret.response())
   self.inputs.remove(conn)
   del self.async_request_handler[conn]
   conn.close()
 def process(self, conn):
  """
  处理路由系统以及执行函数
  :param conn:
  :return:
  """
  self.request = HttpRequest(conn)
  func = None
  for route in self.routes:
   if re.match(route[0], self.request.url):
    func = route[1]
    break
  if not func:
   return HttpNotFound()
  else:
   return func(self.request)
snow.py

二、使用

1. 基本使用

from snow import Snow
from snow import HttpResponse
def index(request):
return HttpResponse('OK')
routes = [
 (r'/index/', index),
]
app = Snow(routes)
app.run(port=8012)

2.异步非阻塞:超时

from snow import Snow
from snow import HttpResponse
from snow import TimeoutFuture
request_list = []
def async(request):
 obj = TimeoutFuture(5)
 yield obj
def home(request):
 return HttpResponse('home')
routes = [
 (r'/home/', home),
 (r'/async/', async),
]
app = Snow(routes)
app.run(port=8012)

3.异步非阻塞:等待

基于等待模式可以完成自定制操作

from snow import Snow
from snow import HttpResponse
from snow import Future
request_list = []
def callback(request, future):
 return HttpResponse(future.value)
def req(request):
 obj = Future(callback=callback)
 request_list.append(obj)
 yield obj
def stop(request):
 obj = request_list[0]
 del request_list[0]
 obj.set_result('done')
 return HttpResponse('stop')
routes = [
 (r'/req/', req),
 (r'/stop/', stop),
]
app = Snow(routes)
app.run(port=8012)

【相关推荐】

1. Python免费视频教程

2. Python学习手册

3. Python面向对象视频教程

以上就是Python基于Socket实现异步非阻塞的详细内容,更多请关注Gxl网其它相关文章!

人气教程排行