编程知识 cdmana.com

Dubbo-go Client端调用服务过程

{"type":"doc","content":[{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"导读:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"有了上一篇文章"},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/1eaa3531f5e37cc35e6eabcbchttps://blog.csdn.net/forevermoonlight/article/details/108962115","title":""},"content":[{"type":"text","text":"《Dubbo-go Server 端开启服务过程》"}]},{"type":"text","text":"的铺垫,可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,发布自己的ivkURL并订阅事件开启监听;而客户应该是通过zk注册组件,拿到需要调用的serviceURL,更新invoker并重写用户的RPCService,从而实现对远程过程调用细节的封装。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"1. 配置文件和客户端源码"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.1 client配置文件"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"helloworld提供的demo:profiles/client.yaml"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"registries :\n \"demoZk\":\n protocol: \"zookeeper\"\n timeout : \"3s\"\n address: \"127.0.0.1:2181\"\n username: \"\"\n password: \"\"\nreferences:\n \"UserProvider\":\n # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册\n registry: \"demoZk\"\n protocol : \"dubbo\"\n interface : \"com.ikurento.user.UserProvider\"\n cluster: \"failover\"\n methods :\n - name: \"GetUser\"\n retries: 3"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可看到配置文件与之前讨论过的server端非常类似,其refrences部分字段就是对当前服务要主调的服务的配置,其中详细说明了调用协议、注册协议、接口id、调用方法、集群策略等,这些配置都会在之后与注册组件交互,重写ivk、调用的过程中使用到。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.2 客户端使用框架源码"}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: user.go\nfunc init() {\n config.SetConsumerService(userProvider)\n hessian.RegisterPOJO(&User{})\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: main.go\nfunc main() {\n hessian.RegisterPOJO(&User{})\n config.Load()\n time.Sleep(3e9)\n println(\"\\n\\n\\nstart to test dubbo\")\n user := &User{}\n err := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)\n if err != nil {\n panic(err)\n }\n println(\"response result: %v\\n\", user)\n initSignal()\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"官网提供的helloworld demo的源码。可看到与服务端类似,在user.go内注册了rpc-service,以及需要rpc传输的结构体user。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在main函数中,同样调用了config.Load()函数,之后就可以直接通过实现好的rpc-service:userProvider 直接调用对应的功能函数,即可实现rpc调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可以猜到,从hessian注册结构、SetConsumerService,到调用函数.GetUser()期间,用户定义的rpc-service也就是userProvider对应的函数被重写,重写后的GetUser函数已经包含了实现了远程调用逻辑的invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来,就要通过阅读源码,看看dubbo-go是如何做到的。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"2. 实现远程过程调用"}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.1 加载配置文件"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file:config/config_loader.go :Load()\n\n// Load Dubbo Init\nfunc Load() {\n // init router\n initRouter()\n // init the global event dispatcher\n extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)\n // start the metadata report if config set\n if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {\n logger.Errorf(\"Provider starts metadata report error, and the error is {%#v}\", err)\n return\n }\n // reference config\n loadConsumerConfig()"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在main函数中调用的config.Load()函数,进而调用了loadConsumerConfig,类似于之前讲到的server端配置读入函数。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在loadConsumerConfig函数中,进行了三步操作:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/config_loader.go\n\nfunc loadConsumerConfig() {\n // 1 init other consumer config\n conConfigType := consumerConfig.ConfigType\n for key, value := range extension.GetDefaultConfigReader() {}\n checkApplicationName(consumerConfig.ApplicationConfig)\n configCenterRefreshConsumer()\n checkRegistries(consumerConfig.Registries, consumerConfig.Registry)\n \n // 2 refer-implement-reference\n for key, ref := range consumerConfig.References {\n if ref.Generic {\n genericService := NewGenericService(key)\n SetConsumerService(genericService)\n }\n rpcService := GetConsumerService(key)\n ref.id = key\n ref.Refer(rpcService)\n ref.Implement(rpcService)\n }\n\n // 3 wait for invoker is available, if wait over default 3s, then panic\n for {}\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1. 检查配置文件并将配置写入内存"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2. "},{"type":"text","marks":[{"type":"strong"}],"text":"在for循环内部"},{"type":"text","text":",依次引用(refer)并且实例化(implement)每个被调reference。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"3. 等待三秒钟所有invoker就绪"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"其中重要的就是for循环里面的引用和实例化,两步操作,会在接下来展开讨论。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,配置已经被写入了框架。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.2 获取远程Service URL,实现可供调用的invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上述的ref.Refer完成的就是这部分的操作。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/a7/a710e0d54df9ba92d24228b021237fca.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"图(一)"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.1 构造注册url"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"和server端类似,存在注册url和服务url,dubbo习惯将服务url作为注册url的sub。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/reference_config.go: Refer()\n\nfunc (c *ReferenceConfig) Refer(_ interface{}) {\n //(一)配置url参数(serviceUrl),将会作为sub\n cfgURL := common.NewURLWithOptions(\n common.WithPath(c.id),\n common.WithProtocol(c.Protocol),\n common.WithParams(c.getUrlMap()),\n common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),\n )\n ...\n // (二)注册地址可以通过url格式给定,也可以通过配置格式给定\n // 这一步的意义就是配置->提取信息生成URL\n if c.Url != \"\" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址\n // 1. user specified URL, could be peer-to-peer address, or register center's address.\n urlStrings := gxstrings.RegSplit(c.Url, \"\\\\s*[;]+\\\\s*\")\n for _, urlStr := range urlStrings {\n serviceUrl, err := common.NewURL(urlStr)\n ...\n }\n } else {// 配置读入注册中心的信息\n // assemble SubURL from register center's configuration mode\n // 这是注册url,protocol = registry,包含了zk的用户名、密码、ip等等\n c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)\n ...\n // set url to regUrls\n for _, regUrl := range c.urls {\n regUrl.SubURL = cfgURL// regUrl的subURl存当前配置url\n }\n }\n //至此,无论通过什么形式,已经拿到了全部的regURL\n // (三)获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL\n if len(c.urls) == 1 {\n // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer\n // 这里是registry\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n } else {\n // 如果有多个注册中心,即有多个invoker,则采取集群策略\n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这个函数中,已经处理完从Register配置到RegisterURL的转换,即图(一)中部分:"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b8/b893dd65aa320c2901a222660d6fa904.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来,已经拿到的url将被传递给RegistryProtocol,进一步refer。"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.2 registryProtocol获取到zkRegistry实例,进一步Refer"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/protocol/protocol.go: Refer\n\n// Refer provider service from registry center\n// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。\nfunc (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {\n var registryUrl = url\n // 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等\n var serviceUrl = registryUrl.SubURL\n if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = \"registry\"\n protocol := registryUrl.GetParam(constant.REGISTRY_KEY, \"\")\n registryUrl.Protocol = protocol//替换成了具体的值,比如\"zookeeper\"\n }\n // 接口对象\n var reg registry.Registry\n // (一)实例化接口对象,缓存策略\n if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {\n // 缓存中不存在当前registry,新建一个reg\n reg = getRegistry(®istryUrl)\n // 缓存起来\n proto.registries.Store(registryUrl.Key(), reg)\n } else {\n reg = regI.(registry.Registry)\n }\n // 到这里,获取到了reg实例 zookeeper的registry\n //(二)根据Register的实例zkRegistry和传入的regURL新建一个directory\n // 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,\n // 这一步将在下面详细给出步骤\n // new registry directory for store service url from registry\n directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)\n if err != nil {\n logger.Errorf(\"consumer service %v create registry directory error, error message is %s, and will return nil invoker!\",\n serviceUrl.String(), err.Error())\n return nil\n }\n // (三)DoRegister 在zk上注册当前client service\n err = reg.Register(*serviceUrl)\n if err != nil {\n logger.Errorf(\"consumer service %v register registry %v error, error message is %s\",\n serviceUrl.String(), registryUrl.String(), err.Error())\n }\n // (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n // invoker保存\n proto.invokers = append(proto.invokers, invoker)\n return invoker\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可详细阅读上述注释,这个函数完成了从url到invoker的全部过程"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(一)首先获得Registry对象,默认是之前实例化的zkRegistry,和之前server获取Registry的处理很类似。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(二)通过构造一个新的directory,异步拿到之前在zk上注册的server端信息,生成invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(三)在zk上注册当前service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(四)集群策略,获得最终invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这一步完成了图(一)中所有余下的绝大多数操作,接下来就需要详细的查看directory的构造过程:"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.3 构造directory(包含较复杂的异步操作)"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/44/443a79e991df95bb099da308faa04c29.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"图(二)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上述的 "},{"type":"codeinline","content":[{"type":"text","text":"extension.GetDefaultRegistryDirectory(®istryUrl, reg)"}]},{"type":"text","text":"函数,本质上调用了已经注册好的"},{"type":"codeinline","content":[{"type":"text","text":"NewRegistryDirectory"}]},{"type":"text","text":"函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: NewRegistryDirectory()\n\n// NewRegistryDirectory will create a new RegistryDirectory\n// 这个函数作为default注册在extension上面\n// url为注册url,reg为zookeeper registry\nfunc NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {\n if url.SubURL == nil {\n return nil, perrors.Errorf(\"url is invalid, suburl can not be nil\")\n }\n dir := &RegistryDirectory{\n BaseDirectory: directory.NewBaseDirectory(url),\n cacheInvokers: []protocol.Invoker{},\n cacheInvokersMap: &sync.Map{},\n serviceType: url.SubURL.Service(),\n registry: registry,\n }\n dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)\n go dir.subscribe(url.SubURL)\n return dir, nil\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"首先构造了一个注册directory,开启协程调用其subscribe函数,传入serviceURL。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这个directory目前包含了对应的zkRegistry,以及传入的URL,他cacheInvokers的部分是空的。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"进入dir.subscribe(url.SubURL)这个异步函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: subscribe()\n\n// subscribe from registry\nfunc (dir *RegistryDirectory) subscribe(url *common.URL) {\n // 增加两个监听,\n dir.consumerConfigurationListener.addNotifyListener(dir)\n dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)\n // subscribe调用\n dir.registry.Subscribe(url, dir)\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"重点来了,他调用了zkRegistry的Subscribe方法,与此同时将自己作为ConfigListener传入"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"我认为这种传入listener的设计模式非常值得学习,而且很有java的味道。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"针对等待zk返回订阅信息这样的异步操作,需要传入一个Listener,这个Listener需要实现Notify方法,进而在作为参数传入内部之后,可以被异步地调用Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"层层的Listener事件链,能将传入的原始serviceURL通过zkConn发送给zk服务,获取到服务端注册好的url对应的二进制信息。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而Notify回调链,则将这串byte[]一步一步解析、加工;以事件的形式向外传递,最终落到directory上的时候,已经是成型的newInvokers了。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"具体细节不再以源码形式展示,可参照上图查阅源码。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此已经拿到了server端注册好的真实invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"完成了图(一)中的部分:"}]}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/21/21ef1360c04a762b87c4729c675aa3ee.png","alt":null,"title":"","style":[{"key":"width","value":"75%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.4 构造带有集群策略的clusterinvoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"经过上述操作,已经拿到了server端Invokers,放入了directory的cacheinvokers数组里面缓存。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"后续的操作对应本文2.2.2的第四步,由directory生成带有特性集群策略的invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n123"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Join函数的实现就是如下函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/clusterimpl/failovercluster_invokers.go: newFailoverClusterInvoker()\n\nfunc newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {\n return &failoverClusterInvoker{\n baseClusterInvoker: newBaseClusterInvoker(directory),\n }\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"dubbo-go框架默认选择failover策略,既然返回了一个invoker,我们查看一下failoverClusterInvoker的Invoker方法,看他是如何将集群策略封装到Invoker函数内部的:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()\n\n// Invoker 函数\nfunc (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {\n ...\n //调用List方法拿到directory缓存的所有invokers\n invokers := invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {// 检查是否可以实现调用\n return &protocol.RPCResult{Err: err}\n }\n // 获取来自用户方向传入的\n methodName := invocation.MethodName()\n retries := getRetries(invokers, methodName)\n loadBalance := getLoadBalance(invokers[0], invocation)\n for i := 0; i <= retries; i++ {\n // 重要!这里是集群策略的体现,失败后重试!\n //Reselect before retry to avoid a change of candidate `invokers`.\n //NOTE: if `invokers` changed, then `invoked` also lose accuracy.\n if i > 0 {\n if err := invoker.checkWhetherDestroyed(); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n invokers = invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n }\n // 这里是负载均衡策略的体现!选择特定ivk进行调用。\n ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)\n if ivk == nil {\n continue\n }\n invoked = append(invoked, ivk)\n //DO INVOKE\n result = ivk.Invoke(ctx, invocation)\n if result.Error() != nil {\n providers = append(providers, ivk.GetUrl().Key())\n continue\n }\n return result\n }\n ...\n}"}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"看了很多Invoke函数的实现,所有类似的Invoker函数都包含两个方向,一个是用户方向的invcation,一个是函数方向的底层invokers。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而集群策略的invoke函数本身作为接线员,把invocation一步步解析,根据调用需求和集群策略,选择特定的invoker来执行"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy函数也是这样,一个是用户方向的ins[] reflect.Type, 一个是函数方向的invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy函数负责将ins转换为invocation,调用对应invoker的invoker函数,实现连通。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而出于这样的设计,可以在一步步Invoker封装的过程中,每个Invoker只关心自己负责操作的部分,从而使整个调用栈解耦。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"秒啊!!!"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,我们理解了failoverClusterInvoker 的Invoke函数实现,也正是和这个集群策略Invoker被返回,接受来自上方的调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"已完成图(一)中的:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b5/b59ea1bddfdd392fb303c99f85fd4ab7.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2.2.5 在zookeeper上注册当前client"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"拿到invokers后,可以回到这个函数了:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/refrence_config.go: Refer()\n\nif len(c.urls) == 1 {\n // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n // (一)拿到了真实的invokers\n } else {\n // 如果有多个注册中心,即有多个invoker,则采取集群策略\n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n cluster := extension.GetCluster(hitClu)\n // If 'zone-aware' policy select, the invoker wrap sequence would be:\n // ZoneAwareClusterInvoker(StaticDirectory) ->\n // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker\n c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))\n }\n // (二)create proxy,为函数配置代理\n if c.Async {\n callback := GetCallback(c.id)\n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)\n } else {\n // 这里c.invoker已经是目的addr了\n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"我们有了可以打通的invokers,但还不能直接调用,因为invoker的入参是invocation,而调用函数使用的是具体的参数列表。需要通过一层proxy来规范入参和出参。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来新建一个默认proxy,放置在c.proxy内,以供后续使用"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,完成了图(一)中最后的操作"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/28/2818614794c118b396ab004f8412ff6f.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.3 将调用逻辑以代理函数的形式写入rpc-service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上面完成了config.Refer操作"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"回到"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"config/config_loader.go: loadConsumerConfig()"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/88/881256613ed8dbe6e37f46188d34e15b.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"下一个重要的函数是Implement,他完的操作较为简单:旨在使用上面生成的c.proxy代理,链接用户自己定义的rpcService到clusterInvoker的信息传输。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"函数较长,只选取了重要的部分:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: common/proxy/proxy.go: Implement()\n\n// Implement\n// proxy implement\n// In consumer, RPCService like:\n// type XxxProvider struct {\n// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error\n// }\n// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数\n// 将当前rpc所有可供调用的函数注册到proxy.rpc内\nfunc (p *Proxy) Implement(v common.RPCService) {\n // makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数\n // 这个被返回的函数是请求实现的载体,由他来发起调用获取结果\n makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {\n return func(in []reflect.Value) []reflect.Value {\n // 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value\n // 这个函数具体的实现如下:\n ...\n // 目前拿到了 methodName、所有入参的interface和value,出参数reply\n // (一)根据这些生成一个 rpcinvocation\n inv = invocation_impl.NewRPCInvocationWithOptions(\n invocation_impl.WithMethodName(methodName),\n invocation_impl.WithArguments(inIArr),\n invocation_impl.WithReply(reply.Interface()),\n invocation_impl.WithCallBack(p.callBack),\n invocation_impl.WithParameterValues(inVArr))\n for k, value := range p.attachments {\n inv.SetAttachments(k, value)\n }\n // add user setAttachment\n atm := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment,也要写入inv\n if m, ok := atm.(map[string]string); ok {\n for k, value := range m {\n inv.SetAttachments(k, value)\n }\n }\n // 至此构造inv完毕\n // (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用\n result := p.invoke.Invoke(invCtx, inv)\n // 如果有attachment,则加入\n if len(result.Attachments()) > 0 {\n invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())\n }\n ...\n }\n }\n numField := valueOfElem.NumField()\n for i := 0; i < numField; i++ {\n t := typeOf.Field(i)\n methodName := t.Tag.Get(\"dubbo\")\n if methodName == \"\" {\n methodName = t.Name\n }\n f := valueOfElem.Field(i)\n if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数\n outNum := t.Type.NumOut()\n // 规定函数输出只能有1/2个\n if outNum != 1 && outNum != 2 {\n logger.Warnf(\"method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2\",\n t.Name, t.Type.String(), outNum)\n continue\n }\n // The latest return type of the method must be error.\n // 规定最后一个返回值一定是error\n if returnType := t.Type.Out(outNum - 1); returnType != typError {\n logger.Warnf(\"the latest return type %s of method %q is not error\", returnType, t.Name)\n continue\n }\n // 获取到所有的出参类型,放到数组里\n var funcOuts = make([]reflect.Type, outNum)\n for i := 0; i < outNum; i++ {\n funcOuts[i] = t.Type.Out(i)\n }\n // do method proxy here:\n // (三)调用make函数,传入函数名和返回值,获得能调用远程的proxy,将这个proxy替换掉原来的函数位置\n f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))\n logger.Debugf(\"set method [%s]\", methodName)\n }\n }\n ...\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"正如之前所说,proxy的作用是将用户定义的函数参数列表,转化为抽象的invocation传入Invoker,进行调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"其中已标明有三处较为重要的地方:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"numberedlist","attrs":{"start":"","normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"在代理函数中实现由参数列表生成Invocation的逻辑"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"在代理函数实现调用Invoker的逻辑"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"将代理函数替换为原始rpc-service对应函数"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,也就解决了一开始的问题:"}]},{"type":"paragraph","attrs":{"indent":1,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: client.go: main()\n\nconfig.Load()\nuser := &User{}\nerr := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这里直接调用用户定义的rpcService的函数GetUser,这里实际调用的是经过重写入的函数代理,所以就能实现远程调用了。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"3. 从client到server的invoker嵌套链 - 小结"}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在阅读dubbo-go源码的过程中,我能发现一条清晰的invoker-proxy嵌套链,我希望通过图的形式来展现:"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/5f/5ff59ddeb3dc646ee3de5bbd22e91dc0.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/c1cfd7aa3ac77380a7a6f635f?utm_source=rss&utm_medium=article

Scroll to Top