# 数据聚合

# 聚合的定义

** 聚合(aggregations** 可以让我们极其方便的实现对数据的统计、分析、运算

# 聚合的种类

# 桶(Bucket)聚合

用于对文档作分组

  • TermAggegation:按照文档字段值分组
  • Date Histogram:按照日期阶梯分组

# 度量(Metric)聚合

用来计算一些值

  • Avg:平均值
  • Max:最大值
  • Min:最小值
  • Stats:同时求 max、min、sum

# 管道(pipeline)

以其它聚合结果为基础做基础

# DSL 实现聚合

# Bucket 聚合语法

GET /hotel/_search
{
  "size": 0,  // 设置 size 为 0,结果中不包含文档,只包含聚合结果
  "aggs": { // 定义聚合
    "brandAgg": { // 给聚合起个名字
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择 term
        "field": "brand", // 参与聚合的字段
        "size": 20 // 希望获取的聚合结果数量
      }
    }
  }
}

# 聚合结果排序

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" // 按照_count 升序排列
        },
        "size": 20
      }
    }
  }
}

# 限定聚合范围

默认情况下,Bucket 聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 // 只对 200 元以下的文档聚合
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

# Metric 聚合语法

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { // 是 brands 聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": { // 聚合名称
          "stats": { // 聚合类型,这里 stats 可以计算 min、max、avg 等
            "field": "score" // 聚合字段,这里是 score
          }
        }
      }
    }
  }
}

# RestAPI 实现聚合

# API 语法

# 聚合条件

image-20210723173057733

# 聚合结果解析

image-20210723173215728

# 业务需求

# Controller 层

@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
    return hotelService.getFilters(params);
}

# Service 层接口

Map<String, List<String>> filters(RequestParams params);

# Service 层实现

@Override
public Map<String, List<String>> filters(RequestParams params) {
    try {
        // 1. 准备 Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        // 2.1.query
        buildBasicQuery(params, request);
        // 2.2. 设置 size
        request.source().size(0);
        // 2.3. 聚合
        buildAggregation(request);
        // 3. 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Map<String, List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        // 4.1. 根据品牌名称,获取品牌结果
        List<String> brandList = getAggByName(aggregations, "brandAgg");
        result.put("品牌", brandList);
        // 4.2. 根据品牌名称,获取品牌结果
        List<String> cityList = getAggByName(aggregations, "cityAgg");
        result.put("城市", cityList);
        // 4.3. 根据品牌名称,获取品牌结果
        List<String> starList = getAggByName(aggregations, "starAgg");
        result.put("星级", starList);
        return result;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
private void buildAggregation(SearchRequest request) {
    request.source().aggregation(AggregationBuilders
                                 .terms("brandAgg")
                                 .field("brand")
                                 .size(100)
                                );
    request.source().aggregation(AggregationBuilders
                                 .terms("cityAgg")
                                 .field("city")
                                 .size(100)
                                );
    request.source().aggregation(AggregationBuilders
                                 .terms("starAgg")
                                 .field("starName")
                                 .size(100)
                                );
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
    // 4.1. 根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get(aggName);
    // 4.2. 获取 buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    // 4.3. 遍历
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        // 4.4. 获取 key
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

# 自动补全

# 拼音分词器

POST /_analyze
{
  "text": "如家酒店还不错",
  "analyzer": "pinyin"
}

# 自定义分词器

# 定义

默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器

# elasticsearch 中分词器(analyzer)组成

  • character filters:在 tokenizer 之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如 keyword,就是不分词;还有 ik_smart
  • tokenizer filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

# 自定义分词器语法

PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { // 自定义分词器
        "my_analyzer": {  // 分词器名称
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": { // 自定义 tokenizer filter
        "py": { // 过滤器名称
          "type": "pinyin", // 过滤器类型,这里是 pinyin
		  "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

# 自动补全查询

# 定义

elasticsearch 提供了 Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束

  • 参与补全查询的字段必须是 completion 类型
  • 字段的内容一般是用来补全多个词条形成的数据

# 语法

# 创建索引库

// 创建索引库
PUT test
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}

# 插入数据

// 示例数据
POST test/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test/_doc
{
  "title": ["Nintendo", "switch"]
}

# 查询数据

// 自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      "text": "s", // 关键字
      "completion": {
        "field": "title", // 补全查询的字段
        "skip_duplicates": true, // 跳过重复的
        "size": 10 // 获取前 10 条结果
      }
    }
  }
}

# 实现酒店搜索框自动补全

# 步骤

  1. 修改 hotel 索引库结构,设置自定义拼音分词器
  2. 修改索引库的 name、all 字段,使用自定义分词器
  3. 索引库添加一个新字段 suggestion,类型为 completion 类型,使用自定义的分词器
  4. 给 HotelDoc 类添加 suggestion 字段,内容包含 brand、business
  5. 重新导入数据到 hotel 库

# 修改映射

// 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

# 修改 HotelDoc 实体

HotelDoc 中要添加一个字段,用来做自动补全,内容可以是酒店品牌、城市、商圈等信息。按照自动补全字段的要求,最好是这些字段的数组,在 HotelDoc 中添加一个 suggestion 字段,类型为 List<String> ,然后将 brand、city、business 等信息放到里面。

@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String location;
    private String pic;
    private Object distance;
    private Boolean isAD;
    private List<String> suggestion;
    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
        // 组装 suggestion
        if(this.business.contains("/")){
            //business 有多个值,需要切割
            String[] arr = this.business.split("/");
            // 添加元素
            this.suggestion = new ArrayList<>();
            this.suggestion.add(this.brand);
            Collections.addAll(this.suggestion, arr);
        }else {
            this.suggestion = Arrays.asList(this.brand, this.business);
        }
    }
}

