时间:2021-07-01 10:21:17 帮助过:41人阅读
Kafka 是一种高吞吐量的分布式发布订阅消息系统
producer:生产者。
consumer:消费者。
topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。
1. 一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了空闲了
2. 一个主题下的一个分区可以同时被不同消费组其中某一个消费者消费
3. 一个主题下的一个分区只能被同一个消费组的一个消费者消费
Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。
0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。
1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。
-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。
此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。
三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。
1. earliest:自动将偏移重置为最早的偏移量
2. latest:自动将偏移量重置为最新的偏移量(默认)
3. none:如果consumer group没有发现先前的偏移量,则向consumer抛出异常。
4. 其他的参数:向consumer抛出异常(无效参数)
# 官方下载地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0
# 需先启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
# 创建一个话题,test话题2个分区 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test". # 显示所有话题 bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 显示话题信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 启动一个生产者(输入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待输入自己的内容 出现>输入即可] >i am a new msg ! >i am a good msg ? # 启动一个生产者(等待消息) # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.so
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i < 20; $i++) { //RD_KAFKA_PARTITION_UA自动选择分区 //$option可选 $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }
php producer.php # output int(20) int(20) int(20) int(20) int(0) # 你可以查看你刚才上面启动的消费者shell应该会输出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //设置消费组 $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的时间内自动提交确认、建议不要启动 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); // 设置offset的存储为broker $topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__); //smallest:简单理解为从头开始消费,其实等价于上面的 earliest //largest:简单理解为从最新的开始消费,其实等价于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test", $topicConf); // 参数1消费分区0 // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1表示消费分区,这里是分区0 //参数2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId()); echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } }
相关推荐:
kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展
kafka装配及Kafka-PHP扩展的使用
以上就是Kafka的介绍以及基于PHP的kafka的安装和测试的详细内容,更多请关注Gxl网其它相关文章!