# 基于 Redis 实现共享 session 登录

# 基于 Redis 实现共享 session 登录

# 发送短信验证码

# 思路图解

image-20230326210924929

# 代码实现

@Override
public Result sendCode(String phone, HttpSession session) {
    // 1. 判断手机号格式是否正确
    if (RegexUtils.isPhoneInvalid(phone)) {
        // 1.1 手机号格式不正确,提示信息
        return Result.fail("手机号格式错误");
    }
    // 2. 手机号格式正确,生成验证码
    String code = RandomUtil.randomNumbers(6);
    // 3. 将验证码存到 Redis
    stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
    // 4. 模拟发送短信验证码
    log.debug("发送验证码成功,验证码为:{}",code);
    return Result.ok();
}

# 校验登录状态

# 思路图解

image-20230326211036384

# 代码实现

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
    // 1. 校验手机号
    String phone = loginForm.getPhone();
    if (RegexUtils.isPhoneInvalid(phone)) {
        return Result.fail("手机号格式错误");
    }
    // 2. 从 Redis 中获取验证码
    String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
    String code = loginForm.getCode();
    // 3. 判断验证码是否存在以及是否正确
    if (cacheCode == null || !cacheCode.equals(code)) {
        return Result.fail("验证码错误,请重新验证");
    }
	// 4. 查询用户,看是否存在
    User user = query().eq("phone",phone).one();
    // 5. 不存在,创建 user 用户并保存
    if (user == null) {
        user = createUserWithPhone(phone);
    }
    // 6. 存在,保存用户信息到 Redis
    String token = UUID.randomUUID().toString(true);
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
    stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);
    stringRedisTemplate.expire(LOGIN_USER_KEY,30,TimeUnit.MINUTES);
    return Result.ok(token);
}
private User createUserWithPhone(String phone) {
    User user = new User();
    user.setPhone(phone);
    user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
    save(user);
    return user;
}

# 解决状态登录刷新问题

# 登录拦截器优化(思路)

image-20230326211555584

# 代码实现

# 第一层拦截器 LoginInterceptor

public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 判断是否需要拦截 ThreadLocal 中是否有用户
        if (UserHolder.getUser() == null) {
            response.setStatus(401);
            return false;
        }
        // 2. 有用户则放行
        return true;
    }
}

# 第二层拦截器 RefreshTokenInterceptor

public class RefreshTokenInterceptor implements HandlerInterceptor {
    private StringRedisTemplate stringRedisTemplate;
    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 获取请求头中 token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2. 基于 token 获取用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
        // 3. 判断用户是否存在
        if (userMap.isEmpty()) {
            // 4. 不存在,拦截
            return true;
        }
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 5. 存在,保存用户信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 6. 刷新 token 有效期
        stringRedisTemplate.expire(LOGIN_USER_KEY + token, 30, TimeUnit.MINUTES);
        // 7. 放行
        return true;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户
        UserHolder.removeUser();
    }
}

# MvcConfig 配置拦截器

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 登录拦截器
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
        //token 刷新拦截器
        registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
                "/shop/**",
                "/voucher/**",
                "/shop-type/**",
                "/blog/hot",
                "/user/code",
                "/user/login"
        ).order(1);
    }
}

# 商户查询缓存

# 缓存介绍

缓存就是数据交换的缓冲区(称作 Cache),是存贮数据的临时地方,一般读写性能较高

# 缓存作用

  1. 降低后端负载
  2. 提高读写效率,降低响应时间

# 缓存的成本

  1. 数据一致性成本
  2. 代码维护成本
  3. 运维成本

# 添加 Redis 缓存

# 业务流程分析

image-20230327094824667

# 代码实现

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Object queryById(Long id) {
        // 1. 设置 redis 缓存 key
        String key = "cache:shop:" + id;
        // 2. 从 Redis 缓存,通过 key 查询店铺信息
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 3.Redis 中查找店铺信息,返回店铺
        if (StrUtil.isNotBlank(shopJson)) {
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        // 4.Redis 中未查找到店铺信息,返回提示信息,从数据库中查询信息
        Shop shop = getById(id);
        if (shop == null) {
            return Result.fail("商铺信息不存在");
        }
        // 5. 将从数据库查询到的信息缓存到 Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
        // 6. 返回商铺信息
        return Result.ok(shop);
    }
}

