Watcher的demo

xiaoxiao2021-02-28  84

/**  * ClassName: ZookeeperWatcher   * 本类就是个watcher类  * date: 2017年8月31日 上午9:44:31   * @author yanlong  */ public class ZookeeperWatcher implements Watcher{ //定义原子变量 AtomicInteger seq = new AtomicInteger(); //定义Session失效时间 private static final int SESSION_TIMEOUT = 1000; //zookeeper服务器地址 private static final String CONNECTION_ADDR = "112.124.121.34:2181"; //zk父路径设置 private static final String PARENT_PATH = "/p"; //zk子路径设置 private static final String CHILDREN_PATH = "/p/c"; //进入标识 private static final String LOG_PREFIX_OF_MAIN = "【main】"; //zk变量 private ZooKeeper zk = null; //信号量设置 用于等待zookeeper连接后 通知阻塞程序继续往下走 private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * createConnection:创建zk连接 * @author yanlong * date: 2017年8月31日 上午10:09:16  * @param connectAddr ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection(String connectAddr,int sessionTimeout){ this.releaseConnection(); try { zk = new ZooKeeper(connectAddr, sessionTimeout, this); System.out.println(LOG_PREFIX_OF_MAIN+"开始连接zk服务器"); connectedSemaphore.await(); } catch (Exception e) { e.printStackTrace(); } } /** * 关闭连接 * releaseConnection: * @author yanlong * date: 2017年8月31日 上午10:10:37 */ public void releaseConnection(){ if(this.zk != null){ try{ this.zk.close(); }catch(InterruptedException e){ e.printStackTrace(); } } } /** * createPath:创建节点 * @author yanlong * date: 2017年8月31日 上午10:14:58  * @param path 节点路径 * @param data 节点内容 * @return */ public boolean createPath(String path,String data,boolean needWatch){ //设置监控(由于zookeeper监控都是一次性的所以必须每次设置监控) try { this.zk.exists(path, needWatch);//创建节点之前 提前监控 System.out.println(LOG_PREFIX_OF_MAIN+"节点创建成功,Path:"+ this.zk.create( /**路径**/ path,  /**数据**/ data.getBytes(),  /**所有可见**/ Ids.OPEN_ACL_UNSAFE,  /**永久存储**/ CreateMode.PERSISTENT)+",content:"+data); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } return true; } /** * readData:读取指定节点数据内容 * @author yanlong * date: 2017年8月31日 上午10:32:09  * @param path 节点路径 * @param needWatch * @return */ public String readData(String path,boolean needWatch){ try { System.out.println("读取操作..."); return new String(this.zk.getData(path, needWatch, null)); } catch (Exception e) { e.printStackTrace(); return ""; } } /** * writeData:更新指定节点的数据内容 * @author yanlong * date: 2017年8月31日 上午10:34:55  * @param path 节点路径 * @param data 数据内容 * @return */ public boolean writeData(String path,String data){ try { System.out.println(LOG_PREFIX_OF_MAIN+"更新数据成功,path:"+path+",stat:"+ this.zk.setData(path, data.getBytes(), -1)); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 删除指定节点 * deleteNode: * @author yanlong * date: 2017年8月31日 上午10:38:29  * @param path 节点路径 */ public void deleteNode(String path){ try { this.zk.delete(path, -1); System.out.println(LOG_PREFIX_OF_MAIN+"删除节点成功,path:"+path); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * exists:判断节点是否存在 * @author yanlong * date: 2017年8月31日 上午10:40:57  * @param path 节点路径 * @param needWatch * @return */ public Stat exists(String path,boolean needWatch){ try { return this.zk.exists(path, needWatch); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } /** * getChildren:获取子节点 * @author yanlong * date: 2017年8月31日 上午10:44:27  * @param path * @param needWatch * @return */ private List<String> getChildren(String path,boolean needWatch){ try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } /** * deleteAllTestPath:删除所有节点 * @author yanlong * date: 2017年8月31日 上午10:50:58 */ public void deleteAllTestPath(boolean needWatch){ if(this.exists(CHILDREN_PATH, needWatch) != null){ this.deleteNode(CHILDREN_PATH); } if(this.exists(PARENT_PATH, needWatch) != null){ this.deleteNode(PARENT_PATH); } } /** * TODO 删除所有节点 * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent) */ @Override public void process(WatchedEvent event) { System.out.println("进入 process ...... event = "+event); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if(event == null){ return; } //连接状态 KeeperState keeperState = event.getState(); //事件类型 EventType eventType = event.getType(); //受影响的path String path = event.getPath(); String logPrefix = "【Watcher-"+this.seq.incrementAndGet()+"】"; System.out.println(logPrefix+"收到Watcher通知"); System.out.println(logPrefix+"连接状态 "+keeperState.toString()); System.out.println(logPrefix+"事件类型 "+eventType.toString()); if(KeeperState.SyncConnected == keeperState){ //成功连接上ZK服务器 if(EventType.None == eventType){ System.out.println(logPrefix+"成功连接上zk服务器"); connectedSemaphore.countDown(); } //创建节点 else if(EventType.NodeCreated == eventType){ System.out.println(logPrefix+"节点创建"); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.exists(path, true); } //更新节点 else if(EventType.NodeDataChanged == eventType){ System.out.println(logPrefix+"节点数据更新"); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(logPrefix+"数据内容:"+this.readData(PARENT_PATH, true)); } //更新子节点 else if(EventType.NodeChildrenChanged == eventType){ System.out.println(logPrefix+"子节点变更"); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(logPrefix+"子节点列表"+this.getChildren(PARENT_PATH, true)); } //删除节点 else if(EventType.NodeDeleted == eventType){ System.out.println(logPrefix+"节点"+path+" 被删除"); } }else if(KeeperState.Disconnected == keeperState){ System.out.println(logPrefix+"与zk服务器断开连接"); }else if(KeeperState.AuthFailed == keeperState){ System.out.println(logPrefix+"权限检查失败"); }else if(KeeperState.Expired == keeperState){ System.out.println(logPrefix+"会话失效"); } System.out.println("---------------------"); } /** * 测试zookeeper监控 * 测试watcher功能 * main: * @author yanlong * date: 2017年8月31日 上午11:20:43  * @param args * @throws Exception  */ public static void main(String[] args) throws Exception { //建立watcher ZookeeperWatcher zkWatcher = new ZookeeperWatcher(); //创建连接 zkWatcher.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT); Thread.sleep(1000); //清理节点 zkWatcher.deleteAllTestPath(false); if(zkWatcher.createPath(PARENT_PATH, System.currentTimeMillis()+"",true)){//设置是否需要监听 Thread.sleep(1000); //读取数据 // System.out.println("--------read parent-----------"); zkWatcher.readData(PARENT_PATH, true); //读取子节点 // System.out.println("--------read children path-----------"); zkWatcher.getChildren(PARENT_PATH, false); //更新数据 zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis()+""); // Thread.sleep(1000); //创建子节点 zkWatcher.createPath(CHILDREN_PATH, System.currentTimeMillis()+"",true); zkWatcher.createPath(CHILDREN_PATH+"/c1", System.currentTimeMillis()+"",true); zkWatcher.createPath(CHILDREN_PATH+"/c1/c2", System.currentTimeMillis()+"",true); Thread.sleep(1000); // // zkWatcher.writeData(CHILDREN_PATH, System.currentTimeMillis()+""); } // Thread.sleep(50000); //清理节点 // zkWatcher.deleteAllTestPath(); // Thread.sleep(1000); zkWatcher.releaseConnection(); } }
转载请注明原文地址: https://www.6miu.com/read-65276.html

最新回复(0)