当前位置:Gxlcms > 数据库问题 > debezium、kafka connector 解析 mysql binlog 到 kafak

debezium、kafka connector 解析 mysql binlog 到 kafak

时间:2021-07-01 10:21:17 帮助过:3人阅读

=/opt/data/zookeeper/logs

kafka 目录下新建文件: mysql.properties

name=mysql
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=192.168.3.125
database.port=3306
database.user=root
database.password=123456
database.server.id=112233
database.server.name=test  
database.whitelist=orders,users  
database.history.kafka.bootstrap.servers=192.168.91.25:9092
database.history.kafka.topic=history.test
include.schema.changes=true
include.query=true
# options: adaptive_time_microseconds(default)adaptive(deprecated) connect()
time.precision.mode=connect
# options: precise(default) double string
decimal.handling.mode=string
# options: long(default) precise
bigint.unsigned.handling.mode=long

修改 kafka  conf目录下 server.properties

broker.id=1
listeners=PLAINTEXT://192.168.91.25:9092
advertised.listeners=PLAINTEXT://192.168.91.25:9092
log.dirs=/opt/data/kafka-logs

修改 confluent  conf 目录下connect-standalone.properties

plugin.path=/usr/local/share/kafka/plugins

三、分别启动 zk 、kafka、 confluent

启动 zk

bin/zookeeper-server-start.sh  config/zookeeper.properties  

启动kafka

./bin/kafka-server-start.sh  config/server.properties

启动连接器 confluent

/bin/connect-standalone.sh  config/connect-standalone.properties  mysql.properties  

四、加入 mysql 配置,启动消费端

get : 192.168.91.25:8083/connectors  获取connectors

post:   192.168.91.25:8083/connectors  加入connector

{
  "name": "entity",  
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
    "database.hostname": "192.168.3.125", 
    "database.port": "3306", 
    "database.user": "root", 
    "database.password": "123456", 
    "database.server.id": "184054", 
    "database.server.name": "my", 
    "database.whitelist": "entity", 
    "database.history.kafka.bootstrap.servers": "192.168.91.25:9092", 
    "database.history.kafka.topic": "his.entity", 
    "include.schema.changes": "true" 
  }}

Linux 启动消费端

./bin/kafka-console-consumer.sh  --bootstrap-server 192.168.91.25:9092 --topic my.entity.user --from-beginning

Java 端消费

技术图片
package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by baizhuang on 2019/10/25 10:39.
 */

public class MyConsumer {
    public static void main(String []args){

        //1.创建 kafka 生产者配置信息。
        Properties properties = new Properties();

        //2.指定 kafka 集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        //key,value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("group.id","test");


        KafkaConsumer<String,String> consumer = new  KafkaConsumer<String,String>(properties);


        consumer.subscribe(Arrays.asList("my.entity.user"));


        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "-----" + consumerRecord.value());
            }

        }

    }
}
View Code

对数据库表操作 :Java 控制台

技术图片

 

debezium、kafka connector 解析 mysql binlog 到 kafak

标签:配置文件   port   ada   ext   linux   nlog   tab   sub   pass   

人气教程排行