# 缓存更新策略

# 三种缓存更新策略

内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用 Redis 的内存淘汰机制,当内存不够时自动淘汰部分数据,下次查询时更新缓存 给缓存数据添加 TTL 时间,到期后自动删除缓存,下次查询时更新缓存 编写业务逻辑,在修改数据库的同时,更新缓存
一致性 一般
维护成本
  1. 低一致性需求:使用内存淘汰机制,例如店铺类型查询缓存
  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存

# 主动更新策略

# Cache Aside Pattern

由缓存调用者,在更新数据库的同时更新缓存

  1. 删除缓存还是更新缓存(推荐 2)
    1. 更新缓存:每次更新数据库都更新缓存,无效写操作过多
    2. 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
  2. 如何保证缓存与数据库的操作的同时成功或失败
    1. 单体系统,将缓存与数据库操作放在一个事务
    2. 分布式系统:利用 TCC 等分布式事务方案
  3. 先操作缓存还是先操作数据库
    1. 先删除缓存,再操作数据库
    2. 先操作数据库,再操作缓存(推荐,线程安全问题出现概率小)

image-20230327104227512

# Read/White Through Pattern

缓存与数据库整合为一个服务,由服务来维护一致性,调用者调用该服务,无需关系缓存一致性问题

# Write Behind Caching Pattern

调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致

# 代码实现

@Override
public Result update(Shop shop) {
    // 1. 判断 id 是否存在
    if (shop.getId() == null) {
        return Result.fail("id值不存在");
    }
    // 2. 更新数据库
    updateById(shop);
    // 3. 更新缓存
    stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());
    return Result.ok();
}

# 缓存穿透

# 概念

缓存穿透是指客户请求的数据在缓存中数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

# 解决思路

# 缓存空对象

  1. 优点:实现简单,维护方便
  2. 缺点:
    1. 额外的内存消耗
    2. 可能造成短期的不一致

image-20230327162403282

# 布隆过滤

  1. 优点:内存占用较少,没有多余的 key
  2. 缺点
    1. 实现复杂
    2. 存在误判可能

image-20230327162821846

# 代码实现

# 业务逻辑

image-20230327163211858

# 代码

@Override
public Object queryShopById(Long id) {
    // 1. 设置 redis 缓存 key
    String key = "cache:shop:" + id;
    // 2. 从 Redis 缓存,通过 key 查询店铺信息
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 3.Redis 中查找店铺信息,返回店铺
    if (StrUtil.isNotBlank(shopJson)) {
        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
        return Result.ok(shop);
    }
    // 判断命中是否是空值
    if (shopJson != null) {
        return Result.fail("店铺存在");
    }
    // 4.Redis 中未查找到店铺信息,返回提示信息,从数据库中查询信息
    Shop shop = getById(id);
    if (shop == null) {
        // 将空值写入 Redis
        stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
        // 返回错误信息
        return Result.fail("商铺信息不存在");
    }
    // 5. 将从数据库查询到的信息缓存到 Redis
    stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
    // 6. 返回商铺信息
    return Result.ok(shop);
}

# 缓存雪崩

# 概念

缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库带来巨大压力

image-20230327164900189

# 解决方案

  1. 给不同的 Key 的 TTL 添加随机值
  2. 利用 Redis 集群提高服务的可用性
  3. 给缓存业务添加降级限流策略
  4. 给业务添加多级缓存

# 缓存击穿

# 概念

缓存击穿问题也叫热点 key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大冲击

image-20230327170015848

# 解决方案

# 互斥锁

image-20230327170237194

# 逻辑过期

image-20230327170649532

# 互斥锁与逻辑过期比较

