likes
comments
collection
share

Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)

作者站长头像
站长
· 阅读数 16

⭐️前面的话⭐️

✉️坚持和努力一定能换来诗与远方! 💭推荐书籍:📚《王道408》,📚《深入理解 Java 虚拟机-周志明》,📚《Java 核心技术卷》 💬算法刷题:✅力扣🌐牛客网 🎈Github 🎈码云Gitee


学习 Elasticsearch 的第2天,通过上一篇文章 ElasticSearch(ES)(NoSQL解决方案3) 我们学习了 Elasticsearch 的核心概念和基础操作,下来实战一下使用 Elasticsearch 定时同步 MySQL 数据进而实现全文检索。

🍓欢迎点赞,评论交流,错误指正。一起进步!

🍓刚接触Elasticsearch,有项目实战经验的大佬欢迎评论区指正、指导、交流。我马上也要接触这块的工作了,抓紧学习中!

全文关键词:Elasticsearch、MySQL、定时任务、Json、SpringBoot、检索、大数据

1 环境准备

  • elasticsearch-7.12.1
  • kibana-7.12.1
  • SpringBoot 应用
  • MySQL 库

可参考:juejin.cn/post/713312…

Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)

2 相关坐标、类、API整理

2.1 坐标

  • 有如下相关坐标👇

    • spring-boot-starter-data-elasticsearch(org.springframework.boot)
    • spring-data-elasticsearch(org.springframework.data)
    • elasticsearch-rest-high-level-client(org.elasticsearch.client)
    • elasticsearch(org.elasticsearch)
  • 坐标关系👇

Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)

Spring Data Release TrainSpring Data ElasticsearchElasticsearchSpring FrameworkSpring Boot
2021.2 (Raj)4.4.x7.17.45.3.x2.7.x
2021.1 (Q)4.3.x7.15.25.3.x2.6.x
2021.0 (Pascal)4.2.x[1]7.12.05.3.x2.5.x
2020.0 (Ockham)[1]4.1.x[1]7.9.35.3.22.4.x
Neumann[1]4.0.x[1]7.6.25.2.122.3.x
Moore[1]3.2.x[1]6.8.125.2.122.2.x
Lovelace[1]3.1.x[1]6.2.25.1.192.1.x
Kay[1]3.0.x[1]5.5.05.0.132.0.x
Ingalls[1]2.1.x[1]2.4.04.3.251.5.x

2.2 创建查询 request

SearchRequest

org.elasticsearch.action.search.SearchRequest;

针对一个或多个索引(或全部)执行搜索的请求。最好使用org.elasticsearch.client.Requests.searchRequest(String...)创建。

请注意,搜索source(SearchSourceBuilder)是必需的。

SearchRequest request = Requests.searchRequest("officialinfo");
//SearchRequest request = new SearchRequest("officialinfo");

2.3 构建查询条件并设置到 request

SearchSourceBuilder 一个搜索源构建器

  • 所在路径
org.elasticsearch.search.builder.SearchSourceBuilder
  • 文档注释和构造方法

A search source builder allowing to easily build search source

SearchSourceBuilder builder = new SearchSourceBuilder();
  • 使用
public SearchSourceBuilder query(QueryBuilder query)
builder.query(QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR));
request.source(builder);

QueryBuilders 设置此请求的搜索查询

  • 所在路径
org.elasticsearch.index.query.QueryBuilders
  • 使用
public static MatchQueryBuilder matchQuery(String name, Object text)
QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR)

2.4 执行查询

RestHighLevelClient(客户端)

  • 所在路径

elasticsearch-rest-high-level-client 坐标下

org.elasticsearch.client.RestHighLevelClient

包装低级RestClient实例并允许构建请求和读取响应的高级 REST 客户端。

  • 搜索请求
// Executes a search request using the Search API.
// 使用搜索 API 执行搜索请求。请参阅elastic.co 上的搜索 API 
public final SearchResponse search(SearchRequest searchRequest, RequestOptions options)
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
  • 依赖注入、初始化、销毁
