在选课中有个中心环节需要格外的注意,那就对每一门课程数量增减。假如一门《走进英国》的选修课,因资源有限只有100个名额,在redis中以Key/Value形式存储–(CousreID,Capacity);客户端每次选课退课都只是对Capacity进行增减,它的业务实现逻辑是这样的:
1.判断redis中是否存在该门课程的缓存;获得数据或者重新加载; 2.判断redis中该门课程的容量是否可减(是否可选); 3.容量减一 4.更新容量 5.发送消息给MQ
虽然redis是单线程,但是考虑到在更新容量(Capacity)时可能已经有其他线程已经进行了超出预料的减操作(当当前为容量为1时,假如在进行更新为0操作之前,已经有其他线程进行截胡,这样会导致容量为-1,这是不对的)。
针对这个问题,解决的方案的第一反应是加锁,将这个共享资源(Capacity)加上锁,或者将整个涉及业务逻辑增减的模块加上锁。初步采用关键字 synchronized给容量加上锁。第一版的部分代码如下:
//选课操作业务逻辑总览: //1.判断redis中是否存在该门课程的缓存;获得数据或者重新加载; //2.判断redis中该门课程的容量是否可减(是否可选); //3. 容量减一 // 更新容量 // 发送消息给MQ //1.定义redisID,用来从redis中拿出课程容量数据 String courseid="AA3044980558B660DB40DA";//课程:功能性食品 redisId ="tm-choosecourseimpl:" + courseid;//redis中的命名规则 //2.判断redis中是否存在该门课程的数据(已经提前将该课程存入redis中) // 存在则更新redis;向mq发送数据 boolean flagCourse = JedisCacheUtil.exists(redisId); synchronized(this) {//给容量加上锁 if (flagCourse) { //存在 //获取redis容量的物理值 int capaction = Integer.valueOf(JedisCacheUtil.get(redisId)); //给容量加上锁 if (capaction > 0) { //容量足够,进行容量减操作 JedisCacheUtil.decr(redisId); //2.运用注入的RabbitMQ模板类进行消息的发送 try { //要添加到mq中的数据 String msg =teachClassId+";"+studentId+";"+true; //经过springAOP 跳转到mqConsumer类中的listen方法中 template.convertAndSend(msg); } catch (Exception ex) { //返回状态码,提示信息:mq连接失败 // return false; } } } } 经过测试这块代码在单机部署的情况下是可以满足容量原子性操作的,但是在微服务的架构下却显得有很大缺陷,主要原因还在于synchronized本身,它是属于JVM范围内的锁机制,但是当多个机器部署多份时,弱点就显示出来的。synchronized的锁机制在多个节点环境会失效。于是又出现了个分布式锁的概念。经过协商决定从Redis事务方面找突破口,最后采用Redis本身的watch机制来实现这块的业务。Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:
事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。 一个事务从开始到执行会经历以下三个阶段: 开始事务。MULTI 命令入队。 执行事务。EXEC Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
修改后的代码如下:
//选课操作业务逻辑总览: //1.判断redis中是否存在该门课程的缓存;获得数据或者重新加载; //2.判断redis中该门课程的容量是否可减(是否可选); //3. 容量减一 // 更新容量 // 发送消息给MQ //1.定义redisID,用来从redis中拿出课程容量数据 String courseid="AA3044980558B660DB40DA";//课程:功能性食品 redisId ="tm-choosecourseimpl:" + courseid;//redis中的命名规则 //2.判断redis中是否存在该门课程的数据(已经提前将该课程存入redis中) // 存在则更新redis;向mq发送数据 //JedisCacheUtil 封装的redisCluster boolean flagCourse = JedisCacheUtil.exists(redisId); if (flagCourse) { //加锁:redis 分布式事务锁 JedisCacheUtil.watch(redisId); Transaction transaction = JedisCacheUtil.multi(); //存在 //获取redis容量的物理值 int capaction = Integer.valueOf(JedisCacheUtil.get(redisId)); //给容量加上锁 if (capaction > 0) { //容量足够,进行容量减操作 transaction.set(redisId,String.valueOf(capaction - 1) ); List<Object> result = transaction.exec(); if (result == null || result.isEmpty()) { System.out.println("Transaction error...");// 可能是watch-key被外部修改,或者是数据操作被驳回 } //2.运用注入的RabbitMQ模板类进行消息的发送 try { //要添加到mq中的数据 String msg =teachClassId+";"+studentId+";"+true; //经过springAOP 跳转到mqConsumer类中的listen方法中 template.convertAndSend(msg); } catch (Exception ex) { //返回状态码,提示信息:mq连接失败 // return false; } } }