Java High Level REST Client支持下面的文档APIS
单文档APIs
Index APIGet APIDelete APIUpdate API多文档操作 APIs
Bulk APIMulti-Get API一个IndexRequest就像下面案例一样:
IndexRequest request = new IndexRequest( "posts", // Index "doc", // Type "1"); // Document id //JSON字符串请求体 String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON);除了上面所示的字符串示例之外,还可以以不同的方式构建文档请求体。
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap);用Map构建文档请求体会自动转为JSON格式。
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);ElasticSearch内置XContentBuilder可以用来帮我们构建JSON请求体。
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");用键值对构建请求体也会自动转为JSON格式。
下面的案例展示功能配置
request.routing("routing"); //Routing value request.parent("parent"); //Parent value //等待主分片响应的超时时间 request.timeout(TimeValue.timeValueSeconds(1)); //字符串配置主分片响应的超时时间 request.timeout("1s"); //设置刷新策略为WriteRequest.RefreshPolicy request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //字符设置 request.setRefreshPolicy("wait_for"); //版本设置 request.version(2); //设置版本类型 request.versionType(VersionType.EXTERNAL); //设置操作类型为DocWriteRequest.OpType request.opType(DocWriteRequest.OpType.CREATE); //字符串设置,可以配置create or update (default) request.opType("create"); //设置在索引文档之前要执行的摄取管道的名称 request.setPipeline("pipeline");索引请求的异步执行需要将IndexRequest实例和ActionListener实例传递给异步方法:
client.indexAsync(request/*需要执行的IndexRequest*/, RequestOptions.DEFAULT, listener/*执行完成之后的回调*/);异步执行不会堵塞并且立即返回,一旦完成,如果执行成功完成,则使用onResponse方法回调ActionListener,如果执行失败,则使用onFailure方法回调ActionListener。
IndexResponse 典型的ActionListener例如:
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(IndexResponse indexResponse) { } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { } };从IndexResponse获取返回的响应信息方式如下:
String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); //文档创建成功操作 if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { //文档更新成功操作 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); //检查成功的分片是不是等于总分片 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { //获取分片失败的原因 String reason = failure.reason(); } }如果版本冲突,我们会得到这样的ElasticsearchException:
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { //版本冲突错误 if (e.status() == RestStatus.CONFLICT) { } }当我们把opType 设置为 create 但是存在相同的 index, type 和 id :
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } }配置Maven pom.xml依赖(改为对应ES版本):
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.2.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.2.4</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.2.4</version> </dependency>这里,我们使用 ObjectMapper将JavaBean转为JSON请求体,对应有以下依赖:
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> //创建连接 //创建连接 RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("192.168.199.18", 9200, "http"), new HttpHost("192.168.199.118", 9200, "http"))); //创建请求 IndexRequest request = new IndexRequest( "posts", // Index "doc", // Type "1"); // Document id //构建请求体(这里演示ObjectMapper将JavaBean转为JSON请求体) JavaBean bean = new JavaBean(); ObjectMapper objectMapper = new ObjectMapper();//create once reuse // generate json String json = null; try { json = objectMapper.writeValueAsString(bean); } catch (JsonProcessingException e) { e.printStackTrace(); } request.source(json, XContentType.JSON); //设置操作类型为DocWriteRequest.OpType request.opType(DocWriteRequest.OpType.CREATE); ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { //调用成功时回调,返回信息作为参数传入 @Override public void onResponse(IndexResponse indexResponse) { System.out.println("异步回调成功"); String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); //文档创建成功操作 if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println(index+ type+ id+ version); //文档更新成功操作 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { } } //调用失败时回调,错误信息作为参数传入 @Override public void onFailure(Exception e) { e.printStackTrace(); } }; //异步操作 //client.indexAsync(request/*需要执行的IndexRequest*/, listener/*执行完成之后的回调*/); //同步操作 try { client.index(request); } catch (IOException e) { e.printStackTrace(); } //关闭连接 if(client !=null){ try { client.close(); } catch (IOException e) { e.printStackTrace(); } }