案例
package com.secbro.learn.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * Created by zhuzs on 2017/4/15. */ public class CuratorNodeCacheTest { public static void main(String[] args) throws Exception { CuratorFramework client = getClient(); String path = "/p1"; final NodeCache nodeCache = new NodeCache(client,path); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("监听事件触发"); System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData())); } }); client.setData().forPath(path,"456".getBytes()); client.setData().forPath(path,"789".getBytes()); client.setData().forPath(path,"123".getBytes()); client.setData().forPath(path,"222".getBytes()); client.setData().forPath(path,"333".getBytes()); client.setData().forPath(path,"444".getBytes()); Thread.sleep(15000); } private static CuratorFramework getClient(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .namespace("demo") .build(); client.start(); return client; } }执行结果:
监听事件触发 重新获得节点内容为:123 监听事件触发 重新获得节点内容为:333 监听事件触发 重新获得节点内容为:444重复执行多次打印结果不同,但最后一个都会打印出“444”这个内容。
前面的章节已经提到zookeeper原生API注册Watcher需要反复注册,即Watcher触发之后就需要重新进行注册。另外,客户端断开之后重新连接到服务器也是需要一段时间。这就导致了zookeeper客户端不能够接收全部的zookeeper事件。zookeeper保证的是数据的最终一致性。因此,对于此问题需要特别注意,在不要对zookeeper事件进行强依赖。
zookeeper的getData(),getChildren()和exists()方法都可以注册watcher监听。而监听有以下几个特性:
一次性触发(one-time trigger) 当数据改变的时候,那么一个Watch事件会产生并且被发送到客户端中。但是客户端只会收到一次这样的通知,如果以后这个数据再次发生改变的时候,之前设置Watch的客户端将不会再次收到改变的通知,因为Watch机制规定了它是一个一次性的触发器。
当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了 getData(“/znode1”, true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。
发送给客户端(Sent to the client) 这个表明了Watch的通知事件是从服务器发送给客户端的,是异步的,这就表明不同的客户端收到的Watch的时间可能不同,但是ZooKeeper有保证:当一个客户端在看到Watch事件之前是不会看到结点数据的变化的。例如:A=3,此时在上面设置了一次Watch,如果A突然变成4了,那么客户端会先收到Watch事件的通知,然后才会看到A=4。
Zookeeper 客户端和服务端是通过 Socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。
被设置了watch的数据(The data for which the watch was set) 这是指节点发生变动的不同方式。你可以认为ZooKeeper维护了两个watch列表:data watch和child watch。getData()和exists()设置data watch,而getChildren()设置child watch。或者,可以认为watch是根据返回值设置的。getData()和exists()返回节点本身的信息,而getChildren()返回 子节点的列表。因此,setData()会触发znode上设置的data watch(如果set成功的话)。一个成功的 create() 操作会触发被创建的znode上的数据watch,以及其父节点上的child watch。而一个成功的 delete()操作将会同时触发一个znode的data watch和child watch(因为这样就没有子节点了),同时也会触发其父节点的child watch。Watch由client连接上的ZooKeeper服务器在本地维护。这样可以减小设置、维护和分发watch的开销。当一个客户端连接到一个新的服务器上时,watch将会被以任意会话事件触发。当与一个服务器失去连接的时候,是无法接收到watch的。而当client重新连接时,如果需要的话,所有先前注册过的watch,都会被重新注册。通常这是完全透明的。只有在一个特殊情况下,watch可能会丢失:对于一个未创建的znode的exist watch,如果在客户端断开连接期间被创建了,并且随后在客户端连接上之前又删除了,这种情况下,这个watch事件可能会被丢失。
对于watch,ZooKeeper提供了这些保障:
Watch与其他事件、其他watch以及异步回复都是有序的。 ZooKeeper客户端库保证所有事件都会按顺序分发。客户端会保障它在看到相应的znode的新数据之前接收到watch事件。//这保证了在process()再次利用zk client访问时数据是存在的从ZooKeeper接收到的watch事件顺序一定和ZooKeeper服务所看到的事件顺序是一致的。经过上面的描述,对于上一篇博客中连续修改节点内容部分监听事件丢失的原因也就变得显而易见了。虽然curator帮开发人员封装了重复注册监听的过程,但是内部依旧需要重复进行注册,而在第一个watcher触发第二个watcher还未注册成功的间隙,进行节点数据的修改,显然无法收到watcher事件。