ElasticSearch学习笔记之三十二 JAVA Client 之 Exists Delete Update APIs

xiaoxiao2025-08-26  72

ElasticSearch学习笔记之三十二 JAVA Client 之 Exists Delete Update APIs

Exists APIExists RequestSynchronous Execution(同步执行)Asynchronous Execution(异步执行) Delete APIDelete RequestOptional argumentsSynchronous Execution(同步执行)Asynchronous Execution(异步执行)Delete Response Update APIUpdate RequestUpdates with a script(脚本更新)Updates with a partial document(更新部分文档)UpsertsOptional argumentsSynchronous Execution(同步执行)Asynchronous Execution(异步执行)Update Response

Exists API

如果文档存在的化exists API 会返回true,否则false。

Exists Request

Exists Request就像Get API一样使用GetRequest. 也支持Get API的所有功能参数. 由于exists() 只返回true 或者 false,我们建议关闭fetching _source和 stored fields 从而让我们的请求更加轻量级。

GetRequest getRequest = new GetRequest( "posts", //Index "doc", // Type "1"); // Document id //禁用fetching _source getRequest.fetchSourceContext(new FetchSourceContext(false)); //禁用fetching stored fields getRequest.storedFields("_none_");

Synchronous Execution(同步执行)

boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);

Asynchronous Execution(异步执行)

索引请求的异步执行需要将GetRequest实例和ActionListener实例传递给异步方法:

client.existsAsync(getRequest/*需要执行的GetRequest*/, RequestOptions.DEFAULT, listener/*执行完成之后的回调*/);

异步执行不会堵塞并且立即返回,一旦完成,如果执行成功完成,则使用onResponse方法回调ActionListener,如果执行失败,则使用onFailure方法回调ActionListener。

典型的GetResponse:

ActionListener<Boolean> listener = new ActionListener<Boolean>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(Boolean exists) { } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { } };

Delete API

Delete Request

DeleteRequest形如:

DeleteRequest request = new DeleteRequest( "posts", //Index "doc", //Type "1"); //Document id

Optional arguments

下面的案例展示功能配置:

//Routing value request.routing("routing"); //Parent value request.parent("parent"); //等待主分片响应的超时时间 request.timeout(TimeValue.timeValueSeconds(1)); //字符串配置主分片响应的超时时间 request.timeout("1s"); //设置刷新策略为WriteRequest.RefreshPolicy request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //字符设置 request.setRefreshPolicy("wait_for"); //版本设置 request.version(2); //设置版本类型 request.versionType(VersionType.EXTERNAL);

Synchronous Execution(同步执行)

DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

Asynchronous Execution(异步执行)

索引请求的异步执行需要将DeleteRequest实例和ActionListener实例传递给异步方法:

client.deleteAsync(request/*需要执行的DeleteRequest*/, RequestOptions.DEFAULT, listener/*执行完成之后的回调*/);

异步执行不会堵塞并且立即返回,一旦完成,如果执行成功完成,则使用onResponse方法回调ActionListener,如果执行失败,则使用onFailure方法回调ActionListener。

DeleteResponse典型案例

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(DeleteResponse deleteResponse) { } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { } };

Delete Response

从DeleteResponse获取返回的响应信息方式如下:

String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); //检查成功的分片是不是等于总分片 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { //获取分片失败的原因 String reason = failure.reason(); } }

也可以用于检查文档是否找到:

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); //如果文档没有找到 if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { }

如果版本冲突,我们会得到这样的ElasticsearchException:

try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { } }

Update API

Update Request

UpdateRequest形如:

UpdateRequest request = new UpdateRequest( "posts", //Index "doc", //Type "1"); //Document id

Update API允许通过使用脚本或通过传递部分文档来更新现有文档。

Updates with a script(脚本更新)

脚本案例如下:

//Map Map<String, Object> parameters = singletonMap("count", 4); //用painless语言和Map参数构建脚本 Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters); //把脚本设置给UpdateRequest request.script(inline);

或者可以作为存储脚本:

Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters); //把脚本设置给UpdateRequest request.script(stored);

