ES + Spring boot的正确姿势 (ES系列三)

前言

在前边我们探讨了ES的基本概念以及根据不同的场景选择数据迁移的方案。在这一篇我们来探讨如何与Spring boot集成,以及为了平滑地从Mysql迁移到ES中我们如何”翻译SQL“。

一、Spring Boot集成ES

第一步我们就要实现Spring boot和ES集成,在Spring boot中主要有Java REST Clientspring-data-elasticsearch两种方式,这里我建议使用Elasticsearch官方提供的Java High Level REST Client来集成,也方便在生产环境中使用阿里云的ES云服务。关键的版本信息如下:

  • ES集群:7.3.0
  • ES相关依赖:7.3.0

这里有两点需要注意:

  • High Level Client能够向上兼容,例如7.3.0版本的Java High Level REST Client能确保与大于等于7.3.0版本的Elasticsearch集群通信。为了保证最大程度地使用最新版客户端的特性,推荐High Level Client版本与集群版本一致。
  • 在集成的过程中可能会踩到一些坑,因为Spring Boot的版本、ES集群的版本、High Level Client的版本之间会存在”关联关系“,所以当Demo无法正常跑起来的时候能做的就是多尝试一些High Level Client版本。

1、pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
xml复制代码<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.3.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>rank-eval-client</artifactId>
<version>7.3.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>lang-mustache-client</artifactId>
<version>7.3.0</version>
</dependency>

2、初始化客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Configuration
public class EsConfig {

@Value("${elasticsearch.host}")
public String host;

/**
* 之前使用transport的接口的时候是9300端口,现在使用HighLevelClient则是9200端口
*/
@Value("${elasticsearch.port:9200}")
public int port;

public static final String SCHEME = "http";

@Value("${elasticsearch.username:admin}")
public String username;

@Value("${elasticsearch.authenticationPassword}")
public String authenticationPassword;

@Bean(name = "remoteHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,
authenticationPassword));
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, SCHEME)).
setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(builder);
}
}

在上边的代码中需要注意username和authenticationPassword的认证信息都是在Kibana中设置的。

二、Java API

下面的代码片段均能在单元测试中正常运行,在执行下边的单元测试之前,我们先创建一个_template,大家可以选择在Kibana提供的Dev Tools里边执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
json复制代码PUT _template/hero_template
{
"index_patterns":[
"hero*"
],
"mappings":{
"properties":{
"@timestamp":{
"type":"date"
},
"id":{
"type":"integer"
},
"name":{
"type":"keyword"
},
"country":{
"type":"keyword"
},
"birthday":{
"type":"keyword"
},
"longevity":{
"type":"integer"
}
}
}
}

1、创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Test
public void createIndex() throws IOException {
IndexRequest request = new IndexRequest("hero");
request.id("1");
Map<String, String> map = new HashMap<>();
map.put("id", "1");
map.put("name", "曹操");
map.put("country", "魏");
map.put("birthday", "公元155年");
map.put("longevity", "65");
request.source(map);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
long version = indexResponse.getVersion();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
assertEquals(1, version);
}

在ES中索引是我们存储、查询数据的逻辑单元,在ES7.0之后对应的是Mysql中表的概念。上边的代码我们创建了一个名为hero的索引,然后我们创建一个map作为我们插入的第一条数据,然后设置到IndexRequest请求对象中。

2、批量插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Test
public void bulkRequestTest() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("hero").id("2")
.source(XContentType.JSON,"id", "2", "name", "刘备", "country", "蜀", "birthday", "公元161年", "longevity", "61"));
request.add(new IndexRequest("hero").id("3")
.source(XContentType.JSON,"id", "3", "name", "孙权", "country", "吴", "birthday", "公元182年", "longevity", "61"));
request.add(new IndexRequest("hero").id("4")
.source(XContentType.JSON,"id", "4", "name", "诸葛亮", "country", "蜀", "birthday", "公元181年", "longevity", "53"));
request.add(new IndexRequest("hero").id("5")
.source(XContentType.JSON,"id", "5", "name", "司马懿", "country", "魏", "birthday", "公元179年", "longevity", "72"));
request.add(new IndexRequest("hero").id("6")
.source(XContentType.JSON,"id", "6", "name", "荀彧", "country", "魏", "birthday", "公元163年", "longevity", "49"));
request.add(new IndexRequest("hero").id("7")
.source(XContentType.JSON,"id", "7", "name", "关羽", "country", "蜀", "birthday", "公元160年", "longevity", "60"));
request.add(new IndexRequest("hero").id("8")
.source(XContentType.JSON,"id", "8", "name", "周瑜", "country", "吴", "birthday", "公元175年", "longevity", "35"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
}

在kibana中查询到的数据如下图

我们后边的查询、更新等等操作都是基于这里的数据。

3、更新数据

1
2
3
4
5
6
7
8
java复制代码@Test
public void updateTest() throws IOException {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("country", "魏");
UpdateRequest request = new UpdateRequest("hero", "7").doc(jsonMap);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
}

上边的代码如果用SQL来表示就是下边这样

1
shell复制代码> update hero set country='魏' where id=7;

4、插入/更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Test
public void insertOrUpdateOne(){
Hero hero = new Hero();
hero.setId(5);
hero.setName("曹丕");
hero.setCountry("魏");
hero.setBirthday("公元187年");
hero.setLongevity(39);
IndexRequest request = new IndexRequest("hero");
request.id(hero.getId().toString());
request.source(JSON.toJSONString(hero), XContentType.JSON);
try {
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); // 1
assertEquals(DocWriteResponse.Result.UPDATED, indexResponse.getResult());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

注意在上边代码中标注1的这行代码,是不是和前边创建索引很像?这里使用方法index()我们可以轻松的实现创建索引、插入数据、更新数据于一体,当指定的索引不存在时即创建索引,当数据不存在时就插入,数据存在时就更新。

5、删除数据

1
2
3
4
5
6
7
java复制代码@Test
public void deleteByIdTest() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("hero");
deleteRequest.id("1");
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
}

上边我们删除了在前边创建id=1的数据,其对应的SQL如下:

1
sql复制代码> delete from hero where id=1;

当然,在ES中我们不仅仅可以使用主键来删除,我们还可以通过其他的字段条件来删除。

1
2
3
4
5
6
7
8
9
java复制代码@Test
public void deleteByQueryRequestTest() throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest("hero");
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("country", "吴"));
BulkByScrollResponse bulkResponse =
client.deleteByQuery(request, RequestOptions.DEFAULT);
assertEquals(0, bulkResponse.getBulkFailures().size());
}

