当前位置:Gxlcms > PHP教程 > 如何基于Hyperf实现RabbitMQ+WebSocket消息推送

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

时间:2021-07-01 10:21:17 帮助过:135人阅读

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

  1. composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

  1. <?php
  2. return [
  3. 'mode' => SWOOLE_PROCESS,
  4. 'servers' => [
  5. [
  6. 'name' => 'http',
  7. 'type' => Server::SERVER_HTTP,
  8. 'host' => '0.0.0.0',
  9. 'port' => 11111,
  10. 'sock_type' => SWOOLE_SOCK_TCP,
  11. 'callbacks' => [
  12. SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
  13. ],
  14. ],
  15. [
  16. 'name' => 'ws',
  17. 'type' => Server::SERVER_WEBSOCKET,
  18. 'host' => '0.0.0.0',
  19. 'port' => 12222,
  20. 'sock_type' => SWOOLE_SOCK_TCP,
  21. 'callbacks' => [
  22. SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
  23. SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
  24. SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
  25. ],
  26. ],
  27. ],

WebSocket 服务器端代码示例

  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://doc.hyperf.io
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
  10. */
  11. namespace App\Controller;
  12. use Hyperf\Contract\OnCloseInterface;
  13. use Hyperf\Contract\OnMessageInterface;
  14. use Hyperf\Contract\OnOpenInterface;
  15. use Swoole\Http\Request;
  16. use Swoole\Server;
  17. use Swoole\Websocket\Frame;
  18. use Swoole\WebSocket\Server as WebSocketServer;
  19. class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  20. {
  21. /**
  22. * 发送消息
  23. * @param WebSocketServer $server
  24. * @param Frame $frame
  25. */
  26. public function onMessage(WebSocketServer $server, Frame $frame): void
  27. {
  28. //心跳刷新缓存
  29. $redis = $this->container->get(\Redis::class);
  30. //获取所有的客户端id
  31. $fdList = $redis->sMembers('websocket_sjd_1');
  32. //如果当前客户端在客户端集合中,就刷新
  33. if (in_array($frame->fd, $fdList)) {
  34. $redis->sAdd('websocket_sjd_1', $frame->fd);
  35. $redis->expire('websocket_sjd_1', 7200);
  36. }
  37. $server->push($frame->fd, 'Recv: ' . $frame->data);
  38. }
  39. /**
  40. * 客户端失去链接
  41. * @param Server $server
  42. * @param int $fd
  43. * @param int $reactorId
  44. */
  45. public function onClose(Server $server, int $fd, int $reactorId): void
  46. {
  47. //删掉客户端id
  48. $redis = $this->container->get(\Redis::class);
  49. //移除集合中指定的value
  50. $redis->sRem('websocket_sjd_1', $fd);
  51. var_dump('closed');
  52. }
  53. /**
  54. * 客户端链接
  55. * @param WebSocketServer $server
  56. * @param Request $request
  57. */
  58. public function onOpen(WebSocketServer $server, Request $request): void
  59. {
  60. //保存客户端id
  61. $redis = $this->container->get(\Redis::class);
  62. $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
  63. var_dump($res1);
  64. $res = $redis->expire('websocket_sjd_1', 7200);
  65. var_dump($res);
  66. $server->push($request->fd, 'Opened');
  67. }
  68. }

WebSocket 前端代码

  1. function WebSocketTest() {
  2. if ("WebSocket" in window) {
  3. console.log("您的浏览器支持 WebSocket!");
  4. var num = 0
  5. // 打开一个 web socket
  6. var ws = new WebSocket("ws://127.0.0.1:12222");
  7. ws.onopen = function () {
  8. // Web Socket 已连接上,使用 send() 方法发送数据
  9. //alert("数据发送中...");
  10. //ws.send("发送数据");
  11. };
  12. window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
  13. var ping = {"type": "ping"};
  14. ws.send(JSON.stringify(ping));
  15. }, 5000);
  16. ws.onmessage = function (evt) {
  17. var d = JSON.parse(evt.data);
  18. console.log(d);
  19. if (d.code == 300) {
  20. $(".address").text(d.address)
  21. }
  22. if (d.code == 200) {
  23. var v = d.data
  24. console.log(v);
  25. num++
  26. var str = `<div class="item">
  27. <p>${v.recordOutTime}</p>
  28. <p>${v.userOutName}</p>
  29. <p>${v.userOutNum}</p>
  30. <p>${v.doorOutName}</p>
  31. </div>`
  32. $(".tableHead").after(str)
  33. if (num > 7) {
  34. num--
  35. $(".table .item:nth-last-child(1)").remove()
  36. }
  37. }
  38. };
  39. ws.error = function (e) {
  40. console.log(e)
  41. alert(e)
  42. }
  43. ws.onclose = function () {
  44. // 关闭 websocket
  45. alert("连接已关闭...");
  46. };
  47. } else {
  48. alert("您的浏览器不支持 WebSocket!");
  49. }
  50. }

