编程知识 cdmana.com

Dubbo go client side calls service procedure

{"type":"doc","content":[{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":" Reading guide :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" With the last article "},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/1eaa3531f5e37cc35e6eabcbchttps://blog.csdn.net/forevermoonlight/article/details/108962115","title":""},"content":[{"type":"text","text":"《Dubbo-go Server Open the service process 》"}]},{"type":"text","text":" The bedding of , It can be compared to the starting process of the client starting from the server . One of the biggest differences is that the server through zk Registration service , Publish your own ivkURL And subscribe to events to start listening ; And the customer should be through zk Certified components , Get what you need to call serviceURL, to update invoker And rewrite the user's RPCService, So as to encapsulate the details of remote procedure call ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"1. Configuration file and client source code "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.1 client The configuration file "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"helloworld Provided demo:profiles/client.yaml"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"registries :\n \"demoZk\":\n protocol: \"zookeeper\"\n timeout : \"3s\"\n address: \"127.0.0.1:2181\"\n username: \"\"\n password: \"\"\nreferences:\n \"UserProvider\":\n # You can specify multiple registry, Separated by commas ; Do not specify default registration with all registries \n registry: \"demoZk\"\n protocol : \"dubbo\"\n interface : \"com.ikurento.user.UserProvider\"\n cluster: \"failover\"\n methods :\n - name: \"GetUser\"\n retries: 3"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" You can see that the configuration file is the same as that discussed earlier server The end is very similar , Its refrences Some fields are the configuration of the service to be called by the current service , It details the call protocol 、 Registration Agreement 、 Interface id、 Calling method 、 Cluster strategy, etc , These configurations will interact with the registration component later , rewrite ivk、 In the process of calling ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.2 The client uses the framework source code "}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: user.go\nfunc init() {\n config.SetConsumerService(userProvider)\n hessian.RegisterPOJO(&User{})\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: main.go\nfunc main() {\n hessian.RegisterPOJO(&User{})\n config.Load()\n time.Sleep(3e9)\n println(\"\\n\\n\\nstart to test dubbo\")\n user := &User{}\n err := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)\n if err != nil {\n panic(err)\n }\n println(\"response result: %v\\n\", user)\n initSignal()\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Provided by the official website helloworld demo Source code . It is similar to the service , stay user.go Registered in rpc-service, And the need for rpc Transmission structure user."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay main Function , Also called config.Load() function , After that, you can directly implement good rpc-service:userProvider Call the corresponding function directly , That is to say rpc call ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" You can guess. , from hessian Registration structure 、SetConsumerService, To the calling function .GetUser() period , User defined rpc-service That is to say userProvider The corresponding function is rewritten , The rewritten GetUser Function already contains the implementation of remote call logic invoker."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Next , By reading the source code , have a look dubbo-go How to do it ."}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"2. Implement remote procedure call "}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.1 Load profile "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file:config/config_loader.go :Load()\n\n// Load Dubbo Init\nfunc Load() {\n // init router\n initRouter()\n // init the global event dispatcher\n extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)\n // start the metadata report if config set\n if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {\n logger.Errorf(\"Provider starts metadata report error, and the error is {%#v}\", err)\n return\n }\n // reference config\n loadConsumerConfig()"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay main Invocation in function config.Load() function , And then called loadConsumerConfig, It's similar to what I said before server End configuration read in function ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay loadConsumerConfig Function , Three steps have been taken :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/config_loader.go\n\nfunc loadConsumerConfig() {\n // 1 init other consumer config\n conConfigType := consumerConfig.ConfigType\n for key, value := range extension.GetDefaultConfigReader() {}\n checkApplicationName(consumerConfig.ApplicationConfig)\n configCenterRefreshConsumer()\n checkRegistries(consumerConfig.Registries, consumerConfig.Registry)\n \n // 2 refer-implement-reference\n for key, ref := range consumerConfig.References {\n if ref.Generic {\n genericService := NewGenericService(key)\n SetConsumerService(genericService)\n }\n rpcService := GetConsumerService(key)\n ref.id = key\n ref.Refer(rpcService)\n ref.Implement(rpcService)\n }\n\n // 3 wait for invoker is available, if wait over default 3s, then panic\n for {}\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1. Check the configuration file and write the configuration to memory "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2. "},{"type":"text","marks":[{"type":"strong"}],"text":" stay for Internal loop "},{"type":"text","text":", Quote... In turn (refer) And instantiate (implement) Each one is tuned reference."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"3. Wait three seconds for all invoker be ready "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The important thing is for References and instantiations in loops , Two step operation , It's going to be discussed next ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" thus , The configuration has been written to the framework ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.2 Get remote Service URL, Implement callable invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Aforementioned ref.Refer This part of the operation is completed ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/a7/a710e0d54df9ba92d24228b021237fca.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" chart ( One )"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.1 Construct registration url"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" and server The end is similar to , Registration exists url And the service url,dubbo Habit will serve url As a registration url Of sub."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/reference_config.go: Refer()\n\nfunc (c *ReferenceConfig) Refer(_ interface{}) {\n //( One ) To configure url Parameters (serviceUrl), Will be sub\n cfgURL := common.NewURLWithOptions(\n common.WithPath(c.id),\n common.WithProtocol(c.Protocol),\n common.WithParams(c.getUrlMap()),\n common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),\n )\n ...\n // ( Two ) Registered address can be registered through url Format given , It can also be given by configuring the format \n // The meaning of this step is to configure -> Extract information to generate URL\n if c.Url != \"\" {// Given by the user url Information , It could be a point-to-point address , It can also be the address of the registry \n // 1. user specified URL, could be peer-to-peer address, or register center's address.\n urlStrings := gxstrings.RegSplit(c.Url, \"\\\\s*[;]+\\\\s*\")\n for _, urlStr := range urlStrings {\n serviceUrl, err := common.NewURL(urlStr)\n ...\n }\n } else {// Configure information to read into the registry \n // assemble SubURL from register center's configuration mode\n // This is registration url,protocol = registry, Contains zk Username 、 password 、ip wait \n c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)\n ...\n // set url to regUrls\n for _, regUrl := range c.urls {\n regUrl.SubURL = cfgURL// regUrl Of subURl Save the current configuration url\n }\n }\n // thus , In whatever form , I've got all the regURL\n // ( 3、 ... and ) obtain registryProtocol example , Call its Refer Method , Introduce the new structure regURL\n if len(c.urls) == 1 {\n // This step goes to registry/protocol/protocol.go registryProtocol.Refer\n // Here is registry\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n } else {\n // If there are multiple registries , There are many invoker, Cluster strategy is adopted \n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In this function , It has been processed from Register Configuration to RegisterURL Transformation , This is the picture ( One ) Middle part :"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b8/b893dd65aa320c2901a222660d6fa904.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Next , What you've got url Will be passed on to RegistryProtocol, further refer."}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.2 registryProtocol Get zkRegistry example , further Refer"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/protocol/protocol.go: Refer\n\n// Refer provider service from registry center\n// It's a configuration file registries Of url, He can generate a invoker = Point to the end addr, For the client to call directly .\nfunc (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {\n var registryUrl = url\n // What I got here is referenceConfig,serviceUrl It contains Reference All the information about , contain interfaceName、method wait \n var serviceUrl = registryUrl.SubURL\n if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = \"registry\"\n protocol := registryUrl.GetParam(constant.REGISTRY_KEY, \"\")\n registryUrl.Protocol = protocol// Replaced with a specific value , such as \"zookeeper\"\n }\n // Interface object \n var reg registry.Registry\n // ( One ) Instantiate the interface object , Cache policy \n if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {\n // There is no current in the cache registry, Create a new one reg\n reg = getRegistry(®istryUrl)\n // cached \n proto.registries.Store(registryUrl.Key(), reg)\n } else {\n reg = regI.(registry.Registry)\n }\n // Come here , Got it reg example zookeeper Of registry\n //( Two ) according to Register Example zkRegistry And incoming regURL Create a new one directory\n // There is a complex asynchronous logic to this step , Got the purpose from the registry service The real addr, Got invoker And put in directory,\n // This step is described in detail below \n // new registry directory for store service url from registry\n directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)\n if err != nil {\n logger.Errorf(\"consumer service %v create registry directory error, error message is %s, and will return nil invoker!\",\n serviceUrl.String(), err.Error())\n return nil\n }\n // ( 3、 ... and )DoRegister stay zk Register on the current client service\n err = reg.Register(*serviceUrl)\n if err != nil {\n logger.Errorf(\"consumer service %v register registry %v error, error message is %s\",\n serviceUrl.String(), registryUrl.String(), err.Error())\n }\n // ( Four )new cluster invoker, take directory Write to cluster , Get the invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n // invoker preservation \n proto.invokers = append(proto.invokers, invoker)\n return invoker\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Please read the above notes in detail , This function completes from url To invoker The whole process of "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"( One ) First get Registry object , The default is instantiated before zkRegistry, And before server obtain Registry It's very similar to ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"( Two ) By constructing a new directory, Before receiving asynchronously zk Registered on server End message , Generate invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"( 3、 ... and ) stay zk Register on the current service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"( Four ) Clustering strategy , Get the final invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This step completes the picture ( One ) Most of the remaining operations in , Next, we need to check in detail directory Construction process of :"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.3 structure directory( Contains more complex asynchronous operations )"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/44/443a79e991df95bb099da308faa04c29.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" chart ( Two )"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Aforementioned "},{"type":"codeinline","content":[{"type":"text","text":"extension.GetDefaultRegistryDirectory(®istryUrl, reg)"}]},{"type":"text","text":" function , It essentially calls the registered "},{"type":"codeinline","content":[{"type":"text","text":"NewRegistryDirectory"}]},{"type":"text","text":" function :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: NewRegistryDirectory()\n\n// NewRegistryDirectory will create a new RegistryDirectory\n// This function is used as default Registered in extension above \n// url To register url,reg by zookeeper registry\nfunc NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {\n if url.SubURL == nil {\n return nil, perrors.Errorf(\"url is invalid, suburl can not be nil\")\n }\n dir := &RegistryDirectory{\n BaseDirectory: directory.NewBaseDirectory(url),\n cacheInvokers: []protocol.Invoker{},\n cacheInvokersMap: &sync.Map{},\n serviceType: url.SubURL.Service(),\n registry: registry,\n }\n dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)\n go dir.subscribe(url.SubURL)\n return dir, nil\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" First, we construct a registration directory, Open the coroutine and call it subscribe function , Pass in serviceURL."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This directory Now it contains the corresponding zkRegistry, And the incoming URL, He cacheInvokers Is empty ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Get into dir.subscribe(url.SubURL) This asynchronous function :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: subscribe()\n\n// subscribe from registry\nfunc (dir *RegistryDirectory) subscribe(url *common.URL) {\n // Add two monitors ,\n dir.consumerConfigurationListener.addNotifyListener(dir)\n dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)\n // subscribe call \n dir.registry.Subscribe(url, dir)\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The key is coming. , He called zkRegistry Of Subscribe Method , At the same time, treat yourself as ConfigListener Pass in "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" I think this kind of introduction listener It's worth learning , And there's a lot of java The smell of ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" For waiting zk An asynchronous operation that returns subscription information , Need to pass in a Listener, This Listener Need to achieve Notify Method , Then, after being passed in as a parameter , Can be called asynchronously Notify, Asynchronous events that will be triggered internally “ Pass it on ”, Further processing ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Layers of Listener Chain of events , Can bring in the original serviceURL adopt zkConn Send to zk service , Get the server registered url The corresponding binary information ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" and Notify Callback chain , Then the string of byte[] Step by step analysis 、 machining ; In the form of events , Finally fell to directory Last time , It's already in shape newInvokers 了 ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The details are no longer shown in the form of source code , Refer to the above figure for the source code ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" So far, I've got server End registration good real invoker."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Finished the picture ( One ) Part of :"}]}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/21/21ef1360c04a762b87c4729c675aa3ee.png","alt":null,"title":"","style":[{"key":"width","value":"75%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.4 Construct... With cluster strategy clusterinvoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" After the above operations , It's got server End Invokers, Put in directory Of cacheinvokers Cache in the array ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The following operations correspond to this article 2.2.2 Step four of , from directory Generate cluster strategy with features invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// ( Four )new cluster invoker, take directory Write to cluster , Get the invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n123"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Join The implementation of the function is as follows :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/clusterimpl/failovercluster_invokers.go: newFailoverClusterInvoker()\n\nfunc newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {\n return &failoverClusterInvoker{\n baseClusterInvoker: newBaseClusterInvoker(directory),\n }\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"dubbo-go The default frame selection is failover Strategy , Now that we've returned one invoker, So let's see failoverClusterInvoker Of Invoker Method , See how he encapsulates the cluster strategy into Invoker intra-function :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()\n\n// Invoker function \nfunc (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {\n ...\n // call List How to get directory All of the cache invokers\n invokers := invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {// Check whether the call can be implemented \n return &protocol.RPCResult{Err: err}\n }\n // Get incoming from user direction \n methodName := invocation.MethodName()\n retries := getRetries(invokers, methodName)\n loadBalance := getLoadBalance(invokers[0], invocation)\n for i := 0; i <= retries; i++ {\n // important ! Here is the embodiment of cluster strategy , Try again after failure !\n //Reselect before retry to avoid a change of candidate `invokers`.\n //NOTE: if `invokers` changed, then `invoked` also lose accuracy.\n if i > 0 {\n if err := invoker.checkWhetherDestroyed(); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n invokers = invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n }\n // Here is the embodiment of the load balancing strategy ! Choose specific ivk To call .\n ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)\n if ivk == nil {\n continue\n }\n invoked = append(invoked, ivk)\n //DO INVOKE\n result = ivk.Invoke(ctx, invocation)\n if result.Error() != nil {\n providers = append(providers, ivk.GetUrl().Key())\n continue\n }\n return result\n }\n ...\n}"}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" I saw a lot of Invoke Implementation of function , All the like Invoker Functions contain two directions , One is user oriented invcation, One is the bottom layer of the function direction invokers."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" And cluster strategy invoke Function itself as an operator , hold invocation Step by step , According to the call requirements and cluster strategy , Select specific invoker To execute "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy So is the function , One is user oriented ins[] reflect.Type, One is the function direction invoker."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy The function is responsible for ins Convert to invocation, Call the corresponding invoker Of invoker function , Connect ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" And out of this design , Step by step Invoker In the process of encapsulation , Every Invoker Only care about your part of the operation , The whole call stack is decoupled ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Second. !!!"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" thus , We understand failoverClusterInvoker Of Invoke Function implementation , And it's with this cluster strategy Invoker Returned , Accept calls from above ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Finished drawing ( One ) Medium :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b5/b59ea1bddfdd392fb303c99f85fd4ab7.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2.2.5 stay zookeeper Register on the current client"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Get invokers after , You can go back to this function :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/refrence_config.go: Refer()\n\nif len(c.urls) == 1 {\n // This step goes to registry/protocol/protocol.go registryProtocol.Refer\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n // ( One ) Got the real invokers\n } else {\n // If there are multiple registries , There are many invoker, Cluster strategy is adopted \n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n cluster := extension.GetCluster(hitClu)\n // If 'zone-aware' policy select, the invoker wrap sequence would be:\n // ZoneAwareClusterInvoker(StaticDirectory) ->\n // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker\n c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))\n }\n // ( Two )create proxy, Configure the proxy for the function \n if c.Async {\n callback := GetCallback(c.id)\n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)\n } else {\n // here c.invoker It's already the purpose addr 了 \n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" We have something we can get through invokers, But you can't call , because invoker The entry parameter of is invocation, The calling function uses a specific parameter list . It needs to go through a layer of proxy To standardize input and output parameters ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Next, create a new default proxy, Put in c.proxy Inside , For future use "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" thus , Finished the picture ( One ) The last operation in "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/28/2818614794c118b396ab004f8412ff6f.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.3 Write the calling logic as a proxy function rpc-service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" It's done config.Refer operation "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" go back to "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"config/config_loader.go: loadConsumerConfig()"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/88/881256613ed8dbe6e37f46188d34e15b.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The next important function is Implement, His operation is relatively simple : Designed to use the above generated c.proxy agent , Link user defined rpcService To clusterInvoker Information transmission of ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The function is longer , Only the important parts are selected :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: common/proxy/proxy.go: Implement()\n\n// Implement\n// proxy implement\n// In consumer, RPCService like:\n// type XxxProvider struct {\n// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error\n// }\n// Implement Implementation process , Namely proxy According to the function name and return value , By calling invoker Construct a proxy function with remote call logic \n// Will the current rpc All available functions are registered with proxy.rpc Inside \nfunc (p *Proxy) Implement(v common.RPCService) {\n // makeDubboCallProxy This is a construction proxy function , The return value of this function is zero func(in []reflect.Value) []reflect.Value Such a function \n // The returned function is the carrier of the request implementation , It's up to him to initiate the call to get the result \n makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {\n return func(in []reflect.Value) []reflect.Value {\n // according to methodName and outs The type of , Construct such a function , This function can change in Input value Converted to output value\n // This function is implemented as follows :\n ...\n // So far I've got methodName、 All of the participating interface and value, Give parameters reply\n // ( One ) Based on this, we generate a rpcinvocation\n inv = invocation_impl.NewRPCInvocationWithOptions(\n invocation_impl.WithMethodName(methodName),\n invocation_impl.WithArguments(inIArr),\n invocation_impl.WithReply(reply.Interface()),\n invocation_impl.WithCallBack(p.callBack),\n invocation_impl.WithParameterValues(inVArr))\n for k, value := range p.attachments {\n inv.SetAttachments(k, value)\n }\n // add user setAttachment\n atm := invCtx.Value(constant.AttachmentKey) // If the incoming ctx There are attachment, Also write inv\n if m, ok := atm.(map[string]string); ok {\n for k, value := range m {\n inv.SetAttachments(k, value)\n }\n }\n // So far the structure inv complete \n // ( Two ) Trigger Invoker I've already put cluster_invoker Put in proxy, Use Invoke Method , adopt getty Remote procedure call \n result := p.invoke.Invoke(invCtx, inv)\n // If there is attachment, Join in \n if len(result.Attachments()) > 0 {\n invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())\n }\n ...\n }\n }\n numField := valueOfElem.NumField()\n for i := 0; i < numField; i++ {\n t := typeOf.Field(i)\n methodName := t.Tag.Get(\"dubbo\")\n if methodName == \"\" {\n methodName = t.Name\n }\n f := valueOfElem.Field(i)\n if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // For each function \n outNum := t.Type.NumOut()\n // The function output can only have 1/2 individual \n if outNum != 1 && outNum != 2 {\n logger.Warnf(\"method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2\",\n t.Name, t.Type.String(), outNum)\n continue\n }\n // The latest return type of the method must be error.\n // Specifies that the last return value must be error\n if returnType := t.Type.Out(outNum - 1); returnType != typError {\n logger.Warnf(\"the latest return type %s of method %q is not error\", returnType, t.Name)\n continue\n }\n // Get all the output parameter types , Put it in the array \n var funcOuts = make([]reflect.Type, outNum)\n for i := 0; i < outNum; i++ {\n funcOuts[i] = t.Type.Out(i)\n }\n // do method proxy here:\n // ( 3、 ... and ) call make function , Pass in the function name and return value , Get a remote call to proxy, Put this proxy Replace the original function position \n f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))\n logger.Debugf(\"set method [%s]\", methodName)\n }\n }\n ...\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" As I said before ,proxy The function is to list the user-defined function parameters , Translate into abstract invocation Pass in Invoker, To call ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Three important places have been marked :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"numberedlist","attrs":{"start":"","normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":" In the proxy function, it is generated by the parameter list Invocation The logic of "}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":" In the proxy function implementation call Invoker The logic of "}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":" Replace the surrogate function with the original rpc-service The corresponding function "}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" thus , And it solved the problem at the beginning :"}]},{"type":"paragraph","attrs":{"indent":1,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: client.go: main()\n\nconfig.Load()\nuser := &User{}\nerr := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The user-defined rpcService Function of GetUser, What is actually called here is the rewritten function proxy , So we can implement remote call ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"3. from client To server Of invoker Nested chains - Summary "}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In the reading dubbo-go In the process of source code , I can find a clear one invoker-proxy Nested chains , I want to show it in the form of a graph :"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/5f/5ff59ddeb3dc646ee3de5bbd22e91dc0.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢

Scroll to Top