当前位置:Gxlcms > PHP教程 > PHP编程中尝试程序并发的几种方式总结_PHP

PHP编程中尝试程序并发的几种方式总结_PHP

时间:2021-07-01 10:21:17 帮助过:3人阅读

本文大约总结了PHP编程中的五种并发方式:
1.curl_multi_init
文档中说的是 Allows the processing of multiple cURL handles asynchronously. 确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocks until there is activity on any of the curl_multi connections.。了解一下常见的异步模型就应该能理解,select, epoll,都很有名

  1. <?php
  2. // build the individual requests as above, but do not execute them
  3. $ch_1 = curl_init('http://www.bitsCN.com/');
  4. $ch_2 = curl_init('http://www.bitsCN.com/');
  5. curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true);
  6. curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true);
  7. // build the multi-curl handle, adding both $ch
  8. $mh = curl_multi_init();
  9. curl_multi_add_handle($mh, $ch_1);
  10. curl_multi_add_handle($mh, $ch_2);
  11. // execute all queries simultaneously, and continue when all are complete
  12. $running = null;
  13. do {
  14. curl_multi_exec($mh, $running);
  15. $ch = curl_multi_select($mh);
  16. if($ch !== 0){
  17. $info = curl_multi_info_read($mh);
  18. if($info){
  19. var_dump($info);
  20. $response_1 = curl_multi_getcontent($info['handle']);
  21. echo "$response_1 \n";
  22. break;
  23. }
  24. }
  25. } while ($running > 0);
  26. //close the handles
  27. curl_multi_remove_handle($mh, $ch_1);
  28. curl_multi_remove_handle($mh, $ch_2);
  29. curl_multi_close($mh);

这里我设置的是,select得到结果,就退出循环,并且删除 curl resource, 从而达到取消http请求的目的。

2.swoole_client
swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21, 我还没升到这个版本,所以直接exit也可以。

  1. <?php
  2. $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
  3. //设置事件回调函数
  4. $client->on("connect", function($cli) {
  5. $req = "GET / HTTP/1.1\r\n
  6. Host: www.bitsCN.com\r\n
  7. Connection: keep-alive\r\n
  8. Cache-Control: no-cache\r\n
  9. Pragma: no-cache\r\n\r\n";
  10. for ($i=0; $i < 3; $i++) {
  11. $cli->send($req);
  12. }
  13. });
  14. $client->on("receive", function($cli, $data){
  15. echo "Received: ".$data."\n";
  16. exit(0);
  17. $cli->sleep(); // swoole >= 1.7.21
  18. });
  19. $client->on("error", function($cli){
  20. echo "Connect failed\n";
  21. });
  22. $client->on("close", function($cli){
  23. echo "Connection close\n";
  24. });
  25. //发起网络连接
  26. $client->connect('183.207.95.145', 80, 1);

3.process
哎,竟然差点忘了 swoole_process, 这里就不用 pcntl 模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。

  1. <?php
  2. $workers = [];
  3. $worker_num = 3;//创建的进程数
  4. $finished = false;
  5. $lock = new swoole_lock(SWOOLE_MUTEX);
  6. for($i=0;$i<$worker_num ; $i++){
  7. $process = new swoole_process('process');
  8. //$process->useQueue();
  9. $pid = $process->start();
  10. $workers[$pid] = $process;
  11. }
  12. foreach($workers as $pid => $process){
  13. //子进程也会包含此事件
  14. swoole_event_add($process->pipe, function ($pipe) use($process, $lock, &$finished) {
  15. $lock->lock();
  16. if(!$finished){
  17. $finished = true;
  18. $data = $process->read();
  19. echo "RECV: " . $data.PHP_EOL;
  20. }
  21. $lock->unlock();
  22. });
  23. }
  24. function process(swoole_process $process){
  25. $response = 'http response';
  26. $process->write($response);
  27. echo $process->pid,"\t",$process->callback .PHP_EOL;
  28. }
  29. for($i = 0; $i < $worker_num; $i++) {
  30. $ret = swoole_process::wait();
  31. $pid = $ret['pid'];
  32. echo "Worker Exit, PID=".$pid.PHP_EOL;
  33. }

