当前位置:Gxlcms > 数据库问题 > 用Swoole4 打造高并发的PHP协程Mysql连接池

用Swoole4 打造高并发的PHP协程Mysql连接池

时间:2021-07-01 10:21:17 帮助过:8人阅读

  •   use \Swoole\Coroutine\Channel;
  •   $chan = new Channel();
  •   go(function () use ($chan) {
  •   echo "我是第一个协程,等待3秒内有push就执行返回" . PHP_EOL;
  •   $p = $chan->pop(2);#1
  •   echo "pop返回结果" . PHP_EOL;
  •   var_dump($p);
  •   });
  •   go(function () use ($chan) {
  •   co::sleep(1);#2
  •   $chan->push(1);
  •   });
  •   echo "main" . PHP_EOL;
  • #1处代码会首先执行,然后遇到pop(),因为channel还是空,会等待2s。此时协程会让出cpu,跳到第二个协程执行,然后#2出睡眠1秒,push变量1进去channel后返回#1处继续执行,成功取车通过中刚push的值1.运行结果为:

    技术图片

    如果把#2处的睡眠时间换成大于pop()的等待时间,结果是:

    技术图片

    • 根据这些特性最终实现连接池的抽象封装类为:
    1.   <?php
    2.   /**
    3.   * 连接池封装.
    4.   * User: user
    5.   * Date: 2018/9/1
    6.   * Time: 13:36
    7.   */
    8.    
    9.   use Swoole\Coroutine\Channel;
    10.    
    11.   abstract class AbstractPool
    12.   {
    13.   private $min;//最少连接数
    14.   private $max;//最大连接数
    15.   private $count;//当前连接数
    16.   private $connections;//连接池组
    17.   protected $spareTime;//用于空闲连接回收判断
    18.   //数据库配置
    19.   protected $dbConfig = array(
    20.   ‘host‘ => ‘10.0.2.2‘,
    21.   ‘port‘ => 3306,
    22.   ‘user‘ => ‘root‘,
    23.   ‘password‘ => ‘root‘,
    24.   ‘database‘ => ‘test‘,
    25.   ‘charset‘ => ‘utf8‘,
    26.   ‘timeout‘ => 2,
    27.   );
    28.    
    29.   private $inited = false;
    30.    
    31.   protected abstract function createDb();
    32.    
    33.   public function __construct()
    34.   {
    35.   $this->min = 10;
    36.   $this->max = 100;
    37.   $this->spareTime = 10 * 3600;
    38.   $this->connections = new Channel($this->max + 1);
    39.   }
    40.    
    41.   protected function createObject()
    42.   {
    43.   $obj = null;
    44.   $db = $this->createDb();
    45.   if ($db) {
    46.   $obj = [
    47.   ‘last_used_time‘ => time(),
    48.   ‘db‘ => $db,
    49.   ];
    50.   }
    51.   return $obj;
    52.   }
    53.    
    54.   /**
    55.   * 初始换最小数量连接池
    56.   * @return $this|null
    57.   */
    58.   public function init()
    59.   {
    60.   if ($this->inited) {
    61.   return null;
    62.   }
    63.   for ($i = 0; $i < $this->min; $i++) {
    64.   $obj = $this->createObject();
    65.   $this->count++;
    66.   $this->connections->push($obj);
    67.   }
    68.   return $this;
    69.   }
    70.    
    71.   public function getConnection($timeOut = 3)
    72.   {
    73.   $obj = null;
    74.   if ($this->connections->isEmpty()) {
    75.   if ($this->count < $this->max) {//连接数没达到最大,新建连接入池
    76.   $this->count++;
    77.   $obj = $this->createObject();
    78.   } else {
    79.   $obj = $this->connections->pop($timeOut);//timeout为出队的最大的等待时间
    80.   }
    81.   } else {
    82.   $obj = $this->connections->pop($timeOut);
    83.   }
    84.   return $obj;
    85.   }
    86.    
    87.   public function free($obj)
    88.   {
    89.   if ($obj) {
    90.   $this->connections->push($obj);
    91.   }
    92.   }
    93.    
    94.   /**
    95.   * 处理空闲连接
    96.   */
    97.   public function gcSpareObject()
    98.   {
    99.   //大约2分钟检测一次连接
    100.   swoole_timer_tick(120000, function () {
    101.   $list = [];
    102.   /*echo "开始检测回收空闲链接" . $this->connections->length() . PHP_EOL;*/
    103.   if ($this->connections->length() < intval($this->max * 0.5)) {
    104.   echo "请求连接数还比较多,暂不回收空闲连接\n";
    105.   }#1
    106.   while (true) {
    107.   if (!$this->connections->isEmpty()) {
    108.   $obj = $this->connections->pop(0.001);
    109.   $last_used_time = $obj[‘last_used_time‘];
    110.   if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收
    111.   $this->count--;
    112.   } else {
    113.   array_push($list, $obj);
    114.   }
    115.   } else {
    116.   break;
    117.   }
    118.   }
    119.   foreach ($list as $item) {
    120.   $this->connections->push($item);
    121.   }
    122.   unset($list);
    123.   });
    124.   }
    125.   }
    126.    
    • 同步PDO客户端下实现

    1.   <?php
    2.   /**
    3.   * 数据库连接池PDO方式
    4.   * User: user
    5.   * Date: 2018/9/8
    6.   * Time: 11:30
    7.   */
    8.   require "AbstractPool.php";
    9.    
    10.   class MysqlPoolPdo extends AbstractPool
    11.   {
    12.   protected $dbConfig = array(
    13.   ‘host‘ => ‘mysql:host=10.0.2.2:3306;dbname=test‘,
    14.   ‘port‘ => 3306,
    15.   ‘user‘ => ‘root‘,
    16.   ‘password‘ => ‘root‘,
    17.   ‘database‘ => ‘test‘,
    18.   ‘charset‘ => ‘utf8‘,
    19.   ‘timeout‘ => 2,
    20.   );
    21.   public static $instance;
    22.    
    23.   public static function getInstance()
    24.   {
    25.   if (is_null(self::$instance)) {
    26.   self::$instance = new MysqlPoolPdo();
    27.   }
    28.   return self::$instance;
    29.   }
    30.    
    31.   protected function createDb()
    32.   {
    33.   return new PDO($this->dbConfig[‘host‘], $this->dbConfig[‘user‘], $this->dbConfig[‘password‘]);
    34.   }
    35.   }
    36.    
    37.   $httpServer = new swoole_http_server(‘0.0.0.0‘, 9501);
    38.   $httpServer->set(
    39.   [‘worker_num‘ => 1]
    40.   );
    41.   $httpServer->on("WorkerStart", function () {
    42.   MysqlPoolPdo::getInstance()->init();
    43.   });
    44.   $httpServer->on("request", function ($request, $response) {
    45.   $db = null;
    46.   $obj = MysqlPoolPdo::getInstance()->getConnection();
    47.   if (!empty($obj)) {
    48.   $db = $obj ? $obj[‘db‘] : null;
    49.   }
    50.   if ($db) {
    51.   $db->query("select sleep(2)");
    52.   $ret = $db->query("select * from guestbook limit 1");
    53.   MysqlPoolPdo::getInstance()->free($obj);
    54.   $response->end(json_encode($ret));
    55.   }
    56.   });
    57.   $httpServer->start();

    代码调用过程详解:
    1、server启动时,调用init()方法初始化最少数量(min指定)的连接对象,放进类型为channelle的connections对象中。在init中循环调用中,依赖了createObject()返回连接对象,而createObject()
    中是调用了本来实现的抽象方法,初始化返回一个PDO db连接。所以此时,连接池connections中有min个对象。

    2、server监听用户请求,当接收发请求时,调用连接数的getConnection()方法从connections通道中pop()一个对象。此时如果并发了10个请求,server因为配置了1个worker,所以再pop到一个对象返回时,遇到sleep()的查询,因为用的连接对象是pdo的查询,此时的woker进程只能等待,完成后才能进入下一个请求。因此,池中的其余连接其实是多余的,同步客户端的请求速度只能和woker的数量有关。
    3、查询结束后,调用free()方法把连接对象放回connections池中。

    ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,同步客户端方式总共运行时间为20s以上,而且mysql的连接始终维持在一条。结果如下:

    技术图片

    • 协程客户端Coroutine\MySQL方式的调用
    1.   <?php
    2.   /**
    3.   * 数据库连接池协程方式
    4.   * User: user
    5.   * Date: 2018/9/8
    6.   * Time: 11:30
    7.   */
    8.   require "AbstractPool.php";
    9.    
    10.   class MysqlPoolCoroutine extends AbstractPool
    11.   {
    12.   protected $dbConfig = array(
    13.   ‘host‘ => ‘10.0.2.2‘,
    14.   ‘port‘ => 3306,
    15.   ‘user‘ => ‘root‘,
    16.   ‘password‘ => ‘root‘,
    17.   ‘database‘ => ‘test‘,
    18.   ‘charset‘ => ‘utf8‘,
    19.   ‘timeout‘ => 10,
    20.   );
    21.   public static $instance;
    22.    
    23.   public static function getInstance()
    24.   {
    25.   if (is_null(self::$instance)) {
    26.   self::$instance = new MysqlPoolCoroutine();
    27.   }
    28.   return self::$instance;
    29.   }
    30.    
    31.   protected function createDb()
    32.   {
    33.   $db = new Swoole\Coroutine\Mysql();
    34.   $db->connect(
    35.   $this->dbConfig
    36.   );
    37.   return $db;
    38.   }
    39.   }
    40.    
    41.   $httpServer = new swoole_http_server(‘0.0.0.0‘, 9501);
    42.   $httpServer->set(
    43.   [‘worker_num‘ => 1]
    44.   );
    45.   $httpServer->on("WorkerStart", function () {
    46.   //MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();
    47.   MysqlPoolCoroutine::getInstance()->init();
    48.   });
    49.    
    50.   $httpServer->on("request", function ($request, $response) {
    51.   $db = null;
    52.   $obj = MysqlPoolCoroutine::getInstance()->getConnection();
    53.   if (!empty($obj)) {
    54.   $db = $obj ? $obj[‘db‘] : null;
    55.   }
    56.   if ($db) {
    57.   $db->query("select sleep(2)");
    58.   $ret = $db->query("select * from guestbook limit 1");
    59.   MysqlPoolCoroutine::getInstance()->free($obj);
    60.   $response->end(json_encode($ret));
    61.   }
    62.   });
    63.   $httpServer->start();

    代码调用过程详解
    1、同样的,协程客户端方式下的调用,也是实现了之前封装好的连接池类AbstractPool.php。只是createDb()的抽象方法用了swoole内置的协程客户端去实现。
    2、server启动后,初始化都和同步一样。不一样的在获取连接对象的时候,此时如果并发了10个请求,同样是配置了1个worker进程在处理,但是在第一请求到达,pop出池中的一个连接对象,执行到query()方法,遇上sleep阻塞时,此时,woker进程不是在等待select的完成,而是切换到另外的协程去处理下一个请求。完成后同样释放对象到池中。当中有重点解释的代码段中getConnection()中。

    1.   public function getConnection($timeOut = 3)
    2.   {
    3.   $obj = null;
    4.   if ($this->connections->isEmpty()) {
    5.   if ($this->count < $this->max) {//连接数没达到最大,新建连接入池
    6.   $this->count++;
    7.   $obj = $this->createObject();#1
    8.   } else {
    9.   $obj = $this->connections->pop($timeOut);#2
    10.   }
    11.   } else {
    12.   $obj = $this->connections->pop($timeOut);#3
    13.   }
    14.   return $obj;
    15.   }

    当调用到getConnection()时,如果此时由于大量并发请求过多,连接池connections为空,而没达到最大连接max数量时时,代码运行到#1处,调用了createObject(),新建连接返回;但如果连接池connections为空,而到达了最大连接数max时,代码运行到了#2处,也就是$this->connections->pop($timeOut),此时会阻塞$timeOut的时间,如果期间有链接释放了,会成功获取到,然后协程返回。超时没获取到,则返回false。

    3、最后说一下协程Mysql客户端一项重要配置,那就是代码里$dbConfig中timeout值的配置。这个配置是意思是最长的查询等待时间。可以看一个例子说明下:

    1.   go(function () {
    2.   $start = microtime(true);
    3.   $db = new Swoole\Coroutine\MySQL();
    4.   $db->connect([
    5.   ‘host‘ => ‘10.0.2.2‘,
    6.   ‘port‘ => 3306,
    7.   ‘user‘ => ‘root‘,
    8.   ‘password‘ => ‘root‘,
    9.   ‘database‘ => ‘test‘,
    10.   ‘timeout‘ => 4#1
    11.   ]);
    12.   $db->query("select sleep(5)");
    13.   echo "我是第一个sleep五秒之后\n";
    14.   $ret = $db->query("select user from guestbook limit 1");#2
    15.   var_dump($ret);
    16.   $use = microtime(true) - $start;
    17.   echo "协程mysql输出用时:" . $use . PHP_EOL;
    18.   });

    #1处代码,如果timeout配了4s查询超时,而第一条查询select sleep(5)阻塞后,协程切换到下一条sql的执行,其实$db并不能执行成功,因为用一个连接,同一个协程中,其实执行是同步的,所以此时第二条查询在等待4s超时后,没获取到db的连接执行,就会执行失败。而如果第一条查询执行的时间少于这个timeout,那么会执行查询成功。猜猜上面执行用时多少?结果如下:

    技术图片

    如果把timeout换成6s呢,结果如下:

    技术图片

    所以要注意的是,协程的客户端内执行其实是同步的,不要理解为异步,它只是遇到IO阻塞时能让出执行权,切换到其他协程而已,不能和异步混淆。

    ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,协程客户端方式总共运行时间为2s多。结果如下:

    技术图片

    数据库此时的连接数为10条(show full PROCESSLIST):

    技术图片

    再尝试 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多个并发的处理,时间是20多秒,mysql连接数达到指定的最大值100个。结果如下:

    技术图片

     

    四、后言

    现在连接池基本实现了高并发时的连接分配和控制,但是还有一些细节要处理,例如:

    • 并发时,建立了max个池对象,不能一直在池中维护这么多,要在请求空闲时,把连接池的数量维持在一个空闲值内。这里是简单做了gcSpareObject()的方法实现空闲处理。直接在初始化woker的时候调用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就会定时检测回收。问题是如何判断程序比较空闲,值得再去优化。
    • 定时检测连接时候是活的,剔除死链
    • 假如程序忘记调用free()释放对象到池,是否有更好方法避免这种情况?

    对于以上,希望各大神看到后,能提供不错的意见!

    转载于:https://my.oschina.net/u/2394701/blog/2046414

    用Swoole4 打造高并发的PHP协程Mysql连接池

    标签:htm   date   缺点   多少   xtend   getc   帮助   extends   不必要   

    人气教程排行