解决方案 优点 缺点
互斥锁 没有额外的内存消耗、保证一致性、实现简单 线程需要等待,性能收到影响、可能有死锁风险
逻辑过期 线程无需等待,性能较好 不保证一致性、 有额外内存消耗、实现复杂

# 基于互斥锁解决缓存击穿问题

# 业务逻辑

image-20230328095107316

# 代码实现

@Override
public Result queryShopById(Long id) throws InterruptedException {
    Shop shop = queryWithMutex(id);
    if (shop == null) {
        return Result.fail("店铺不存在");
    }
    return Result.ok(shop);
}
public Shop queryWithMutex(Long id) throws InterruptedException {
    // 1. 设置 redis 缓存 key
    String key = "cache:shop:" + id;
    // 2. 从 Redis 缓存,通过 key 查询店铺信息
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 3.Redis 中查找店铺信息,返回店铺
    if (StrUtil.isNotBlank(shopJson)) {
        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
        return shop;
    }
    // 判断命中是否是空值
    if (shopJson != null) {
        return null;
    }
    String lockKey = null;
    Shop shop = null;
    try {
        // 4. 开始实现缓存重建
        // 4.1 获取互斥锁
        lockKey = "lock:shop:" + id;
        boolean isLock = tryLock(lockKey);
        // 4.2 判断是否获取成功
        if (!isLock) {
            // 4.3 失败,休眠重试
            Thread.sleep(50);
            return queryWithMutex(id);
        }
        // 4.4 成功,根据 id 查询数据库
        shop = getById(id);
        if (shop == null) {
            // 将空值写入 Redis
            stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回错误信息
            return null;
        }
        // 5. 将从数据库查询到的信息缓存到 Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        // 6. 释放互斥锁
        unlock(lockKey);
    }
    // 7. 返回商铺信息
    return shop;
}
// 实现获取锁(加锁)
private boolean tryLock(String key) {
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}
// 实现释放锁
private void unlock(String key) {
    stringRedisTemplate.delete(key);
}

# 基于逻辑过期解决缓存击穿问题

# 业务逻辑

image-20230328141118976

# 代码实现

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public void saveShop2Redis(Long id, Long expireSeconds) {
    // 1. 查询店铺信息
    Shop shop = getById(id);
    // 2. 封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    // 3. 写入 Redis
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
public Shop queryWithLogicalExpire(Long id) {
    // 1. 设置 redis 缓存 key
    String key = "cache:shop:" + id;
    // 2. 从 Redis 缓存,通过 key 查询店铺信息
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 3.Redis 中查找店铺信息,判断是否命中
    if (StrUtil.isBlank(shopJson)) {
        // 未命中,返回空
        return null;
    }
    // 4. 命中,需要将 json 反序列化为对象
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    JSONObject data = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(data, Shop.class);
    LocalDateTime expireTime = redisData.getExpireTime();
    // 5. 判断是否过期
    if (expireTime.isAfter(LocalDateTime.now())) {
        // 5.1 未过期,直接返回店铺信息
        return shop;
    }
    // 5.2 已过期需要缓存重建
    // 6. 缓存重建
    String lockKey = "lock:shop:" + id;
    // 6.1 获取互斥锁
    boolean isLock = tryLock(lockKey);
    // 6.2 判断是否获取锁成功
    if (isLock) {
        // 6.3 成功,开启独立线程,开启缓存重建
        CACHE_REBUILD_EXECUTOR.submit(() -> {
            // 重建缓存
            try {
                this.saveShop2Redis(id,20L);
            }catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 释放锁
                unlock(lockKey);
            }
        });
    }
    // 6.4 返回过期商铺信息
    return shop;
}

# 缓存工具封装(解决缓存击穿问题)

