//如果是在jvm中引⽤,⽣成对应的url,并由refprotocol⽣成Invoker,refprotocol是⼀个动态代理类,能通过不同的url来实现不同的逻辑 if (isJvmRefer) { URL url = new URL(_PROTOCOL, OST, 0, e()).addParameters(map); invoker = (interfaceClass, url); if (Enabled()) { ("Using injvm service " + e()); } } else { //如果⽤户指定了URL if (url != null && () > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = LON_SPLIT_(url); if (us != null && > 0) { for (String u : us) { URL url = f(u); if (h() == null || h().length() == 0) { url = h(interfaceName); } if (RY_(tocol())) { (ameterAndEncoded(_KEY, yString(map))); } else { (rl(url, map)); } } } //如果没有指定 URL,就去加载注册中⼼的URL } else { // assemble URL from register center's configuration List us = loadRegistries(false); if (us != null && !y()) { if (us != null && !y()) { //加载注册中⼼的监控 for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { (R_KEY, (String())); } (ameterAndEncoded(_KEY, yString(map))); } } if (y()) { } } //如果注册中⼼只有⼀个,refprotocol直接⽣成Invoker if (() == 1) { invoker = (interfaceClass, (0)); } else { //如果注册中⼼有多个,则调⽤cluster的join⽅法来⽣成Invoker List> invokers = new ArrayList>(); URL registryURL = null; for (URL url : urls) { ((interfaceClass, url)); if (RY_(tocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = ameter(R_KEY, ); invoker = (new StaticDirectory(u, invokers)); } else { // not a registry url invoker = (new StaticDirectory(invokers)); } } }
//检查 Boolean c = check; if (c == null && consumer != null) { c = k(); } if (c == null) { c = true; // default true } //是否可⽤ if (c && !lable()) { } if (Enabled()) { ("Refer dubbo service " + e() + " from url " + ()); } // create service proxy //最终调⽤proxyFactory的getProxy⽅法⽣成最终的代理类 return (T) xy(invoker);} throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + alHost() + " use dubbo ve throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" :
createProxy的⽅法稍微复杂⼀点,⾸先判断该引⽤是不是jvm中的,如果是就直接创建Invoker,其次判断⽤户是否指定的URL,如果未指定,就加载注册中⼼的URL,这⼀阶段的主要⽬的是拿到URL,然后再判断URL是不是有多个,如果只有⼀个,就调⽤refprotocol创建Invoker,如果有多个,就调⽤cluster的join⽅法合并多个Invoker为⼀个Invoker,然后检查,最终调⽤proxyFactory的getProxy⽅法⽣成ref,可以明显看出,⼤部分逻辑都是如何⽣成Invoker,这⾥⾯有⼏个细节需要继续深⼊,⼀个是refprotocol的ref⽅法,另⼀个是cluster的join⽅法,最后⼀个是proxyFactory的getProxy⽅法。流程图如下:服务发现流程图1. 深⼊refprotocol#ref由于ReferenceBean中引⼊的refprotocol是由SPI机制⽣成的动态代理类,想要看到具体的代码,有两种办法,第⼀种是断点ExtensionLoader的createAdaptiveExtensionClassCode⽅法,查看⽣成的code,第⼆种是⽤Arthas反编译,这⾥选⽤第⼆种。启动Arthas,先⽤sc搜索对应的class:红框中的类就是想要反编译的,然后再⽤命令jad ol$Adaptive进⾏反编译,得到的代码如下:public class Protocol$Adaptiveimplements Protocol { @Override public void destroy() { } @Override public int getDefaultPort() { } public Exporter export(Invoker invoker) throws RpcException { String string; if (invoker == null) { throw new IllegalArgumentException("r argument == null"); } if (() == null) { throw new IllegalArgumentException("r argument getUrl() == null"); } URL uRL = (); String string2 = string = tocol() == null ? "dubbo" : tocol(); if (string == null) { } Protocol protocol = ensionLoader().getExtension(string); return (invoker); } public Invoker refer(Class class_, URL uRL) throws RpcException { String string; if (uRL == null) { throw new IllegalArgumentException("url == null"); } URL uRL2 = uRL; //获取协议,如果为空,则默认为dubbo String string2 = string = tocol() == null ? "dubbo" : tocol(); if (string == null) { } Protocol protocol = ensionLoader().getExtension(string); return (class_, uRL); }} throw new UnsupportedOperationException("method public abstract void y() of interface throw new UnsupportedOperationException("method public abstract int aultPort() of interface . throw new IllegalStateException(new StringBuffer().append("Fail to get extension(ol) name from url(").append(ng( throw new IllegalStateException(new StringBuffer().append("Fail to get extension(ol) name from url(").append(ng重点关注refer⽅法,这个⽅法从url中取协议,如果协议为空,则默认为dubbo,然后⽤ExtensionLoader加载指定的拓展,然后再次debug这个getExtension⽅法,发现得到的Protocol是⼀个套娃结构,以zookeeper为注册中⼼时的registry为例:最外层是ProtocolFilterWrapper,然后是QosProtocolWrapper,然后是ProtocolListennerWrapper,最后才是RegistryProtocol。ProtocolFilterWrapper的作⽤是啥呢?我们在服务调动的时候可以有Filter对服务进⾏拦截处理,例如sofa-tracer对服务调⽤时,⽣产者和消费者打印⽇志中的trace_id⼀致,就是⽤的拦截器对消费者和⽣成都做了拦截,下⾯是ProtocolFilterWrapper的源码:private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) { Invoker last = invoker; List filters = ensionLoader().getActivateExtension((), key, group); if (!y()) { for (int i = () - 1; i >= 0; i--) { final Filter filter = (i); final Invoker next = last; last = new Invoker() { @Override public Class getInterface() { return erface(); } @Override public URL getUrl() { return (); } @Override public boolean isAvailable() { return lable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return (next, invocation); } @Override public void destroy() { y(); } @Override public String toString() { return ng(); } }; } } return last; }QosProtocolWrapper是什么呢,QosProtocolWrapper是⽤来启动QosServer的,Qos全程Quality of Service,Qos详细的资料可以看这⾥,QosProtocolWrapper源码如下:private void startQosServer(URL url) { if (!eAndSet(false, true)) { return; } try { boolean qosEnable = oolean(ameter(QOS_ENABLE,"true")); if (!qosEnable) { return; } int port = nt(ameter(QOS_PORT,"22222")); boolean acceptForeignIp = oolean(ameter(ACCEPT_FOREIGN_IP,"true")); Server server = tance(); t(port); eptForeignIp(acceptForeignIp); (); } catch (Throwable throwable) { //throw new RpcException("fail to start qos server", throwable); } }ProtocolListennerWrapper⼜是⼲嘛的呢?在服务运⾏的过程中,有时候需要监听某个服务,在它暴露或发现它做⼀些事情,ProtocolListennerWrapper就是⽤来实现这个功能的,源码如下,当服务被暴露或发现服务时,会通知所有的listeners: @Override public Invoker refer(Class type, URL url) throws RpcException { if (RY_(tocol())) { return (type, url); } return new ListenerInvokerWrapper((type, url), fiableList( ensionLoader() .getActivateExtension(url, R_LISTENER_KEY))); } public ListenerInvokerWrapper(Invoker invoker, List listeners) { if (invoker == null) { throw new IllegalArgumentException("invoker == null"); } r = invoker; ers = listeners; if (listeners != null && !y()) { for (InvokerListener listener : listeners) { if (listener != null) { try { ed(invoker); } catch (Throwable t) { (sage(), t); } } } } }最后终于终于到了RegistryProtocol,程序运⾏到RegistryProtocol的refer⽅法,获取对应的注册中⼼,然后调⽤⽅法doRefer,在doRefer⽅法中new了⼀个RegistryDirectory对象,然后使⽤cluster#join⽅法⽣成Invoker,还记得有多个注册中⼼URL的时候怎么处理的吗,都是调⽤的这个⽅法,在下⾯我们深⼊cluster的join⽅法,下⾯是doRefer源码:private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { //实例化⼀个RegistryDirectory对象 RegistryDirectory directory = new RegistryDirectory(type, url); //分别设置注册中⼼和协议 istry(registry); tocol(protocol); // all attributes of REFER_KEY Map parameters = new HashMap(().getParameters()); URL subscribeUrl = new URL(ER_PROTOCOL, (ER_IP_KEY), 0, e(), parameters); if (!_(viceInterface()) && ameter(ER_KEY, true)) { //注册中⼼注册消费者 er(ameters(RY_KEY, ERS_CATEGORY, _KEY, f(false))); } //订阅服务 ibe(ameter(RY_KEY, ERS_CATEGORY + "," + URATORS_CATEGORY + "," + S_CATEGORY)); //⽣成Invoker Invoker invoker = (directory); //放⼊缓存 erConsumer(invoker, url, subscribeUrl, directory); return invoker;}2. 深⼊cluster#joincluster也是⼀个Adaptive的动态代理类,再次通过Arthas反编译获取起代码如下:/* * Decompiled with CFR. */package r;import ;import ionLoader;import r;import eption;import r;import ory;public class Cluster$Adaptiveimplements Cluster { public Invoker join(Directory directory) throws RpcException { if (directory == null) { throw new IllegalArgumentException("ory argument == null"); } if (() == null) { throw new IllegalArgumentException("ory argument getUrl() == null"); } URL uRL = (); //获取cluster参数,默认为failover String string = ameter("cluster", "failover"); if (string == null) { } Cluster cluster = ensionLoader().getExtension(string); return (directory); }} throw new IllegalStateException(new StringBuffer().append("Fail to get extension(r) name from url(").append(可以得知该动态代理类先是获取url中cluster的值,如果为空,则默认为failover,然后再通过getExtension获取具体的Cluster对象,在此通过断点获知Cluster也是个套娃:外层是⼀个MockClusterWrapper,然后再是FailoverCluster。MockClusterWrapper中将其包装成MockClusterInvoker:public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { r = cluster; } @Override public Invoker join(Directory directory) throws RpcException { return new MockClusterInvoker(directory, (directory)); }}FailoverCluster中将其包装成FailoverClusterInvokerpublic class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public Invoker join(Directory directory) throws RpcException { return new FailoverClusterInvoker(directory); }}也就是说最终得到⼀个这样的数据结构,MockClusterWrapper对象中有directory和FailoverClusterInvoker对象两个成员变量,⽽FailoverClusterInvoker中有成员变量directory。3. 深⼊proxyFactory#getProxy再次通过Arthas反编译proxyFactory#getProxy得到源码如下:public Object getProxy(Invoker invoker) throws RpcException { if (invoker == null) { throw new IllegalArgumentException("r argument == null"); } if (() == null) { throw new IllegalArgumentException("r argument getUrl() == null"); } URL uRL = (); String string = ameter("proxy", "javassist"); if (string == null) { } ProxyFactory proxyFactory = ensionLoader().getExtension(string); return xy(invoker); } throw new IllegalStateException(new StringBuffer().append("Fail to get extension(actory) name from url(").append(再次断点getExtension,⼜得到⼀个套娃结构:进⼊StubProxyFactoryWrapper,getProxy⽅法如下:@Override@SuppressWarnings({"unchecked", "rawtypes"})public T getProxy(Invoker invoker) throws RpcException { //通过JavassistProxyFactory获得动态代理类 T proxy = xy(invoker); if ( != erface()) { //获取存根参数 String stub = ().getParameter(_KEY, ().getParameter(_KEY)); //如果存根参数不为空 if (mpty(stub)) { Class> serviceType = erface(); if (ult(stub)) { if (().hasParameter(_KEY)) { stub = e() + "Stub"; } else { stub = e() + "Local"; } } try { //获取存根class Class> stubClass = e(stub); if (!gnableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + e() + " not implement interface " + e()); } try { Constructor> constructor = nstructor(stubClass, serviceType); //实例化class proxy = (T) tance(new Object[]{proxy}); //export stub service //导出存根服务 URL url = (); if (ameter(_EVENT_KEY, T_STUB_EVENT)) { url = ameter(_SERVER_KEY, ng()); try { export(proxy, (Class) erface(), url); } catch (Exception e) { ("export a stub service error.", e); } } } catch (NoSuchMethodException e) { } } catch (Throwable t) { // ignore } } } return proxy;} url = ameter(_EVENT_METHODS_KEY, (pper(ss()).getDeclaredMethodNam throw new IllegalStateException("No such constructor "public " + pleName() + "(" + e() + ")" in stub implementa ("Failed to create stub implementation class " + stub + " in consumer " + alHost() + " use dubbo version " + 这段代码的逻辑是使⽤JavassistProxyFactory⽣成代理类,然后检查URL中有没有“stub”参数。如果存在,就导出存根服务。JavassistProxyFactory创建代理的⽅法如下: @Override @SuppressWarnings("unchecked") public T getProxy(Invoker invoker, Class>[] interfaces) { return (T) xy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }可知,创建爱你代理时⼜包了⼀层InvokerInvocationHandler,InvokerInvocationHandler源码如下:public class InvokerInvocationHandler implements InvocationHandler { private final Invoker> invoker; public InvokerInvocationHandler(Invoker> handler) { r = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = e(); Class>[] parameterTypes = ameterTypes(); if (laringClass() == ) { return (invoker, args); } if ("toString".equals(methodName) && == 0) { return ng(); } if ("hashCode".equals(methodName) && == 0) { return de(); } if ("equals".equals(methodName) && == 1) { return (args[0]); } //封装成RpcInvocation return (new RpcInvocation(method, args)).recreate(); }}注意invoke⽅法,被动态代理的类的⽅法都会交给invoke⽅法来处理,InvokerInvocationHandler这个类将除Object外的⽅法都封装成RpcInvocation交给invoker来调⽤。现在屡⼀下思路,⾸先服务暴露时,先⽣成URL,再根据URL来⽣成Directory,再根据cluster的injoin⽅法⽣成Invoker,然后再根据proxyFactory⽣成适配的代理类,⽽当我们调⽤dubbo服务的⽅法时,真正⼲活的是Invoker,现在反推Invoker⼀共封了多少层:1.⾸先是创建动态代理类时封的⼀层InvokerInvocationHandler2. 然后是cluster的injoin封的MockClusterInvoker和FailoverClusterInvokerinvoker的初步封装⼀共三层三.总结服务发现的过程就是通过各种途径创建Directory,然后通过cluster创建成Invoker,然后⽣成动态代理类,当调⽤⼀个dubbo服务时,实际上就是层层调⽤Invoker的⽅法,下⼀篇深⼊Invoker远程调⽤的过程。
评论列表(0条)