hbase 源代码解析(2)HAdmin 的表创建过程

xiaoxiao2021-02-28  74

这个源代码将写一个系列。先走流程一样。过一遍,然后重点分析里面的功能。

HConection 里的一个重要方法.getAdmin。获得HBaseAdmin。

*HBaseAdmin提供了建表、创建列簇、检查表是否存在、修改表结构和列簇结构、删除表等功能。*

这章主要将HBaseAdmin的createTable客户端的代码。

hbaseAdmin对象拿到connetion。保存conf,rpc等一些东西。

HBaseAdmin(ClusterConnection connection) { this.conf = connection.getConfiguration(); this.connection = connection; this.rpcCallerFactory = connection.getRpcRetryingCallerFactory(); this.rpcControllerFactory = connection.getRpcControllerFactory(); this.ng = this.connection.getNonceGenerator(); }

以crateTable。不阻塞,可以等。在future里的等就像。所以是同步方法。

/** * Creates a new table but does not block and wait for it to come online. * You can use Future.get(long, TimeUnit) to wait on the operation to complete.

Future future = createTableAsyncV2(desc, splitKeys); 应该尽量去配置splitKeys,可以为null 但是不能为”“.getBytes().

然后调用executeCallable

CreateTableResponse response = executeCallable( new MasterCallable<CreateTableResponse>(getConnection()) { @Override public CreateTableResponse call(int callTimeout) throws ServiceException { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(desc.getTableName()); CreateTableRequest request = RequestConverter.buildCreateTableRequest( desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); return master.createTable(controller, request); } });

这个最主要的是master.createTable(controller, request);这个后面点讲 new MasterCallable 实现RetryingCallable 在这里主要是

interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface this.master = this.connection.getKeepAliveMasterService(); MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();

这里master 就是一个MasterProtos。用于通信 这个会调用一个callWithRetries的方法。重复尝试,

private static <V> V executeCallable(MasterCallable<V> callable, RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) throws IOException { RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable, operationTimeout); } finally { callable.close(); } }

上面掉了一个东西。我们知道,client首先访问的应该是zk,并且重zk里拿到HMaster的地址。下面将介绍怎么连接zk的。 在callWithRetries里有这么一段代码。

// bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block callable.prepare(tries != 0); // if called with false, check table status on ZK interceptor.intercept(context.prepare(callable, tries));

这里的prepare就是去获取zk

@Override public void prepare(boolean reload) throws IOException { this.master = this.connection.getKeepAliveMasterService(); }

connetion里的方法。首先判断Master是否在运行。第一次的时候因为mss.getStub()是空的返回false 所以 newMasterServiceStubMaker

@Override public MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException { synchronized (masterAndZKLock) { if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); try { this.masterServiceState.stub = stubMaker.makeStub();

这个是makestub的主要方法。这里都很重要,

private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); } try { checkIfBaseNodeAvailable(zkw); ServerName sn = MasterAddressTracker.getMasterAddress(zkw); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); throw new MasterNotRunningException(msg); } if (isDeadServer(sn)) { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort(), hostnamesCanChange); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { stub = stubs.get(key); if (stub == null) { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); isMasterRunning(); stubs.put(key, stub); } } return stub; } finally { zkw.close(); } }

1)首先是getKeepAliveZooKeeperWatcher 这里就获取到zookeper的连接的。this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);

2)然后就是从zk里的默认路径/hbase/master拿到ServerName名字 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);

3)最重要的一步建立stub BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); 这个stub保存在mater的rpc连接。然后通过protos就能访问MasterService。

回到master 这个下一张节。。

未完待续….

转载请注明原文地址: https://www.6miu.com/read-46971.html

最新回复(0)