ElasticSearch学习笔记之三十 JAVA Client 之 Document APIs

xiaoxiao2025-07-06  7

ElasticSearch学习笔记之三十 JAVA Client 之 文档请求概述

Document APIs(文档APIS)Index APIIndex Request(索引请求)Providing the document source(构建文档请求体)Optional arguments(功能参数)Synchronous Execution(同步执行)Asynchronous Execution(异步执行)Index Response(索引返回) 完整案例如下:

Document APIs(文档APIS)

Java High Level REST Client支持下面的文档APIS

单文档APIs

Index APIGet APIDelete APIUpdate API

多文档操作 APIs

Bulk APIMulti-Get API

Index API

Index Request(索引请求)

一个IndexRequest就像下面案例一样:

IndexRequest request = new IndexRequest( "posts", // Index "doc", // Type "1"); // Document id //JSON字符串请求体 String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON);

Providing the document source(构建文档请求体)

除了上面所示的字符串示例之外,还可以以不同的方式构建文档请求体。

Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap);

用Map构建文档请求体会自动转为JSON格式。

XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);

ElasticSearch内置XContentBuilder可以用来帮我们构建JSON请求体。

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");

用键值对构建请求体也会自动转为JSON格式。

Optional arguments(功能参数)

下面的案例展示功能配置

request.routing("routing"); //Routing value request.parent("parent"); //Parent value //等待主分片响应的超时时间 request.timeout(TimeValue.timeValueSeconds(1)); //字符串配置主分片响应的超时时间 request.timeout("1s"); //设置刷新策略为WriteRequest.RefreshPolicy request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //字符设置 request.setRefreshPolicy("wait_for"); //版本设置 request.version(2); //设置版本类型 request.versionType(VersionType.EXTERNAL); //设置操作类型为DocWriteRequest.OpType request.opType(DocWriteRequest.OpType.CREATE); //字符串设置,可以配置create or update (default) request.opType("create"); //设置在索引文档之前要执行的摄取管道的名称 request.setPipeline("pipeline");

Synchronous Execution(同步执行)

IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

Asynchronous Execution(异步执行)

索引请求的异步执行需要将IndexRequest实例和ActionListener实例传递给异步方法:

client.indexAsync(request/*需要执行的IndexRequest*/, RequestOptions.DEFAULT, listener/*执行完成之后的回调*/);

异步执行不会堵塞并且立即返回,一旦完成,如果执行成功完成,则使用onResponse方法回调ActionListener,如果执行失败,则使用onFailure方法回调ActionListener。

IndexResponse 典型的ActionListener例如:

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(IndexResponse indexResponse) { } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { } };

Index Response(索引返回)

从IndexResponse获取返回的响应信息方式如下:

String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); //文档创建成功操作 if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { //文档更新成功操作 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); //检查成功的分片是不是等于总分片 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { //获取分片失败的原因 String reason = failure.reason(); } }

如果版本冲突,我们会得到这样的ElasticsearchException:

IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { //版本冲突错误 if (e.status() == RestStatus.CONFLICT) { } }

当我们把opType 设置为 create 但是存在相同的 index, type 和 id :

IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } }

完整案例如下:

配置Maven pom.xml依赖(改为对应ES版本):

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.2.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.2.4</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.2.4</version> </dependency>

这里,我们使用 ObjectMapper将JavaBean转为JSON请求体,对应有以下依赖:

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> //创建连接 //创建连接 RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("192.168.199.18", 9200, "http"), new HttpHost("192.168.199.118", 9200, "http"))); //创建请求 IndexRequest request = new IndexRequest( "posts", // Index "doc", // Type "1"); // Document id //构建请求体(这里演示ObjectMapper将JavaBean转为JSON请求体) JavaBean bean = new JavaBean(); ObjectMapper objectMapper = new ObjectMapper();//create once reuse // generate json String json = null; try { json = objectMapper.writeValueAsString(bean); } catch (JsonProcessingException e) { e.printStackTrace(); } request.source(json, XContentType.JSON); //设置操作类型为DocWriteRequest.OpType request.opType(DocWriteRequest.OpType.CREATE); ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(IndexResponse indexResponse) { System.out.println("异步回调成功"); String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); //文档创建成功操作 if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println(index+ type+ id+ version); //文档更新成功操作 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { } } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { e.printStackTrace(); } }; //异步操作 //client.indexAsync(request/*需要执行的IndexRequest*/, listener/*执行完成之后的回调*/); //同步操作 try { client.index(request); } catch (IOException e) { e.printStackTrace(); } //关闭连接 if(client !=null){ try { client.close(); } catch (IOException e) { e.printStackTrace(); } }
转载请注明原文地址: https://www.6miu.com/read-5032662.html

最新回复(0)