数据地图搜索功能模块项目总结「springBoot+Elasticsearch」

Java
260
0
0
2023-07-22

项目介绍

这个模块主要是对近期所做的一个项目的一个总结,主要是针对 数据地图 中的一个搜索功能做一个优化。那么什么是数据地图呢?这里简单上网查了点资料:

什么是数据地图

针对此次项目中的数据地图,数据地图是基于元数据中心构建的一站式企业数据资产目录,可以看作是 元数据 中心的界面。数据开发、 分析师 、数据运营、 算法工程师 可以在数据地图上完成数据的检索,解决了不知道有哪些数据?到哪里找数据?如何准确地理解数据的难题。

数据地图提供了多维度的检索功能,使用者可以按照表名、列名、注释、主题域、分层、指标进行检索,结果按照匹配相关度进行排序。考虑到数据中台中有一些表是数仓维护的表,有一些表数仓已经不再维护,在结果排序的时候,增加了数仓维护的表优先展示的规则。同时数据地图还提供了按照主题域、业务过程导览,可以帮助使用者快速了解当前有哪些表可以使用。

当使用者定位到某一个表打开时,会进入详情页,详情页中会展示表的基础信息,字段信息、分区信息、产出信息以及数据血缘。数据血缘可以帮助使用者了解这个表的来源和去向,这个表可能影响的下游应用和报表,这个表的数据来源。

数据地图同时还提供了数据预览的功能,考虑到安全性因素,只允许预览 10 条数据,用于判断数据是否符合使用者的预期。数据地图提供的收藏功能, 方便使用者快速找到自己经常使用的表。当数据开发、分析师、数据运营找到自己需要的表时,在数据地图上可以直接发起申请对该表的权限申请。数据地图对于提高数据发现的效率,实现非技术人员自助取数有重要作用。

项目中具体任务介绍

在此次项目中,主要是原来的数据地图在搜索功能做的并不是很友好,比如关键字检索所需要的信息时,仅仅支持单个关键字的检索,也就是说在进行关键字检索的时候,后端的处理时将前端返回的关键字提取出来后,在后台处理也就是做了一个模糊匹配,并且对查询到的数据做的排序也仅仅是对查询到的数据通过id由高到低的排序(由高到低的排序,个人觉得是由于在创建 索引 时由于大部分表都是自增id,所以将比较大的id排序在前面算是基本符合条件的),综上描述可以知道整体的搜索功能还是有所欠缺的,一个数据地图的搜索功能至少要支持的操作便是对搜索关键字进行分析处理,提取出关键信息,在与数据进行匹配,最后结果按照匹配相关度进行排序。

分析设计

针对上面的介绍,以及对其他大厂的数据地图的分析,可以得知数据地图要满足一下条件

  • 查询速度要快
  • 数据匹配度要高
  • 方便排序
  • 返回结构高亮

由此可知要满足以上条件, ElasticSearch 是个很不错的选择,通过对elasticsearch的使用,我总结了一下的它的特点:

  • Elasticsearch 易于安装和配置,学习和使用成本较低,开箱即用。
  • Elasticsearch 支持单机也支持分布式,内置分布式协调管理功能,天生集群。
  • Elasticsearch 提供了分片和副本机制,一个索引可以分成多个分片,一个分片可以设置多个复制分片,提高效率和高可用。
  • Elasticsearch 注重于核心功能实现,高级功能多有第三方插件提供,例如图形化界面 Kibana 的支撑。
  • Elasticsearch 建立索引快,实时性查询快,适用于新兴的实时搜索应用,面对海量数据也毫不逊色,速度快,负载强。
  • Elasticseach 有强大的文本分析功能(分词)和倒排索引机制,进一步提升搜索速度。

而要做到多个关键字的搜索功能,elssticsearch也提供了分词的功能,下面我使用“中华人民共和国国歌”这个关键词来进行对比elasticsearch的三种分词方法:

  • 单字分词:
  • 如:“中华人民共和国国歌”
  • 效果:“中”、“华”、“人”、“民”、“共”、“和”、“国”、“歌”
  • 二分法分词:
  • 按两个字进行切分。
  • 如:“中华人民共和国国歌”
  • 效果:“中华”、“华人”、“人民”、“民共”、“共和”、“和国”、“国国”、“国歌”。
  • 词库分词:
  • 在这里我使用的是ik分词器,至于ik分词器的安装,可以自行搜索安装教程,也是比较简单地在这里我就不说了,ik分词器主要提供了两种分词模式分别是:
  • ik_max_word:会将文本做最细粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,中华人民,中华,华人,人民共和国,人民,人,民,共和国,共和,和,国国,国歌”,会穷尽各种可能的组合,适合 Term Query。
  • ik_smart:会将文本做最粗粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,国歌”,适合 Phrase Query。

