先介绍一下项目背景。
我们开发的一个社群项目,就是给拥有共同兴趣的小伙伴提供一个共同的社群-用来讨论.记录生活的一款软件。
开发这个项目是由ruby和java共同完成的,刚开始是由ruby独立开发,在项目不断壮大之后,感觉单服务部署架构有点太冗杂,所以开始考虑分布式架构,然后就是java和ruby同时开发服务端,之间的通信采用的是restful api 进行交互。
接下来介绍一下需求:java端开发了一个打卡活动的服务。然后有一个点赞的功能是在ruby端完成的。ruby完成点赞计数功能,java端负责计算所对应的分数,进行加分和减分操作。
接下来介绍一下出现的问题:当时java端书写api的时候,全部是在一个方法中同步完成的操作,由于积分的业务逻辑有点复杂,导致api的响应时间稍微有点长,ruby端说不行,于是就改成异步的。由于打卡项目是一个比较小的项目,并没有集成消息队列,所以我就采用多线程的方式来做异步。
实现方式:
@Configuration @EnableAsync public class BeanConfig { @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(30); // 设置最大线程数 executor.setMaxPoolSize(60); // 设置队列容量 executor.setQueueCapacity(120); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("daka-"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }在调用service方法的时候在方法名上添加一个@Async注解,这个注解表明运行该方法会开启一个线程来运行,如果数量不够,则会进入等待。
当时在测试服务器上测试,代码运行ok,响应时间ok。 (注:测试服务器是单节点部署的-一个docker容器。)
之后部署到正式环境。两台服务器-6个docker容器。
运行一段时间之后出现重复的打卡积分数据,导致程序抛出异常。刚开始并没有注意到是咋回事,后来查看日志发现抛出异常的次数越来越多,这时候开始决定排查问题,
首先分析了一下数据库中存放的数据,发现重复的数据有一个特点就是存入数据库的时间是相同的。
这时候怀疑是请求出现了问题,然后联系ruby端的同学帮忙开始连调接口,询问他们是否一秒之内重复请求我的api了,ruby同学回答是没有,这时候就很怀疑了,照常理来说在客户端点击点赞按钮,一秒钟是不可能操作两次的,因为客户端操作会有一些提示,总之一个点赞流程下来,怎么也得2秒。这时候怀疑是被用程序点赞了。
然后找到问题所在,出现并发问题怎么办? 当然是加锁了。mysql-乐观锁不适合,mysql悲观锁-效率低,redis锁 刚刚好!
既然找到解决方案,那就开干。
String redisLockKey = RedisConstans.getScoreLockKey(noticeCode,dakaNoteId,scoreUserId,scoreOperatorId,objId); boolean scoreLock = clusterLock.getScoreLock(redisLockKey); if(!scoreLock){ return ResultContent.resultPassOk(); } String redisKey = RedisConstans.getScoreKey(noticeCode,dakaNoteId,scoreUserId,scoreOperatorId,objId); Boolean isOperateScore = true; if(noticeCode.equals(EnumEvent.LIKE.key) || noticeCode.equals(EnumEvent.REPLY.key) || noticeCode.equals(EnumEvent.CAREFULLY_CHOSEN.key)){ Boolean flag = redisData.hasKey(redisKey); if(flag){ clusterLock.releaseScoreLock(redisLockKey); return ResultContent.resultPassOk(); }else{ redisData.set(redisKey,String.valueOf(System.currentTimeMillis())); } }else{ isOperateScore = false; }查看代码。
1,首先根据特定的id声明一个全局的redis key。
2,获取锁的代码逻辑,就是利用redis的setIfAbsent方法往redis中塞入一个key,如果塞入成功返回true如果失败返回false
public boolean getScoreLock(String key) { Date now = new Date(); return valueOperations.setIfAbsent(key, String.valueOf(now.getTime())); }这样也就解决了并发问题,但是还有一个问题就是之前的代码查询数据是否存在的时候是查询的mysql数据库,如果两个连接同时访问mysql,返回的结果是相同的,所以查询数据库来判断数据是否存在在高并发情况下是不起作用的,所以继续使用redis,往redis中塞入一个key,查询的时候查询redis来判断数据是否存在。
3,释放锁-也就是删除redis中的key
在return 前面 加上删除锁的代码就ok了。
搞定!上线测试没问题,未发现重复数据。
由于之前采用多线程来处理数据库插入的问题,在修改的代码的时候总是感觉不太好,就像找个东西来替换,activeMq,rabbitMq 现如今市场上有很多处理异步任务的消息队列,由于项目小,并发不是很高,考虑成本没必要,所以采用的是redis做的消息队列。
redis做消息队列链接
redis做消息队列链接