# 基于 Redis 实现共享 session 登录
# 基于 Redis 实现共享 session 登录
# 发送短信验证码
# 思路图解
# 代码实现
@Override | |
public Result sendCode(String phone, HttpSession session) { | |
// 1. 判断手机号格式是否正确 | |
if (RegexUtils.isPhoneInvalid(phone)) { | |
// 1.1 手机号格式不正确,提示信息 | |
return Result.fail("手机号格式错误"); | |
} | |
// 2. 手机号格式正确,生成验证码 | |
String code = RandomUtil.randomNumbers(6); | |
// 3. 将验证码存到 Redis | |
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES); | |
// 4. 模拟发送短信验证码 | |
log.debug("发送验证码成功,验证码为:{}",code); | |
return Result.ok(); | |
} |
# 校验登录状态
# 思路图解
# 代码实现
@Override | |
public Result login(LoginFormDTO loginForm, HttpSession session) { | |
// 1. 校验手机号 | |
String phone = loginForm.getPhone(); | |
if (RegexUtils.isPhoneInvalid(phone)) { | |
return Result.fail("手机号格式错误"); | |
} | |
// 2. 从 Redis 中获取验证码 | |
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); | |
String code = loginForm.getCode(); | |
// 3. 判断验证码是否存在以及是否正确 | |
if (cacheCode == null || !cacheCode.equals(code)) { | |
return Result.fail("验证码错误,请重新验证"); | |
} | |
// 4. 查询用户,看是否存在 | |
User user = query().eq("phone",phone).one(); | |
// 5. 不存在,创建 user 用户并保存 | |
if (user == null) { | |
user = createUserWithPhone(phone); | |
} | |
// 6. 存在,保存用户信息到 Redis | |
String token = UUID.randomUUID().toString(true); | |
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); | |
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); | |
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap); | |
stringRedisTemplate.expire(LOGIN_USER_KEY,30,TimeUnit.MINUTES); | |
return Result.ok(token); | |
} | |
private User createUserWithPhone(String phone) { | |
User user = new User(); | |
user.setPhone(phone); | |
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10)); | |
save(user); | |
return user; | |
} |
# 解决状态登录刷新问题
# 登录拦截器优化(思路)
# 代码实现
# 第一层拦截器 LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor { | |
@Override | |
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { | |
// 1. 判断是否需要拦截 ThreadLocal 中是否有用户 | |
if (UserHolder.getUser() == null) { | |
response.setStatus(401); | |
return false; | |
} | |
// 2. 有用户则放行 | |
return true; | |
} | |
} |
# 第二层拦截器 RefreshTokenInterceptor
public class RefreshTokenInterceptor implements HandlerInterceptor { | |
private StringRedisTemplate stringRedisTemplate; | |
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { | |
this.stringRedisTemplate = stringRedisTemplate; | |
} | |
@Override | |
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { | |
// 1. 获取请求头中 token | |
String token = request.getHeader("authorization"); | |
if (StrUtil.isBlank(token)) { | |
return true; | |
} | |
// 2. 基于 token 获取用户 | |
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token); | |
// 3. 判断用户是否存在 | |
if (userMap.isEmpty()) { | |
// 4. 不存在,拦截 | |
return true; | |
} | |
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); | |
// 5. 存在,保存用户信息到 ThreadLocal | |
UserHolder.saveUser(userDTO); | |
// 6. 刷新 token 有效期 | |
stringRedisTemplate.expire(LOGIN_USER_KEY + token, 30, TimeUnit.MINUTES); | |
// 7. 放行 | |
return true; | |
} | |
@Override | |
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { | |
// 移除用户 | |
UserHolder.removeUser(); | |
} | |
} |
# MvcConfig 配置拦截器
@Configuration | |
public class MvcConfig implements WebMvcConfigurer { | |
@Resource | |
private StringRedisTemplate stringRedisTemplate; | |
@Override | |
public void addInterceptors(InterceptorRegistry registry) { | |
// 登录拦截器 | |
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0); | |
//token 刷新拦截器 | |
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns( | |
"/shop/**", | |
"/voucher/**", | |
"/shop-type/**", | |
"/blog/hot", | |
"/user/code", | |
"/user/login" | |
).order(1); | |
} | |
} |
# 商户查询缓存
# 缓存介绍
缓存就是数据交换的缓冲区(称作 Cache),是存贮数据的临时地方,一般读写性能较高
# 缓存作用
- 降低后端负载
- 提高读写效率,降低响应时间
# 缓存的成本
- 数据一致性成本
- 代码维护成本
- 运维成本
# 添加 Redis 缓存
# 业务流程分析
# 代码实现
@Service | |
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { | |
@Resource | |
private StringRedisTemplate stringRedisTemplate; | |
@Override | |
public Object queryById(Long id) { | |
// 1. 设置 redis 缓存 key | |
String key = "cache:shop:" + id; | |
// 2. 从 Redis 缓存,通过 key 查询店铺信息 | |
String shopJson = stringRedisTemplate.opsForValue().get(key); | |
// 3.Redis 中查找店铺信息,返回店铺 | |
if (StrUtil.isNotBlank(shopJson)) { | |
Shop shop = JSONUtil.toBean(shopJson, Shop.class); | |
return Result.ok(shop); | |
} | |
// 4.Redis 中未查找到店铺信息,返回提示信息,从数据库中查询信息 | |
Shop shop = getById(id); | |
if (shop == null) { | |
return Result.fail("商铺信息不存在"); | |
} | |
// 5. 将从数据库查询到的信息缓存到 Redis | |
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); | |
// 6. 返回商铺信息 | |
return Result.ok(shop); | |
} | |
} |
# 缓存更新策略
# 三种缓存更新策略
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不用自己维护,利用 Redis 的内存淘汰机制,当内存不够时自动淘汰部分数据,下次查询时更新缓存 | 给缓存数据添加 TTL 时间,到期后自动删除缓存,下次查询时更新缓存 | 编写业务逻辑,在修改数据库的同时,更新缓存 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
- 低一致性需求:使用内存淘汰机制,例如店铺类型查询缓存
- 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存
# 主动更新策略
# Cache Aside Pattern
由缓存调用者,在更新数据库的同时更新缓存
- 删除缓存还是更新缓存(推荐 2)
- 更新缓存:每次更新数据库都更新缓存,无效写操作过多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
- 如何保证缓存与数据库的操作的同时成功或失败
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统:利用 TCC 等分布式事务方案
- 先操作缓存还是先操作数据库
- 先删除缓存,再操作数据库
- 先操作数据库,再操作缓存(推荐,线程安全问题出现概率小)
# Read/White Through Pattern
缓存与数据库整合为一个服务,由服务来维护一致性,调用者调用该服务,无需关系缓存一致性问题
# Write Behind Caching Pattern
调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致
# 代码实现
@Override | |
public Result update(Shop shop) { | |
// 1. 判断 id 是否存在 | |
if (shop.getId() == null) { | |
return Result.fail("id值不存在"); | |
} | |
// 2. 更新数据库 | |
updateById(shop); | |
// 3. 更新缓存 | |
stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId()); | |
return Result.ok(); | |
} |
# 缓存穿透
# 概念
缓存穿透是指客户请求的数据在缓存中数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
# 解决思路
# 缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的不一致
# 布隆过滤
- 优点:内存占用较少,没有多余的 key
- 缺点
- 实现复杂
- 存在误判可能
# 代码实现
# 业务逻辑
# 代码
@Override | |
public Object queryShopById(Long id) { | |
// 1. 设置 redis 缓存 key | |
String key = "cache:shop:" + id; | |
// 2. 从 Redis 缓存,通过 key 查询店铺信息 | |
String shopJson = stringRedisTemplate.opsForValue().get(key); | |
// 3.Redis 中查找店铺信息,返回店铺 | |
if (StrUtil.isNotBlank(shopJson)) { | |
Shop shop = JSONUtil.toBean(shopJson, Shop.class); | |
return Result.ok(shop); | |
} | |
// 判断命中是否是空值 | |
if (shopJson != null) { | |
return Result.fail("店铺存在"); | |
} | |
// 4.Redis 中未查找到店铺信息,返回提示信息,从数据库中查询信息 | |
Shop shop = getById(id); | |
if (shop == null) { | |
// 将空值写入 Redis | |
stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES); | |
// 返回错误信息 | |
return Result.fail("商铺信息不存在"); | |
} | |
// 5. 将从数据库查询到的信息缓存到 Redis | |
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES); | |
// 6. 返回商铺信息 | |
return Result.ok(shop); | |
} |
# 缓存雪崩
# 概念
缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库带来巨大压力
# 解决方案
- 给不同的 Key 的 TTL 添加随机值
- 利用 Redis 集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
# 缓存击穿
# 概念
缓存击穿问题也叫热点 key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大冲击
# 解决方案
# 互斥锁
# 逻辑过期
# 互斥锁与逻辑过期比较
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | 没有额外的内存消耗、保证一致性、实现简单 | 线程需要等待,性能收到影响、可能有死锁风险 |
逻辑过期 | 线程无需等待,性能较好 | 不保证一致性、 有额外内存消耗、实现复杂 |
# 基于互斥锁解决缓存击穿问题
# 业务逻辑
# 代码实现
@Override | |
public Result queryShopById(Long id) throws InterruptedException { | |
Shop shop = queryWithMutex(id); | |
if (shop == null) { | |
return Result.fail("店铺不存在"); | |
} | |
return Result.ok(shop); | |
} | |
public Shop queryWithMutex(Long id) throws InterruptedException { | |
// 1. 设置 redis 缓存 key | |
String key = "cache:shop:" + id; | |
// 2. 从 Redis 缓存,通过 key 查询店铺信息 | |
String shopJson = stringRedisTemplate.opsForValue().get(key); | |
// 3.Redis 中查找店铺信息,返回店铺 | |
if (StrUtil.isNotBlank(shopJson)) { | |
Shop shop = JSONUtil.toBean(shopJson, Shop.class); | |
return shop; | |
} | |
// 判断命中是否是空值 | |
if (shopJson != null) { | |
return null; | |
} | |
String lockKey = null; | |
Shop shop = null; | |
try { | |
// 4. 开始实现缓存重建 | |
// 4.1 获取互斥锁 | |
lockKey = "lock:shop:" + id; | |
boolean isLock = tryLock(lockKey); | |
// 4.2 判断是否获取成功 | |
if (!isLock) { | |
// 4.3 失败,休眠重试 | |
Thread.sleep(50); | |
return queryWithMutex(id); | |
} | |
// 4.4 成功,根据 id 查询数据库 | |
shop = getById(id); | |
if (shop == null) { | |
// 将空值写入 Redis | |
stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES); | |
// 返回错误信息 | |
return null; | |
} | |
// 5. 将从数据库查询到的信息缓存到 Redis | |
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
// 6. 释放互斥锁 | |
unlock(lockKey); | |
} | |
// 7. 返回商铺信息 | |
return shop; | |
} | |
// 实现获取锁(加锁) | |
private boolean tryLock(String key) { | |
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); | |
return BooleanUtil.isTrue(flag); | |
} | |
// 实现释放锁 | |
private void unlock(String key) { | |
stringRedisTemplate.delete(key); | |
} |
# 基于逻辑过期解决缓存击穿问题
# 业务逻辑
# 代码实现
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); | |
public void saveShop2Redis(Long id, Long expireSeconds) { | |
// 1. 查询店铺信息 | |
Shop shop = getById(id); | |
// 2. 封装逻辑过期时间 | |
RedisData redisData = new RedisData(); | |
redisData.setData(shop); | |
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); | |
// 3. 写入 Redis | |
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData)); | |
} | |
public Shop queryWithLogicalExpire(Long id) { | |
// 1. 设置 redis 缓存 key | |
String key = "cache:shop:" + id; | |
// 2. 从 Redis 缓存,通过 key 查询店铺信息 | |
String shopJson = stringRedisTemplate.opsForValue().get(key); | |
// 3.Redis 中查找店铺信息,判断是否命中 | |
if (StrUtil.isBlank(shopJson)) { | |
// 未命中,返回空 | |
return null; | |
} | |
// 4. 命中,需要将 json 反序列化为对象 | |
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); | |
JSONObject data = (JSONObject) redisData.getData(); | |
Shop shop = JSONUtil.toBean(data, Shop.class); | |
LocalDateTime expireTime = redisData.getExpireTime(); | |
// 5. 判断是否过期 | |
if (expireTime.isAfter(LocalDateTime.now())) { | |
// 5.1 未过期,直接返回店铺信息 | |
return shop; | |
} | |
// 5.2 已过期需要缓存重建 | |
// 6. 缓存重建 | |
String lockKey = "lock:shop:" + id; | |
// 6.1 获取互斥锁 | |
boolean isLock = tryLock(lockKey); | |
// 6.2 判断是否获取锁成功 | |
if (isLock) { | |
// 6.3 成功,开启独立线程,开启缓存重建 | |
CACHE_REBUILD_EXECUTOR.submit(() -> { | |
// 重建缓存 | |
try { | |
this.saveShop2Redis(id,20L); | |
}catch (Exception e) { | |
throw new RuntimeException(e); | |
} finally { | |
// 释放锁 | |
unlock(lockKey); | |
} | |
}); | |
} | |
// 6.4 返回过期商铺信息 | |
return shop; | |
} |
# 缓存工具封装(解决缓存击穿问题)
@Slf4j | |
@Component | |
public class CacheClient { | |
@Resource | |
private StringRedisTemplate stringRedisTemplate; | |
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); | |
public void set(String key, Object value, Long time, TimeUnit unit) { | |
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit); | |
} | |
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) { | |
// 设置逻辑过期 | |
RedisData redisData = new RedisData(); | |
redisData.setData(value); | |
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); | |
// 写入 Redis | |
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); | |
} | |
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time,TimeUnit unit) { | |
String key = keyPrefix + id; | |
String json = stringRedisTemplate.opsForValue().get(key); | |
if (StrUtil.isNotBlank(json)) { | |
return JSONUtil.toBean(json, type); | |
} | |
if (json != null) { | |
return null; | |
} | |
R r = dbFallback.apply(id); | |
if (r == null) { | |
stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES); | |
return null; | |
} | |
this.set(key,r,time,unit); | |
return r; | |
} | |
public <R, ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type, Function<ID, R> dbFallback, Long time,TimeUnit unit) { | |
String key = keyPrefix + id; | |
String shopJson = stringRedisTemplate.opsForValue().get(key); | |
if (StrUtil.isBlank(shopJson)) { | |
return null; | |
} | |
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); | |
JSONObject data = (JSONObject) redisData.getData(); | |
R r = JSONUtil.toBean(data, type); | |
LocalDateTime expireTime = redisData.getExpireTime(); | |
if (expireTime.isAfter(LocalDateTime.now())) { | |
return r; | |
} | |
String lockKey = "lock:shop:" + id; | |
boolean isLock = tryLock(lockKey); | |
if (isLock) { | |
CACHE_REBUILD_EXECUTOR.submit(() -> { | |
try { | |
R r1 = dbFallback.apply(id); | |
this.setWithLogicalExpire(key,r1,time,unit); | |
}catch (Exception e) { | |
throw new RuntimeException(e); | |
} finally { | |
unlock(lockKey); | |
} | |
}); | |
} | |
return r; | |
} | |
// 实现获取锁(加锁) | |
private boolean tryLock(String key) { | |
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); | |
return BooleanUtil.isTrue(flag); | |
} | |
// 实现释放锁 | |
private void unlock(String key) { | |
stringRedisTemplate.delete(key); | |
} | |
} |
# 优惠卷秒杀
# 全局唯一 ID
# 概念
全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
# 1.2 ID 组成部分
- 符号位:1bit,永远为 0
- 时间戳:31bit,以秒为单位
- 序列号:32bit,秒内的计数器,支持每秒产生 个不同 ID
# 代码实现
@Component | |
public class RedisIdWorker { | |
private static final Long BEGIN_TIMESTAMP = 1640995200L; | |
private static final int COUNT_BITS = 32; | |
@Resource | |
private StringRedisTemplate stringRedisTemplate; | |
public long nextId(String ketPrefix) { | |
// 1. 生成时间戳 | |
LocalDateTime now = LocalDateTime.now(); | |
long nowSecond = now.toEpochSecond(ZoneOffset.UTC); | |
long timestamp = nowSecond - BEGIN_TIMESTAMP; | |
// 2. 生成序列号 | |
// 2.1 获取当前日期 | |
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); | |
// 2.2 自增长 | |
long count = stringRedisTemplate.opsForValue().increment("icr:" + ketPrefix + ":" + date); | |
// 3. 拼接并返回 | |
return timestamp << COUNT_BITS | count; | |
} | |
} |
# 添加优惠券
# VoucherController
/** | |
* 新增普通券 | |
* @param voucher 优惠券信息 | |
* @return 优惠券id | |
*/ | |
@PostMapping | |
public Result addVoucher(@RequestBody Voucher voucher) { | |
voucherService.save(voucher); | |
return Result.ok(voucher.getId()); | |
} | |
/** | |
* 新增秒杀券 | |
* @param voucher 优惠券信息,包含秒杀信息 | |
* @return 优惠券id | |
*/ | |
@PostMapping("seckill") | |
public Result addSeckillVoucher(@RequestBody Voucher voucher) { | |
voucherService.addSeckillVoucher(voucher); | |
return Result.ok(voucher.getId()); | |
} |
# VoucherServiceImpl
@Override | |
@Transactional | |
public void addSeckillVoucher(Voucher voucher) { | |
// 保存优惠券 | |
save(voucher); | |
// 保存秒杀信息 | |
SeckillVoucher seckillVoucher = new SeckillVoucher(); | |
seckillVoucher.setVoucherId(voucher.getId()); | |
seckillVoucher.setStock(voucher.getStock()); | |
seckillVoucher.setBeginTime(voucher.getBeginTime()); | |
seckillVoucher.setEndTime(voucher.getEndTime()); | |
seckillVoucherService.save(seckillVoucher); | |
} |
# 秒杀下单
# 业务逻辑
# 库存超卖
# 问题产生原因
# 悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行
Synchronized、Lock 都属于悲观锁
# 乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改
- 如果没有修改则认为是安全的,自己才更新数据
- 如果已经被其它线程修改说明发生了安全问题,此时可以重试或者异常
# 版本号法
# CAS 法
# 一人一单
# 业务逻辑
# 集群模式下一人一单
# 一人一单并发安全问题
# 分布式锁
# 分布式锁概念
满足分布式系统或集群下多进程可见并且互斥的锁
# 分布式锁实现
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用 mysql 本身的互斥锁机制 | 利用 setnx 这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
# 实现思路
# Redisson
# 概念
Redisson 是一个 Redis 的基础上实现的 Java 驻内存数据网格,它不仅提供了一系列的分布式的 Java 常用对象,还提供了很多分布式服务,其中就包含了各种分布式锁的实现
# Redisson 入门
引入依赖
<dependency> | |
<groupId>org.redisson</groupId> | |
<artfactId>redisson</artfactId> | |
</dependency> |
配置 Redisson 客户端
@Configuration | |
public class RedisConfig { | |
@Bean | |
public RedissonClient redissonClient() { | |
// 配置类 | |
Config config = new Config(); | |
// 添加 redis 地址,这里添加了单点地址,也可以使用 config.useClusterServers () 添加集群地址 | |
config.useSingleServer().setAddress("redis://192.168.116.129:6379").setPassword("root"); | |
// 创建客户端 | |
return Redisson.create(config); | |
} | |
} |
使用 Redisson 的分布式锁
@Resource | |
private RedissonClient redissonClient; | |
@Test | |
void testRedisson() throws InterruptedException { | |
// 获取锁(可重入),指定锁的名称 | |
RLock lock = redissonClient.getLock("anyLock"); | |
// 尝试获取锁,参数分别时:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 | |
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS); | |
// 判断锁是否获取成功 | |
if(isLock) { | |
try { | |
System.out.println("执行业务"); | |
} finally { | |
// 释放锁 | |
lock.unclock(); | |
} | |
} | |
} |
# Redisson 可重入锁原理
# Redisson 分布式锁原理
# Redisson 分布式锁主从一致性问题
- 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高,实现复杂
# Redis 优化秒杀
# 业务分析
# Redis 消息队列实现异步秒杀
# 消息队列概述
消息队列(Message Queue),字面意思就是存放信息的队列,最简单的消息队列包含 3 个角色
- 消息队列:存储和管理消息,也称为消息代理(Message Borker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列狐猴去信息并处理信息
# Redis 提供三种方式实现消息队列
- list 结构:基于 List 结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
# 基于 List 实现消息队列
- Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果
- 当队列中没有信息时 RPOP 或者 LPOP 操作会返回 null,并不像 JVM 阻塞队列那样会阻塞等待消息,因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果
- 优点:
- 利用 Redis 存储,不受限于 JVM 内存上限
- 基于 Redis 的持久化机制,数据安全性有保障
- 可以满足消息的有序性
- 缺点:
- 无法避免消息丢失
- 只支持单消费者
# PubSub 消息队列
- PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型,顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关信息
- SUBSCRIBE channel [channel]:订阅到一个或多个频道
- PUBLISH channel msg:向一个频道发送消息
- PSUBSCRIBE pattern [pattern]:订阅与 pattern 格式匹配的所有频道
- 优点:
- 发布订阅模型,支持多生产、多消费
- 缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
# 基于 Stream 的消息队列
- Stream 是 Redis5.0 引入的一种新的数据类型,可以实现一个功能非常完善的消息队列
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
# 发送消息命令
# 读取消息命令
阻塞方法读取最新消息
# 基于 Stream 的消息队列 —— 消费者组
# 消费者组
- 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列
- 消息可回溯
- 可以多消费者争抢,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
# 创建消费者组
- key:队列名称
- groupName:消费者组名称
- ID:起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
常见命令
# 从消费者组读取消息
- group:消费者组名称
- consumer:消费者名称,如果消费者不存在,会创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动 ACK,获取消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始 ID
- “>”:从下一个未消费的消息开始
- 其它:根据指定 id 从 pending-list 中获取已消费但未确定的消息