# 自动补全查询的 JavaAPI

# 自动补全代码实现

image-20210723213759922

# 解析

image-20210723213917524

# 实现搜索自动补全

# Controller 层接受请求

@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix) {
    return hotelService.getSuggestions(prefix);
}

# Service 层接口

List<String> getSuggestions(String prefix);

# Service 层实现

@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1. 准备 Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestions",
            SuggestBuilders.completionSuggestion("suggestion")
            .prefix(prefix)
            .skipDuplicates(true)
            .size(10)
        ));
        // 3. 发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Suggest suggest = response.getSuggest();
        // 4.1. 根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2. 获取 options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3. 遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

# 数据同步

# 定义

elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步

image-20210723214758392

# 思路分析

# 同步调用

image-20210723214931869

  • hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据
  • 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,

# 异步通知

image-20210723215140735

  • hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息
  • hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改

# 监听 binlog

image-20210723215518541

  • 给 mysql 开启 binlog 功能
  • mysql 完成增、删、改操作都会记录在 binlog 中
  • hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容

# 实现数据同步

# 步骤

  • 声明 exchange、queue、RoutingKey

  • 在 hotel-admin 中的增、删、改业务中完成消息发送

  • 在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据

  • 启动并测试数据同步功能

# 声明交换机、队列

image-20210723215850307

# 引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

# 声明队列交换机名称

新建一个类 MqConstants

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

# 声明队列交换机

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

# 发送 MQ 消息

image-20210723221843816

# 接收 MQ 消息

  • 新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库
  • 删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据

# Service 层接口

void deleteById(Long id);
void insertById(Long id);

# Service 层实现

@Override
public void deleteById(Long id) {
    try {
        // 1. 准备 Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2. 发送请求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
@Override
public void insertById(Long id) {
    try {
        // 0. 根据 id 查询酒店数据
        Hotel hotel = getById(id);
        // 转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);
        // 1. 准备 Request 对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2. 准备 Json 文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3. 发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

# 编写监听器

@Component
public class HotelListener {
    @Autowired
    private IHotelService hotelService;
    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }
    /**
     * 监听酒店删除的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

# es 集群

# 定义

# 介绍

单机的 elasticsearch 做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题

  • 海量数据存储问题:将索引库从逻辑上拆分为 N 个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份(replica )

# ES 集群相关概念

  • 集群(cluster):一组拥有共同的 cluster name 的 节点
  • <font color="red"> 节点(node)</font> :集群中的一个 Elasticearch 实例
  • <font color="red"> 分片(shard)</font>:索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中

image-20230630080227716

  • 主分片(Primary shard):相对于副本分片的定义

  • 副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样

# 分片规则

image-20230630080406653

# 集群脑裂问题

# 集群机制划分

image-20210723223008967

  • master 节点:对 CPU 要求高,但是内存要求第
  • data 节点:对 CPU 和内存要求都高
  • coordinating 节点:对网络带宽、CPU 要求高

# es 职责划分图

image-20210723223629142

# 脑裂问题

脑裂是因为集群中的节点失联导致的

例如一个集群中,主节点与其它节点失联:

image-20210723223804995

此时,node2 和 node3 认为 node1 宕机,就会重新选主:

image-20210723223845754

当 node3 当选后,集群继续对外提供服务,node2 和 node3 自成集群,node1 自成集群,两个集群数据不同步,出现数据差异。

当网络恢复后,因为集群中有两个 master 节点,集群状态的不一致,出现脑裂的情况:

image-20210723224000555

解决脑裂的方案是,要求选票超过 ( eligible 节点数量 + 1 )/ 2 才能当选为主,因此 eligible 节点数量最好是奇数。对应配置项是 discovery.zen.minimum_master_nodes,在 es7.0 以后,已经成为默认配置,因此一般不会发生脑裂问题

# 集群分布式存储

# 分片存储原理

elasticsearch 会通过 hash 算法来计算文档应该存储到哪个分片:

image-20210723224354904

# 集群分布式查询

# elasticsearch 查询的两个阶段

  • scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node 汇总 data node 的搜索结果,并处理为最终结果集返回给用户

image-20210723225809848

# 集群故障转移

# 原理

集群的 master 节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移

# 示例分析

1)例如一个集群结构如图:

image-20210723225945963

现在,node1 是主节点,其它两个节点是从节点。

2)突然,node1 发生了故障:

image-20210723230020574

宕机后的第一件事,需要重新选主,例如选中了 node2:

image-20210723230055974

node2 成为主节点后,会检测集群监控状态,发现:shard-1、shard-0 没有副本节点。因此需要将 node1 上的数据迁移到 node2、node3:

image-20210723230216642

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Baozi 微信支付

微信支付

Baozi 支付宝

支付宝

Baozi 微信

微信