redis+lua脚本实现分布式流量控制器

xiaoxiao2021-02-28  44

背景:公司消息系统出口流量有限,需要做到分布式流量控制机制,这里使用redis 的队列+redis lua脚本实现了一个分布式流量控制器

通常的流量控制,采取一段时间内的发送数量与阀值对比,这样会造成 A 时间段不超过阀值,B时间段也不超过阀值,但A 和B之间的时间段超过阀值。也就是说不是那么的精确。

故事:“那我们为什么要在一段时间内比较数量,而不是在一个数量值上比较时间呢”,大脑里灵光一闪,出现了这句话,让我做出了这个流量控制器,这里采用的算法的思想概括为一句话就是,相同数量比较时间,具体算法见下图。

 

左侧为流程图,右侧为redis 中用来记录历史发送时间的队列

 

 

 

 

 

lua脚本如下:

 

local function addToQueue(x,time) local count=0 for i=1,x,1 do redis.call('lpush',KEYS[1],time) count=count+1 end return count end local result=0 local timeBase = redis.call('lindex',KEYS[1], tonumber(ARGV[2])-tonumber(ARGV[1])) if (timeBase == false) or (tonumber(ARGV[4]) - tonumber(timeBase)>tonumber(ARGV[3])) then result=result+addToQueue(tonumber(ARGV[1]),tonumber(ARGV[4])) end if (timeBase~=false) then redis.call('ltrim',KEYS[1],0,tonumber(ARGV[2])) end return result

 

 

java:

 

/** * Bestpay.com.cn Inc. * Copyright (c) 2011-2017 All Rights Reserved. */ package com.bestpay.messagecenter.product.core.redis.impl; import com.bestpay.messagecenter.product.common.constant.RedisProductKeys; import com.bestpay.messagecenter.product.common.util.StreamUtil; import com.bestpay.messagecenter.product.core.redis.ConfigRedisService; import com.bestpay.messagecenter.product.core.redis.QosRedisService; import com.google.common.base.Throwables; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import redis.clients.jedis.Jedis; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * redis list 滑动窗口限流服务 * @author lxn * @version Id: QosRedisServiceImpl.java, v 0.1 2017/6/30 17:36 lxn Exp $$ */ @Slf4j @Service public class QosRedisServiceImpl implements QosRedisService { /** * Jedis连接操作 */ @Resource private JedisConnectionFactory jedisConnectionFactory; /** * 脚本的sha1 */ private String scriptShal; @Autowired private ConfigRedisService configRedisService; /** * 启动载入lua脚本到redis */ @PostConstruct public void loadScript() { JedisConnection connection = jedisConnectionFactory.getConnection(); Jedis jedis = connection.getNativeConnection(); String script = StreamUtil.convertStreamToString(FreqRedisServiceImpl.class.getClassLoader(). getResourceAsStream("qosScript.lua")); this.scriptShal = jedis.scriptLoad(script); log.info("滑动窗口流控脚本载入成功,sha1:{}", this.scriptShal); connection.close(); } /** * 非阻塞请求 * @param count 申请的数量 * @param rateCount 限流数量 * @param rateTime 限流时间 毫秒 * @return */ @Override public long acquirePromise(String redisKey,long count, long rateCount, long rateTime) { Assert.hasText(redisKey,"限流key不能为空"); Assert.isTrue(count>0,"申请的数量不能小于0"); Assert.isTrue(rateCount>0,"限流数量不能小于0"); Assert.isTrue(rateTime>0,"限流时间不能小于0"); List<String> keys = new ArrayList<>(); keys.add(redisKey);//队列名 List<String> values = new ArrayList<>(); values.add(String.valueOf(count)); //申请发送的数量 1 values.add(String.valueOf(rateCount));//阀值数量 2 values.add(String.valueOf(rateTime));//阀值时间(毫秒)3 values.add(String.valueOf(System.currentTimeMillis()));//申请的时间4 JedisConnection connection=null; try { connection = jedisConnectionFactory.getConnection(); Jedis jedis = connection.getNativeConnection(); Object evalResult = jedis.evalsha(scriptShal, keys, values); return Long.parseLong(evalResult.toString()); }finally { if(connection!=null) { connection.close(); } } } /** * 阻塞请求 * @param count * @param rateCount * @param rateTime */ @Override public void acquirePromiseBlock(String redisKey,long count, long rateCount, long rateTime) { while (acquirePromise(redisKey,count, rateCount, rateTime)<=0){ int sleepTime = configRedisService.getConfigInt(RedisProductKeys.getCfgQosLimitThreadSleepTime()); try { Thread.sleep(sleepTime); }catch (InterruptedException e){ log.error("流量控制线程睡眠失败{}", Throwables.getStackTraceAsString(e)); // 恢复中断状态 Thread.currentThread().interrupt(); } } } }

 

 

 

 

 

 

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-75745.html

最新回复(0)