时间:2021-07-01 10:21:17 帮助过:17人阅读
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionDefault() throws Exception {
try {
initPositionFromMqTail();
} catch (Exception e) {
logger.error("Init position from mq error.", e);
}
if (binlogFilename == null || nextPosition == null) {
initPositionFromBinlogTail();
}
}
//......
}
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionFromBinlogTail() throws SQLException {
String sql = "SHOW MASTER STATUS";
Connection conn = null;
ResultSet rs = null;
try {
Connection connection = dataSource.getConnection();
rs = connection.createStatement().executeQuery(sql);
while (rs.next()) {
binlogFilename = rs.getString("File");
nextPosition = rs.getLong("Position");
}
} finally {
if (conn != null) {
conn.close();
}
if (rs != null) {
rs.close();
}
}
}
//......
}
SHOW MASTER STATUS
来获取最新的binlogFilename及nextPositionrocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionFromMqTail() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
consumer.setNamesrvAddr(config.mqNamesrvAddr);
consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
consumer.start();
Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
MessageQueue queue = queues.iterator().next();
if (queue != null) {
Long offset = consumer.maxOffset(queue);
if (offset > 0)
offset--;
PullResult pullResult = consumer.pull(queue, "*", offset, 100);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
MessageExt msg = pullResult.getMsgFoundList().get(0);
String json = new String(msg.getBody(), "UTF-8");
JSONObject js = JSON.parseObject(json);
binlogFilename = (String) js.get("binlogFilename");
nextPosition = js.getLong("nextPosition");
}
}
}
//......
}
BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition
聊聊rocketmq-mysql的BinlogPositionManager
标签:不同 cep new lstat message exec art while star