# 数据聚合
# 聚合的定义
** 聚合(aggregations)** 可以让我们极其方便的实现对数据的统计、分析、运算
# 聚合的种类
# 桶(Bucket)聚合
用于对文档作分组
- TermAggegation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组
# 度量(Metric)聚合
用来计算一些值
- Avg:平均值
- Max:最大值
- Min:最小值
- Stats:同时求 max、min、sum
# 管道(pipeline)
以其它聚合结果为基础做基础
# DSL 实现聚合
# Bucket 聚合语法
GET /hotel/_search | |
{ | |
"size": 0, // 设置 size 为 0,结果中不包含文档,只包含聚合结果 | |
"aggs": { // 定义聚合 | |
"brandAgg": { // 给聚合起个名字 | |
"terms": { // 聚合的类型,按照品牌值聚合,所以选择 term | |
"field": "brand", // 参与聚合的字段 | |
"size": 20 // 希望获取的聚合结果数量 | |
} | |
} | |
} | |
} |
# 聚合结果排序
GET /hotel/_search | |
{ | |
"size": 0, | |
"aggs": { | |
"brandAgg": { | |
"terms": { | |
"field": "brand", | |
"order": { | |
"_count": "asc" // 按照_count 升序排列 | |
}, | |
"size": 20 | |
} | |
} | |
} | |
} |
# 限定聚合范围
默认情况下,Bucket 聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件
GET /hotel/_search | |
{ | |
"query": { | |
"range": { | |
"price": { | |
"lte": 200 // 只对 200 元以下的文档聚合 | |
} | |
} | |
}, | |
"size": 0, | |
"aggs": { | |
"brandAgg": { | |
"terms": { | |
"field": "brand", | |
"size": 20 | |
} | |
} | |
} | |
} |
# Metric 聚合语法
GET /hotel/_search | |
{ | |
"size": 0, | |
"aggs": { | |
"brandAgg": { | |
"terms": { | |
"field": "brand", | |
"size": 20 | |
}, | |
"aggs": { // 是 brands 聚合的子聚合,也就是分组后对每组分别计算 | |
"score_stats": { // 聚合名称 | |
"stats": { // 聚合类型,这里 stats 可以计算 min、max、avg 等 | |
"field": "score" // 聚合字段,这里是 score | |
} | |
} | |
} | |
} | |
} | |
} |
# RestAPI 实现聚合
# API 语法
# 聚合条件
# 聚合结果解析
# 业务需求
# Controller 层
@PostMapping("filters") | |
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){ | |
return hotelService.getFilters(params); | |
} |
# Service 层接口
Map<String, List<String>> filters(RequestParams params); |
# Service 层实现
@Override | |
public Map<String, List<String>> filters(RequestParams params) { | |
try { | |
// 1. 准备 Request | |
SearchRequest request = new SearchRequest("hotel"); | |
// 2. 准备 DSL | |
// 2.1.query | |
buildBasicQuery(params, request); | |
// 2.2. 设置 size | |
request.source().size(0); | |
// 2.3. 聚合 | |
buildAggregation(request); | |
// 3. 发出请求 | |
SearchResponse response = client.search(request, RequestOptions.DEFAULT); | |
// 4. 解析结果 | |
Map<String, List<String>> result = new HashMap<>(); | |
Aggregations aggregations = response.getAggregations(); | |
// 4.1. 根据品牌名称,获取品牌结果 | |
List<String> brandList = getAggByName(aggregations, "brandAgg"); | |
result.put("品牌", brandList); | |
// 4.2. 根据品牌名称,获取品牌结果 | |
List<String> cityList = getAggByName(aggregations, "cityAgg"); | |
result.put("城市", cityList); | |
// 4.3. 根据品牌名称,获取品牌结果 | |
List<String> starList = getAggByName(aggregations, "starAgg"); | |
result.put("星级", starList); | |
return result; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private void buildAggregation(SearchRequest request) { | |
request.source().aggregation(AggregationBuilders | |
.terms("brandAgg") | |
.field("brand") | |
.size(100) | |
); | |
request.source().aggregation(AggregationBuilders | |
.terms("cityAgg") | |
.field("city") | |
.size(100) | |
); | |
request.source().aggregation(AggregationBuilders | |
.terms("starAgg") | |
.field("starName") | |
.size(100) | |
); | |
} | |
private List<String> getAggByName(Aggregations aggregations, String aggName) { | |
// 4.1. 根据聚合名称获取聚合结果 | |
Terms brandTerms = aggregations.get(aggName); | |
// 4.2. 获取 buckets | |
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); | |
// 4.3. 遍历 | |
List<String> brandList = new ArrayList<>(); | |
for (Terms.Bucket bucket : buckets) { | |
// 4.4. 获取 key | |
String key = bucket.getKeyAsString(); | |
brandList.add(key); | |
} | |
return brandList; | |
} |
# 自动补全
# 拼音分词器
POST /_analyze | |
{ | |
"text": "如家酒店还不错", | |
"analyzer": "pinyin" | |
} |
# 自定义分词器
# 定义
默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器
# elasticsearch 中分词器(analyzer)组成
- character filters:在 tokenizer 之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如 keyword,就是不分词;还有 ik_smart
- tokenizer filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
# 自定义分词器语法
PUT /test | |
{ | |
"settings": { | |
"analysis": { | |
"analyzer": { // 自定义分词器 | |
"my_analyzer": { // 分词器名称 | |
"tokenizer": "ik_max_word", | |
"filter": "py" | |
} | |
}, | |
"filter": { // 自定义 tokenizer filter | |
"py": { // 过滤器名称 | |
"type": "pinyin", // 过滤器类型,这里是 pinyin | |
"keep_full_pinyin": false, | |
"keep_joined_full_pinyin": true, | |
"keep_original": true, | |
"limit_first_letter_length": 16, | |
"remove_duplicated_term": true, | |
"none_chinese_pinyin_tokenize": false | |
} | |
} | |
} | |
}, | |
"mappings": { | |
"properties": { | |
"name": { | |
"type": "text", | |
"analyzer": "my_analyzer", | |
"search_analyzer": "ik_smart" | |
} | |
} | |
} | |
} |
# 自动补全查询
# 定义
elasticsearch 提供了 Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束
- 参与补全查询的字段必须是 completion 类型
- 字段的内容一般是用来补全多个词条形成的数据
# 语法
# 创建索引库
// 创建索引库 | |
PUT test | |
{ | |
"mappings": { | |
"properties": { | |
"title":{ | |
"type": "completion" | |
} | |
} | |
} | |
} |
# 插入数据
// 示例数据 | |
POST test/_doc | |
{ | |
"title": ["Sony", "WH-1000XM3"] | |
} | |
POST test/_doc | |
{ | |
"title": ["SK-II", "PITERA"] | |
} | |
POST test/_doc | |
{ | |
"title": ["Nintendo", "switch"] | |
} |
# 查询数据
// 自动补全查询 | |
GET /test/_search | |
{ | |
"suggest": { | |
"title_suggest": { | |
"text": "s", // 关键字 | |
"completion": { | |
"field": "title", // 补全查询的字段 | |
"skip_duplicates": true, // 跳过重复的 | |
"size": 10 // 获取前 10 条结果 | |
} | |
} | |
} | |
} |
# 实现酒店搜索框自动补全
# 步骤
- 修改 hotel 索引库结构,设置自定义拼音分词器
- 修改索引库的 name、all 字段,使用自定义分词器
- 索引库添加一个新字段 suggestion,类型为 completion 类型,使用自定义的分词器
- 给 HotelDoc 类添加 suggestion 字段,内容包含 brand、business
- 重新导入数据到 hotel 库
# 修改映射
// 酒店数据索引库 | |
PUT /hotel | |
{ | |
"settings": { | |
"analysis": { | |
"analyzer": { | |
"text_anlyzer": { | |
"tokenizer": "ik_max_word", | |
"filter": "py" | |
}, | |
"completion_analyzer": { | |
"tokenizer": "keyword", | |
"filter": "py" | |
} | |
}, | |
"filter": { | |
"py": { | |
"type": "pinyin", | |
"keep_full_pinyin": false, | |
"keep_joined_full_pinyin": true, | |
"keep_original": true, | |
"limit_first_letter_length": 16, | |
"remove_duplicated_term": true, | |
"none_chinese_pinyin_tokenize": false | |
} | |
} | |
} | |
}, | |
"mappings": { | |
"properties": { | |
"id":{ | |
"type": "keyword" | |
}, | |
"name":{ | |
"type": "text", | |
"analyzer": "text_anlyzer", | |
"search_analyzer": "ik_smart", | |
"copy_to": "all" | |
}, | |
"address":{ | |
"type": "keyword", | |
"index": false | |
}, | |
"price":{ | |
"type": "integer" | |
}, | |
"score":{ | |
"type": "integer" | |
}, | |
"brand":{ | |
"type": "keyword", | |
"copy_to": "all" | |
}, | |
"city":{ | |
"type": "keyword" | |
}, | |
"starName":{ | |
"type": "keyword" | |
}, | |
"business":{ | |
"type": "keyword", | |
"copy_to": "all" | |
}, | |
"location":{ | |
"type": "geo_point" | |
}, | |
"pic":{ | |
"type": "keyword", | |
"index": false | |
}, | |
"all":{ | |
"type": "text", | |
"analyzer": "text_anlyzer", | |
"search_analyzer": "ik_smart" | |
}, | |
"suggestion":{ | |
"type": "completion", | |
"analyzer": "completion_analyzer" | |
} | |
} | |
} | |
} |
# 修改 HotelDoc 实体
HotelDoc 中要添加一个字段,用来做自动补全,内容可以是酒店品牌、城市、商圈等信息。按照自动补全字段的要求,最好是这些字段的数组,在 HotelDoc 中添加一个 suggestion 字段,类型为 List<String>
,然后将 brand、city、business 等信息放到里面。
@Data | |
@NoArgsConstructor | |
public class HotelDoc { | |
private Long id; | |
private String name; | |
private String address; | |
private Integer price; | |
private Integer score; | |
private String brand; | |
private String city; | |
private String starName; | |
private String business; | |
private String location; | |
private String pic; | |
private Object distance; | |
private Boolean isAD; | |
private List<String> suggestion; | |
public HotelDoc(Hotel hotel) { | |
this.id = hotel.getId(); | |
this.name = hotel.getName(); | |
this.address = hotel.getAddress(); | |
this.price = hotel.getPrice(); | |
this.score = hotel.getScore(); | |
this.brand = hotel.getBrand(); | |
this.city = hotel.getCity(); | |
this.starName = hotel.getStarName(); | |
this.business = hotel.getBusiness(); | |
this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); | |
this.pic = hotel.getPic(); | |
// 组装 suggestion | |
if(this.business.contains("/")){ | |
//business 有多个值,需要切割 | |
String[] arr = this.business.split("/"); | |
// 添加元素 | |
this.suggestion = new ArrayList<>(); | |
this.suggestion.add(this.brand); | |
Collections.addAll(this.suggestion, arr); | |
}else { | |
this.suggestion = Arrays.asList(this.brand, this.business); | |
} | |
} | |
} |
# 自动补全查询的 JavaAPI
# 自动补全代码实现
# 解析
# 实现搜索自动补全
# Controller 层接受请求
@GetMapping("suggestion") | |
public List<String> getSuggestions(@RequestParam("key") String prefix) { | |
return hotelService.getSuggestions(prefix); | |
} |
# Service 层接口
List<String> getSuggestions(String prefix); |
# Service 层实现
@Override | |
public List<String> getSuggestions(String prefix) { | |
try { | |
// 1. 准备 Request | |
SearchRequest request = new SearchRequest("hotel"); | |
// 2. 准备 DSL | |
request.source().suggest(new SuggestBuilder().addSuggestion( | |
"suggestions", | |
SuggestBuilders.completionSuggestion("suggestion") | |
.prefix(prefix) | |
.skipDuplicates(true) | |
.size(10) | |
)); | |
// 3. 发起请求 | |
SearchResponse response = client.search(request, RequestOptions.DEFAULT); | |
// 4. 解析结果 | |
Suggest suggest = response.getSuggest(); | |
// 4.1. 根据补全查询名称,获取补全结果 | |
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); | |
// 4.2. 获取 options | |
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); | |
// 4.3. 遍历 | |
List<String> list = new ArrayList<>(options.size()); | |
for (CompletionSuggestion.Entry.Option option : options) { | |
String text = option.getText().toString(); | |
list.add(text); | |
} | |
return list; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} |
# 数据同步
# 定义
elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步
# 思路分析
# 同步调用
- hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据
- 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,
# 异步通知
- hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息
- hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改
# 监听 binlog
- 给 mysql 开启 binlog 功能
- mysql 完成增、删、改操作都会记录在 binlog 中
- hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容
# 实现数据同步
# 步骤
-
声明 exchange、queue、RoutingKey
-
在 hotel-admin 中的增、删、改业务中完成消息发送
-
在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据
-
启动并测试数据同步功能
# 声明交换机、队列
# 引入依赖
<!--amqp--> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-amqp</artifactId> | |
</dependency> |
# 声明队列交换机名称
新建一个类 MqConstants
public class MqConstants { | |
/** | |
* 交换机 | |
*/ | |
public final static String HOTEL_EXCHANGE = "hotel.topic"; | |
/** | |
* 监听新增和修改的队列 | |
*/ | |
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; | |
/** | |
* 监听删除的队列 | |
*/ | |
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; | |
/** | |
* 新增或修改的RoutingKey | |
*/ | |
public final static String HOTEL_INSERT_KEY = "hotel.insert"; | |
/** | |
* 删除的RoutingKey | |
*/ | |
public final static String HOTEL_DELETE_KEY = "hotel.delete"; | |
} |
# 声明队列交换机
@Configuration | |
public class MqConfig { | |
@Bean | |
public TopicExchange topicExchange(){ | |
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false); | |
} | |
@Bean | |
public Queue insertQueue(){ | |
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true); | |
} | |
@Bean | |
public Queue deleteQueue(){ | |
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true); | |
} | |
@Bean | |
public Binding insertQueueBinding(){ | |
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY); | |
} | |
@Bean | |
public Binding deleteQueueBinding(){ | |
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY); | |
} | |
} |
# 发送 MQ 消息
# 接收 MQ 消息
- 新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库
- 删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据
# Service 层接口
void deleteById(Long id); | |
void insertById(Long id); |
# Service 层实现
@Override | |
public void deleteById(Long id) { | |
try { | |
// 1. 准备 Request | |
DeleteRequest request = new DeleteRequest("hotel", id.toString()); | |
// 2. 发送请求 | |
client.delete(request, RequestOptions.DEFAULT); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public void insertById(Long id) { | |
try { | |
// 0. 根据 id 查询酒店数据 | |
Hotel hotel = getById(id); | |
// 转换为文档类型 | |
HotelDoc hotelDoc = new HotelDoc(hotel); | |
// 1. 准备 Request 对象 | |
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); | |
// 2. 准备 Json 文档 | |
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); | |
// 3. 发送请求 | |
client.index(request, RequestOptions.DEFAULT); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} |
# 编写监听器
@Component | |
public class HotelListener { | |
@Autowired | |
private IHotelService hotelService; | |
/** | |
* 监听酒店新增或修改的业务 | |
* @param id 酒店id | |
*/ | |
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) | |
public void listenHotelInsertOrUpdate(Long id){ | |
hotelService.insertById(id); | |
} | |
/** | |
* 监听酒店删除的业务 | |
* @param id 酒店id | |
*/ | |
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) | |
public void listenHotelDelete(Long id){ | |
hotelService.deleteById(id); | |
} | |
} |
# es 集群
# 定义
# 介绍
单机的 elasticsearch 做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题
- 海量数据存储问题:将索引库从逻辑上拆分为 N 个分片(shard),存储到多个节点
- 单点故障问题:将分片数据在不同节点备份(replica )
# ES 集群相关概念
- 集群(cluster):一组拥有共同的 cluster name 的 节点
- <font color="red"> 节点(node)</font> :集群中的一个 Elasticearch 实例
- <font color="red"> 分片(shard)</font>:索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中
-
主分片(Primary shard):相对于副本分片的定义
-
副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样
# 分片规则
# 集群脑裂问题
# 集群机制划分
- master 节点:对 CPU 要求高,但是内存要求第
- data 节点:对 CPU 和内存要求都高
- coordinating 节点:对网络带宽、CPU 要求高
# es 职责划分图
# 脑裂问题
脑裂是因为集群中的节点失联导致的
例如一个集群中,主节点与其它节点失联:
此时,node2 和 node3 认为 node1 宕机,就会重新选主:
当 node3 当选后,集群继续对外提供服务,node2 和 node3 自成集群,node1 自成集群,两个集群数据不同步,出现数据差异。
当网络恢复后,因为集群中有两个 master 节点,集群状态的不一致,出现脑裂的情况:
解决脑裂的方案是,要求选票超过 ( eligible 节点数量 + 1 )/ 2 才能当选为主,因此 eligible 节点数量最好是奇数。对应配置项是 discovery.zen.minimum_master_nodes,在 es7.0 以后,已经成为默认配置,因此一般不会发生脑裂问题
# 集群分布式存储
# 分片存储原理
elasticsearch 会通过 hash 算法来计算文档应该存储到哪个分片:
# 集群分布式查询
# elasticsearch 查询的两个阶段
- scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片
- gather phase:聚集阶段,coordinating node 汇总 data node 的搜索结果,并处理为最终结果集返回给用户
# 集群故障转移
# 原理
集群的 master 节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移
# 示例分析
1)例如一个集群结构如图:
现在,node1 是主节点,其它两个节点是从节点。
2)突然,node1 发生了故障:
宕机后的第一件事,需要重新选主,例如选中了 node2:
node2 成为主节点后,会检测集群监控状态,发现:shard-1、shard-0 没有副本节点。因此需要将 node1 上的数据迁移到 node2、node3: