redis队列-生产消费模式-简单实现

xiaoxiao2021-02-28  25

生产消费,不外乎就是生产新的消息插入到队列尾巴,消费者从队列头部取消息。

基于此,简单实现如下:(还有一种稍微复杂的实现,是结合了spring的实现,复杂实现)

往jedis队列尾部塞入消息

/** * 往列表尾部插入数据 * * @param key * @param value */ public static void rpush(String key, String value) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.rpush(key, value); } }

从jedis队列头部取出数据

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; public class RedisConsumer implements Runnable{ private Logger logger = LoggerFactory.getLogger(RedisConsumer.class); private static int MAX_ERR_TIMES = 10; private static int EXCEPTION_SLEEP_SECONDS = 3; private String key; public RedisConsumer(String key){ this.key = key; logger.info("创建消费者"); } @Override public void run() { logger.info("消费者执行,"); try (Jedis jedis = JedisUtil.getJedisPoolUtil().getJedis()) { int errorTimes = 0; boolean runFlag = true; while(runFlag){ try { String msg = jedis.blpop(0,key).get(1); logger.info("消费消息,msg={}",msg); //业务逻辑处理 ... errorTimes = 0; }catch (Exception e){ errorTimes++; if(errorTimes > MAX_ERR_TIMES){ logger.warn("消费者异常次数超过阈值,关闭线程,请查看redis服务是否关闭或异常"); runFlag = false; break; } try { Thread.sleep(EXCEPTION_SLEEP_SECONDS*1000); } catch (InterruptedException e1) { logger.warn("消费者异常,睡眠被打断",e1); } logger.warn("消费者异常",e); } } } } }

jedis工具类

import redis.clients.jedis.Jedis; public class JedisUtil { private static JedisPoolUtil jedisPoolUtil = SpringUtils.getBean("jedisPoolUtil"); /** * 返回连接池 */ public static JedisPoolUtil getJedisPoolUtil() { return jedisPoolUtil; } /** * 设置有效期的字符串缓存 * * @param key * @param value * @param seconds */ public static void set(String key, String value, int seconds) { /** * JDK 7新特性写法,实现了AutoCloseable接口,这个写法会自动调用close方法关闭 */ try (Jedis jedis = jedisPoolUtil.getJedis()) { jedis.setex(key, seconds, value); } } /** * 设置字符串缓存 * * @param key * @param value */ public static void set(String key, String value) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.set(key, value); } } /** * 设置byte[]缓存 * * @param key * @param value */ public static void setByte(byte[] key, byte[] value,int expire) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.set(key, value); expireBytesKey(key, expire); } } /** * 设置有效期的字符串缓存 * * @param key * @param value * @param seconds */ public static void setex(String key, int seconds, String value) { try (Jedis jedis = jedisPoolUtil.getJedis()) { jedis.setex(key, seconds, value); } } /** * 获取字符串数据 * * @param key * @return */ public static String get(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.get(key); } } /** * 获取byte[]数据 * * @param key * @return */ public static byte[] getByte(byte[] key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.get(key); } } /** * 往列表头部插入数据 * * @param key * @param value */ public static void lpush(String key, String value) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.lpush(key, value); } } /** * 往列表尾部插入数据 * * @param key * @param value */ public static void rpush(String key, String value) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.rpush(key, value); } } /** * 从列表头部获取数据 * * @param key * @return */ public static String lpop(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.lpop(key); } } /** * 从列表尾部获取数据 * * @param key * @return */ public static String rpop(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.rpop(key); } } /** * 获取列表长度 * * @param key * @return 列表长度,若返回-1则表示取值发生异常 */ public static long llen(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.llen(key); } } /** * 计数器递增 * @param key * @return 执行 incr 命令之后 key 的值 */ public static long incr(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.incr(key); } } /** * 计数器递减 * @param key * @return 执行 decr 命令之后 key 的值 */ public static long decr(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.decr(key); } } /** * 删除键 * @param key * @return 执行 decr 命令之后 key 的值 */ public static long delByte(byte[] key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.del(key); } } /** * 设置key有效期 * @param key * @param seconds */ public static void expire(String key,int seconds) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.expire(key, seconds); } } public static void expireBytesKey(byte[] key, int expire){ try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.expire(key, expire); } } /** * 判断member是否是集合key的成员 * @param key * @param member * @return */ public static boolean sismember(String key, String member) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.sismember(key, member); } } /** * 往集合SET添加数据 * @param key * @param member */ public static void sadd(String key,String member) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.sadd(key, member); } } /** * 获取键的剩余有效秒数 * 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以毫秒为单位,返回 key 的剩余生存时间 * 注意:在 Redis 2.8 以前,当 key 不存在,或者 key 没有设置剩余生存时间时,命令都返回 -1 * @param key */ public static Long pttl(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.pttl(key); } } /** * 删除键 * @param key */ public static void del(String key) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.del(key); } } /** * 获取List第n个值 * @param key */ public static String lindex(String key,Long index) { try(Jedis jedis = jedisPoolUtil.getJedis()) { return jedis.lindex(key,index); } } }

制造多个消费者

new Thread(new RedisConsumer(msgProcessor, MsgConstant.REDIS_QUEUE_KEY)).start(); logger.info("第一个消费者启动完成"); new Thread(new RedisConsumer(msgProcessor,MsgConstant.REDIS_QUEUE_KEY)).start(); logger.info("第二个消费者启动完成");

注意

blpop:阻塞式从redis消息队列头部取出消息。如果没有取到消息,就会导致链接阻塞,直到有新的消息生产并存进队列。

lpop:非阻塞式从redis消息队列头部取出消息。

转载请注明原文地址: https://www.6miu.com/read-2631642.html

最新回复(0)