Elasticsearch Java API 索引的增删改查

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

本节介绍以下 CRUD API:

单文档 APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

1
2
3
4
5
复制代码String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码/**  
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

Map方式

Map是key:value数据类型,可以代表json结构.

1
2
3
4
复制代码Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码 /**  
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

1
2
3
4
5
6
7
复制代码import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码/**  
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

1
2
3
4
5
6
7
8
9
10
复制代码// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码/**  
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

综合实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
复制代码 
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CreateIndex {

private TransportClient client;

@Before
public void getClient() throws Exception{
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名
//创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}

/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}


/**
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

}

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

1
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete API

根据ID删除:

1
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
1
2
3
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete By Query API

通过查询条件删除

1
2
3
4
5
6
7
复制代码BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
.source("persons") //index(索引名)
.get(); //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询
.source("persons") //index(索引名)
.execute(new ActionListener<BulkByScrollResponse>() { //回调监听
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //删除文档的数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

1
2
3
4
5
6
7
8
9
复制代码UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

1
2
3
4
5
6
7
8
9
10
复制代码client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE
.get();

client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //合并到现有文档
.startObject()
.field("gender", "male")
.endObject())
.get();

Update by script

使用脚本更新文档

1
2
3
复制代码UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

1
2
3
4
5
6
复制代码UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

1
2
3
4
复制代码{
"name" : "Joe Dalton",
"gender": "male"
}

如果不存在,会插入新的文档:

1
2
3
4
复制代码{
"name" : "Joe Smith",
"gender": "male"
}

Multi Get API

一次获取多个文档

1
2
3
4
5
6
7
8
9
10
11
12
复制代码MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //一个id的方式
.add("twitter", "tweet", "2", "3", "4") //多个id的方式
.add("another", "type", "foo") //可以从另外一个索引获取
.get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //判断是否存在
String json = response.getSourceAsString(); //_source 字段
}
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

1
复制代码import static org.elasticsearch.common.xcontent.XContentFactory.*;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
//处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

1
2
3
4
5
复制代码import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //调用失败抛 Throwable
})
.setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();

BulkProcessor 默认设置

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

1
2
复制代码bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

1
复制代码bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

1
复制代码bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%