由于elasticsearch的以上特性,我便使用了它作为了数据的存储

数据迁移

首先由于对elasticsearch的不熟悉,以及工作性质的原因,数据要想从 mysql 中同步到elasticsearch,无法使用logstash等工具进行数据同步,因此我只能自己定义接口,将自己所需要的数据表全部同步到一个索引字段中(当时写的很累),其实也就是查询所有的数据到一张表中,让一个对象来接住所有的数据,然后再一个对象一行 json 数据,调用添加数据的接口,将数据插入elasticsearch。下面主要来说说再这个过程中,自己原来接触很少的知识点吧。

  • 在创建elasticsearch里面的映射,也就是mysql中的字段时,针对不同的字段,可以做出不同的处理(这里的处理包括对字段的存储分词模式以及查询分词模式,最好在创建映射时就定义好,先定义好映射再传入数据),比如当需要存储一个字段为一个对象时,我们可以直接将这个字段设置成一个对象,当然它在后端中对应的也是一个对象。
  • 针对上面整个索引结构,如果我们需要在mysql中查询怎么办?在这里我用到了以前从未使用过一个方法,在这里我就不细说了,如果有类似的情况的,可以搜索@Many注解

springBoot集成elasticsearch

虽然原来也有写过类似的集成博文,但是由于这次是公司项目,有指定elasticsearch的版本必须为5.5.1,个人认为ES 5、6、7的改动还是挺大的,所以这次还是将整个集成过程写出来吧!

相关依赖

 <dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<version>.5.1</version>
		</dependency> 

application.yml配置文件

 spring:
  elasticsearch:
    host-name: 自己配置的elasticsearch地址
    port: 端口
    clusterName: 集群的名字
    pool: 线程数量 

config配置类

 @Configuration
@Slfj
public class ElasticsearchConfig {
 

    @Value("${spring.elasticsearch.host-name}")
     private  String hostName;
    @Value("${spring.elasticsearch.clusterName}")
    private String clusterName;
    @Value("${spring.elasticsearch.port}")
    private Integer port;
    @Value("${spring.elasticsearch.pool}")
    private Integer pool;
    
    @Bean(name = "transportClient")
    public TransportClient transportClient(){
 
        TransportClient transportClient = null;
        try {
 
            Settings settings = Settings.builder().put("culster.name", cluster Name)
                    .put("client.transport.sniff",false)
                    .put("thread_pool.search.size",pool).build();
            transportClient = new PreBuiltTransportClient(settings);
            TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName(hostName),port)
        }catch ( Exception  e){
 
            
            log.error("连接失败",e);
        }
        return transportClient;
    }
} 

ElasticsearchUtil工具类

 @Component
@Slfj
public class ElasticSearchUtil {
 

    @Autowired
    private TransportClient transportClient;

    private  static  TransportClient client;

    /**
     * @PostContruct是spring框架的注解
     *  spring 容器初始化的时候执行该方法
     */
    @PostConstruct
    public  void  init() {
 
        client = this.transportClient;
    }

    /**
     * 创建索引
     *
     * @param index
     * @return
     */
    public static boolean createIndex(String index) {
 
         if (!isIndexExist(index)){
 
            log.info("Index is not exits!");
        }else {
 
            log.info("Index is exits!");
        }
        CreateIndexResponse indexResponse = client.admin().indices().prepareCreate(index).execute().actionGet();
        log.info("执行建立成功?" + indexResponse.isAcknowledged());
        return indexResponse.isAcknowledged();
    }


    /**
     * 删除索引
     *
     * @param index
     * @return
     */
    public static boolean deleteIndex(String index) {
 
        if(!isIndexExist(index)) {
 
            log.info("Index is not exits!");
        }
        //        AcknowledgedResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
        DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
        if (dResponse.isAcknowledged()) {
 
            log.info("delete index " + index + "  successfully!");
        } else {
 
            log.info("Fail to delete index " + index);
        }
        return dResponse.isAcknowledged();
    }

