时间:2021-07-01 10:21:17 帮助过:3人阅读
kafka 目录下新建文件: mysql.properties
name=mysql connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=192.168.3.125 database.port=3306 database.user=root database.password=123456 database.server.id=112233 database.server.name=test database.whitelist=orders,users database.history.kafka.bootstrap.servers=192.168.91.25:9092 database.history.kafka.topic=history.test include.schema.changes=true include.query=true # options: adaptive_time_microseconds(default)adaptive(deprecated) connect() time.precision.mode=connect # options: precise(default) double string decimal.handling.mode=string # options: long(default) precise bigint.unsigned.handling.mode=long
修改 kafka conf目录下 server.properties
broker.id=1 listeners=PLAINTEXT://192.168.91.25:9092 advertised.listeners=PLAINTEXT://192.168.91.25:9092 log.dirs=/opt/data/kafka-logs
修改 confluent conf 目录下connect-standalone.properties
plugin.path=/usr/local/share/kafka/plugins
启动 zk
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
./bin/kafka-server-start.sh config/server.properties
启动连接器 confluent
/bin/connect-standalone.sh config/connect-standalone.properties mysql.properties
get : 192.168.91.25:8083/connectors 获取connectors
post: 192.168.91.25:8083/connectors 加入connector
{ "name": "entity", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "192.168.3.125", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "184054", "database.server.name": "my", "database.whitelist": "entity", "database.history.kafka.bootstrap.servers": "192.168.91.25:9092", "database.history.kafka.topic": "his.entity", "include.schema.changes": "true" }}
Linux 启动消费端
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.91.25:9092 --topic my.entity.user --from-beginning
Java 端消费
package kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Created by baizhuang on 2019/10/25 10:39. */ public class MyConsumer { public static void main(String []args){ //1.创建 kafka 生产者配置信息。 Properties properties = new Properties(); //2.指定 kafka 集群 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //key,value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id","test"); KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties); consumer.subscribe(Arrays.asList("my.entity.user")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key() + "-----" + consumerRecord.value()); } } } }View Code
对数据库表操作 :Java 控制台
debezium、kafka connector 解析 mysql binlog 到 kafak
标签:配置文件 port ada ext linux nlog tab sub pass