网络知识 娱乐 第六章 交易性能优化技术之缓存库存

第六章 交易性能优化技术之缓存库存

第六章 交易性能优化技术之缓存库存

交易性能瓶颈

  • jmeter压测(对活动下单过程进行压测,采用post请求,设置传入参数,性能发现下单avarage大约2s,tps500,交易验证主要完全依赖数据库的操作)
  • 交易验证完全依赖数据库
  • 库存行锁
  • 后置处理逻辑

Untitled

交易验证优化

  • 用户风控策略优化:策略缓存模型化
    • 在开始交易后,针对活动实时信息和用户实时信息的验证,目的是为了风控策略。
    • 检查用户账号是否异常,是否异地登陆,策略是:通过异步的方式将用户模型写入缓存,与实时信息做一致性检验,做到风控策略。
  • 活动校验策略优化:引入活动发布流程,模型缓存化,紧急下线能力
    • 实时活动的缓存存在一个问题:如果后台修改活动信息(修改活动结束时间),但redis的缓存还处于正常有效期,用户依然可以以活动价格秒杀商品,因此需要有紧急下线的能力。
    • 对应的策略是:在活动开始前半个小时发布活动,对缓存预热,同时后台设计一个紧急下线的接口,清除redis缓存,那么用户下单时就会去数据库查询活动的最新信息了。

下单以及用户验证优化

我们每次下单都需要查询数据库中的用户信息以及商品信息,我们把下过单的用户以及商品信息进行缓存,减少对数据库的访问。

商品信息缓存

  • ItemService.java 中添加 getItemByIdInCache 方法
//验证item mode及promo mode缓存模型是否有效
ItemModel getItemByIdInCache(Integer id);
  • ItemServiceImpl.java 实现方法:
		// 从缓存中获取商品信息
    public ItemModel getItemByIdInCache(Integer id) {
        // 根据商品id去redis缓存中获取商品信息
        ItemModel itemModel = (ItemModel) redisTemplate.opsForValue().get("item_validate_" + id);
        // 如果商品信息为空,就去数据库中查询商品信息,并将商品信息存储中redis缓存中,并设置过期时间10分钟
        if (itemModel == null) {
            itemModel = this.getItemById(id);
            redisTemplate.opsForValue().set("item_validate_" + id, itemModel);
            // 设置redis缓存过期时间为10分钟
            redisTemplate.expire("item_validate_" + id, 10, TimeUnit.MINUTES);
        }
        return itemModel;
    }
  • 然后在下单的 OrderServiceImpl.java 中把从数据库拿变成去缓存中取:
public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
        //1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
        // ItemModel itemModel = itemService.getItemById(itemId);
        ItemModel itemModel = itemService.getItemByIdInCache(itemId); //根据商品id获取商品信息

用户信息缓存

  • UserService.java 中添加 getUserByIdInCache 方法
// 通过缓存获取用户对象
UserModel getUserByIdInCache(Integer id);
  • UserServiceImpl.java 实现方法:
		// 从缓存中获取用户信息
    public UserModel getUserByIdInCache(Integer id) {
        // 根据用户id去redis缓存中获取用户信息
        UserModel userModel = (UserModel) redisTemplate.opsForValue().get("user_validate_" + id);
        // 如果用户信息为空,就去数据库中查询用户信息,并将用户信息存储中redis缓存中,并设置过期时间10分钟
        if (userModel == null) {
            userModel = this.getUserById(id);
            redisTemplate.opsForValue().set("user_validate_" + id, userModel);
            redisTemplate.expire("user_validate_" + id, 10, TimeUnit.MINUTES);
        }
        return userModel;
    }
  • 然后在下单的 OrderServiceImpl.java 中把从数据库拿变成去缓存中取:
		@Transactional
    public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
        //1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
        // ItemModel itemModel = itemService.getItemById(itemId);
        ItemModel itemModel = itemService.getItemByIdInCache(itemId); //根据商品id获取商品信息
        if (itemModel == null) { // 商品是否为空
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR, "商品信息不存在");
        }
        //1.1校验当前下单用户是否是正常用户
        // UserModel userModel = userService.getUserById(userId);
        UserModel userModel = userService.getUserByIdInCache(userId);

性能压测

  • 没有做缓存优化:

Untitled

  • 做了缓存优化的:

Untitled

