当前位置:Gxlcms > 数据库问题 > debezium、kafka connector 解析 mysql binlog 到 kafak

debezium、kafka connector 解析 mysql binlog 到 kafak

时间:2021-07-01 10:21:17 帮助过:3人阅读

=/opt/data/zookeeper/logs

kafka 目录下新建文件: mysql.properties

  1. name=<span style="color: #000000;">mysql
  2. connector.</span><span style="color: #0000ff;">class</span>=<span style="color: #000000;">io.debezium.connector.mysql.MySqlConnector
  3. database.hostname</span>=<span style="color: #800080;">192.168</span>.<span style="color: #800080;">3.125</span><span style="color: #000000;">
  4. database.port</span>=<span style="color: #800080;">3306</span><span style="color: #000000;">
  5. database.user</span>=<span style="color: #000000;">root
  6. database.password</span>=<span style="color: #800080;">123456</span><span style="color: #000000;">
  7. database.server.id</span>=<span style="color: #800080;">112233</span><span style="color: #000000;">
  8. database.server.name</span>=<span style="color: #000000;">test
  9. database.whitelist</span>=<span style="color: #000000;">orders,users
  10. database.history.kafka.bootstrap.servers</span>=<span style="color: #800080;">192.168</span>.<span style="color: #800080;">91.25</span>:<span style="color: #800080;">9092</span><span style="color: #000000;">
  11. database.history.kafka.topic</span>=<span style="color: #000000;">history.test
  12. include.schema.changes</span>=<span style="color: #0000ff;">true</span><span style="color: #000000;">
  13. include.query</span>=<span style="color: #0000ff;">true</span><span style="color: #000000;">
  14. # options: adaptive_time_microseconds(</span><span style="color: #0000ff;">default</span><span style="color: #000000;">)adaptive(deprecated) connect()
  15. time.precision.mode</span>=<span style="color: #000000;">connect
  16. # options: precise(</span><span style="color: #0000ff;">default</span>) <span style="color: #0000ff;">double</span> <span style="color: #0000ff;">string</span>
  17. <span style="color: #0000ff;">decimal</span>.handling.mode=<span style="color: #0000ff;">string</span><span style="color: #000000;">
  18. # options: </span><span style="color: #0000ff;">long</span>(<span style="color: #0000ff;">default</span><span style="color: #000000;">) precise
  19. bigint.unsigned.handling.mode</span>=<span style="color: #0000ff;">long</span>

修改 kafka  conf目录下 server.properties

  1. broker.id=<span style="color: #800080;">1</span><span style="color: #000000;">
  2. listeners</span>=PLAINTEXT:<span style="color: #008000;">//</span><span style="color: #008000;">192.168.91.25:9092</span>
  3. advertised.listeners=PLAINTEXT:<span style="color: #008000;">//</span><span style="color: #008000;">192.168.91.25:9092</span>
  4. log.dirs=/opt/data/kafka-logs

修改 confluent  conf 目录下connect-standalone.properties

  1. plugin.path=/usr/local/share/kafka/plugins

三、分别启动 zk 、kafka、 confluent

启动 zk

  1. bin/zookeeper-server-start.sh config/zookeeper.properties  

启动kafka

  1. ./bin/kafka-server-start.sh config/server.properties

启动连接器 confluent

  1. /bin/connect-standalone.sh config/connect-standalone.properties mysql.properties

四、加入 mysql 配置,启动消费端

get : 192.168.91.25:8083/connectors  获取connectors

post:   192.168.91.25:8083/connectors  加入connector

  1. <span style="color: #000000;">{
  2. </span><span style="color: #800000;">"</span><span style="color: #800000;">name</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  3. </span><span style="color: #800000;">"</span><span style="color: #800000;">config</span><span style="color: #800000;">"</span><span style="color: #000000;">: {
  4. </span><span style="color: #800000;">"</span><span style="color: #800000;">connector.class</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">io.debezium.connector.mysql.MySqlConnector</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  5. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.hostname</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">192.168.3.125</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  6. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.port</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">3306</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  7. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.user</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">root</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  8. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.password</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">123456</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  9. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.server.id</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">184054</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  10. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.server.name</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">my</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  11. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.whitelist</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  12. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.history.kafka.bootstrap.servers</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">192.168.91.25:9092</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  13. </span><span style="color: #800000;">"</span><span style="color: #800000;">database.history.kafka.topic</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">his.entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
  14. </span><span style="color: #800000;">"</span><span style="color: #800000;">include.schema.changes</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">true</span><span style="color: #800000;">"</span><span style="color: #000000;">
  15. }}</span>

Linux 启动消费端

  1. ./bin/kafka-console-consumer.sh --bootstrap-server <span style="color: #800080;">192.168</span>.<span style="color: #800080;">91.25</span>:<span style="color: #800080;">9092</span> --topic my.entity.user --<span style="color: #0000ff;">from</span>-beginning

Java 端消费

技术图片
  1. <span style="color: #0000ff;">package</span><span style="color: #000000;"> kafka;
  2. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerConfig;
  3. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerRecord;
  4. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerRecords;
  5. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.KafkaConsumer;
  6. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> java.util.Arrays;
  7. </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> java.util.Properties;
  8. </span><span style="color: #008000;">/**</span><span style="color: #008000;">
  9. * Created by baizhuang on 2019/10/25 10:39.
  10. </span><span style="color: #008000;">*/</span>
  11. <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> MyConsumer {
  12. </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> main(String []args){
  13. </span><span style="color: #008000;">//</span><span style="color: #008000;">1.创建 kafka 生产者配置信息。</span>
  14. Properties properties = <span style="color: #0000ff;">new</span><span style="color: #000000;"> Properties();
  15. </span><span style="color: #008000;">//</span><span style="color: #008000;">2.指定 kafka 集群</span>
  16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092"<span style="color: #000000;">);
  17. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,</span><span style="color: #0000ff;">true</span><span style="color: #000000;">);
  18. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,</span>"1000"<span style="color: #000000;">);
  19. </span><span style="color: #008000;">//</span><span style="color: #008000;">key,value 反序列化</span>
  20. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"<span style="color: #000000;">);
  21. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,</span>"org.apache.kafka.common.serialization.StringDeserializer"<span style="color: #000000;">);
  22. properties.put(</span>"group.id","test"<span style="color: #000000;">);
  23. KafkaConsumer</span><String,String> consumer = <span style="color: #0000ff;">new</span> KafkaConsumer<String,String><span style="color: #000000;">(properties);
  24. consumer.subscribe(Arrays.asList(</span>"my.entity.user"<span style="color: #000000;">));
  25. </span><span style="color: #0000ff;">while</span> (<span style="color: #0000ff;">true</span><span style="color: #000000;">) {
  26. ConsumerRecords</span><String, String> consumerRecords = consumer.poll(100<span style="color: #000000;">);
  27. </span><span style="color: #0000ff;">for</span> (ConsumerRecord<String, String><span style="color: #000000;"> consumerRecord : consumerRecords) {
  28. System.out.println(consumerRecord.key() </span>+ "-----" +<span style="color: #000000;"> consumerRecord.value());
  29. }
  30. }
  31. }
  32. }</span>
View Code

对数据库表操作 :Java 控制台

技术图片

 

debezium、kafka connector 解析 mysql binlog 到 kafak

标签:配置文件   port   ada   ext   linux   nlog   tab   sub   pass   

人气教程排行