Elasticsearch TransportClient5.4 Spring集成

xiaoxiao2021-02-28  98

elasticsearch官方client5.4版本没有现成的Spring data集成方案。 这里改装了一个,用到了TransportClient5.4  集成 Spring FactoryBean,想的是在产生client的后不要每次都去close掉,用Spring的bean管理,这样效率更高。 下面有对应的源文件大家可以看下。 自己写的结合了Spring和TransportClient方案 pom.xml <dependency>     <groupId>org.elasticsearch.client</groupId>     <artifactId>transport</artifactId>     <version>5.4.0</version> </dependency> <dependency>    <groupId>org.apache.logging.log4j</groupId>    <artifactId>log4j-api</artifactId>     <version>2.8.2</version> </dependency> <dependency>    <groupId>org.apache.logging.log4j</groupId>    <artifactId>log4j-core</artifactId>    <version>2.8.2</version> </dependency>   spring-context.xml     <!-- 自定义的elasticsearch客户端 -->     <bean id="transportClient" class="com.hc.common.service.TransportClientFactoryBean">         <propertyname="clusterNodes"value="127.0.0.1:9300"/>         <propertyname="clusterName"value="elasticsearch"/>     </bean>         <bean id="transportClientRepository"class="com.hc.common.persistence.TransportClientRepository">         <constructor-argref="transportClient"/>     </bean> log4j2.xml <?xml version="1.0"encoding="utf-8"?> <configurationstatus="warn">    <appenders>        <console name="console" target="SYSTEM_OUT">             <patternlayout                pattern="%d{hh:mm:ss.sss} [%t] %-5level %logger{36} -%msg%n" />        </console>    </appenders>    <loggers>        <logger name="org.elasticsearch" level="info">        </logger>        <logger name="cn.sunit" level="debug">        </logger>        <root level="error">             <appender-ref ref="console" />        </root>    </loggers> </configuration> TransportClientFactoryBean.java   package com.hc.common.service;   import staticorg.apache.commons.lang.StringUtils.*;   import java.net.InetAddress; import java.util.Properties;   importorg.elasticsearch.client.transport.TransportClient; importorg.elasticsearch.common.settings.Settings; importorg.elasticsearch.common.transport.InetSocketTransportAddress; importorg.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; importorg.springframework.beans.factory.DisposableBean; importorg.springframework.beans.factory.FactoryBean; importorg.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert;   /**  *TransportClientFactoryBean  *  *@author Rizwan Idrees  *@author Mohsin Husen  *@author Jakub Vavrik  *@author Piotr Betkier  */ public class TransportClientFactoryBeanimplements FactoryBean<TransportClient>, InitializingBean, DisposableBean{      private static final Logger logger =LoggerFactory.getLogger(TransportClientFactoryBean.class);    private String clusterNodes = "127.0.0.1:9300";    private String clusterName = "elasticsearch";    private Boolean clientTransportSniff = true;    private Boolean clientIgnoreClusterName = Boolean.FALSE;    private String clientPingTimeout = "5s";    private String clientNodesSamplerInterval = "5s";    private TransportClient client;    private Properties properties;    static final String COLON = ":";    static final String COMMA = ",";      @Override    public void destroy() throws Exception {        try {            logger.info("Closing elasticSearch client");            if (client != null) {                 client.close();            }        } catch (final Exception e) {            logger.error("Error closing ElasticSearch client: ", e);        }     }      @Override    public TransportClient getObject() throws Exception {        return client;     }      @Override    public Class<TransportClient> getObjectType() {        return TransportClient.class;     }      @Override    public boolean isSingleton() {        return false;     }      @Override    public void afterPropertiesSet() throws Exception {        buildClient();     }      protected void buildClient() throws Exception {        client = new PreBuiltTransportClient(settings());        //client = TransportClient.builder().settings(settings()).build();        Assert.hasText(clusterNodes, "[Assertion failed] clusterNodessettings missing.");        for (String clusterNode : split(clusterNodes, COMMA)) {            String hostName =substringBeforeLast(clusterNode, COLON);            String port = substringAfterLast(clusterNode, COLON);            Assert.hasText(hostName, "[Assertion failed] missing host name in'clusterNodes'");            Assert.hasText(port, "[Assertion failed] missing port in'clusterNodes'");            logger.info("adding transport node : " + clusterNode);            client.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName(hostName),Integer.valueOf(port)));         }        client.connectedNodes();     }      private Settings settings() {        if (properties != null) {            return Settings.builder().put(properties).build();        }        return Settings.EMPTY; /*       return Settings.builder()                 .put("cluster.name",clusterName)                .put("client.transport.sniff", clientTransportSniff)                .put("client.transport.ignore_cluster_name",clientIgnoreClusterName)                .put("client.transport.ping_timeout", clientPingTimeout)                 .put("client.transport.nodes_sampler_interval",clientNodesSamplerInterval)                 .build();*/     }      public void setClusterNodes(String clusterNodes) {        this.clusterNodes = clusterNodes;     }       publicvoid setClusterName(String clusterName) {        this.clusterName = clusterName;     }      public void setClientTransportSniff(Boolean clientTransportSniff) {        this.clientTransportSniff = clientTransportSniff;     }      public String getClientNodesSamplerInterval() {        return clientNodesSamplerInterval;     }      public void setClientNodesSamplerInterval(StringclientNodesSamplerInterval) {        this.clientNodesSamplerInterval = clientNodesSamplerInterval;     }      public String getClientPingTimeout() {        return clientPingTimeout;     }      public void setClientPingTimeout(String clientPingTimeout) {        this.clientPingTimeout = clientPingTimeout;     }      public Boolean getClientIgnoreClusterName() {        return clientIgnoreClusterName;     }      public void setClientIgnoreClusterName(Boolean clientIgnoreClusterName){        this.clientIgnoreClusterName = clientIgnoreClusterName;     }      public void setProperties(Properties properties) {         this.properties = properties;     } } TransportClientRepository.java package com.hc.common.persistence;   import java.beans.PropertyDescriptor; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; importorg.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; importorg.elasticsearch.action.index.IndexResponse; importorg.elasticsearch.action.search.SearchResponse; importorg.elasticsearch.action.update.UpdateResponse; importorg.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.text.Text; importorg.elasticsearch.common.unit.TimeValue; importorg.elasticsearch.common.xcontent.XContentBuilder; importorg.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilder; importorg.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; importorg.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; importorg.elasticsearch.search.fetch.subphase.highlight.HighlightField; importorg.elasticsearch.search.sort.FieldSortBuilder; importorg.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; importcom.hc.common.annotation.ESearchTypeColumn; import com.hc.modules.cms.entity.Article;   public class TransportClientRepository {    private static final Logger log =LoggerFactory.getLogger(TransportClientRepository.class);      private TransportClient client;      public TransportClientRepository(TransportClient client) {        super();        this.client = client;     }      /**     * 创建搜索引擎文档     * @param index 索引名称     * @param type 索引类型     * @param id 索引id     * @param doc     * @return     */    public String saveDoc(String index, String type, String id, Object doc){        IndexResponse response = client.prepareIndex(index, type,id).setSource(getXContentBuilderKeyValue(doc)).get();        return response.getId();     }      /**     * 更新文档     *     * @param index     * @param type     * @param id     * @param doc     * @return     */    public String updateDoc(String index, String type, String id, Objectdoc) {        UpdateResponse response = client.prepareUpdate(index, type, id).setDoc(getXContentBuilderKeyValue(doc)).get();        return response.getId();     }      /**     * 删除索引     *     * @param index     * @param type     * @param id     * @return     */    public String deleteById(String index, String type, String id) {        DeleteResponse response = client.prepareDelete(index, type, id).get();        return response.getId();     }      /**     * 获取索引对应的存储内容     *     * @param index     * @param type     * @param id     * @return     */    public String getIdx(String index, String type, String id) {        GetResponse response = client.prepareGet(index, type, id).get();        if (response.isExists()) {            return response.getSourceAsString();        }        return null;     }      /**     * 对象转换     *     * @param t     * @param src     * @return     */    @SuppressWarnings("unchecked")    private <T> T parseObject(T t, String src) {        try {            return (T) JSON.parseObject(src, t.getClass());        } catch (Exception e) {            log.error("解析失败,{}", e.getMessage());        }        return null;     }      /**     * 获取索引对应的存储内容自动转换成对象的方式     *     * @param index     * @param type     * @param id     * @param t     * @return      */    public <T> T getIdx(String index, String type, String id, T t) {        return parseObject(t, getIdx(index, type, id));     }      public void searchFullText(String filed, String queryValue, int pageNum,int pageSize, String... indexs) {        QueryBuilder builder = QueryBuilders.matchQuery(filed, queryValue);        SearchResponse scrollResp =client.prepareSearch(indexs).addSort(FieldSortBuilder.DOC_FIELD_NAME,SortOrder.ASC)                 .setFrom(pageNum *pageSize).setSize(pageSize).setScroll(newTimeValue(60000)).setQuery(builder).get();        do {            for (SearchHit hit : scrollResp.getHits().getHits()) {                System.out.println("result:+++++" + hit.getSourceAsString());            }            scrollResp =client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(newTimeValue(60000)).execute()                     .actionGet();        } while (scrollResp.getHits().getHits().length != 0);     }      /**     * 全文搜索     *      *@param param     * @param pageNum     * @param pageSize     * @param indexs     * @return     */    public <T> ElasticSearchPage<T> searchFullText(T param,ElasticSearchPage<T> page, String... indexs) {        QueryBuilder builder = null;        Map<String, Object> map = getObjectMap(param);        if (map == null)            return null;        for (Map.Entry<String, Object> entry : map.entrySet()) {            if (entry.getValue() != null) {                 // builder =QueryBuilders.wildcardQuery( entry.getKey(), "*" +                 // entry.getValue().toString()+ "*" );                 builder =QueryBuilders.matchQuery(entry.getKey(), entry.getValue());                 // builder =QueryBuilders.multiMatchQuery(text, fieldNames)(                // entry.getKey(),entry.getValue());            }        }        HighlightBuilder highlight = new HighlightBuilder();        highlight.field("title").field("description");        SearchResponse scrollResp =client.prepareSearch(indexs).setFrom(page.getPageNum() * page.getPageSize())                .highlighter(highlight).setSize(page.getPageSize())                 // .setScroll(newTimeValue(60000))                 .setQuery(builder).get();        List<T> result = new ArrayList<>();        // ElasticSearchPage<T> ret = new ElasticSearchPage<>();        for (SearchHit hit : scrollResp.getHits().getHits()) {            try {                 Map<String,HighlightField> highlightResult = hit.getHighlightFields();                highlightResult.get("description");                 result.add(parseObject(param,hit.getSourceAsString()));            } catch (Exception e) {                 e.printStackTrace();            }         }        page.setTotal(scrollResp.getHits().totalHits);        page.setParam(param);        page.setRetList(result);        return page;     }    /**     * 全文本搜索加高亮显示,没法用泛型,只能设置死返回类型     * @param param     * @param page     * @param highlight     * @param indexs     * @return     */    public ElasticSearchPage<Article> searchFullText(Article param,ElasticSearchPage<Article> page, HighlightBuilder highlight, String...indexs) {        QueryBuilder builder = null;        Map<String, Object> map = getObjectMap(param);        if (map == null)            return null;        for (Map.Entry<String, Object> entry : map.entrySet()) {            if (entry.getValue() != null) {                 builder = QueryBuilders.matchQuery(entry.getKey(),entry.getValue());            }        }        SearchResponse scrollResp =client.prepareSearch(indexs).setFrom(page.getPageNum() * page.getPageSize())                .highlighter(highlight).setSize(page.getPageSize())                .setQuery(builder).get();        List<Article> result = new ArrayList<>();        for (SearchHit hit : scrollResp.getHits().getHits()) {            try {                 Map<String,HighlightField> highlightResult = hit.getHighlightFields();                 Article articleSearch =parseObject(param, hit.getSourceAsString());                 String titleAdd = "";                 for(Text textTemp :highlightResult.get("description").fragments()){                     titleAdd += textTemp;                }                articleSearch.setTitle(titleAdd);                 result.add(articleSearch);            } catch (Exception e) {                 e.printStackTrace();            }        }        page.setTotal(scrollResp.getHits().totalHits);        page.setParam(param);        page.setRetList(result);        return page;     }      public static Map<String, Object> getObjectMap(Object o) {        List<Field> fieldList = new ArrayList<Field>();        @SuppressWarnings("rawtypes")        Class tempClass = o.getClass();        while (tempClass != null) {            fieldList.addAll(Arrays.asList(tempClass.getDeclaredFields()));            tempClass = tempClass.getSuperclass();        }        try {            Map<String, Object> result = new HashMap<>();            for (Field field : fieldList) {                 if(field.isAnnotationPresent(ESearchTypeColumn.class)) {                     PropertyDescriptordescriptor = new PropertyDescriptor(field.getName(), o.getClass());                    result.put(field.getName(),descriptor.getReadMethod().invoke(o));                 }            }            return result;        } catch (Exception e) {            e.printStackTrace();            return null;        }     }      /**     * 判断某个索引下type是否存在     *     * @param index     * @param type     * @return     */    public boolean isTypeExist(String index, String type) {        returnclient.admin().indices().prepareTypesExists(index).setTypes(type).execute().actionGet().isExists();     }      /**     * 判断索引是否存在     *     * @param index     * @return     */    public boolean isIndexExist(String index) {        returnclient.admin().indices().prepareExists(index).execute().actionGet().isExists();     }      /**     * 创建type(存在则进行更新)     *     * @param index     *            索引名称     * @param type     *            type名称     * @param o     *            要设置type的object     * @return     */    public boolean createType(String index, String type, Object o) {        if (!isIndexExist(index)) {            log.error("{}索引不存在", index);            return false;        }        try {            // 若type存在则可通过该方法更新type            return client.admin().indices().preparePutMapping(index).setType(type).setSource(o).get().isAcknowledged();        } catch (Exception e) {            log.error("创建type失败,{}", e.getMessage());            e.printStackTrace();            return false;        }     }      public static XContentBuilder getXContentBuilderKeyValue(Object o) {        try {            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();            List<Field> fieldList = new ArrayList<Field>();            @SuppressWarnings("rawtypes")            Class tempClass = o.getClass();            while (tempClass != null) {// 当父类为null的时候说明到达了最上层的父类(Object类).                fieldList.addAll(Arrays.asList(tempClass.getDeclaredFields()));                 tempClass = tempClass.getSuperclass();// 得到父类,然后赋给自己            }            for (Field field : fieldList) {                 if(field.isAnnotationPresent(ESearchTypeColumn.class)) {                     PropertyDescriptordescriptor = new PropertyDescriptor(field.getName(), o.getClass());                     Object value =descriptor.getReadMethod().invoke(o);                     if (value != null) {                         builder.field(field.getName(),value.toString());                     }                 }            }            builder.endObject();            log.debug(builder.string());            return builder;        } catch (Exception e) {            log.error("获取object key-value失败,{}", e.getMessage());        }        return null;     } } ESearchTypeColumn.java package com.hc.common.annotation;   import java.lang.annotation.ElementType; import java.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;   /**  * 构建为elasticsearch  * 方便使用的jsonBuilder对象  *@author huangcheng  */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface ESearchTypeColumn {     /**     * 字段类型     *     * @return     */    String type() default "string";      /**     * 是否分词     *     * @return     */    boolean analyze() default false;   } Article.java public classArticle{       publicstaticfinalString DEFAULT_TEMPLATE ="frontViewArticle";               privatestaticfinallong serialVersionUID= 1L;        privateCategory category;//分类编号               @ESearchTypeColumn        privateString title;       // 标题     privateString link;  // 外部链接        privateString color;     // 标题颜色(red:红色;green:绿色;blue:蓝色;yellow:黄色;orange:橙色)        privateString image;    // 文章图片        privateString keywords;//关键字               @ESearchTypeColumn        privateString description;//描述、摘要 ElasticSearchPage.java package com.hc.common.persistence;   import java.util.List;   /**  * ES搜索引擎分页类  * @authorhuangcheng  * @param<T>  */ public classElasticSearchPage <T> {       privateString scrollId;       privatelongtotal;       privateintpageSize;       privateintpageNum;       privateT param;       privateList<T> retList;       privateList<String> scrollIds;       publicList<String> getScrollIds() {         returnscrollIds;     }       publicvoid setScrollIds(List<String>scrollIds) {         this.scrollIds =scrollIds;     }       publicList<T> getRetList() {         returnretList;     }       publicvoid setRetList(List<T>retList) {         this.retList =retList;     }       publicString getScrollId() {         returnscrollId;     }       publicvoid setScrollId(StringscrollId) {         this.scrollId =scrollId;     }       publiclong getTotal() {         returntotal;     }       publicvoid setTotal(longtotal){         this.total =total;     }       publicint getPageSize() {         if(pageSize > 50){             return50;         }else{             returnpageSize;         }     }       publicvoid setPageSize(intpageSize){         this.pageSize =pageSize;     }       publicint getPageNum() {         if(pageNum <=0){             return0;         }else{             returnpageNum -= 1;         }     }       publicvoid setPageNum(intpageNum){         this.pageNum =pageNum;     }       publicT getParam() {         returnparam;     }       publicvoid setParam(Tparam) {         this.param =param;     }       @Override     publicString toString() {         return"Page{" +                 "scrollId='"+ scrollId + '\''+                 ",total=" + total +                 ",pageSize=" + pageSize +                 ",pageNum=" + pageNum +                 ",param=" + param +                 ",retList=" + retList +                 ",scrollIds=" + scrollIds +                 '}';     } }   Test SearchBaseTest.java packagecom.hc.search;   importorg.junit.runner.RunWith; importorg.springframework.test.context.ContextConfiguration; importorg.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;   @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration({"classpath:spring-context.xml"}) publicabstract class SearchBaseTest extends AbstractJUnit4SpringContextTests{ } ArticleRepositoryTest.java package com.hc.search;   import javax.annotation.Resource; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.junit.Test; import com.hc.common.persistence.ElasticSearchPage; import com.hc.common.persistence.TransportClientRepository; import com.hc.modules.cms.entity.Article;   public classArticleRepositoryTest extendsSearchBaseTest{     @Resource(name="transportClientRepository")    TransportClientRepository client;      @Test     public voidfindByNameAndPrice() throwsException{        System.out.println(client.getIdx("blog1","type","2001"));     }      @Test    public voidsaveDoc() throws Exception{       Article article = new Article();       article.setDescription("东南太阳汽车出新车了DX9");       article.setTitle("旗帜迎风飘扬");       article.setId("2006");       article.setHits(5);       article.setBeginDateString("2017/6/7");       System.out.println(client.saveDoc("blog1","type",article.getId(),article));    }       @Test    public voidsearchFullText(){       Article param = new Article();       param.setDescription("太阳");       ElasticSearchPage<Article> page= newElasticSearchPage<Article>();       page.setPageSize(10);       HighlightBuilder highlight = new HighlightBuilder();       highlight.field("description").preTags("<span style=\"color:red\">").postTags("</span>");       page = client.searchFullText(param,page, highlight,"blog1");       for(Article aa : page.getRetList()){           System.out.println(aa.getId() +"===="+aa.getDescription()+"===title:=="+aa.getTitle());       }       System.out.println(page.getTotal());    } }

转载请注明原文地址: https://www.6miu.com/read-55949.html

最新回复(0)