Updates with a partial document(更新部分文档)

当使用部分文档进行更新时,部分文档将与现有文档合并。

部分文档的生成有如下方式:

JSON字符串数据

UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON);

Map转JSON

Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap);

XContentBuilder工具转JSON

XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.timeField("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder);

键值对构建

UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");

Upserts

当文档不存在的时候我们可以使用upsert方式新插入文档:

String jsonString = "{\"created\":\"2017-01-01\"}"; //字符串upsert request.upsert(jsonString, XContentType.JSON);

与部分文档更新类似,可以使用接受String、Map、XContentBuilder或Object键值对的方法来定义upsert文档的内容。

Optional arguments

下面的案例展示功能配置:

//Routing value request.routing("routing"); //Parent value request.parent("parent"); //等待主分片响应的超时时间 request.timeout(TimeValue.timeValueSeconds(1)); //字符串配置主分片响应的超时时间 request.timeout("1s"); //设置刷新策略为WriteRequest.RefreshPolicy request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //字符设置 request.setRefreshPolicy("wait_for"); //设置重试更新操作的次数(如果要更新的文档已经由更新操作的获取和索引阶段之间的另一个操作更改) request.retryOnConflict(3); //启用源检索,默认情况下禁用 request.fetchSource(true); String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; //配置source include字段 request.fetchSource(new FetchSourceContext(true, includes, excludes)); String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; //配置source exclude字段 request.fetchSource(new FetchSourceContext(true, includes, excludes)); //版本设置 request.version(2); //禁用noop detection request.detectNoop(false); //设置无论文档是否存在,脚本都必须运行(如果文档不存在,则脚本负责创建文档)。 request.scriptedUpsert(true); //指明部分更新文档假如文档不存在的时候按照upsert处理 request.docAsUpsert(true); //设置在更新操作之前必须处于活跃状态的分片副本的数量。 request.waitForActiveShards(2); //活跃状态的分片副本的数量可以取值ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default) request.waitForActiveShards(ActiveShardCount.ALL);

Synchronous Execution(同步执行)

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

Asynchronous Execution(异步执行)

索引请求的异步执行需要将UpdateRequest实例和ActionListener实例传递给异步方法

client.updateAsync(request/*需要执行的UpdateRequest*/, RequestOptions.DEFAULT, listener/*执行完成之后的回调*/);

异步执行不会堵塞并且立即返回,一旦完成,如果执行成功完成,则使用onResponse方法回调ActionListener,如果执行失败,则使用onFailure方法回调ActionListener。

典型的UpdateResponse :

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(UpdateResponse updateResponse) { } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { } };

Update Response

从UpdateResponse获取返回的响应信息方式如下:

String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); //文档第一次被创建9(upsert) if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { //文档被更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { //文档被删除 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { //处理文档未受更新影响的情况,即在文档上没有执行操作(NOOP) } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { }

当通过fetchSource方法在UpdateRequest中启用源检索时,响应包含更新的文档的源

//按GETResult返回UpdateRequest GetResult result = updateResponse.getGetResult(); if (result.isExists()) { //source做字符串返回 String sourceAsString = result.sourceAsString(); //做Map<String, Object>返回 Map<String, Object> sourceAsMap = result.sourceAsMap(); //按byte[]返回source byte[] sourceAsBytes = result.source(); //当文档的源不存在于响应中的情况(默认情况下是这样) } else { }

我们也可以获取分片失败的信息

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); //检查成功的分片是不是等于总分片 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { //获取失败的原因 String reason = failure.reason(); } }

当UpdateRequest操作在一个不存在的文档的时候,我们会得到一个404响应码,和一下如下的ElasticsearchException:

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist") .doc("field", "value"); try { UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); } catch (ElasticsearchException e) { //文档没有找到 if (e.status() == RestStatus.NOT_FOUND) { } }

如果出现版本冲突, 会得到下面的ElasticsearchException :

UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { //版本冲突 if (e.status() == RestStatus.CONFLICT) { } }
转载请注明原文地址: https://www.6miu.com/read-5035278.html

最新回复(0)