use \Swoole\Coroutine\Channel;
$chan = new Channel();
go(function () use ($chan) {
echo "我是第一个协程,等待3秒内有push就执行返回" . PHP_EOL;
$p = $chan->pop(2);
echo "pop返回结果" . PHP_EOL;
var_dump($p);
});
go(function () use ($chan) {
co::sleep(1);
$chan->push(1);
});
echo "main" . PHP_EOL;
#1处代码会首先执行,然后遇到pop(),因为channel还是空,会等待2s。此时协程会让出cpu,跳到第二个协程执行,然后#2出睡眠1秒,push变量1进去channel后返回#1处继续执行,成功取车通过中刚push的值1.运行结果为:
如果把#2处的睡眠时间换成大于pop()的等待时间,结果是:
-
<?php
-
-
-
-
-
-
-
-
use Swoole\Coroutine\Channel;
-
-
abstract class AbstractPool
-
{
-
private $min;
-
private $max;
-
private $count;
-
private $connections;
-
protected $spareTime;
-
-
protected $dbConfig = array(
-
‘host‘ => ‘10.0.2.2‘,
-
‘port‘ => 3306,
-
‘user‘ => ‘root‘,
-
‘password‘ => ‘root‘,
-
‘database‘ => ‘test‘,
-
‘charset‘ => ‘utf8‘,
-
‘timeout‘ => 2,
-
);
-
-
private $inited = false;
-
-
protected abstract function createDb();
-
-
public function __construct()
-
{
-
$this->min = 10;
-
$this->max = 100;
-
$this->spareTime = 10 * 3600;
-
$this->connections = new Channel($this->max + 1);
-
}
-
-
protected function createObject()
-
{
-
$obj = null;
-
$db = $this->createDb();
-
if ($db) {
-
$obj = [
-
‘last_used_time‘ => time(),
-
‘db‘ => $db,
-
];
-
}
-
return $obj;
-
}
-
-
-
-
-
-
public function init()
-
{
-
if ($this->inited) {
-
return null;
-
}
-
for ($i = 0; $i < $this->min; $i++) {
-
$obj = $this->createObject();
-
$this->count++;
-
$this->connections->push($obj);
-
}
-
return $this;
-
}
-
-
public function getConnection($timeOut = 3)
-
{
-
$obj = null;
-
if ($this->connections->isEmpty()) {
-
if ($this->count < $this->max) {
-
$this->count++;
-
$obj = $this->createObject();
-
} else {
-
$obj = $this->connections->pop($timeOut);
-
}
-
} else {
-
$obj = $this->connections->pop($timeOut);
-
}
-
return $obj;
-
}
-
-
public function free($obj)
-
{
-
if ($obj) {
-
$this->connections->push($obj);
-
}
-
}
-
-
-
-
-
public function gcSpareObject()
-
{
-
-
swoole_timer_tick(120000, function () {
-
$list = [];
-
-
if ($this->connections->length() < intval($this->max * 0.5)) {
-
echo "请求连接数还比较多,暂不回收空闲连接\n";
-
}
-
while (true) {
-
if (!$this->connections->isEmpty()) {
-
$obj = $this->connections->pop(0.001);
-
$last_used_time = $obj[‘last_used_time‘];
-
if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {
-
$this->count--;
-
} else {
-
array_push($list, $obj);
-
}
-
} else {
-
break;
-
}
-
}
-
foreach ($list as $item) {
-
$this->connections->push($item);
-
}
-
unset($list);
-
});
-
}
-
}
-
-
<?php
-
-
-
-
-
-
-
require "AbstractPool.php";
-
-
class MysqlPoolPdo extends AbstractPool
-
{
-
protected $dbConfig = array(
-
‘host‘ => ‘mysql:host=10.0.2.2:3306;dbname=test‘,
-
‘port‘ => 3306,
-
‘user‘ => ‘root‘,
-
‘password‘ => ‘root‘,
-
‘database‘ => ‘test‘,
-
‘charset‘ => ‘utf8‘,
-
‘timeout‘ => 2,
-
);
-
public static $instance;
-
-
public static function getInstance()
-
{
-
if (is_null(self::$instance)) {
-
self::$instance = new MysqlPoolPdo();
-
}
-
return self::$instance;
-
}
-
-
protected function createDb()
-
{
-
return new PDO($this->dbConfig[‘host‘], $this->dbConfig[‘user‘], $this->dbConfig[‘password‘]);
-
}
-
}
-
-
$httpServer = new swoole_http_server(‘0.0.0.0‘, 9501);
-
$httpServer->set(
-
[‘worker_num‘ => 1]
-
);
-
$httpServer->on("WorkerStart", function () {
-
MysqlPoolPdo::getInstance()->init();
-
});
-
$httpServer->on("request", function ($request, $response) {
-
$db = null;
-
$obj = MysqlPoolPdo::getInstance()->getConnection();
-
if (!empty($obj)) {
-
$db = $obj ? $obj[‘db‘] : null;
-
}
-
if ($db) {
-
$db->query("select sleep(2)");
-
$ret = $db->query("select * from guestbook limit 1");
-
MysqlPoolPdo::getInstance()->free($obj);
-
$response->end(json_encode($ret));
-
}
-
});
-
$httpServer->start();
代码调用过程详解:
1、server启动时,调用init()方法初始化最少数量(min指定)的连接对象,放进类型为channelle的connections对象中。在init中循环调用中,依赖了createObject()返回连接对象,而createObject()
中是调用了本来实现的抽象方法,初始化返回一个PDO db连接。所以此时,连接池connections中有min个对象。
2、server监听用户请求,当接收发请求时,调用连接数的getConnection()方法从connections通道中pop()一个对象。此时如果并发了10个请求,server因为配置了1个worker,所以再pop到一个对象返回时,遇到sleep()的查询,因为用的连接对象是pdo的查询,此时的woker进程只能等待,完成后才能进入下一个请求。因此,池中的其余连接其实是多余的,同步客户端的请求速度只能和woker的数量有关。
3、查询结束后,调用free()方法把连接对象放回connections池中。
ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,同步客户端方式总共运行时间为20s以上,而且mysql的连接始终维持在一条。结果如下:
- 协程客户端Coroutine\MySQL方式的调用
-
<?php
-
-
-
-
-
-
-
require "AbstractPool.php";
-
-
class MysqlPoolCoroutine extends AbstractPool
-
{
-
protected $dbConfig = array(
-
‘host‘ => ‘10.0.2.2‘,
-
‘port‘ => 3306,
-
‘user‘ => ‘root‘,
-
‘password‘ => ‘root‘,
-
‘database‘ => ‘test‘,
-
‘charset‘ => ‘utf8‘,
-
‘timeout‘ => 10,
-
);
-
public static $instance;
-
-
public static function getInstance()
-
{
-
if (is_null(self::$instance)) {
-
self::$instance = new MysqlPoolCoroutine();
-
}
-
return self::$instance;
-
}
-
-
protected function createDb()
-
{
-
$db = new Swoole\Coroutine\Mysql();
-
$db->connect(
-
$this->dbConfig
-
);
-
return $db;
-
}
-
}
-
-
$httpServer = new swoole_http_server(‘0.0.0.0‘, 9501);
-
$httpServer->set(
-
[‘worker_num‘ => 1]
-
);
-
$httpServer->on("WorkerStart", function () {
-
-
MysqlPoolCoroutine::getInstance()->init();
-
});
-
-
$httpServer->on("request", function ($request, $response) {
-
$db = null;
-
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
-
if (!empty($obj)) {
-
$db = $obj ? $obj[‘db‘] : null;
-
}
-
if ($db) {
-
$db->query("select sleep(2)");
-
$ret = $db->query("select * from guestbook limit 1");
-
MysqlPoolCoroutine::getInstance()->free($obj);
-
$response->end(json_encode($ret));
-
}
-
});
-
$httpServer->start();
代码调用过程详解
1、同样的,协程客户端方式下的调用,也是实现了之前封装好的连接池类AbstractPool.php。只是createDb()的抽象方法用了swoole内置的协程客户端去实现。
2、server启动后,初始化都和同步一样。不一样的在获取连接对象的时候,此时如果并发了10个请求,同样是配置了1个worker进程在处理,但是在第一请求到达,pop出池中的一个连接对象,执行到query()方法,遇上sleep阻塞时,此时,woker进程不是在等待select的完成,而是切换到另外的协程去处理下一个请求。完成后同样释放对象到池中。当中有重点解释的代码段中getConnection()中。
-
public function getConnection($timeOut = 3)
-
{
-
$obj = null;
-
if ($this->connections->isEmpty()) {
-
if ($this->count < $this->max) {
-
$this->count++;
-
$obj = $this->createObject();
-
} else {
-
$obj = $this->connections->pop($timeOut);
-
}
-
} else {
-
$obj = $this->connections->pop($timeOut);
-
}
-
return $obj;
-
}
当调用到getConnection()时,如果此时由于大量并发请求过多,连接池connections为空,而没达到最大连接max数量时时,代码运行到#1处,调用了createObject(),新建连接返回;但如果连接池connections为空,而到达了最大连接数max时,代码运行到了#2处,也就是$this->connections->pop($timeOut),此时会阻塞$timeOut的时间,如果期间有链接释放了,会成功获取到,然后协程返回。超时没获取到,则返回false。
3、最后说一下协程Mysql客户端一项重要配置,那就是代码里$dbConfig中timeout值的配置。这个配置是意思是最长的查询等待时间。可以看一个例子说明下:
-
go(function () {
-
$start = microtime(true);
-
$db = new Swoole\Coroutine\MySQL();
-
$db->connect([
-
‘host‘ => ‘10.0.2.2‘,
-
‘port‘ => 3306,
-
‘user‘ => ‘root‘,
-
‘password‘ => ‘root‘,
-
‘database‘ => ‘test‘,
-
‘timeout‘ => 4
-
]);
-
$db->query("select sleep(5)");
-
echo "我是第一个sleep五秒之后\n";
-
$ret = $db->query("select user from guestbook limit 1");
-
var_dump($ret);
-
$use = microtime(true) - $start;
-
echo "协程mysql输出用时:" . $use . PHP_EOL;
-
});
#1处代码,如果timeout配了4s查询超时,而第一条查询select sleep(5)阻塞后,协程切换到下一条sql的执行,其实$db并不能执行成功,因为用一个连接,同一个协程中,其实执行是同步的,所以此时第二条查询在等待4s超时后,没获取到db的连接执行,就会执行失败。而如果第一条查询执行的时间少于这个timeout,那么会执行查询成功。猜猜上面执行用时多少?结果如下:
如果把timeout换成6s呢,结果如下:
所以要注意的是,协程的客户端内执行其实是同步的,不要理解为异步,它只是遇到IO阻塞时能让出执行权,切换到其他协程而已,不能和异步混淆。
ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,协程客户端方式总共运行时间为2s多。结果如下:
数据库此时的连接数为10条(show full PROCESSLIST):
再尝试 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多个并发的处理,时间是20多秒,mysql连接数达到指定的最大值100个。结果如下:
四、后言
现在连接池基本实现了高并发时的连接分配和控制,但是还有一些细节要处理,例如:
- 并发时,建立了max个池对象,不能一直在池中维护这么多,要在请求空闲时,把连接池的数量维持在一个空闲值内。这里是简单做了gcSpareObject()的方法实现空闲处理。直接在初始化woker的时候调用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就会定时检测回收。问题是如何判断程序比较空闲,值得再去优化。
- 定时检测连接时候是活的,剔除死链
- 假如程序忘记调用free()释放对象到池,是否有更好方法避免这种情况?
对于以上,希望各大神看到后,能提供不错的意见!
转载于:https://my.oschina.net/u/2394701/blog/2046414
用Swoole4 打造高并发的PHP协程Mysql连接池
标签:htm date 缺点 多少 xtend getc 帮助 extends 不必要