当前位置:Gxlcms > 数据库问题 > canal同步mysql数据至es5.5.0

canal同步mysql数据至es5.5.0

时间:2021-07-01 10:21:17 帮助过:6人阅读

系统环境:

  • canal-1.1.4
  • es 5.5.0
  • transport方式连接es

各项配置可以直接参考canal官方文档,由于1.1.4支持的es版本为6.x以上,其他版本需要替换依赖重新编译client-adapter.elasticsearch模块,以下为es5.5.0低版本兼容方案以及个人踩的坑。

依赖修改:

修改client-adapter模块的pom.xml,将es的依赖修改为es版本适配的5.5.0。

  1. <code class="language-xml"><dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch</artifactId>
  4. <version>5.5.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.elasticsearch.client</groupId>
  8. <artifactId>transport</artifactId>
  9. <version>5.5.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.elasticsearch.client</groupId>
  13. <artifactId>elasticsearch-rest-client</artifactId>
  14. <version>6.4.3</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.elasticsearch.client</groupId>
  18. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  19. <version>6.4.3</version>
  20. </dependency>
  21. </code>

由于5.5.0版本无rest-client,因此只修改transport相关版本,后续仅测试tcp连接es同步,rest不确定兼容性。

代码兼容:

  1. <code class="language-java">ESConnection.java:
  2. transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
  3. Integer.parseInt(host.substring(i + 1))));
  4. 修改为
  5. transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
  6. Integer.parseInt(host.substring(i + 1))));
  7. </code>

开始编译

  1. <code class="language-log">mvn clean install -Dmaven.test.skip -Denv=release
  2. </code>

rest兼容问题

问题1

  1. <code class="language-log">[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure: Compilation failure:
  2. [ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未报告的异常错误java.io.IOException; 必须对其进行捕获或声明以便抛出
  3. [ERROR] canal/client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java:[24,13] 方法引用无效
  4. </code>

5.x版本的transportclient不兼容rest-client,注释掉rest导致的异常。

  1. <code class="language-java">RestHighLevelClientExt::getMapping
  2. @Deprecated
  3. public static GetMappingsResponse getMapping(RestHighLevelClient restHighLevelClient,
  4. GetMappingsRequest getMappingsRequest,
  5. RequestOptions options) throws IOException,IllegalAccessException {
  6. throw new IllegalAccessException("es 5.x unsupport this method, use tcp mode");
  7. }
  8. ESConnection::getMapping
  9. ...
  10. if (mode == ESClientMode.TRANSPORT) {
  11. ...
  12. } else {
  13. try {
  14. GetMappingsRequest request = new GetMappingsRequest();
  15. request.indices(index);
  16. GetMappingsResponse response;
  17. // try {
  18. // response = restHighLevelClient
  19. // .indices()
  20. // .getMapping(request, RequestOptions.DEFAULT);
  21. // // 6.4以下版本直接使用该接口会报错
  22. // } catch (Exception e) {
  23. // logger.warn("Low ElasticSearch version for getMapping");
  24. response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
  25. // }
  26. mappings = response.mappings();
  27. } catch (NullPointerException e) {
  28. throw new IllegalArgumentException("Not found the mapping info of index: " + index);
  29. } catch (IOException | IllegalAccessException e) {//此处增加一个异常捕获
  30. logger.error(e.getMessage(), e);
  31. return null;
  32. }
  33. ...
  34. }
  35. </code>

问题2

  1. <code class="language-log">[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure
  2. [ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未报告的异常错误java.io.IOException; 必须对其进行捕获或声明以便抛出
  3. </code>

原因如下,getSourceAsMap方法在6.4.3抛出runtimeException(ElasticsearchParseException是子类),而5.5.0版本抛出IOException,需要显示捕获。

  1. <code class="language-java">//6.4.3抛出的异常时runtimeException
  2. public Map<String, Object> getSourceAsMap() throws ElasticsearchParseException {
  3. return this.sourceAsMap();
  4. }
  5. //5.5.0版本
  6. public Map<String, Object> getSourceAsMap() throws IOException {
  7. return sourceAsMap();
  8. }
  9. </code>

修改ESTemplate的getEsType方法捕获异常即可

  1. <code class="language-java">ESTemplate::getEsType
  2. Map<String, Object> sourceMap = null;
  3. try{
  4. sourceMap = mappingMetaData.getSourceAsMap();
  5. }catch (IOException e){
  6. logger.error(e.getMessage(), e);
  7. return null;
  8. }
  9. </code>

编译后,替换canal.adapter-1.1.4\plugin下的 client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 文件。

执行deploy和adapter启动脚本即可。

配置问题

启动后报错:

  1. <code class="language-log">2020-07-07 14:36:08.223 [main] INFO org.elasticsearch.plugins.PluginsService - loaded plugin [org.elasticsearch.transport.Netty4Plugin]
  2. 2020-07-07 14:36:08.473 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es failed
  3. java.lang.RuntimeException: java.lang.IllegalArgumentException: unknown setting [mode] please check that any required plugins are installed, or check the breaking changes documentation for removed settings
  4. at com.alibaba.otter.canal.client.adapter.es.ESAdapter.init(ESAdapter.java:137)
  5. at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:172)
  6. </code>

查看canal源码,未发现抛出异常日志的代码,再搜索依赖的包,发现异常是es创建transportClient时抛出的异常,于是猜测是canal-adpapter配置中的某个mode参数被引入创建transportClient的setting中导致创建失败,于是注释掉,并重启。

  1. <code class="language-yml"> - name: es
  2. hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
  3. properties:
  4. #mode: transport # transport # or rest //注释了这行,是1.1.4的坑,代码中properties下的所有配置都会被传入transportClient的setting中,rest模式则不会,所以transport模式除了cluster.name外的配置会导致es连接创建失败
  5. # security.auth: test:123456 # only used for rest mode
  6. cluster.name: elasticsearch
  7. </code>

测试:

重启后,向mysql插入数据后,adapter打印出日志

  1. <code class="language-log">[pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":21,"name":"测试用户","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777991,"type":"INSERT"}
  2. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Destination: example_instance, database:canal, table:class, type:INSERT, affected index count: 1
  3. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Prepared to sync index: canal_test, destination: example_instance
  4. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Single table insert to es index, destination:example_instance, table: class, index: canal_test, id: 21
  5. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 1 ms,destination: example_instance, es index: canal_test
  6. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync completed: canal_test, destination: example_instance
  7. [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 2 ms, affected indexes count:1, destination: example_instance
  8. [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":21,"name":"测试用户","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777993,"type":"INSERT"}
  9. Affected indexes: canal_test
  10. </code>

查看es数据

  1. <code class="language-shell">curl 127.0.0.1:9200/canal_test/canal/21
  2. {
  3. "_index": "canal_test",
  4. "_type": "canal",
  5. "_id": "21",
  6. "_version": 1,
  7. "found": true,
  8. "_source": {
  9. "name": "测试用户"
  10. }
  11. }
  12. </code>

小结:

  • canal-adapter不支持索引名,若有频繁全量构建需求则不适用该方案
  • 更新时查询不支持非数字类型主键(拼接SQL字符串导致)
  • 表的更新都会同步至es,一对多关联时,记录变更可能会触发索引批量更新,索引若存储快照数据则建议监听变更开发带业务逻辑的adapter

相关文档:

MySQL实时同步到Elasticsearch实现方案
canal官方文档

mysql启动binlog

canal增量同步mysql信息到ES

canal同步mysql数据至es5.5.0

标签:text   lse   str   方案   nat   抛出异常   efault   prepare   mpi   

人气教程排行