redis订阅发布

xiaoxiao2021-02-28  93

1.创建监听

public class SubListener extends JedisPubSub {     public static Logger LOG = LoggerFactory.getLogger(SubListener.class);     @Override     public void onMessage(String channel, String message) {         LOG.info("onMessage C1 "+channel+" M1 "+message);     }     @Override     public void onPMessage(String pattern, String channel, String message) {         LOG.info("onPMessage C "+channel+" M "+message);     }     @Override     public void onSubscribe(String channel, int subscribedChannels) {         LOG.info("onSubscribe C "+channel+" S "+subscribedChannels);     }     @Override     public void onUnsubscribe(String channel, int subscribedChannels) {         LOG.info("onUnsubscribe C "+channel+" S "+subscribedChannels);     }     @Override     public void onPUnsubscribe(String pattern, int subscribedChannels) {         LOG.info("onPUnsubscribe P "+pattern+" S "+subscribedChannels);     }     @Override     public void onPSubscribe(String pattern, int subscribedChannels) {         LOG.info("onPSubscribe P "+pattern+" S "+subscribedChannels);     }

2.发布与订阅工具类

rediscluster.properties为redis的配置文件

public class RedisPubSubProxy {     public static int REDIS_TIMEOUT =20000;     public static final Log LOG = LogFactory.getLog(RedisPubSubProxy.class);     public HashMap<String,JedisPool> poolmap=new HashMap<String, JedisPool>();     public JedisPool getJedis(int clustorid){         JedisPool obj =poolmap.get(clustorid+"");         if(obj==null){             Properties properties = new Properties();             InputStream is = RedisPubSubProxy.class.getResourceAsStream("/rediscluster.properties" );             JedisPoolConfig jedispool_config = new JedisPoolConfig();             try {                 properties.load(is);                // jedispool_config.setMaxActive(Integer.parseInt(properties.getProperty("redis."+clustorid+".maxActive")));                // jedispool_config.setMaxIdle(Integer.parseInt(properties.getProperty("redis." + clustorid + ".maxIdle")));                // jedispool_config.setMaxWait(Long.parseLong(properties.getProperty("redis." + clustorid + ".maxWait")));                 jedispool_config.setTestOnBorrow(Boolean.getBoolean(properties.getProperty("redis." + clustorid + ".testOnBorrow")));                 String sharded = properties.getProperty("redis."+clustorid+".sharded1");                 String ip = sharded.split(":")[0];                 int port =Integer.parseInt(sharded.split(":")[1]);                 JedisPool pool =new JedisPool(jedispool_config,ip,port,REDIS_TIMEOUT);                 poolmap.put(clustorid+"",pool);                 obj=pool;                 LOG.info( "RedisPubSubProxy{" +                         ", ip "+ip+" port "+port+                         '}');             } catch (IOException e) {                 e.printStackTrace();             }         }         return obj;    }     /**      * 订阅      * @param clustor      * @param subListener   消息通过onMessage接收      * @param patterns   单独列表      */     public void subscribe(final int clustor, SubListener subListener,final String... patterns){        final SubListener fsubListener =subListener;         new Thread(new Runnable() {             @Override             public void run() {                 try {                     JedisPool pool =getJedis(clustor);                     Jedis jedis =pool.getResource();                     if(jedis!=null) {                         for(String t:patterns) {                             LOG.info("subscribe " + clustor + " " + t);                         }                         jedis.subscribe(fsubListener, patterns);                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             }         },"Thread-Subscribe-"+clustor).start();     }

public long publish(int clustor,String channel,String message){         long result =0;         try {             JedisPool pool =getJedis(clustor);             Jedis jedis=pool.getResource();             if(jedis!=null){                 LOG.info("publish "+clustor+" "+channel+" "+message);                result= jedis.publish(channel,message);             }         } catch (Exception e) {             e.printStackTrace();         }         return result;     }

}

3.发布

RedisPubSubProxy redisPubSubProxy = new RedisPubSubProxy(); redisPubSubProxy.publish(1,"List",“msg”); //redis id,通道名称,消息

4.订阅

SubListener subListener = new SubListener(); RedisPubSubProxy redisPubSubProxy = new RedisPubSubProxy(); redisPubSubProxy.subscribe(1,subListener,"List");

数据由监听里的onMessage方法接受

转载请注明原文地址: https://www.6miu.com/read-71866.html

最新回复(0)