4.pthreads
编译pthreads模块时,提示php编译时必须打开ZTS, 所以貌似必须 thread safe 版本才能使用. wamp中多php正好是TS的,直接下了个dll, 文档中的说明复制到对应目录,就在win下测试了。 还没完全理解,查到文章说 php 的 pthreads 和 POSIX pthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。

  1. <?php
  2. class Foo extends Stackable {
  3. public $url;
  4. public $response = null;
  5. public function __construct(){
  6. $this->url = 'http://www.bitsCN.com';
  7. }
  8. public function run(){}
  9. }
  10. class Process extends Worker {
  11. private $text = "";
  12. public function __construct($text,$object){
  13. $this->text = $text;
  14. $this->object = $object;
  15. }
  16. public function run(){
  17. while (is_null($this->object->response)){
  18. print " Thread {$this->text} is running\n";
  19. $this->object->response = 'http response';
  20. sleep(1);
  21. }
  22. }
  23. }
  24. $foo = new Foo();
  25. $a = new Process("A",$foo);
  26. $a->start();
  27. $b = new Process("B",$foo);
  28. $b->start();
  29. echo $foo->response;

5.yield
以同步方式书写异步代码:

  1. <?php
  2. class AsyncServer {
  3. protected $handler;
  4. protected $socket;
  5. protected $tasks = [];
  6. protected $timers = [];
  7. public function __construct(callable $handler) {
  8. $this->handler = $handler;
  9. $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
  10. if(!$this->socket) {
  11. die(socket_strerror(socket_last_error())."\n");
  12. }
  13. if (!socket_set_nonblock($this->socket)) {
  14. die(socket_strerror(socket_last_error())."\n");
  15. }
  16. if(!socket_bind($this->socket, "0.0.0.0", 1234)) {
  17. die(socket_strerror(socket_last_error())."\n");
  18. }
  19. }
  20. public function Run() {
  21. while (true) {
  22. $now = microtime(true) * 1000;
  23. foreach ($this->timers as $time => $sockets) {
  24. if ($time > $now) break;
  25. foreach ($sockets as $one) {
  26. list($socket, $coroutine) = $this->tasks[$one];
  27. unset($this->tasks[$one]);
  28. socket_close($socket);
  29. $coroutine->throw(new Exception("Timeout"));
  30. }
  31. unset($this->timers[$time]);
  32. }
  33. $reads = array($this->socket);
  34. foreach ($this->tasks as list($socket)) {
  35. $reads[] = $socket;
  36. }
  37. $writes = NULL;
  38. $excepts= NULL;
  39. if (!socket_select($reads, $writes, $excepts, 0, 1000)) {
  40. continue;
  41. }
  42. foreach ($reads as $one) {
  43. $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port);
  44. if (!$len) {
  45. //echo "socket_recvfrom fail.\n";
  46. continue;
  47. }
  48. if ($one == $this->socket) {
  49. //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n";
  50. $handler = $this->handler;
  51. $coroutine = $handler($one, $data, $len, $ip, $port);
  52. if (!$coroutine) {
  53. //echo "[Run]everything is done.\n";
  54. continue;
  55. }
  56. $task = $coroutine->current();
  57. //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n";
  58. $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
  59. if(!$socket) {
  60. //echo socket_strerror(socket_last_error())."\n";
  61. $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));
  62. continue;
  63. }
  64. if (!socket_set_nonblock($socket)) {
  65. //echo socket_strerror(socket_last_error())."\n";
  66. $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));
  67. continue;
  68. }
  69. socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port);
  70. $deadline = $now + $task->timeout;
  71. $this->tasks[$socket] = [$socket, $coroutine, $deadline];
  72. $this->timers[$deadline][$socket] = $socket;
  73. } else {
  74. //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n";
  75. list($socket, $coroutine, $deadline) = $this->tasks[$one];
  76. unset($this->tasks[$one]);
  77. unset($this->timers[$deadline][$one]);
  78. socket_close($socket);
  79. $coroutine->send(array($data, $len));
  80. }
  81. }
  82. }
  83. }
  84. }
  85. class AsyncTask {
  86. public $data;
  87. public $len;
  88. public $ip;
  89. public $port;
  90. public $timeout;
  91. public function __construct($data, $len, $ip, $port, $timeout) {
  92. $this->data = $data;
  93. $this->len = $len;
  94. $this->ip = $ip;
  95. $this->port = $port;
  96. $this->timeout = $timeout;
  97. }
  98. }
  99. function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) {
  100. return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout);
  101. }
  102. function RequestHandler($socket, $req_buf, $req_len, $ip, $port) {
  103. //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n";
  104. try {
  105. list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "127.0.0.1", 2345, 3000));
  106. } catch (Exception $ex) {
  107. $rsp_buf = $ex->getMessage();
  108. $rsp_len = strlen($rsp_buf);
  109. //echo "[Exception]$rsp_buf\n";
  110. }
  111. //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n";
  112. socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port);
  113. }
  114. $server = new AsyncServer(RequestHandler);
  115. $server->Run();
  116. ?>

代码解读:

借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;
封装AsyncSendRecv接口,调用形如yield AsyncSendRecv(),更加自然;
添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。

人气教程排行