AMQP 组件

  1. composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

  1. <?php
  2. return [
  3. 'default' => [
  4. 'host' => 'localhost',
  5. 'port' => 5672,
  6. 'user' => 'guest',
  7. 'password' => 'guest',
  8. 'vhost' => '/',
  9. 'pool' => [
  10. 'min_connections' => 1,
  11. 'max_connections' => 10,
  12. 'connect_timeout' => 10.0,
  13. 'wait_timeout' => 3.0,
  14. 'heartbeat' => -1,
  15. ],
  16. 'params' => [
  17. 'insist' => false,
  18. 'login_method' => 'AMQPLAIN',
  19. 'login_response' => null,
  20. 'locale' => 'en_US',
  21. 'connection_timeout' => 3.0,
  22. 'read_write_timeout' => 6.0,
  23. 'context' => null,
  24. 'keepalive' => false,
  25. 'heartbeat' => 3,
  26. ],
  27. ],
  28. ];

MQ 消费者代码

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumer;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. use Hyperf\Server\Server;
  8. use Hyperf\Server\ServerFactory;
  9. /**
  10. * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
  11. */
  12. class DemoConsumer extends ConsumerMessage
  13. {
  14. /**
  15. * rabbmitMQ消费端代码
  16. * @param $data
  17. * @return string
  18. */
  19. public function consume($data): string
  20. {
  21. print_r($data);
  22. //获取集合中所有的value
  23. $redis = $this->container->get(\Redis::class);
  24. $fdList=$redis->sMembers('websocket_sjd_1');
  25. $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
  26. foreach($fdList as $key=>$v){
  27. if(!empty($v)){
  28. $server->push((int)$v, $data);
  29. }
  30. }
  31. return Result::ACK;
  32. }
  33. }

控制器代码

  1. /**
  2. * test
  3. * @return array
  4. */
  5. public function test()
  6. {
  7. $data = array(
  8. 'code' => 200,
  9. 'data' => [
  10. 'userOutName' => 'ccflow',
  11. 'userOutNum' => '9999',
  12. 'recordOutTime' => date("Y-m-d H:i:s", time()),
  13. 'doorOutName' => '教师公寓',
  14. ]
  15. );
  16. $data = \GuzzleHttp\json_encode($data);
  17. $message = new DemoProducer($data);
  18. $producer = ApplicationContext::getContainer()->get(Producer::class);
  19. $result = $producer->produce($message);
  20. var_dump($result);
  21. $user = $this->request->input('user', 'Hyperf');
  22. $method = $this->request->getMethod();
  23. return [
  24. 'method' => $method,
  25. 'message' => "{$user}.",
  26. ];
  27. }

最终效果

ab7e49780093484c182c1baf0dbedce.png

推荐:《PHP教程》

以上就是如何基于Hyperf实现RabbitMQ+WebSocket消息推送的详细内容,更多请关注Gxlcms其它相关文章!

人气教程排行