当前位置:Gxlcms > PHP教程 > osx-请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

osx-请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

时间:2021-07-01 10:21:17 帮助过:22人阅读

最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

 Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

回复内容:

最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

 Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

consumer应该只能通过循环来判断broker是否有变化,然后更新说。

人气教程排行