    /**
     * 判断索引是否存在
     *
     * @param index
     * @return
     */
    public static boolean isIndexExist(String index) {
 
        Indices exists Response inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();
        if (inExistsResponse.isExists()) {
 
            log.info("Index [" + index + "] is exist!");
        } else {
 
            log.info("Index [" + index + "] is not exist!");
        }
        return inExistsResponse.isExists();
    }

    /**
     * 数据添加,正定ID
     *
     * @param jsonObject 要增加的数据
     * @param index      索引,类似数据库
     * @param type       类型,类似表
     * @param id         数据ID
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type, String id) {
 
        IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();
        log.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());
        return response.getId();
    }
    /**
     * 数据添加
     *
     * @param jsonObject 要增加的数据
     * @param index      索引,类似数据库
     * @param type       类型,类似表
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type) {
 
        return addData(jsonObject, index, type, UUID. random UUID().toString().replaceAll("-", "").toUpperCase());
    }

    /**
     * 通过ID删除数据
     *
     * @param index 索引,类似数据库
     * @param type  类型,类似表
     * @param id    数据ID
     */
    public static void deleteDataById(String index, String type, String id) {
 
        DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet();
        log.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
    }

    /**
     * 通过ID 更新数据
     *
     * @param jsonObject 要增加的数据
     * @param index      索引,类似数据库
     * @param type       类型,类似表
     * @param id         数据ID
     * @return
     */
    public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {
 
        Update Request  updateRequest = new UpdateRequest();
        updateRequest.index(index).type(type).id(id).doc(jsonObject);
        client.update(updateRequest);
    }

    /**
     * 通过ID获取数据
     *
     * @param index  索引,类似数据库
     * @param type   类型,类似表
     * @param id     数据ID
     * @param fields 需要显示的字段, 逗号 分隔(缺省为全部字段)
     * @return
     */
    public static Map<String,Object> searchDataById(String index, String type, String id, String fields) {
 
        GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
        if (StringUtils.isNotEmpty(fields)) {
 
            getRequestBuilder.setFetchSource(fields.split(","), null);
        }
        GetResponse getResponse = getRequestBuilder.execute().actionGet();
        return getResponse.getSource();
    }

    /**
     * 使用分词查询,并分页
     *
     * @param index          索引名称
     * @param type           类型名称,可传入多个type逗号分隔
     * @param startPage    当前页
     * @param pageSize       每页显示条数
     * @param query          查询条件
     * @param fields         需要显示的字段,逗号分隔(缺省为全部字段)
     * @param sortField      排序字段
     * @param highlightField 高亮字段
     * @return
     */
    public static EsPage searchDataPage(String index, String type, int startPage, int pageSize, QueryBuilder query, String fields, String  Sort Field, String highlightField) {
 
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
 
            searchRequestBuilder.setTypes(type.split(","));
        }
        searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
        // 需要显示的字段,逗号分隔(缺省为全部字段)
        if (StringUtils.isNotEmpty(fields)) {
 
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }

        //排序字段
        if (StringUtils.isNotEmpty(sortField)) {
 
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }

        // 高亮(xxx=,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
 
            HighlightBuilder highlightBuilder = new HighlightBuilder();

            //highlightBuilder.preTags("<span style='color:red' >");//设置前缀
            //highlightBuilder.postTags("</span>");//设置后缀

            // 设置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }

        //searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
        searchRequestBuilder.setQuery(query);

        // 分页应用
        searchRequestBuilder.setFrom(startPage).setSize(pageSize);

        // 设置是否按查询匹配度排序
        searchRequestBuilder.setExplain(true);

        //打印的内容 可以在 Elasticsearch head 和 Kibana  上执行查询
        log.info("\n{}", searchRequestBuilder);

        // 执行搜索,返回搜索响应信息
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long totalHits = searchResponse.getHits().getTotalHits();
        long length = searchResponse.getHits().getHits().length;

        log.debug("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length);

        if (searchResponse.status().getStatus() ==) {
 
            // 解析对象
            List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);

            return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
        }

