zookeeper 3: 监听器Watcher的实现

xiaoxiao2021-02-28  121

watcher实现数据结构和数据流图

备注:图片过大,请右键新tab打开图片,放大查看

客户端

客户端接收Watch event的notification消息

所有的Watcher最终回调都是在ClientCnxn内部类SendThread中进行接收消息。server发给客户端的Notification事件通知的消息体的xid字段比较特殊,为-1.

class SendThread extends Thread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true; void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { //目前我们只关注 xid == -1 代表WatchedEvent的事件Notification // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); //WatcherEvent用来序列化和反序列化,内部字段顺序和WatchedEvent完全一致 event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); //最终Event事件执行靠EventThread线程,先放到EventThread的内部队列中 return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. .... } .... }

客户端对WatchedEvent处理的入口

而EventThread异步线程run()函数如下:

class EventThread extends Thread { private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); @Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); } .... }

服务器端

服务器端,所有对ZNode节点数据的增删改,都会触发事件。

DataTree触发Watcher入口

现在先以修改znode数据为例

文件:DataTree.java public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }

在最后,调用dataWatchers.triggerWatch(path, EventType.NodeDataChanged) 对path路径的节点进行Watcher回调。

文件:WatchManager.java public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { //这里的Watcher w实际上服务器代理客户端Watch对象,为NIOServerCnxn类 if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }

NIOServerCnxn代理Watcher进行数据通信

首先,介绍下NIOServerCnxn,其是客户端和Server进行通信的封装类,内部包含了socket连接的input、outputStream,是从NIO SelectedKey衍生出来的输入输出。

NIOServerCnxn的另外一个重要作用,使他实现了Watcher接口,完成对客户端Watcher的代理。

文件: NIOServerCnxn @Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); //注意这里xid为-1 if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); //对WatcherEvent进行序列化,网络传输 }
转载请注明原文地址: https://www.6miu.com/read-28079.html

最新回复(0)