库存行锁优化(重点)

  • 我们之前减库存的操作:
		
    <update id="decreaseStock">
        update item_stock
        set stock = stock - #{amount}
        where item_id = #{itemId}
          and stock >= #{amount}
    </update>
  • 库存的数量就是 (stock - amount),条件是 item_id 存在 并且 stock 要大于等于 amount
  • item_id 要加上唯一索引,这样查询的时候会为数据库加上行锁,否则是数据库表锁
  • item_stock 表中的 item_id 字段加上唯一索引:

Untitled

Untitled

扣减库存缓存化(方案一)

  • 方案是:我们要将扣减库存的操作发生在缓存而不是数据库中,缓存的扣减时间相对较少
  • 流程:
    • 活动发布同时同步库存进缓存
    • 下单交易减缓存库存

具体操作步骤:

  • PromoService 接口中添加活动发布接口
		//活动发布
    void publishPromo(Integer promoId);
  • PromoServiceImpl 实现类(这里默认在获取活动id以及商品信息的时候库存不发生变化)
		@Override
    public void publishPromo(Integer promoId) {
        // 通过活动id获取活动
        PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
        // 如果没有活动直接返回即可
        if (promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0) {
            return;
        }
        ItemModel itemModel = itemService.getItemById(promoDO.getItemId());

        // 将活动商品库存同步到redis中
        redisTemplate.opsForValue().set("promo_item_stock_" + itemModel.getId(), itemModel.getStock());
    }
  • ItemController 中开发一个模拟前端发布活动的后端接口
		@RequestMapping(value = "/publishpromo", method = {RequestMethod.GET})
    @ResponseBody
    // 这个@RequestParam表示这个参数是必传的,如果没传则会报ServletRequestBindingException错误,然后被GlobalExceptionHandler类所捕获
    public CommonReturnType publishpromo(@RequestParam(name = "promoId") Integer promoId) {
        promoService.publishPromo(promoId);
        return CommonReturnType.create(null);
    }
  • 修改 ItemServiceImpl 中的 decreaseStock 方法:减库存操作应该减掉的是 redis 缓存中的库存
		@Override
    @Transactional // 涉及到库存减的操作,所以要保证事务的一致性
    public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {

//        int affectedRow = itemStockDOMapper.decreaseStock(itemId, amount); // 返回的是受影响的行数
//        if (affectedRow > 0) {
//            // 更新库存成功
//            return true;
//        } else {
//            // 更新库存失败
//            return false;
//        }

        //将stock库存表中对应的商品id的对应库存数量减去购买数量
        //increment(-3)等于减去3,返回的是计算后的值
        long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue() * -1);
        if (result >= 0) {
            // 更新库存成功
            return true;
        } else {
            // 更新库存失败
            return false;
        }

    }
  • 虽然已经做到减库存,减去的是缓存中的库存的操作,但这样还存在数据库记录不一致的问题。
  • 数据库中的库存数据和redis中的库存数据是完全不一致的,数据库中的库存数据是没被减过的。
  • 若redis突然发生宕机,那之前减库存的数据就都没了,然后用户下单了,你实际的库存却没变化。

异步同步数据库(方案二)

采用异步消息队列的方式,将异步扣减的消息同步给消息的 consumer 端,并由消息的consunmer 端完成数据库中库存的扣减操作

  • 活动发布同步库存进缓存
  • 下单交易减缓存库存
  • 异步消息的方式扣减数据库内存,让用户可以通过redis进行快速下单购买的操作,同时redis的缓存数据又可以以异步的方式存储到mysql数据库中,保证数据的一致性。

异步消息队列 rocketmq

采用异步队列可以既能让C端用户完成购买商品的高效体验,又能保证数据库的一致性。

常见的异步消息中间件用到的有 ActiveMQ(实现java的AMS)、Kafka(基于流式处理)、RocketMQ 是阿里巴巴基于 Kafka 改造的一种异步消息队列。

  • 高性能,高并发,分布式消息中间件
  • 典型应用场景:分布式事务,异步解耦

Untitled

RocketMQ 主要有:

  • Producer 端,负责向Broker发送消息;
  • Consumer 端,多个 consumer 组成一个 ConsumerGroup,每个消息会由一个 Group 里的consumer 来消费;
  • Broker 由 topic 和 MessageQueue 组成,消息隶属于某个 topic,一个 topic 可能由一个或多个 topic 管理。

部署模型

Untitled

