时间:2021-07-01 10:21:17 帮助过:32人阅读
PHP 版本大于 5.5
Kafka Server 版本大于 0.8.0
消费模块 Kafka Server 版本需要大于 0.9.0
添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());
// 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md)
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
return array(
array(
'topic' => 'test',
'value' => 'test....message.',
'key' => 'testkey',
),
);
});
$producer->setLogger($logger);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode, $context) {
var_dump($errorCode);
});
$producer->send();
<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(array('test'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});以上内容就是使用 PHP 编写的 Kafka 客户端 教程,希望能帮助到大家。
相关推荐:
python中Socket之客户端与服务端握手的实例
php获取访客(客户端)IP和地理位置的文字教程
怎么快速开发webservice客户端?
以上就是使用 PHP 编写的 Kafka 客户端 的详细内容,更多请关注Gxl网其它相关文章!