@Autowired
RestHighLevelClient client;

@BeforeEach
public void init() {
    // 初始化客户端
    log.info("客户端client信息" + client);
    HttpHost serverHost = new HttpHost("localhost",9200);
    client = new RestHighLevelClient(RestClient.builder(serverHost));
}

@AfterEach
public void afterDo() {
    // 销毁客户端 【同上,理论上应该用 @After 注解实现】
    if (client != null) {
        try {
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.5 结果返回

SearchResponse

  • 所在路径
org.elasticsearch.action.search.SearchResponse
  • 使用
private final SearchResponseSections internalResponse;

public SearchHits getHits() {
    return internalResponse.hits();
}
SearchHit[] hits = response.getHits().getHits();
System.out.println("------------------------------hit--------------------------------");
for (SearchHit hit : hits) {
    System.out.println(hit);
}

System.out.println("------------------------------hit.getSourceAsMap()--------------------------------");
for (SearchHit hit : hits) {
    System.out.println(hit.getSourceAsMap());
}

SearchResponseSections

第一个 getHits() 实际调用的是此类的方法

SearchHit 第1个getHits()的返回结果

org.elasticsearch.search.SearchHit
public final class SearchHits implements Writeable, ToXContentFragment, Iterable<SearchHit> {
    public static final SearchHit[] EMPTY = new SearchHit[0];

    private final SearchHit[] hits;
    private final TotalHits totalHits;
    private final float maxScore;
    @Nullable
    private final SortField[] sortFields;
    @Nullable
    private final String collapseField;
    @Nullable
    private final Object[] collapseValues;
}

SearchHit 第2个getHits()的返回结果

public final class SearchHit implements Writeable, ToXContentObject, Iterable<DocumentField> {

    private final transient int docId;

    private static final float DEFAULT_SCORE = Float.NaN;
    private float score = DEFAULT_SCORE;

    private final Text id;
    private final Text type;

    private final NestedIdentity nestedIdentity;

    private long version = -1;
    private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
    private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;

    private BytesReference source;

    private Map<String, DocumentField> documentFields;
    private final Map<String, DocumentField> metaFields;

    private Map<String, HighlightField> highlightFields = null;

    private SearchSortValues sortValues = SearchSortValues.EMPTY;

    private String[] matchedQueries = Strings.EMPTY_ARRAY;

    private Explanation explanation;

    @Nullable
    private SearchShardTarget shard;

    //These two fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
    //serialized over the wire. When parsing hits back from xcontent though, in most of the cases (whenever explanation is disabled)
    //we can't rebuild the shard target object so we need to set these manually for users retrieval.
    private transient String index;
    private transient String clusterAlias;

    private Map<String, Object> sourceAsMap;

    private Map<String, SearchHits> innerHits;
}

3 代码实现(全)

3.1 pom.xml

<!-- __________________________________elasticsearch相关的坐标__________________________________ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

<!--<dependency>-->
<!--    <groupId>org.springframework.data</groupId>-->
<!--    <artifactId>spring-data-elasticsearch</artifactId>-->
<!--</dependency>-->

<!--<dependency>-->
<!--    <groupId>org.elasticsearch.client</groupId>-->
<!--    <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!--</dependency>-->

<!--<dependency>-->
<!--    <groupId>org.elasticsearch</groupId>-->
<!--    <artifactId>elasticsearch</artifactId>-->
<!--</dependency>-->

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.78</version>
</dependency>

<!-- __________________________________web、test、mybatis、mybatis-plus、druid、mysql、lombok__________________________________ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.3.4</version>
</dependency>

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.2</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.8</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>

<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

3.2 elasticsearch 配置类

配置 RestHighLevelClient

import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

// Elasticsearch的配置类【配置es链接】
@Configuration
public class ElasticSearchClientConfig {
    // 这一段感觉应该是初始化了一个 RequestOptions 变量
    public static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS = builder.build();
    }

    /**
     * 无账号密码登录
     * @return RestHighLevelClient
     */
    @Bean
    public static RestHighLevelClient restHighLevelClient() {
        // 集群配置法
        RestHighLevelClient client = new RestHighLevelClient
                (RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        return client;
    }
}

3.3 MySQL数据库的数据 定时同步到es

@EnableScheduling 注解开启定时任务

@Scheduled 注解可以控制方法定时执行

  • @Scheduled 注解有参数

    • cron 表达式 设置每天什么时候执行
    • fixedDelay 控制方法执行的间隔时间,以上一次方法执行完开始算起。
    • fixedRate 按照一定的速率执行,从上一次方法执行开始的时间算起。
      • 如果上一次方法阻塞住了,下一次也是不会执行,但是在阻塞这段时间内累计应该执行的次数,当不再阻塞时,一下子把这些全部执行掉,而后再按照固定速率继续执行。
    • initialDelay 如: @Scheduled(initialDelay = 10000,fixedRate = 15000),容器启动后延迟10秒执行一次定时器,s以后每15秒再执行一次该定时器。
  • cron 表达式语法

    • 每一位的含义
    - 第一位,表示秒,取值0-59
    - 第二位,表示分,取值0-59
    - 第三位,表示小时,取值0-23
    - 第四位,日期天/日,取值1-31
    - 第五位,日期月份,取值1-12
    - 第六位,星期,取值1-7,星期一,星期二...,注:不是第1周,第二周的意思;
    另外:1表示星期天,2表示星期一。
    - 第7为,年份,可以留空,取值1970-2099
    
    • 一些特殊符号
    (*) 星号:可以理解为每的意思,每秒,每分,每天,每月,每年...
    (?) 问号:问号只能出现在日期和星期这两个位置。
    (-) 减号:表达一个范围,如在小时字段中使用“10-12”,则表示从10到12点,即10,11,12
    (,) 逗号:表达一个列表值,如在星期字段中使用“1,2,4”,则表示星期一,星期二,星期四
    (/) 斜杠:如:x/y,x是开始值,y是步长,比如在第一位(秒) 0/15就是,从0秒开始,每15秒,
    最后就是0,15,30,45,60    另:*/y,等同于0/y
    
    • 举例如下
    0 0 3 * * ?     每天3点执行
    0 5 3 * * ?     每天3点5分执行
    0 5 3 ? * *     每天3点5分执行,与上面作用相同
    0 5/10 3 * * ?  每天3点的 5分,15分,25分,35分,45分,55分这几个时间点执行
    0 10 3 ? * 1    每周星期天,3点10分 执行,注:1表示星期天    
    0 10 3 ? * 1#3  每个月的第三个星期,星期天 执行,#号只能出现在星期的位置
    
  • Scheduled定时任务会导致内存泄漏吗?

要看你的定时任务是做什么的,只是调用无状态的方法是不会占用内存的,就像你用for循环执行一个任务很多次一样。但是,如果你在定时任务里进行一些影响全局的操作,例如不停地往一个List里append新的内容,当然会造成内存泄漏。

在使用定时器的过程中,需要注意定时器里的内存溢出问题。定时器本身不会导致内存溢出,就是怕定时器里的代码处理不好,加入无法被GC回收的对象,堆积导致内存溢出。

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;

@EnableScheduling // 定时器
@Component
public class ElasticsearchImpl {
    @Autowired
    @Resource
    // 这个是要被同步的数据表所对应的Mapper
    private PogOfficialInfoMapper pogOfficialInfoMapper;

    // 定时器注解,每过去5秒执行这个方法
    @Scheduled(cron = "0/5 * * * * ?")
    public void Escalculating() {
        // 查询 tb_newbee_mall_goods_category 表中的所有数据,待会这些数据全部放在 es 里
        List<GoodsCategory> categoryList = pogOfficialInfoMapper.selectList(null);

        // 调用高层对象
        // ElasticSearchClientConfig 为之前写的 Elasticsearch 配置类,
        // restHighLevelClient() 是其中的静态方法
        RestHighLevelClient client = ElasticSearchClientConfig.restHighLevelClient();

        // 存储刚刚 categoryList 里获得的数据
        for (GoodsCategory category : categoryList) {
            // 创建构造器指定index索引
            IndexRequest indexRequest = new IndexRequest("officialinfo");  // 索引的名字
            indexRequest.id(category.getCategoryId().toString());

            // 创建批量操作的对象
            BulkRequest request = new BulkRequest();

            // 将查询到的数据转化为Json
            indexRequest.source(JSONObject.toJSONString(category), XContentType.JSON);
            // Json数据放入批量操作对象中
            request.add(indexRequest);

            // 执行操作
            try {
                client.bulk(request, ElasticSearchClientConfig.COMMON_OPTIONS);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println(category);
        }
    }
}

3.4 Elasticsearch查询(使用Kibana)

Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)

数据库里查询也是48个

Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)

3.5 Java代码检索

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.Arrays;

// 参考 :https://www.cnblogs.com/Muhuai/p/16334246.html
@SpringBootTest
@Slf4j
public class ElasticsearchApplicationTest {
    // 连接客户端
    @Autowired
    RestHighLevelClient client;


    @BeforeEach
    public void init() {
        // 初始化客户端
        log.info("客户端client信息" + client);
        HttpHost serverHost = new HttpHost("localhost",9200);
        client = new RestHighLevelClient(RestClient.builder(serverHost));
    }

    @AfterEach
    public void afterDo() {
        // 销毁客户端 【同上,理论上应该用 @After 注解实现】
        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public void testIndex() throws IOException {
        // 验证索引是否正常 - OK
        // 查询索引信息
        GetIndexRequest request = new GetIndexRequest("officialinfo");// officialinfo 为索引名称
        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
        // 获取别名
        System.out.println(response.getAliases().toString());//{officialinfo=[]}
        // 获取默认设置
        System.out.println(response.getDefaultSettings().toString());//{}
        // 获取索引信息
        System.out.println(Arrays.toString(response.getIndices()));//[officialinfo]
        // 获取映射信息
        System.out.println(response.getMappings().toString());
        //{officialinfo=org.elasticsearch.cluster.metadata.MappingMetadata@65b780fb}
    }

    // 文档查询【查询 id 为 1 的数据】
    @Test
    public void testClientSearchById() throws IOException {
        // 验证文档是否正常 - OK
        GetRequest request = new GetRequest().index("officialinfo").id("20");
        GetResponse response = client.get(request,RequestOptions.DEFAULT);
        System.out.println(response);
        System.out.println(response.getSourceAsMap());
    }

    // 模糊查询
    @Test
    public void testClientSearch() throws IOException {
        // 1、创建查询request
        SearchRequest request = new SearchRequest("officialinfo");
        // 这个request.type() 可能因为 Elasticsearch 版本的问题,已经不用了,如果用的话,会报错
        // 【[types removal] Specifying types in search requests is deprecated."]】
        // request.types("text");

        // 2、指定查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        // 选择模糊查询匹配的模式是 and 还是 or
        // 也可以不加后面的 .operator(Operator.OR) ,如果不加,就是直接匹配
        builder.query(QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR));
        request.source(builder);
        // 3、执行查询
        SearchResponse response = client.search(request,RequestOptions.DEFAULT);

        // 结果打印【为什么用 Hit ,需要去看 Elasticsearch 查询的结果,看结果很容易就明白了】
        SearchHit[] hits = response.getHits().getHits();
        System.out.println("------------------------------hit--------------------------------");
        for (SearchHit hit : hits) {
            System.out.println(hit);
        }

        System.out.println("------------------------------hit.getSourceAsMap()--------------------------------");
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsMap());
        }
    }
}

4 其它

4.1 yml 配置

spring:
  datasource:
    druid:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/newbee_mall_db_v2?serverTimezone=UTC
      username: root
      password: 123456
  elasticsearch:
    rest:
      uris: http://localhost:9200

mybatis-plus:
  global-config:
    db-config:
      table-prefix: tbl_
      id-type: auto
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4.2 参考

转载自:https://juejin.cn/post/7133521095886307335
评论
请登录