ZookeeperServer详解

xiaoxiao2021-02-28  26

首先zookeeper server的位置在代码中org/apache/zookeeper/server这个package下面。之前有说过,zookeeper的启动类是ZookeeperServerMain.java这个类,在这个类中使用了

final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);

对ZookeeperServer进行实例化,其中txnLog是数据存储封装类,日志也是一种数据存储咯。然后在有

cnxnFactory = ServerCnxnFactory.createFactory(); //上面的具体点就是 ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); // 具体点就是NIOServerCnxnFactory //在这个类中 cnxnFactory.startup(zkServer); //具体的操作就是 start(); setZooKeeperServer(zks); if (startServer) { zks.startdata(); zks.startup(); }

其中zks.startup()具体操作就是

//这个从名字上应该可以猜到是干啥的,session跟踪器,管理session的 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //这个就很重要了 setupRequestProcessors(); //jmx的管理 registerJMX(); //设置状态 setState(State.RUNNING); notifyAll();

其中setupRequestProcessors()的具体内容为

RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start();

很明显是一种责任链的操作模式。具体每种processor是做什么的请看代码。在PrepRequestProcessor中,有LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();可以看到,其实处理器共享了请求队列,每次取出一个请求对其依次处理。那么这个submittedRequests从哪里来的呢? 继续看代码。首先,在下面的函数(同样在PrepRequestProcessor类中)中,有下面代码

public void processRequest(Request request) { submittedRequests.add(request); }

在ZooKeeperServer类中,有

public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }

同样在ZookeeperServer类中的

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {} long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {} private void close(long sessionId) {}

函数中,存在着submitRequest(si);命令。

我们以processPacket为例,继续跟进。 在NIOServerCnxn中,

private void readRequest() throws IOException { zkServer.processPacket(this, incomingBuffer); }

一直找下去可以找到 workerPool = new WorkerService(“NIOWorker”, numWorkerThreads, false);这个类在NIOServerCnxnFactory的内部私有类AcceptThread中被实例化,伴随着一起实例化的是

for(SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); }

再粘帖下AcceptThread的描述,该类之前的博文里面有介绍一些内容。

/** * There is a single AcceptThread which accepts new connections and assigns * them to a SelectorThread using a simple round-robin scheme to spread * them across the SelectorThreads. It enforces maximum number of * connections per IP and attempts to cope with running out of file * descriptors by briefly sleeping before retrying. */

以上是ZookeeperServer作为一个服务的核心处理环节。ZookeeperServer一共有以下的内部变量

protected static final Logger LOG; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); Environment.logEnv("Server environment:", LOG); } protected ZooKeeperServerBean jmxServerBean; protected DataTreeBean jmxDataTreeBean; public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; /** value of -1 indicates unset, use default */ protected int minSessionTimeout = -1; /** value of -1 indicates unset, use default */ protected int maxSessionTimeout = -1; protected SessionTracker sessionTracker; private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; private final AtomicLong hzxid = new AtomicLong(0); public final static Exception ok = new Exception("No prob"); protected RequestProcessor firstProcessor; protected volatile State state = State.INITIAL; protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR } /** * This is the secret that we use to generate passwords, for the moment it * is more of a sanity check. */ static final private long superSecret = 0XB3415C00L; private final AtomicInteger requestsInProcess = new AtomicInteger(0); final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>(); // this data structure must be accessed under the outstandingChanges lock final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; private final ServerStats serverStats; private final ZooKeeperServerListener listener; private ZooKeeperServerShutdownHandler zkShutdownHandler;

首先是管理扩展的几个Bean,接着是服务的参数 txnLogFactory是用来读写日志的 zkDb则管理着zookeeper的文件树结构 firstProcessor是处理器的入口 state标志着服务状态,用volatile保证了它的可见性 serverCnxnFactory和secureServerCnxnFactory是管理服务连接与处理流程的工厂,说工厂也不太合适,ServerCnxnFactory主要工作还是服务的管理和连接的控制,基本上核心流程都会与这个有关。 serverStats管理着服务的当前各类数据 zkShutdownHandler是处理服务关闭的钩子

ZookeeperServer相当于一个管理的大手,指挥着服务所有过程。核心代码。

zookeeper代码看了有一段时间了,用自己的话来说,zookeeper是一个分布式的存储服务组件,该组件具有较高的一致性和安全性,可以用作各种配置管理类的服务。其代码由几大重要部分组成,一是连接管理和控制,二是小数据存储服务,三是不同机器服务协调机制,四是jmx管理扩展模块。

这里也推荐其它的一些zookeeper博文,可以做一个对照来看,即使本系列文章有错误,不够精彩或者没讲到的地方,也可以通过下面的文章进行完善。 1.http://shift-alt-ctrl.iteye.com/blog/1845568 2.https://www.cnblogs.com/gpcuster/archive/2010/12/29/1921213.html 3.http://cailin.iteye.com/blog/2014486/ 4.http://blog.csdn.net/dengsilinming/article/details/18224925

接下来转战hadoop ^v^

转载请注明原文地址: https://www.6miu.com/read-2100047.html

最新回复(0)