在进入今天的正题之前,先来简单介绍下Zookeeper:
  Zookeeper是一个分布式应用程序协调服务,保证数据的一致性,其提供的功能包括:配置维护、域名维护、分布式同步、组服务等。
  watch监控机制是zookeeper的关键技术之一,本文将通过zk的部分源码来简单了解下watch机制的实现原理。

watch监控机制的实现原理

  当今时代,发布订阅场景到处可见,像微信中的公众号消息订阅,或者网购场景下库存消息的订阅通知等等,这些都是属于发布订阅的场景。
  watch监控机制是zk的一个关键技术,zk通过它来实现发布订阅的功能,通过watch我们可以联想到设计模式中的观察者模式,二者确实有点类似,你可以将其看成是分布式场景下的观察者模式。
原理系列之——zookeeper的watch监控机制

客户端watch的注册和回调

客户端watch注册实现过程:
  发送一个带有watch事件的请求——>DataWatchRegistration保存watch事件——>将请求封装成Packet并放入一个队列等待发送——>调用SendThread中的readResponse——>ZKWatchManager将该watch事件进行存储

//Zookeeper.java
    public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        PathUtils.validatePath(path);
        ZooKeeper.WatchRegistration wcb = null;
        if (watcher != null) {
        	//注册watch
            wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
        }
        String serverPath = this.prependChroot(path);
        RequestHeader h = new RequestHeader();
        h.setType(4);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(Code.get(r.getErr()), path);
        } else {
            if (stat != null) {
                DataTree.copyStat(response.getStat(), stat);
            }
            return response.getData();
        }
    }
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration, watchDeregistration);
        synchronized(packet) {
            while(!packet.finished) {
                packet.wait();
            }
            return r;
        }
    }
    public ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
        ClientCnxn.Packet packet = null;
        packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        synchronized(this.state) {
            if (this.state.isAlive() && !this.closing) {
                if (h.getType() == -11) {
                    this.closing = true;
                }
                this.outgoingQueue.add(packet);
            } else {
                this.conLossPacket(packet);
            }
        }
        this.sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }
    class SendThread extends ZooKeeperThread {
        .....
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) {
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
                }
            } else if (replyHdr.getXid() == -4) {
                if (replyHdr.getErr() == Code.AUTHFAILED.intValue()) {
                    ClientCnxn.this.state = States.AUTH_FAILED;
                    ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
                }
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                }
            } else if (replyHdr.getXid() == -1) {
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                if (ClientCnxn.this.chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
                        event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
                    } else {
                        ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
                    }
                }
                WatchedEvent we = new WatchedEvent(event);
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
                }
                ClientCnxn.this.eventThread.queueEvent(we);
            } else if (this.tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
            } else {
                ClientCnxn.Packet packet;
                synchronized(ClientCnxn.this.pendingQueue) {
                    if (ClientCnxn.this.pendingQueue.size() == 0) {
                        throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                    }
                    packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
                }
                try {
                    if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                        packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
                        throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
                    }
                    packet.replyHeader.setXid(replyHdr.getXid());
                    packet.replyHeader.setErr(replyHdr.getErr());
                    packet.replyHeader.setZxid(replyHdr.getZxid());
                    if (replyHdr.getZxid() > 0L) {
                        ClientCnxn.this.lastZxid = replyHdr.getZxid();
                    }
                    if (packet.response != null && replyHdr.getErr() == 0) {
                        packet.response.deserialize(bbia, "response");
                    }
                    if (ClientCnxn.LOG.isDebugEnabled()) {
                        ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
                    }
                } finally {
                    ClientCnxn.this.finishPacket(packet);
                }
            }
        }