Untitled

  • 首先 NameServer 相当于一个注册管理器,Broker1 向 NameServer 发出注册请求,NameServer 记录Broker1的ip以及它负责的topicA,topicA负责的queue1和queue2;
  • Producer 连接 NameServer发现 broker1,会向topicA为主题的broker1投递消息,采用负载轮询向queue投递;
  • Consumer 抓取负责的topicA,与 queue 建立长连接,当有消息时,唤醒,拉取对应的message,没有消息就等待,这种方式叫做长轮询。
  • 一个consumer对应一个group,会平均划分,如果出现consumer过多,会有空闲。一个项目中会出现多个不同的Consumer Group,比如订单系统、商品系统等。
  • 若一个queue被多个consumer消费,会存在锁竞争机制,rocketmq采用的策略是以queue为单位平均分配,尽量保证consumer与queue数量相等。

Untitled

  • 多个Broker会有主从复制机制,用于应对 broker1 异常,nameserver 将 broker2 设为主库,通知 producer 以及 consumer 端去接管,Broker1和Broker2的数据可以是同步也可以是异步的。

RocketMQ 安装

rocketmq 官网,下载压缩包 上传到服务器

  • RocketMQ 默认的虚拟机内存较大,启动 Broker 或者 NameServer 可能会因为内存不足而导致失败,所以需要编辑如下两个配置文件,修改 JVM 内存大小
# 编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vi bin/runbroker.sh
	# 参考设置
	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

$ vi bin/runserver.sh
	# 参考设置
	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  • 启动 NameServer
# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动 Broker
# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log

测试 RocketMQ

发送消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

关闭 RocketMQ

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

缓存库存接入异步化

  • 配置 appliaction.yml
mq:
  nameserver:
    addr: 43.138.131.175:9876
  topicname: stock
  • MqProducer.java
@Component
public class MqProducer {

    private DefaultMQProducer producer;

    //声明value注解,引入配置变量
    @Value("${mq.nameserver.addr}")
    private String nameAddr;

    @Value("${mq.topicname}")
    private String topicName;

    @PostConstruct
    public void init() throws MQClientException {
        // 做mq producer的初始化
        producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr(nameAddr);
        producer.start();
    }

    // 同步库存扣减消息
    public boolean asyncReduceStock(Integer itemId, Integer amount) {
        Map<String, Object> bodyMap = new HashMap<>();
        bodyMap.put("itemId", itemId);
        bodyMap.put("amount", amount);
        //投放消息
        Message message = new Message(topicName, "increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
        try {
            producer.send(message);
        } catch (MQClientException e) {
            e.printStackTrace();
            return false;
        } catch (RemotingException e) {
            e.printStackTrace();
            return false;
        } catch (MQBrokerException e) {
            e.printStackTrace();
            return false;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

}
  • ItemServiceImpl 实现缓存减库存以及发送消息减数据库的库存
		@Override
    @Transactional // 涉及到库存减的操作,所以要保证事务的一致性
    public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {

        //将stock库存表中对应的商品id的对应库存数量减去购买数量
        //increment(-3)等于减去3,返回的是计算后的值
        long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue() * -1);
        if (result >= 0) {
            //更新库存成功,发送消息,减数据库的库存
            boolean mqResult = mqProducer.asyncReduceStock(itemId, amount);
            if (!mqResult) {
                //发送消息失败,需要回滚redis库存,将库存重新补回去
                redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue());
                return false;
            }
            return true;
        } else {
            // 更新库存失败,需要回滚redis库存,将库存重新补回去
            redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue());
            return false;
        }

    }
  • MqConsumer.java
@Component
public class MqConsumer {

    private DefaultMQPushConsumer consumer;

    //声明value注解,引入配置变量
    @Value("${mq.nameserver.addr}")
    private String nameAddr;

    @Value("${mq.topicname}")
    private String topicName;

    @Autowired
    private ItemStockDOMapper itemStockDOMapper;

    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer("stock_consumer_group");
        consumer.setNamesrvAddr(nameAddr);
        //订阅所有消息
        consumer.subscribe(topicName, "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //实现库存真正到数据库内扣减的逻辑
                Message msg = msgs.get(0);
                String jsonString = new String(msg.getBody());
                Map<String, Object> map = JSON.parseObject(jsonString);
                Integer itemId = (Integer) map.get("itemId");
                Integer amount = (Integer) map.get("amount");

                itemStockDOMapper.decreaseStock(itemId, amount);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}
  • 最终通过 itemStockDOMapper 调用数据库减库存

Untitled