Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)
本节介绍以下 CRUD API:
单文档 APIs
多文档 APIs
Multi Get API Bulk API
注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。
Index API
Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。
这里有几种不同的方式来产生JSON格式的文档(document):
- 手动方式,使用原生的byte[]或者String
- 使用Map方式,会自动转换成与之等价的JSON
- 使用第三方库来序列化beans,如Jackson
- 使用内置的帮助类 XContentFactory.jsonBuilder()
手动方式
1 | 复制代码String json = "{" + |
实例
1 | 复制代码/** |
Map方式
Map是key:value数据类型,可以代表json结构.
1 | 复制代码Map<String, Object> json = new HashMap<String, Object>(); |
实例
1 | 复制代码 /** |
序列化方式
ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.
1 | 复制代码import com.fasterxml.jackson.databind.*; |
实例
1 | 复制代码/** |
XContentBuilder帮助类方式
ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档
1 | 复制代码// Index name |
实例
1 | 复制代码/** |
综合实例
1 | 复制代码 |
你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。
Get API
根据id查看文档:
1 | 复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1").get(); |
更多请查看 rest get API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
1 | 复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1") |
Delete API
根据ID删除:
1 | 复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get(); |
更多请查看 delete API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
1 | 复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1") |
1 | 复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") |
Delete By Query API
通过查询条件删除
1 | 复制代码BulkByScrollResponse response = |
如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取
1 | 复制代码DeleteByQueryAction.INSTANCE.newRequestBuilder(client) |
Update API
有两种方式更新索引:
- 创建
UpdateRequest
,通过client发送; - 使用
prepareUpdate()
方法;
使用UpdateRequest
1 | 复制代码UpdateRequest updateRequest = new UpdateRequest(); |
使用 prepareUpdate()
方法
这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)
1 | 复制代码client.prepareUpdate("ttl", "doc", "1") |
Update by script
使用脚本更新文档
1 | 复制代码UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") |
Update by merging documents
合并文档
1 | 复制代码UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") |
Upsert
更新插入,如果存在文档就更新,如果不存在就插入
1 | 复制代码IndexRequest indexRequest = new IndexRequest("index", "type", "1") |
如果 index/type/1
存在,类似下面的文档:
1 | 复制代码{ |
如果不存在,会插入新的文档:
1 | 复制代码{ |
Multi Get API
一次获取多个文档
1 | 复制代码MultiGetResponse multiGetItemResponses = client.prepareMultiGet() |
更多请浏览REST multi get 文档
Bulk API
Bulk API,批量插入:
1 | 复制代码import static org.elasticsearch.common.xcontent.XContentFactory.*; |
1 | 复制代码BulkRequestBuilder bulkRequest = client.prepareBulk(); |
使用 Bulk Processor
BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求
创建BulkProcessor
实例
首先创建BulkProcessor
实例
1 | 复制代码import org.elasticsearch.action.bulk.BackoffPolicy; |
1 | 复制代码BulkProcessor bulkProcessor = BulkProcessor.builder( |
BulkProcessor 默认设置
- bulkActions 1000
- bulkSize 5mb
- 不设置flushInterval
- concurrentRequests 为 1 ,异步执行
- backoffPolicy 重试 8次,等待50毫秒
增加requests
然后增加requests
到BulkProcessor
1 | 复制代码bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); |
关闭 Bulk Processor
当所有文档都处理完成,使用awaitClose
或 close
方法关闭BulkProcessor
:
1 | 复制代码bulkProcessor.awaitClose(10, TimeUnit.MINUTES); |
或
1 | 复制代码bulkProcessor.close(); |
在测试中使用Bulk Processor
如果你在测试种使用Bulk Processor
可以执行同步方法
1 | 复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) |
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
本文转载自: 掘金