        return null;

    }

    /**
     * 使用分词查询
     *
     * @param index          索引名称
     * @param type           类型名称,可传入多个type逗号分隔
     * @param query          查询条件
     * @param size           文档大小限制
     * @param fields         需要显示的字段,逗号分隔(缺省为全部字段)
     * @param sortField      排序字段
     * @param highlightField 高亮字段
     * @return
     */
    public static List<Map<String, Object>> searchListData(String index, String type, QueryBuilder query, Integer size, String fields, String sortField, String highlightField) {
 

        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
 
            searchRequestBuilder.setTypes(type.split(","));
        }

        if (StringUtils.isNotEmpty(highlightField)) {
 
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 设置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }

        searchRequestBuilder.setQuery(query);

        if (StringUtils.isNotEmpty(fields)) {
 
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);

        if (StringUtils.isNotEmpty(sortField)) {
 
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }

        if (size != null && size >) {
 
            searchRequestBuilder.setSize(size);
        }

        // 打印的内容 可以在 Elasticsearch head 和 Kibana  上执行查询
        log.info("\n{}", searchRequestBuilder);

        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long  totalHits = searchResponse.getHits().getTotalHits();
        long length = searchResponse.getHits().getHits().length;

        log.info("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length);

        if (searchResponse.status().getStatus() ==) {
 
            // 解析对象
            return setSearchResponse(searchResponse, highlightField);
        }
        return null;
    }
    /**
     * 高亮结果集 特殊处理
     *
     * @param searchResponse
     * @param highlightField
     */
    private static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
 
        List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
        StringBuffer stringBuffer = new StringBuffer();

        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
 
            searchHit.getSourceAsMap().put("id", searchHit.getId());

            if (StringUtils.isNotEmpty(highlightField)) {
 

                System.out.println("遍历 高亮结果集,覆盖 正常结果集" + searchHit.getSourceAsMap());
                Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();

                if (text != null) {
 
                    for (Text str : text) {
 
                        stringBuffer.append(str.string());
                    }
                    //遍历 高亮结果集,覆盖 正常结果集
                    searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                }
            }
            sourceList.add(searchHit.getSourceAsMap());
        }
        return sourceList;
    }
} 

EsPage返回结果定义

 @Data
@NoArgsConstructor
@AllArgsConstructor
public class EsPage {
 

    /**
     * 当前页
     */
    private int currentPage;
    /**
     * 每页显示多少条
     */
    private int pageSize;

    /**
     * 总记录数
     */
    private int recordCount;
    /**
     * 本页的数据列表
     */
    private List<Map<String, Object>> recordList;

    /**
     * 总页数
     */
    private int pageCount;
    /**
     * 页码列表的开始索引(包含)
     */
    private int beginPageIndex;
    /**
     * 页码列表的结束索引(包含)
     */
    private int endPageIndex;

    /**
     * 只接受前个必要的属性,会自动的计算出其他3个属性的值
     *
     * @param currentPage
     * @param pageSize
     * @param recordCount
     * @param recordList
     */
    public EsPage(int currentPage, int pageSize, int recordCount, List<Map<String, Object>> recordList) {
 
        this.currentPage = currentPage;
        this.pageSize = pageSize;
        this.recordCount = recordCount;
        this.recordList = recordList;

        // 计算总页码
        pageCount = (recordCount + pageSize -) / pageSize;

        // 计算 beginPageIndex 和 endPageIndex
        // >> 总页数不多于页,则全部显示
        if (pageCount <=) {
 
            beginPageIndex =;
            endPageIndex = pageCount;
        }
        // >> 总页数多于页,则显示当前页附近的共10个页码
        else {
 
            // 当前页附近的共个页码(前4个 + 当前页 + 后5个)
            beginPageIndex = currentPage -;
            endPageIndex = currentPage +;
            // 当前面的页码不足个时,则显示前10个页码
            if (beginPageIndex <) {
 
                beginPageIndex =;
                endPageIndex =;
            }
            // 当后面的页码不足个时,则显示后10个页码
            if (endPageIndex > pageCount) {
 
                endPageIndex = pageCount;
                beginPageIndex = pageCount - + 1;
            }
        }
    }
} 

后续的操作可以根据自己的业务进行调整,主要的业务代码我就不粘贴出来了,这里可以提一个业务问题,比如说有个收藏功能需要实现,收藏的数据该如何存入es中呢?我在这里使用的方法是,再es中索引里面建立了一个是否被收藏的字段,如果有用户做出收藏操作时,我们可以将该用户的某个唯一字段比如eamil存储再这个字段里面,如果需要查询某个用户是否收藏这张表可以尝试用这个字段进行匹配。方法有些简陋,希望有大佬个我提提意见,有没有更好的实现方法。