Elasticsearch定时同步MySQL数据实现全文检索(详细实战教程)
⭐️前面的话⭐️
✉️坚持和努力一定能换来诗与远方! 💭推荐书籍:📚《王道408》,📚《深入理解 Java 虚拟机-周志明》,📚《Java 核心技术卷》 💬算法刷题:✅力扣🌐牛客网 🎈Github 🎈码云Gitee
学习 Elasticsearch 的第2天,通过上一篇文章 ElasticSearch(ES)(NoSQL解决方案3) 我们学习了 Elasticsearch 的核心概念和基础操作,下来实战一下使用 Elasticsearch 定时同步 MySQL 数据进而实现全文检索。
🍓欢迎点赞,评论交流,错误指正。一起进步!
🍓刚接触Elasticsearch,有项目实战经验的大佬欢迎评论区指正、指导、交流。我马上也要接触这块的工作了,抓紧学习中!
全文关键词:Elasticsearch、MySQL、定时任务、Json、SpringBoot、检索、大数据
1 环境准备
- elasticsearch-7.12.1
- kibana-7.12.1
- SpringBoot 应用
- MySQL 库
2 相关坐标、类、API整理
2.1 坐标
-
有如下相关坐标👇
- spring-boot-starter-data-elasticsearch(org.springframework.boot)
- spring-data-elasticsearch(org.springframework.data)
- elasticsearch-rest-high-level-client(org.elasticsearch.client)
- elasticsearch(org.elasticsearch)
-
坐标关系👇
- 版本选择(参照官网 ,注意版本兼容性)
Spring Data Release Train | Spring Data Elasticsearch | Elasticsearch | Spring Framework | Spring Boot |
---|---|---|---|---|
2021.2 (Raj) | 4.4.x | 7.17.4 | 5.3.x | 2.7.x |
2021.1 (Q) | 4.3.x | 7.15.2 | 5.3.x | 2.6.x |
2021.0 (Pascal) | 4.2.x[1] | 7.12.0 | 5.3.x | 2.5.x |
2020.0 (Ockham)[1] | 4.1.x[1] | 7.9.3 | 5.3.2 | 2.4.x |
Neumann[1] | 4.0.x[1] | 7.6.2 | 5.2.12 | 2.3.x |
Moore[1] | 3.2.x[1] | 6.8.12 | 5.2.12 | 2.2.x |
Lovelace[1] | 3.1.x[1] | 6.2.2 | 5.1.19 | 2.1.x |
Kay[1] | 3.0.x[1] | 5.5.0 | 5.0.13 | 2.0.x |
Ingalls[1] | 2.1.x[1] | 2.4.0 | 4.3.25 | 1.5.x |
2.2 创建查询 request
SearchRequest
org.elasticsearch.action.search.SearchRequest;
针对一个或多个索引(或全部)执行搜索的请求。最好使用org.elasticsearch.client.Requests.searchRequest(String...)创建。
请注意,搜索source(SearchSourceBuilder)是必需的。
SearchRequest request = Requests.searchRequest("officialinfo");
//SearchRequest request = new SearchRequest("officialinfo");
2.3 构建查询条件并设置到 request
SearchSourceBuilder 一个搜索源构建器
- 所在路径
org.elasticsearch.search.builder.SearchSourceBuilder
- 文档注释和构造方法
A search source builder allowing to easily build search source
SearchSourceBuilder builder = new SearchSourceBuilder();
- 使用
public SearchSourceBuilder query(QueryBuilder query)
builder.query(QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR));
request.source(builder);
QueryBuilders 设置此请求的搜索查询
- 所在路径
org.elasticsearch.index.query.QueryBuilders
- 使用
public static MatchQueryBuilder matchQuery(String name, Object text)
QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR)
2.4 执行查询
RestHighLevelClient(客户端)
- 所在路径
elasticsearch-rest-high-level-client 坐标下
org.elasticsearch.client.RestHighLevelClient
包装低级RestClient实例并允许构建请求和读取响应的高级 REST 客户端。
- 搜索请求
// Executes a search request using the Search API.
// 使用搜索 API 执行搜索请求。请参阅elastic.co 上的搜索 API
public final SearchResponse search(SearchRequest searchRequest, RequestOptions options)
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
- 依赖注入、初始化、销毁
@Autowired
RestHighLevelClient client;
@BeforeEach
public void init() {
// 初始化客户端
log.info("客户端client信息" + client);
HttpHost serverHost = new HttpHost("localhost",9200);
client = new RestHighLevelClient(RestClient.builder(serverHost));
}
@AfterEach
public void afterDo() {
// 销毁客户端 【同上,理论上应该用 @After 注解实现】
if (client != null) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5 结果返回
SearchResponse
- 所在路径
org.elasticsearch.action.search.SearchResponse
- 使用
private final SearchResponseSections internalResponse;
public SearchHits getHits() {
return internalResponse.hits();
}
SearchHit[] hits = response.getHits().getHits();
System.out.println("------------------------------hit--------------------------------");
for (SearchHit hit : hits) {
System.out.println(hit);
}
System.out.println("------------------------------hit.getSourceAsMap()--------------------------------");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsMap());
}
SearchResponseSections
第一个 getHits()
实际调用的是此类的方法
SearchHit 第1个getHits()的返回结果
org.elasticsearch.search.SearchHit
public final class SearchHits implements Writeable, ToXContentFragment, Iterable<SearchHit> {
public static final SearchHit[] EMPTY = new SearchHit[0];
private final SearchHit[] hits;
private final TotalHits totalHits;
private final float maxScore;
@Nullable
private final SortField[] sortFields;
@Nullable
private final String collapseField;
@Nullable
private final Object[] collapseValues;
}
SearchHit 第2个getHits()的返回结果
public final class SearchHit implements Writeable, ToXContentObject, Iterable<DocumentField> {
private final transient int docId;
private static final float DEFAULT_SCORE = Float.NaN;
private float score = DEFAULT_SCORE;
private final Text id;
private final Text type;
private final NestedIdentity nestedIdentity;
private long version = -1;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private BytesReference source;
private Map<String, DocumentField> documentFields;
private final Map<String, DocumentField> metaFields;
private Map<String, HighlightField> highlightFields = null;
private SearchSortValues sortValues = SearchSortValues.EMPTY;
private String[] matchedQueries = Strings.EMPTY_ARRAY;
private Explanation explanation;
@Nullable
private SearchShardTarget shard;
//These two fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
//serialized over the wire. When parsing hits back from xcontent though, in most of the cases (whenever explanation is disabled)
//we can't rebuild the shard target object so we need to set these manually for users retrieval.
private transient String index;
private transient String clusterAlias;
private Map<String, Object> sourceAsMap;
private Map<String, SearchHits> innerHits;
}
3 代码实现(全)
3.1 pom.xml
<!-- __________________________________elasticsearch相关的坐标__________________________________ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!--<dependency>-->
<!-- <groupId>org.springframework.data</groupId>-->
<!-- <artifactId>spring-data-elasticsearch</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- __________________________________web、test、mybatis、mybatis-plus、druid、mysql、lombok__________________________________ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.4</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
3.2 elasticsearch 配置类
配置 RestHighLevelClient
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// Elasticsearch的配置类【配置es链接】
@Configuration
public class ElasticSearchClientConfig {
// 这一段感觉应该是初始化了一个 RequestOptions 变量
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
/**
* 无账号密码登录
* @return RestHighLevelClient
*/
@Bean
public static RestHighLevelClient restHighLevelClient() {
// 集群配置法
RestHighLevelClient client = new RestHighLevelClient
(RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
return client;
}
}
3.3 MySQL数据库的数据 定时同步到es
@EnableScheduling 注解开启定时任务
@Scheduled 注解可以控制方法定时执行
-
@Scheduled 注解有参数
- cron 表达式 设置每天什么时候执行
- fixedDelay 控制方法执行的间隔时间,以上一次方法执行完开始算起。
- fixedRate 按照一定的速率执行,从上一次方法执行开始的时间算起。
- 如果上一次方法阻塞住了,下一次也是不会执行,但是在阻塞这段时间内累计应该执行的次数,当不再阻塞时,一下子把这些全部执行掉,而后再按照固定速率继续执行。
- initialDelay 如: @Scheduled(initialDelay = 10000,fixedRate = 15000),容器启动后延迟10秒执行一次定时器,s以后每15秒再执行一次该定时器。
-
cron 表达式语法
- 每一位的含义
- 第一位,表示秒,取值0-59 - 第二位,表示分,取值0-59 - 第三位,表示小时,取值0-23 - 第四位,日期天/日,取值1-31 - 第五位,日期月份,取值1-12 - 第六位,星期,取值1-7,星期一,星期二...,注:不是第1周,第二周的意思; 另外:1表示星期天,2表示星期一。 - 第7为,年份,可以留空,取值1970-2099
- 一些特殊符号
(*) 星号:可以理解为每的意思,每秒,每分,每天,每月,每年... (?) 问号:问号只能出现在日期和星期这两个位置。 (-) 减号:表达一个范围,如在小时字段中使用“10-12”,则表示从10到12点,即10,11,12 (,) 逗号:表达一个列表值,如在星期字段中使用“1,2,4”,则表示星期一,星期二,星期四 (/) 斜杠:如:x/y,x是开始值,y是步长,比如在第一位(秒) 0/15就是,从0秒开始,每15秒, 最后就是0,15,30,45,60 另:*/y,等同于0/y
- 举例如下
0 0 3 * * ? 每天3点执行 0 5 3 * * ? 每天3点5分执行 0 5 3 ? * * 每天3点5分执行,与上面作用相同 0 5/10 3 * * ? 每天3点的 5分,15分,25分,35分,45分,55分这几个时间点执行 0 10 3 ? * 1 每周星期天,3点10分 执行,注:1表示星期天 0 10 3 ? * 1#3 每个月的第三个星期,星期天 执行,#号只能出现在星期的位置
-
Scheduled定时任务会导致内存泄漏吗?
要看你的定时任务是做什么的,只是调用无状态的方法是不会占用内存的,就像你用for循环执行一个任务很多次一样。但是,如果你在定时任务里进行一些影响全局的操作,例如不停地往一个List里append新的内容,当然会造成内存泄漏。
在使用定时器的过程中,需要注意定时器里的内存溢出问题。定时器本身不会导致内存溢出,就是怕定时器里的代码处理不好,加入无法被GC回收的对象,堆积导致内存溢出。
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
@EnableScheduling // 定时器
@Component
public class ElasticsearchImpl {
@Autowired
@Resource
// 这个是要被同步的数据表所对应的Mapper
private PogOfficialInfoMapper pogOfficialInfoMapper;
// 定时器注解,每过去5秒执行这个方法
@Scheduled(cron = "0/5 * * * * ?")
public void Escalculating() {
// 查询 tb_newbee_mall_goods_category 表中的所有数据,待会这些数据全部放在 es 里
List<GoodsCategory> categoryList = pogOfficialInfoMapper.selectList(null);
// 调用高层对象
// ElasticSearchClientConfig 为之前写的 Elasticsearch 配置类,
// restHighLevelClient() 是其中的静态方法
RestHighLevelClient client = ElasticSearchClientConfig.restHighLevelClient();
// 存储刚刚 categoryList 里获得的数据
for (GoodsCategory category : categoryList) {
// 创建构造器指定index索引
IndexRequest indexRequest = new IndexRequest("officialinfo"); // 索引的名字
indexRequest.id(category.getCategoryId().toString());
// 创建批量操作的对象
BulkRequest request = new BulkRequest();
// 将查询到的数据转化为Json
indexRequest.source(JSONObject.toJSONString(category), XContentType.JSON);
// Json数据放入批量操作对象中
request.add(indexRequest);
// 执行操作
try {
client.bulk(request, ElasticSearchClientConfig.COMMON_OPTIONS);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(category);
}
}
}
3.4 Elasticsearch查询(使用Kibana)
数据库里查询也是48个
3.5 Java代码检索
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.Arrays;
// 参考 :https://www.cnblogs.com/Muhuai/p/16334246.html
@SpringBootTest
@Slf4j
public class ElasticsearchApplicationTest {
// 连接客户端
@Autowired
RestHighLevelClient client;
@BeforeEach
public void init() {
// 初始化客户端
log.info("客户端client信息" + client);
HttpHost serverHost = new HttpHost("localhost",9200);
client = new RestHighLevelClient(RestClient.builder(serverHost));
}
@AfterEach
public void afterDo() {
// 销毁客户端 【同上,理论上应该用 @After 注解实现】
if (client != null) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void testIndex() throws IOException {
// 验证索引是否正常 - OK
// 查询索引信息
GetIndexRequest request = new GetIndexRequest("officialinfo");// officialinfo 为索引名称
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
// 获取别名
System.out.println(response.getAliases().toString());//{officialinfo=[]}
// 获取默认设置
System.out.println(response.getDefaultSettings().toString());//{}
// 获取索引信息
System.out.println(Arrays.toString(response.getIndices()));//[officialinfo]
// 获取映射信息
System.out.println(response.getMappings().toString());
//{officialinfo=org.elasticsearch.cluster.metadata.MappingMetadata@65b780fb}
}
// 文档查询【查询 id 为 1 的数据】
@Test
public void testClientSearchById() throws IOException {
// 验证文档是否正常 - OK
GetRequest request = new GetRequest().index("officialinfo").id("20");
GetResponse response = client.get(request,RequestOptions.DEFAULT);
System.out.println(response);
System.out.println(response.getSourceAsMap());
}
// 模糊查询
@Test
public void testClientSearch() throws IOException {
// 1、创建查询request
SearchRequest request = new SearchRequest("officialinfo");
// 这个request.type() 可能因为 Elasticsearch 版本的问题,已经不用了,如果用的话,会报错
// 【[types removal] Specifying types in search requests is deprecated."]】
// request.types("text");
// 2、指定查询条件
SearchSourceBuilder builder = new SearchSourceBuilder();
// 选择模糊查询匹配的模式是 and 还是 or
// 也可以不加后面的 .operator(Operator.OR) ,如果不加,就是直接匹配
builder.query(QueryBuilders.matchQuery("categoryRank","0").operator(Operator.OR));
request.source(builder);
// 3、执行查询
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
// 结果打印【为什么用 Hit ,需要去看 Elasticsearch 查询的结果,看结果很容易就明白了】
SearchHit[] hits = response.getHits().getHits();
System.out.println("------------------------------hit--------------------------------");
for (SearchHit hit : hits) {
System.out.println(hit);
}
System.out.println("------------------------------hit.getSourceAsMap()--------------------------------");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsMap());
}
}
}
4 其它
4.1 yml
配置
spring:
datasource:
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/newbee_mall_db_v2?serverTimezone=UTC
username: root
password: 123456
elasticsearch:
rest:
uris: http://localhost:9200
mybatis-plus:
global-config:
db-config:
table-prefix: tbl_
id-type: auto
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4.2 参考
转载自:https://juejin.cn/post/7133521095886307335