Dubbo Provider启动流程源码分析

xiaoxiao2021-02-28  18

简单的官方demo:

provider的java代码:

public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); context.start(); System.in.read(); // 按任意键退出 }

provider的spring配置:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> <!-- 提供方应用信息,用于计算依赖关系 --> <dubbo:application name="demo-provider"/> <!-- 使用multicast广播注册中心暴露服务地址 --> <dubbo:registry address="multicast://224.5.6.7:1234"/> <!-- 用dubbo协议在20880端口暴露服务 --> <dubbo:protocol name="dubbo" port="20880"/> <!-- 和本地bean一样实现服务 --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- 声明需要暴露的服务接口 --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> </beans>

通过配置文件,以及spring的mvc初始化流程,我们可以假设下服务启动流程: 1. spring配置文件的加载以及初始化 2. bean的单例生成 3. dubbo管理所有服务实现单例对象 4. 向配置中心注册服务信息

spring自定义标签 dubbo标签初始化实现了Spring提供的NamespaceHandler接口,所以下面先看看DubboNamespaceHandler类:

public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { // DubboBeanDefinitionParser定义了如何解析dubbo节点信息 // DubboBeanDefinitionParser的第一个参数是beanclass // 应用相关配置 registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // 模块相关配置 registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); // 注册中心相关配置 registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); // registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); // 服务提供者配置 registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); // 服务消费者配置 registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); // 网络协议相关配置 registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); // 服务bean配置 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); // registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); // registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }

在DubboBeanDefinitionParser中的parse中,解析设置了大部分配置信息以及服务信息。 我们可以关注下其中beanclass的源码,因为这章主要分析的是provider,这里从provider进行分析: 首先是beanName叫做ServiceBean的bean实例。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware { ... // 初始化bean的时候执行 public void afterPropertiesSet() throws Exception { ...// 初始化各种配置 // 发布服务 if (!isDelay()) { export(); } } }

发布代码:

public synchronized void export() { ... // 延迟导出 if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }

导出代码:

protected synchronized void doExport() { ... doExportUrls(); } private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } // method => invoker => exporter private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ... // 导出服务 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // 注册中心可以是zk,consul等 // 注册中心host String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 注册中心port Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // 获取暴露范围配置 String scope = url.getParameter(Constants.SCOPE_KEY); if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 如果暴露本地服务 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 如果暴露远端服务,走服务发现流程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); // 注册地址 URL monitorUrl = loadMonitor(registryURL); // 动态代理,将class+method包装位invoker,ref是服务的具体实例对象obj,invoker是个可执行对象 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // invoker=>exporter,最终在服务端保存下来的是exporter,对服务的暴露和引用都是通过这个对象实现的,而这个对象的实现由协议决定 Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); }

invoker 生成,动态代理的过程

定义代码:

ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); @SPI("javassist") //使用javassist字节码技术生成对象 public interface ProxyFactory { /** * create proxy. * 定义生成代理对象的方法 * * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; /** * create invoker. * getProxy调用的参数生成 * * @param <T> * @param proxy * @param type * @param url * @return invoker */ @Adaptive({Constants.PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; } // jdk 动态代理 public class JdkProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; } } // javassist动态代理 public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } } public abstract class AbstractProxyInvoker<T> implements Invoker<T> { private final T proxy; //在proxyFactory.getInvoker的时候被设置,即 private final Class<T> type; private final URL url; public Result invoke(Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } // 调用的是jdkproxyfactory和javassistproxyfactory定义的方法 protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable; }

实际函数调用:

// 泛型调用 public class ExtensionLoader<T> { // 缓存 private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>(); private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>(); // class->value private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>(); // 获取loader,优先缓存 public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; } // 每个class绑定了一个value,获取这个value,优先缓存 public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if (createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } } } } return (T) instance; } }

exporter 的生成

抽象类

public abstract class AbstractProtocol implements Protocol { // 一个协议对应着多个exporter protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>(); // 对应着一堆invoker protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>(); }

以thrift作为交换数据协议为例

public class ThriftProtocol extends AbstractProtocol { // thrift port public static final int DEFAULT_PORT = 40880; // 对应的数据交换方 // ip:port -> ExchangeServer private final ConcurrentMap<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // 服务发布的函数调用 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 只能使用 thrift codec URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME); // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer && !serverMap.containsKey(key)) { serverMap.put(key, getServer(url)); } // export service. key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 缓存起来 exporterMap.put(key, exporter); return exporter; } public void destroy() { // 销毁invoker super.destroy(); // 移除消费ip,关闭server for (String key : new ArrayList<String>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); if (server != null) { server.close(getServerShutdownTimeout()); } // ~ end of if ( server != null ) } // ~ end of loop serverMap } // ~ end of method destroy public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers); invokers.add(invoker); return invoker; } }
转载请注明原文地址: https://www.6miu.com/read-2399955.html

最新回复(0)