private void finishPacket(ClientCnxn.Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        if (p.watchDeregistration != null) {
            Map materializedWatchers = null;
            try {
                materializedWatchers = p.watchDeregistration.unregister(err);
                Iterator i$ = materializedWatchers.entrySet().iterator();
                while(i$.hasNext()) {
                    Entry<EventType, Set<Watcher>> entry = (Entry)i$.next();
                    Set<Watcher> watchers = (Set)entry.getValue();
                    if (watchers.size() > 0) {
                        this.queueEvent(p.watchDeregistration.getClientPath(), err, watchers, (EventType)entry.getKey());
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            } catch (NoWatcherException var9) {
                LOG.error("Failed to find watcher!", var9);
                p.replyHeader.setErr(var9.code().intValue());
            } catch (KeeperException var10) {
                LOG.error("Exception when removing watcher", var10);
                p.replyHeader.setErr(var10.code().intValue());
            }
        }
        if (p.cb == null) {
            synchronized(p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            this.eventThread.queuePacket(p);
        }
    }

客户端回调处理过程:
  在SendThread.readResponse()中的xid=-1来进行处理——>调用 eventThread.queueEvent()进行处理

    class SendThread extends ZooKeeperThread {
        .....
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) {
  				....
            } else if (replyHdr.getXid() == -1) {
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                if (ClientCnxn.this.chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
                        event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
                    } else {
                        ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
                    }
                }
                WatchedEvent we = new WatchedEvent(event);
                if (ClientCnxn.LOG.isDebugEnabled()) {
                    ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
                }
                ClientCnxn.this.eventThread.queueEvent(we);
            }
            ....
        }

服务端watch的注册和触发

服务端watch注册实现过程:
  判断请求是否带有watch注册事件——>通过FinalRequestProcessor中的processRequest解析请求——>若有watch,则调用zks.getZKDatabase().getData将事件注册到WatchManager

//FinalRequestProcessor.java
    public void processRequest(Request request) {
		....
        ProcessTxnResult rc = null;
        synchronized(this.zks.outstandingChanges) {
            rc = this.zks.processTxn(request);
            if (request.getHdr() != null) {
                TxnHeader hdr = request.getHdr();
                Record txn = request.getTxn();
                long zxid = hdr.getZxid();
                while(!this.zks.outstandingChanges.isEmpty() && ((ChangeRecord)this.zks.outstandingChanges.get(0)).zxid <= zxid) {
                    ChangeRecord cr = (ChangeRecord)this.zks.outstandingChanges.remove(0);
                    if (cr.zxid < zxid) {
                        LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid);
                    }
                    if (this.zks.outstandingChangesForPath.get(cr.path) == cr) {
                        this.zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
            }
            if (request.isQuorum()) {
                this.zks.getZKDatabase().addCommittedProposal(request);
            }
        }
        if (request.type != -11 || !this.connClosedByClient(request) || !this.closeSession(this.zks.serverCnxnFactory, request.sessionId) && !this.closeSession(this.zks.secureServerCnxnFactory, request.sessionId)) {
            if (request.cnxn != null) {
                ServerCnxn cnxn = request.cnxn;
                String lastOp = "NA";
                this.zks.decInProcess();
                Code err = Code.OK;
                Object rsp = null;
                try {
                    if (request.getHdr() != null && request.getHdr().getType() == -1) {
                        if (request.getException() != null) {
                            throw request.getException();
                        }
                        throw KeeperException.create(Code.get(((ErrorTxn)request.getTxn()).getErr()));
                    }
                    KeeperException ke = request.getException();
                    if (ke != null && request.type != 14) {
                        throw ke;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{}", request);
                    }
                    boolean removed;
                    String msg;
                    WatcherType type;
                    Stat stat;
                    DataNode n;
                    List children;
                    Stat stat;
                    label170:
                    switch(request.type) {
                    ...
                    case 4:
                        lastOp = "GETD";
                        GetDataRequest getDataRequest = new GetDataRequest();
                        ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                        n = this.zks.getZKDatabase().getNode(getDataRequest.getPath());
                        if (n == null) {
                            throw new NoNodeException();
                        }
                        PrepRequestProcessor.checkACL(this.zks, this.zks.getZKDatabase().aclForNode(n), 1, request.authInfo);
                        stat = new Stat();
                        byte[] b = this.zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
                        rsp = new GetDataResponse(b, stat);
                        break;
                    ....
                } catch (SessionMovedException var15) {
                    ....
                }
                long lastZxid = this.zks.getZKDatabase().getDataTreeLastProcessedZxid();
                ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
                this.zks.serverStats().updateLatency(request.createTime);
                cnxn.updateStatsForResponse((long)request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
                try {
                    cnxn.sendResponse(hdr, (Record)rsp, "response");
                    if (request.type == -11) {
                        cnxn.sendCloseSession();
                    }
                } catch (IOException var14) {
                    LOG.error("FIXMSG", var14);
                }
            }
        }
    }

服务端watch事件的触发过程:
  setData()对节点数据变更——>调用 WatchManager.triggerWatch 触发事件——>触发之后删除(客户端watch机制是一次性的)

//DataTree.java
    public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
        Stat s = new Stat();
        DataNode n = (DataNode)this.nodes.get(path);
        if (n == null) {
            throw new NoNodeException();
        } else {
            byte[] lastdata = null;
            byte[] lastdata;
            synchronized(n) {
                lastdata = n.data;
                n.data = data;
                n.stat.setMtime(time);
                n.stat.setMzxid(zxid);
                n.stat.setVersion(version);
                n.copyStat(s);
            }
            String lastPrefix = this.getMaxPrefixWithQuota(path);
            if (lastPrefix != null) {
                this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
            }
            this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
            return s;
        }
    }
//WatchManager.java
    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        HashSet watchers;
        synchronized(this) {
            watchers = (HashSet)this.watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
                }
                return null;
            }
            Iterator i$ = watchers.iterator();
            while(i$.hasNext()) {
                Watcher w = (Watcher)i$.next();
                HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        Iterator i$ = watchers.iterator();
        while(true) {
            Watcher w;
            do {
                if (!i$.hasNext()) {
                    return watchers;
                }
                w = (Watcher)i$.next();
            } while(supress != null && supress.contains(w));
            w.process(e);
        }
    }
//DataTree.java
    public void deleteNode(String path, long zxid) throws NoNodeException {
        int lastSlash = path.lastIndexOf(47);
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        DataNode node = (DataNode)this.nodes.get(path);
        if (node == null) {
            throw new NoNodeException();
        } else {
            this.nodes.remove(path);
            synchronized(node) {
                this.aclCache.removeUsage(node.acl);
            }
            DataNode parent = (DataNode)this.nodes.get(parentName);
            if (parent == null) {
                throw new NoNodeException();
            } else {
                synchronized(parent) {
                    parent.removeChild(childName);
                    parent.stat.setPzxid(zxid);
                    long eowner = node.stat.getEphemeralOwner();
                    EphemeralType ephemeralType = EphemeralType.get(eowner);
                    if (ephemeralType == EphemeralType.CONTAINER) {
                        this.containers.remove(path);
                    } else if (ephemeralType == EphemeralType.TTL) {
                        this.ttls.remove(path);
                    } else if (eowner != 0L) {
                        HashSet<String> nodes = (HashSet)this.ephemerals.get(eowner);
                        if (nodes != null) {
                            synchronized(nodes) {
                                nodes.remove(path);
                            }
                        }
                    }
                }
                if (parentName.startsWith("/zookeeper") && "zookeeper_limits".equals(childName)) {
                    this.pTrie.deletePath(parentName.substring("/zookeeper/quota".length()));
                }
                String lastPrefix = this.getMaxPrefixWithQuota(path);
                if (lastPrefix != null) {
                    this.updateCount(lastPrefix, -1);
                    int bytes = false;
                    int bytes;
                    synchronized(node) {
                        bytes = node.data == null ? 0 : -node.data.length;
                    }
                    this.updateBytes(lastPrefix, (long)bytes);
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG, 64L, "dataWatches.triggerWatch " + path);
                    ZooTrace.logTraceMessage(LOG, 64L, "childWatches.triggerWatch " + parentName);
                }
                Set<Watcher> processed = this.dataWatches.triggerWatch(path, EventType.NodeDeleted);
                this.childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
                this.childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
            }
        }
    }

总结

  客户端和服务端各自处理watch事件,并将所需信息分别在两端,这样一来,能够减少彼此之间的通信内容和频率,大大提升了服务的处理性能。