@Slf4j
@Component
public class CacheClient {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入 Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }
    public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time,TimeUnit unit) {
        String key = keyPrefix + id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(json)) {
            return JSONUtil.toBean(json, type);
        }
        if (json != null) {
            return null;
        }
        R r = dbFallback.apply(id);
        if (r == null) {
            stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }
        this.set(key,r,time,unit);
        return r;
    }
    public <R, ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type, Function<ID, R> dbFallback, Long time,TimeUnit unit) {
        String key = keyPrefix + id;
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isBlank(shopJson)) {
            return null;
        }
        RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
        JSONObject data = (JSONObject) redisData.getData();
        R r = JSONUtil.toBean(data, type);
        LocalDateTime expireTime = redisData.getExpireTime();
        if (expireTime.isAfter(LocalDateTime.now())) {
            return r;
        }
        String lockKey = "lock:shop:" + id;
        boolean isLock = tryLock(lockKey);
        if (isLock) {
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    R r1 = dbFallback.apply(id);
                    this.setWithLogicalExpire(key,r1,time,unit);
                }catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    unlock(lockKey);
                }
            });
        }
        return r;
    }
    // 实现获取锁(加锁)
    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }
    // 实现释放锁
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

# 优惠卷秒杀

# 全局唯一 ID

# 概念

全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具

  1. 唯一性
  2. 高可用
  3. 高性能
  4. 递增性
  5. 安全性

# 1.2 ID 组成部分

image-20230329095732696

  1. 符号位:1bit,永远为 0
  2. 时间戳:31bit,以秒为单位
  3. 序列号:32bit,秒内的计数器,支持每秒产生2322^{32} 个不同 ID

# 代码实现

@Component
public class RedisIdWorker {
    private static final Long BEGIN_TIMESTAMP = 1640995200L;
    private static final int COUNT_BITS = 32;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    public long nextId(String ketPrefix) {
        // 1. 生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        // 2. 生成序列号
        // 2.1 获取当前日期
        String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
        // 2.2 自增长
        long count = stringRedisTemplate.opsForValue().increment("icr:" + ketPrefix + ":" + date);
        // 3. 拼接并返回
        return timestamp << COUNT_BITS | count;
    }
}

# 添加优惠券

# VoucherController

/**
 * 新增普通券
 * @param voucher 优惠券信息
 * @return 优惠券id
 */
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
    voucherService.save(voucher);
    return Result.ok(voucher.getId());
}
/**
 * 新增秒杀券
 * @param voucher 优惠券信息,包含秒杀信息
 * @return 优惠券id
 */
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
    voucherService.addSeckillVoucher(voucher);
    return Result.ok(voucher.getId());
}

# VoucherServiceImpl

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
}

# 秒杀下单

# 业务逻辑

image-20230329162851192

# 库存超卖

# 问题产生原因

image-20230329163924374

# 悲观锁

认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行

Synchronized、Lock 都属于悲观锁

# 乐观锁

认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改

  1. 如果没有修改则认为是安全的,自己才更新数据
  2. 如果已经被其它线程修改说明发生了安全问题,此时可以重试或者异常

# 版本号法

image-20230329164811115

# CAS 法

image-20230329164959697

# 一人一单

# 业务逻辑

image-20230329202121156

# 集群模式下一人一单

# 一人一单并发安全问题

image-20230329204515757

# 分布式锁

# 分布式锁概念

满足分布式系统或集群下多进程可见并且互斥的锁

image-20230402190403516

# 分布式锁实现

MySQL Redis Zookeeper
互斥 利用 mysql 本身的互斥锁机制 利用 setnx 这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

# 实现思路

image-20230402193954216

# Redisson

# 概念

Redisson 是一个 Redis 的基础上实现的 Java 驻内存数据网格,它不仅提供了一系列的分布式的 Java 常用对象,还提供了很多分布式服务,其中就包含了各种分布式锁的实现

# Redisson 入门

引入依赖

<dependency>
	<groupId>org.redisson</groupId>
    <artfactId>redisson</artfactId>
</dependency>

配置 Redisson 客户端

@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissonClient() {
        // 配置类
        Config config = new Config();
        // 添加 redis 地址,这里添加了单点地址,也可以使用 config.useClusterServers () 添加集群地址
        config.useSingleServer().setAddress("redis://192.168.116.129:6379").setPassword("root");
        // 创建客户端
        return Redisson.create(config);
    }
}

