/**
* ClassName: ZookeeperWatcher
* 本类就是个watcher类
* date: 2017年8月31日 上午9:44:31
* @author yanlong
*/
public class ZookeeperWatcher implements Watcher{
//定义原子变量
AtomicInteger seq = new AtomicInteger();
//定义Session失效时间
private static final int SESSION_TIMEOUT = 1000;
//zookeeper服务器地址
private static final String CONNECTION_ADDR = "112.124.121.34:2181";
//zk父路径设置
private static final String PARENT_PATH = "/p";
//zk子路径设置
private static final String CHILDREN_PATH = "/p/c";
//进入标识
private static final String LOG_PREFIX_OF_MAIN = "【main】";
//zk变量
private ZooKeeper zk = null;
//信号量设置 用于等待zookeeper连接后 通知阻塞程序继续往下走
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
*
* createConnection:创建zk连接
* @author yanlong
* date: 2017年8月31日 上午10:09:16
* @param connectAddr ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection(String connectAddr,int sessionTimeout){
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN+"开始连接zk服务器");
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 关闭连接
* releaseConnection:
* @author yanlong
* date: 2017年8月31日 上午10:10:37
*/
public void releaseConnection(){
if(this.zk != null){
try{
this.zk.close();
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
/**
*
* createPath:创建节点
* @author yanlong
* date: 2017年8月31日 上午10:14:58
* @param path 节点路径
* @param data 节点内容
* @return
*/
public boolean createPath(String path,String data,boolean needWatch){
//设置监控(由于zookeeper监控都是一次性的所以必须每次设置监控)
try {
this.zk.exists(path, needWatch);//创建节点之前 提前监控
System.out.println(LOG_PREFIX_OF_MAIN+"节点创建成功,Path:"+
this.zk.create(
/**路径**/
path,
/**数据**/
data.getBytes(),
/**所有可见**/
Ids.OPEN_ACL_UNSAFE,
/**永久存储**/
CreateMode.PERSISTENT)+",content:"+data);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
return true;
}
/**
*
* readData:读取指定节点数据内容
* @author yanlong
* date: 2017年8月31日 上午10:32:09
* @param path 节点路径
* @param needWatch
* @return
*/
public String readData(String path,boolean needWatch){
try {
System.out.println("读取操作...");
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
/**
*
* writeData:更新指定节点的数据内容
* @author yanlong
* date: 2017年8月31日 上午10:34:55
* @param path 节点路径
* @param data 数据内容
* @return
*/
public boolean writeData(String path,String data){
try {
System.out.println(LOG_PREFIX_OF_MAIN+"更新数据成功,path:"+path+",stat:"+
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 删除指定节点
* deleteNode:
* @author yanlong
* date: 2017年8月31日 上午10:38:29
* @param path 节点路径
*/
public void deleteNode(String path){
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN+"删除节点成功,path:"+path);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
*
* exists:判断节点是否存在
* @author yanlong
* date: 2017年8月31日 上午10:40:57
* @param path 节点路径
* @param needWatch
* @return
*/
public Stat exists(String path,boolean needWatch){
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
*
* getChildren:获取子节点
* @author yanlong
* date: 2017年8月31日 上午10:44:27
* @param path
* @param needWatch
* @return
*/
private List<String> getChildren(String path,boolean needWatch){
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
*
* deleteAllTestPath:删除所有节点
* @author yanlong
* date: 2017年8月31日 上午10:50:58
*/
public void deleteAllTestPath(boolean needWatch){
if(this.exists(CHILDREN_PATH, needWatch) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, needWatch) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
*
* TODO 删除所有节点
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
*/
@Override
public void process(WatchedEvent event) {
System.out.println("进入 process ...... event = "+event);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(event == null){
return;
}
//连接状态
KeeperState keeperState = event.getState();
//事件类型
EventType eventType = event.getType();
//受影响的path
String path = event.getPath();
String logPrefix = "【Watcher-"+this.seq.incrementAndGet()+"】";
System.out.println(logPrefix+"收到Watcher通知");
System.out.println(logPrefix+"连接状态 "+keeperState.toString());
System.out.println(logPrefix+"事件类型 "+eventType.toString());
if(KeeperState.SyncConnected == keeperState){
//成功连接上ZK服务器
if(EventType.None == eventType){
System.out.println(logPrefix+"成功连接上zk服务器");
connectedSemaphore.countDown();
}
//创建节点
else if(EventType.NodeCreated == eventType){
System.out.println(logPrefix+"节点创建");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.exists(path, true);
}
//更新节点
else if(EventType.NodeDataChanged == eventType){
System.out.println(logPrefix+"节点数据更新");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(logPrefix+"数据内容:"+this.readData(PARENT_PATH, true));
}
//更新子节点
else if(EventType.NodeChildrenChanged == eventType){
System.out.println(logPrefix+"子节点变更");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(logPrefix+"子节点列表"+this.getChildren(PARENT_PATH, true));
}
//删除节点
else if(EventType.NodeDeleted == eventType){
System.out.println(logPrefix+"节点"+path+" 被删除");
}
}else if(KeeperState.Disconnected == keeperState){
System.out.println(logPrefix+"与zk服务器断开连接");
}else if(KeeperState.AuthFailed == keeperState){
System.out.println(logPrefix+"权限检查失败");
}else if(KeeperState.Expired == keeperState){
System.out.println(logPrefix+"会话失效");
}
System.out.println("---------------------");
}
/**
* 测试zookeeper监控
* 测试watcher功能
* main:
* @author yanlong
* date: 2017年8月31日 上午11:20:43
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//建立watcher
ZookeeperWatcher zkWatcher = new ZookeeperWatcher();
//创建连接
zkWatcher.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
Thread.sleep(1000);
//清理节点
zkWatcher.deleteAllTestPath(false);
if(zkWatcher.createPath(PARENT_PATH, System.currentTimeMillis()+"",true)){//设置是否需要监听
Thread.sleep(1000);
//读取数据
//
System.out.println("--------read parent-----------");
zkWatcher.readData(PARENT_PATH, true);
//读取子节点
//
System.out.println("--------read children path-----------");
zkWatcher.getChildren(PARENT_PATH, false);
//更新数据
zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis()+"");
//
Thread.sleep(1000);
//创建子节点
zkWatcher.createPath(CHILDREN_PATH, System.currentTimeMillis()+"",true);
zkWatcher.createPath(CHILDREN_PATH+"/c1", System.currentTimeMillis()+"",true);
zkWatcher.createPath(CHILDREN_PATH+"/c1/c2", System.currentTimeMillis()+"",true);
Thread.sleep(1000);
//
//
zkWatcher.writeData(CHILDREN_PATH, System.currentTimeMillis()+"");
}
//
Thread.sleep(50000);
//清理节点
//
zkWatcher.deleteAllTestPath();
//
Thread.sleep(1000);
zkWatcher.releaseConnection();
}
}