hbase 源代码解析(1) Connection

xiaoxiao2021-02-28  111

最近打算对源代码学习。作为自己学习备忘。今天开始写hbase。对hbase的接触两年多。如果不对源码分析总感觉确点什么。写的肯定比不上大牛。但是学习过程的分享,我能懂,大学能学点什么就够了。

已经写了第一个roaringbitmap的一个系列了。写完之后,感觉自己什么都懂了。 http://blog.csdn.net/chenfenggang/article/details/74611144 http://blog.csdn.net/chenfenggang/article/details/74781791 http://blog.csdn.net/chenfenggang/article/details/74788617 http://blog.csdn.net/chenfenggang/article/details/74826685

**Client: 使用HBase RPC机制与HMaster和HRegionServer进行通信 Client与HMaster进行通信进行管理类操作 Client与HRegionServer进行数据读写类操作**

客户端首先建立建立连接:

try (Connection connection = ConnectionFactory.createConnection(config); 主要是采用反射机制生成了一个 String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL, ConnectionManager.HConnectionImplementation.class.getName());

这个里面实现内主要做了两件事,一个rpcCLient。还有一个是

AsyncProcess this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);

Rpc生成也是用反射。hbase 里面特别喜欢用这种机制,用这种机制应该是方便扩展吧。

public static RpcClient createClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) { String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class.getName()); return ReflectionUtils.instantiateWithCustomCtor( rpcClientClass, new Class[] { Configuration.class, String.class, SocketAddress.class, MetricsConnection.class }, new Object[] { conf, clusterId, localAddr, metrics } ); }

使用这个获得socket factory,拿到这个基本就可以建立连接了。 NetUtils.getDefaultSocketFactory(conf)

另外一个控制类

if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (ConnectionManager.nonceGenerator == null) { ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator(); } this.nonceGenerator = ConnectionManager.nonceGenerator; } } else { this.nonceGenerator = new NoNonceGenerator(); } stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);

**ConnectionManager.nonceGenerator //每个客户端随机的NonceGEnerator,主要是为了生成clientid。 stats ;创建跟踪该connection所相关的region 信息监控实例 this.asyncProcess 创建一个异步进程实例,该进程主要负责持续的请求流 this.interceptor //远程服务器出现故障时,进行处理的机制 this.rpcCallerFactory //RpcRetryingCaller创建工厂 this.rpcClient //负责IPC调用相关**

总而言之,拿到socket factory 基本就已经结束。 rpcClient 和asyncProcess 一人一个。

未完待续。。。。

转载请注明原文地址: https://www.6miu.com/read-44878.html

最新回复(0)