时间:2021-07-01 10:21:17 帮助过:69人阅读
当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。
这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化
- mysql> desc article;
- +-------------+--------------+------+-----+--------------------------------+-------+
- | Field | Type | Null | Key | Default | Extra |
- +-------------+--------------+------+-----+--------------------------------+-------+
- | id | int(11) | NO | | 0 | |
- | title | mediumtext | NO | | NULL | |
- | description | mediumtext | YES | | NULL | |
- | author | varchar(100) | YES | | NULL | |
- | source | varchar(100) | YES | | NULL | |
- | content | longtext | YES | | NULL | |
- | status | enum('Y','N')| NO | | 'N' | |
- | ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
- | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
- +-------------+--------------+------+-----+--------------------------------+-------+
- 7 rows in set (0.00 sec)
logstash 增加 mtime 的查询规则
- jdbc {
- jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
- jdbc_driver_class => "com.mysql.jdbc.Driver"
- jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
- jdbc_user => "cms"
- jdbc_password => "password"
- schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
- statement => "select * from article where mtime > :sql_last_value"
- use_column_value => true
- tracking_column => "mtime"
- tracking_column_type => "timestamp"
- record_last_run => true
- last_run_metadata_path => "/var/tmp/article-mtime.last"
- }
创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。
- CREATE TABLE `elasticsearch_trash` (
- `id` int(11) NOT NULL,
- `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
为 article 表创建触发器
- CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
- BEGIN
- -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
- IF NEW.status = 'N' THEN
- insert into elasticsearch_trash(id) values(OLD.id);
- END IF;
- -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
- IF NEW.status = 'Y' THEN
- delete from elasticsearch_trash where id = OLD.id;
- END IF;
- END
- CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
- BEGIN
- -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
- insert into elasticsearch_trash(id) values(OLD.id);
- END
接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。
你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。
实体
- package cn.netkiller.api.domain.elasticsearch;
- import java.util.Date;
- import javax.persistence.Column;
- import javax.persistence.Entity;
- import javax.persistence.Id;
- import javax.persistence.Table;
- @Entity
- @Table
- public class ElasticsearchTrash {
- @Id
- private int id;
- @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
- private Date ctime;
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public Date getCtime() {
- return ctime;
- }
- public void setCtime(Date ctime) {
- this.ctime = ctime;
- }
- }
仓库
- package cn.netkiller.api.repository.elasticsearch;
- import org.springframework.data.repository.CrudRepository;
- import com.example.api.domain.elasticsearch.ElasticsearchTrash;
- public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{
- }
定时任务
- package cn.netkiller.api.schedule;
- import org.elasticsearch.action.delete.DeleteResponse;
- import org.elasticsearch.client.transport.TransportClient;
- import org.elasticsearch.rest.RestStatus;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import com.example.api.domain.elasticsearch.ElasticsearchTrash;
- import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
- @Component
- public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
- @Autowired
- private TransportClient client;
- @Autowired
- private ElasticsearchTrashRepository alasticsearchTrashRepository;
- public ScheduledTasks() {
- }
- @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
- public void cleanTrash() {
- for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
- DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
- RestStatus status = response.status();
- logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
- if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
- alasticsearchTrashRepository.delete(elasticsearchTrash);
- }
- }
- }
- }
Spring boot 启动主程序。
- package cn.netkiller.api;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.scheduling.annotation.EnableScheduling;
- @SpringBootApplication
- @EnableScheduling
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- }
相关推荐:
Elasticsearch是什么?Elasticsearch 能够被用在什么地方?
Elasticsearch索引和文档操作实例教程
详解spring中使用Elasticsearch的实例教程
以上就是实例详解MySQL 与 Elasticsearch 数据不对称问题的详细内容,更多请关注Gxl网其它相关文章!