时间:2021-07-01 10:21:17 帮助过:3人阅读
kafka 目录下新建文件: mysql.properties
- name=<span style="color: #000000;">mysql
- connector.</span><span style="color: #0000ff;">class</span>=<span style="color: #000000;">io.debezium.connector.mysql.MySqlConnector
- database.hostname</span>=<span style="color: #800080;">192.168</span>.<span style="color: #800080;">3.125</span><span style="color: #000000;">
- database.port</span>=<span style="color: #800080;">3306</span><span style="color: #000000;">
- database.user</span>=<span style="color: #000000;">root
- database.password</span>=<span style="color: #800080;">123456</span><span style="color: #000000;">
- database.server.id</span>=<span style="color: #800080;">112233</span><span style="color: #000000;">
- database.server.name</span>=<span style="color: #000000;">test
- database.whitelist</span>=<span style="color: #000000;">orders,users
- database.history.kafka.bootstrap.servers</span>=<span style="color: #800080;">192.168</span>.<span style="color: #800080;">91.25</span>:<span style="color: #800080;">9092</span><span style="color: #000000;">
- database.history.kafka.topic</span>=<span style="color: #000000;">history.test
- include.schema.changes</span>=<span style="color: #0000ff;">true</span><span style="color: #000000;">
- include.query</span>=<span style="color: #0000ff;">true</span><span style="color: #000000;">
- # options: adaptive_time_microseconds(</span><span style="color: #0000ff;">default</span><span style="color: #000000;">)adaptive(deprecated) connect()
- time.precision.mode</span>=<span style="color: #000000;">connect
- # options: precise(</span><span style="color: #0000ff;">default</span>) <span style="color: #0000ff;">double</span> <span style="color: #0000ff;">string</span>
- <span style="color: #0000ff;">decimal</span>.handling.mode=<span style="color: #0000ff;">string</span><span style="color: #000000;">
- # options: </span><span style="color: #0000ff;">long</span>(<span style="color: #0000ff;">default</span><span style="color: #000000;">) precise
- bigint.unsigned.handling.mode</span>=<span style="color: #0000ff;">long</span>
修改 kafka conf目录下 server.properties
- broker.id=<span style="color: #800080;">1</span><span style="color: #000000;">
- listeners</span>=PLAINTEXT:<span style="color: #008000;">//</span><span style="color: #008000;">192.168.91.25:9092</span>
- advertised.listeners=PLAINTEXT:<span style="color: #008000;">//</span><span style="color: #008000;">192.168.91.25:9092</span>
- log.dirs=/opt/data/kafka-logs
修改 confluent conf 目录下connect-standalone.properties
- plugin.path=/usr/local/share/kafka/plugins
启动 zk
- bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
- ./bin/kafka-server-start.sh config/server.properties
启动连接器 confluent
- /bin/connect-standalone.sh config/connect-standalone.properties mysql.properties
get : 192.168.91.25:8083/connectors 获取connectors
post: 192.168.91.25:8083/connectors 加入connector
- <span style="color: #000000;">{
- </span><span style="color: #800000;">"</span><span style="color: #800000;">name</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">config</span><span style="color: #800000;">"</span><span style="color: #000000;">: {
- </span><span style="color: #800000;">"</span><span style="color: #800000;">connector.class</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">io.debezium.connector.mysql.MySqlConnector</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.hostname</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">192.168.3.125</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.port</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">3306</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.user</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">root</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.password</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">123456</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.server.id</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">184054</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.server.name</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">my</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.whitelist</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.history.kafka.bootstrap.servers</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">192.168.91.25:9092</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">database.history.kafka.topic</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">his.entity</span><span style="color: #800000;">"</span><span style="color: #000000;">,
- </span><span style="color: #800000;">"</span><span style="color: #800000;">include.schema.changes</span><span style="color: #800000;">"</span>: <span style="color: #800000;">"</span><span style="color: #800000;">true</span><span style="color: #800000;">"</span><span style="color: #000000;">
- }}</span>
Linux 启动消费端
- ./bin/kafka-console-consumer.sh --bootstrap-server <span style="color: #800080;">192.168</span>.<span style="color: #800080;">91.25</span>:<span style="color: #800080;">9092</span> --topic my.entity.user --<span style="color: #0000ff;">from</span>-beginning
Java 端消费
View Code
- <span style="color: #0000ff;">package</span><span style="color: #000000;"> kafka;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerConfig;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerRecord;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.ConsumerRecords;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> org.apache.kafka.clients.consumer.KafkaConsumer;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> java.util.Arrays;
- </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> java.util.Properties;
- </span><span style="color: #008000;">/**</span><span style="color: #008000;">
- * Created by baizhuang on 2019/10/25 10:39.
- </span><span style="color: #008000;">*/</span>
- <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> MyConsumer {
- </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> main(String []args){
- </span><span style="color: #008000;">//</span><span style="color: #008000;">1.创建 kafka 生产者配置信息。</span>
- Properties properties = <span style="color: #0000ff;">new</span><span style="color: #000000;"> Properties();
- </span><span style="color: #008000;">//</span><span style="color: #008000;">2.指定 kafka 集群</span>
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092"<span style="color: #000000;">);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,</span><span style="color: #0000ff;">true</span><span style="color: #000000;">);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,</span>"1000"<span style="color: #000000;">);
- </span><span style="color: #008000;">//</span><span style="color: #008000;">key,value 反序列化</span>
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"<span style="color: #000000;">);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,</span>"org.apache.kafka.common.serialization.StringDeserializer"<span style="color: #000000;">);
- properties.put(</span>"group.id","test"<span style="color: #000000;">);
- KafkaConsumer</span><String,String> consumer = <span style="color: #0000ff;">new</span> KafkaConsumer<String,String><span style="color: #000000;">(properties);
- consumer.subscribe(Arrays.asList(</span>"my.entity.user"<span style="color: #000000;">));
- </span><span style="color: #0000ff;">while</span> (<span style="color: #0000ff;">true</span><span style="color: #000000;">) {
- ConsumerRecords</span><String, String> consumerRecords = consumer.poll(100<span style="color: #000000;">);
- </span><span style="color: #0000ff;">for</span> (ConsumerRecord<String, String><span style="color: #000000;"> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord.key() </span>+ "-----" +<span style="color: #000000;"> consumerRecord.value());
- }
- }
- }
- }</span>
对数据库表操作 :Java 控制台
debezium、kafka connector 解析 mysql binlog 到 kafak
标签:配置文件 port ada ext linux nlog tab sub pass