对应的SQL:

1
sql复制代码> delete from hero where country='吴';

6、复合操作

在上边的增删改都是一次只能操作一种类型,而ES还给我们提供了一次进行多种类型的操作,例如下边的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Test
public void bulkDiffRequestTest() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("hero", "3"));
request.add(new UpdateRequest("hero", "7")
.doc(XContentType.JSON,"longevity", "70"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
BulkItemResponse[] bulkItemResponses = bulkResponse.getItems();
for (BulkItemResponse item : bulkItemResponses){
DocWriteResponse itemResponse = item.getResponse();
switch (item.getOpType()) {
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
assertEquals(RestStatus.OK, item.status());
}
}

我们使用了BulkRequest对象,将DeleteRequestUpdateRequest两种操作addBulkRequet中,然后将返回的BulkItemResponse[]数组根据不同的操作类型进行分类处理即可。当然据我所知,目前Mysql并没有类似的语法支持,如果有希望大家留言指正哈。

7、查询

到这里才是我们真正的重点,在ES里边支持多种类型的查询,例如**”精确“(和RDBMS有所区别)查询、模糊查询、相关性查询、范围查询、全文检索、分页查询、排序、聚合**等等查询功能,在Mysql中的大部分查询功能在ES中均能实现。同还允许我们选择同步、异步的方式来执行查询

单条件查询 + limit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Test
public void selectByUserTest(){
SearchRequest request = new SearchRequest("hero");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(new TermQueryBuilder("country", "魏"));
// 相当于mysql里边的limit 1;
builder.size(1);
request.source(builder);
try {
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
assertEquals(1, hits.length);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

上边的单元测试中,我们用user作为查询条件,并且限制返回条数,类似SQL如下

1
sql复制代码> select * from posts where country='魏' limit 1;

多条件查询 + 排序 + 分页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Test
public void boolQueryTest(){
SearchRequest request = new SearchRequest("hero");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must(termQuery("country", "魏"));
boolQueryBuilder.must(rangeQuery("longevity").gte(50));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.from(0).size(2);
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.sort("longevity", SortOrder.DESC);
request.source(sourceBuilder);
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Query by Condition execution failed: {}", e.getMessage(), e);
}
assert response != null;
assertEquals(0, response.getShardFailures().length);
SearchHit[] hits = response.getHits().getHits();
List<Hero> herosList = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
herosList.add(JSON.parseObject(hit.getSourceAsString(), Hero.class));
}
log.info("print info: {}, size: {}", herosList.toString(), herosList.size());
}

上边的将曹魏集团的寿命50岁以上的英雄查询出来,并根据寿命从高到低排序,只截取两位英雄,其对应的sql:

1
java复制代码> select * from hero where country='魏' and longevity >= 50 order by longevity DESC limit 2;

这里要注意,我们在ES提供的API中使用多条件查询时需要将多个条件封装到BoolQueryBuilder对象中,其支持下边几种查询类型

1
2
3
4
5
java复制代码private static final String MUSTNOT = "mustNot";
private static final String MUST_NOT = "must_not";
private static final String FILTER = "filter";
private static final String SHOULD = "should";
private static final String MUST = "must";

具体解释参考官方文档

总结

在这部分我们先分享了如何将Spring boot和ES集成,以及最佳实践的建议——采用Java High Level REST Client来构建我们的API,然后分享了相关的依赖以及如何初始化客户端。

紧接着我们开始用High Level REST Client实现了创建索引、批量插入、更新数据、插入/更新数据、删除数据、复合操作,最后我们用两个简单的例子实现了查询数据,当然还有很多的查询例子没有展示出来,建议大家根据自己的需求,去官网查询使用的方法。

参考

Java High Level REST Client

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%