1.创建监听
public class SubListener extends JedisPubSub { public static Logger LOG = LoggerFactory.getLogger(SubListener.class); @Override public void onMessage(String channel, String message) { LOG.info("onMessage C1 "+channel+" M1 "+message); } @Override public void onPMessage(String pattern, String channel, String message) { LOG.info("onPMessage C "+channel+" M "+message); } @Override public void onSubscribe(String channel, int subscribedChannels) { LOG.info("onSubscribe C "+channel+" S "+subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { LOG.info("onUnsubscribe C "+channel+" S "+subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { LOG.info("onPUnsubscribe P "+pattern+" S "+subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { LOG.info("onPSubscribe P "+pattern+" S "+subscribedChannels); }
2.发布与订阅工具类
rediscluster.properties为redis的配置文件
public class RedisPubSubProxy { public static int REDIS_TIMEOUT =20000; public static final Log LOG = LogFactory.getLog(RedisPubSubProxy.class); public HashMap<String,JedisPool> poolmap=new HashMap<String, JedisPool>(); public JedisPool getJedis(int clustorid){ JedisPool obj =poolmap.get(clustorid+""); if(obj==null){ Properties properties = new Properties(); InputStream is = RedisPubSubProxy.class.getResourceAsStream("/rediscluster.properties" ); JedisPoolConfig jedispool_config = new JedisPoolConfig(); try { properties.load(is); // jedispool_config.setMaxActive(Integer.parseInt(properties.getProperty("redis."+clustorid+".maxActive"))); // jedispool_config.setMaxIdle(Integer.parseInt(properties.getProperty("redis." + clustorid + ".maxIdle"))); // jedispool_config.setMaxWait(Long.parseLong(properties.getProperty("redis." + clustorid + ".maxWait"))); jedispool_config.setTestOnBorrow(Boolean.getBoolean(properties.getProperty("redis." + clustorid + ".testOnBorrow"))); String sharded = properties.getProperty("redis."+clustorid+".sharded1"); String ip = sharded.split(":")[0]; int port =Integer.parseInt(sharded.split(":")[1]); JedisPool pool =new JedisPool(jedispool_config,ip,port,REDIS_TIMEOUT); poolmap.put(clustorid+"",pool); obj=pool; LOG.info( "RedisPubSubProxy{" + ", ip "+ip+" port "+port+ '}'); } catch (IOException e) { e.printStackTrace(); } } return obj; } /** * 订阅 * @param clustor * @param subListener 消息通过onMessage接收 * @param patterns 单独列表 */ public void subscribe(final int clustor, SubListener subListener,final String... patterns){ final SubListener fsubListener =subListener; new Thread(new Runnable() { @Override public void run() { try { JedisPool pool =getJedis(clustor); Jedis jedis =pool.getResource(); if(jedis!=null) { for(String t:patterns) { LOG.info("subscribe " + clustor + " " + t); } jedis.subscribe(fsubListener, patterns); } } catch (Exception e) { e.printStackTrace(); } } },"Thread-Subscribe-"+clustor).start(); }
public long publish(int clustor,String channel,String message){ long result =0; try { JedisPool pool =getJedis(clustor); Jedis jedis=pool.getResource(); if(jedis!=null){ LOG.info("publish "+clustor+" "+channel+" "+message); result= jedis.publish(channel,message); } } catch (Exception e) { e.printStackTrace(); } return result; }
}
3.发布
RedisPubSubProxy redisPubSubProxy = new RedisPubSubProxy(); redisPubSubProxy.publish(1,"List",“msg”); //redis id,通道名称,消息
4.订阅
SubListener subListener = new SubListener(); RedisPubSubProxy redisPubSubProxy = new RedisPubSubProxy(); redisPubSubProxy.subscribe(1,subListener,"List");
数据由监听里的onMessage方法接受