使用 Redisson 的分布式锁

@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    // 尝试获取锁,参数分别时:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    // 判断锁是否获取成功
    if(isLock) {
        try {
            System.out.println("执行业务");
        } finally {
            // 释放锁
            lock.unclock();
        }
    }
}

# Redisson 可重入锁原理

image-20230402203723653

# Redisson 分布式锁原理

image-20230402205618015

# Redisson 分布式锁主从一致性问题

image-20230402211429800

  1. 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
  2. 缺陷:运维成本高,实现复杂

# Redis 优化秒杀

# 业务分析

image-20230403100703600

image-20230403101146693

# Redis 消息队列实现异步秒杀

# 消息队列概述

消息队列(Message Queue),字面意思就是存放信息的队列,最简单的消息队列包含 3 个角色

  1. 消息队列:存储和管理消息,也称为消息代理(Message Borker)
  2. 生产者:发送消息到消息队列
  3. 消费者:从消息队列狐猴去信息并处理信息

image-20230403154904751

# Redis 提供三种方式实现消息队列

  1. list 结构:基于 List 结构模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:比较完善的消息队列模型

# 基于 List 实现消息队列

  1. Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果
  2. 当队列中没有信息时 RPOP 或者 LPOP 操作会返回 null,并不像 JVM 阻塞队列那样会阻塞等待消息,因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果
  3. 优点:
    1. 利用 Redis 存储,不受限于 JVM 内存上限
    2. 基于 Redis 的持久化机制,数据安全性有保障
    3. 可以满足消息的有序性
  4. 缺点:
    1. 无法避免消息丢失
    2. 只支持单消费者

image-20230403160911540

# PubSub 消息队列

  1. PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型,顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关信息
  2. SUBSCRIBE channel [channel]:订阅到一个或多个频道
  3. PUBLISH channel msg:向一个频道发送消息
  4. PSUBSCRIBE pattern [pattern]:订阅与 pattern 格式匹配的所有频道
  5. 优点:
    1. 发布订阅模型,支持多生产、多消费
  6. 缺点:
    1. 不支持数据持久化
    2. 无法避免消息丢失
    3. 消息堆积有上限,超出时数据丢失

image-20230403161639654

# 基于 Stream 的消息队列

  1. Stream 是 Redis5.0 引入的一种新的数据类型,可以实现一个功能非常完善的消息队列
  2. 消息可回溯
  3. 一个消息可以被多个消费者读取
  4. 可以阻塞读取
  5. 有消息漏读的风险

# 发送消息命令

image-20230403162513112

image-20230403162522097

# 读取消息命令

image-20230403163145175

image-20230403163200333

阻塞方法读取最新消息

image-20230403163231650

image-20230403163342348

# 基于 Stream 的消息队列 —— 消费者组

# 消费者组

  1. 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列
  2. 消息可回溯
  3. 可以多消费者争抢,加快消费速度
  4. 可以阻塞读取
  5. 没有消息漏读的风险
  6. 有消息确认机制,保证消息至少被消费一次

image-20230403163723998

# 创建消费者组

image-20230403163750169

  1. key:队列名称
  2. groupName:消费者组名称
  3. ID:起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息
  4. MKSTREAM:队列不存在时自动创建队列

常见命令

image-20230403163954388

# 从消费者组读取消息

image-20230403164113382

  1. group:消费者组名称
  2. consumer:消费者名称,如果消费者不存在,会创建一个消费者
  3. count:本次查询的最大数量
  4. BLOCK milliseconds:当没有消息时最长等待时间
  5. NOACK:无需手动 ACK,获取消息后自动确认
  6. STREAMS key:指定队列名称
  7. ID:获取消息的起始 ID
    1. “>”:从下一个未消费的消息开始
    2. 其它:根据指定 id 从 pending-list 中获取已消费但未确定的消息
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Baozi 微信支付

微信支付

Baozi 支付宝

支付宝

Baozi 微信

微信