编程知识 cdmana.com

Dubbo go source code Notes (2) client calling process

1.png

author |  Li Zhixin

Reading guide : With the last article 《Dubbo-go Source notes ( One )Server Open the service process 》 The bedding of , It can be compared to the starting process of the client starting from the server . One of the biggest differences is that the server through zk Registration service , Publish your own ivkURL And subscribe to events to start listening ; And the customer should be through zk Certified components , Get what you need to call serviceURL, to update invoker And rewrite the user's RPCService, So as to encapsulate the details of remote procedure call .

Configuration files and client source code

1. client The configuration file

helloworld Provided demo:profiles/client.yaml.

registries :
  "demoZk":
    protocol: "zookeeper"
    timeout  : "3s"
    address: "127.0.0.1:2181"
    username: ""
    password: ""
references:
  "UserProvider":
    #  You can specify multiple registry, Separated by commas ; Do not specify default registration with all registries 
    registry: "demoZk"
    protocol : "dubbo"
    interface : "com.ikurento.user.UserProvider"
    cluster: "failover"
    methods :
    - name: "GetUser"
      retries: 3

You can see that the configuration file is the same as that discussed earlier Server The end is very similar , Its refrences Some fields are the configuration of the service to be called by the current service , It details the call protocol 、 Registration Agreement 、 Interface id、 Calling method 、 Cluster strategy, etc , These configurations will interact with the registration component later 、 rewrite ivk、 In the process of calling .

2. The client uses the framework source code

user.go:

func init() {
  config.SetConsumerService(userProvider)
  hessian.RegisterPOJO(&User{})
}

main.go:

func main() {
  hessian.RegisterPOJO(&User{})
  config.Load()
  time.Sleep(3e9)
  println("\n\n\nstart to test dubbo")
  user := &User{}
  err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
  if err != nil {
      panic(err)
  }
  println("response result: %v\n", user)
  initSignal()
}

On the official website helloworld demo In the source code , It is similar to the service , stay user.go Registered in rpc-service, And the need for rpc Transmission structure user.

stay main Function , Also called config.Load() function , After that, we can achieve good rpc-service:userProvider Call the corresponding function directly , That is to say rpc call .

You can guess. , from hessian Registration structure 、SetConsumerService, To the calling function .GetUser() period , User defined rpc-service That is to say userProvider The corresponding function is rewritten , The rewritten GetUser The function already contains... That implements the remote call logic invoker.

Next , By reading the source code , have a look dubbo-go How to do it .

Implement remote procedure call

1. Load profile

// file: config/config_loader.go :Load()

// Load Dubbo Init
func Load() {
  // init router
  initRouter()
  // init the global event dispatcher
  extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
  // start the metadata report if config set
  if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
      logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
  return
  }
  // reference config
  loadConsumerConfig()

stay main Called in the function config.Load() function , And then called loadConsumerConfig, It's similar to what I said before server End configuration read in function .

stay loadConsumerConfig Function , Three steps have been taken :

// config/config_loader.go
func loadConsumerConfig() {
    // 1 init other consumer config
    conConfigType := consumerConfig.ConfigType
    for key, value := range extension.GetDefaultConfigReader() {}
    checkApplicationName(consumerConfig.ApplicationConfig)
    configCenterRefreshConsumer()
    checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
    
    // 2 refer-implement-reference
    for key, ref := range consumerConfig.References {
        if ref.Generic {
            genericService := NewGenericService(key)
            SetConsumerService(genericService)
        }
        rpcService := GetConsumerService(key)
        ref.id = key
        ref.Refer(rpcService)
        ref.Implement(rpcService)
    }
    // 3 wait for invoker is available, if wait over default 3s, then panic
    for {}
}
  1. Check the configuration file and write the configuration to memory
  2. stay for Internal loop , Quote... In turn (refer) And instantiate (implement) Each one is tuned reference
  3. Wait three seconds for all invoker be ready

The important thing is for References and instantiations in loops , Two step operation , It's going to be discussed next .

thus , The configuration has been written to the framework .

2. Get remote Service URL, Implement callable invoker

Aforementioned ref.Refer This part of the operation is completed .

2.png chart ( One )

1) Construct registration url

and server The end is similar to , Registration exists url And the service url,dubbo Habit will serve url As a registration url Of sub.

// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {
  //( One ) To configure url Parameters (serviceUrl), Will be sub
  cfgURL := common.NewURLWithOptions(
  common.WithPath(c.id),
  common.WithProtocol(c.Protocol),
  common.WithParams(c.getUrlMap()),
  common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
  )
  ...
  // ( Two ) Registered address can be registered through url Format given , It can also be given by configuring the format 
  //  The meaning of this step is to configure -> Extract information to generate URL
  if c.Url != "" {//  Given by the user url Information , It could be a point-to-point address , It can also be the address of the registry 
  // 1. user specified URL, could be peer-to-peer address, or register center's address.
  urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
  for _, urlStr := range urlStrings {
    serviceUrl, err := common.NewURL(urlStr)
    ...
  }
  } else {//  Configure information to read into the registry 
  //  assemble SubURL from register center's configuration mode
  //  This is registration url,protocol = registry, Contains zk Username 、 password 、ip wait 
  c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
  ...
  // set url to regUrls
  for _, regUrl := range c.urls {
    regUrl.SubURL = cfgURL// regUrl Of subURl Save the current configuration url
  }
  }
  // thus , In whatever form , I've got all the regURL
  // ( 3、 ... and ) obtain registryProtocol example , Call its Refer Method , Introduce the new structure regURL
  if len(c.urls) == 1 {
  //  This step goes to registry/protocol/protocol.go registryProtocol.Refer
  //  Here is registry
  c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
  } else {
  //  If there are multiple registries , There are many invoker, Cluster strategy is adopted 
  invokers := make([]protocol.Invoker, 0, len(c.urls))
  ...
  }

In this function , It has been processed from Register Configuration to RegisterURL Transformation , This is the picture ( One ) Middle part :

3.png

Next , What you've got url Will be passed on to RegistryProtocol, further refer.

2)registryProtocol Get zkRegistry example , further Refer

// file: registry/protocol/protocol.go: Refer

// Refer provider service from registry center
//  It's a configuration file registries Of url, He can generate a invoker =  Point to the end addr, For the client to call directly .
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
  var registryUrl = url
  //  What I got here is referenceConfig,serviceUrl It contains Reference All the information about , contain interfaceName、method wait 
  var serviceUrl = registryUrl.SubURL
  if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"
  protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")
  registryUrl.Protocol = protocol// Replaced with a specific value , such as "zookeeper"
  }
  //  Interface object 
  var reg registry.Registry
  // ( One ) Instantiate the interface object , Cache policy 
  if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
  //  There is no current in the cache registry, Create a new one reg
  reg = getRegistry(&registryUrl)
  //  cached 
  proto.registries.Store(registryUrl.Key(), reg)
  } else {
  reg = regI.(registry.Registry)
  }
  //  Come here , Got it reg example  zookeeper Of registry
  //( Two ) according to Register Example zkRegistry And incoming regURL Create a new one directory
  //  There is a complex asynchronous logic to this step , Got the purpose from the registry service The real addr, Got invoker And put in directory,
  //  This step is described in detail below 
  // new registry directory for store service url from registry
  directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
  if err != nil {
  logger.Errorf("consumer service %v  create registry directory  error, error message is %s, and will return nil invoker!",
    serviceUrl.String(), err.Error())
  return nil
  }
  // ( 3、 ... and )DoRegister  stay zk Register on the current client service
  err = reg.Register(*serviceUrl)
  if err != nil {
  logger.Errorf("consumer service %v register registry %v error, error message is %s",
    serviceUrl.String(), registryUrl.String(), err.Error())
  }
  // ( Four )new cluster invoker, take directory Write to cluster , Get the invoker
  cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
  invoker := cluster.Join(directory)
  // invoker preservation 
  proto.invokers = append(proto.invokers, invoker)
  return invoker
}

Please read the above notes in detail , This function completes from url To invoker The whole process of :

( One ) First get Registry object , The default is instantiated before zkRegistry, And before server obtain Registry It's very similar to .

( Two ) By constructing a new directory, Before receiving asynchronously zk Registered on server End message , Generate invoker.

( 3、 ... and ) stay zk Register on the current service.

( Four ) Clustering strategy , Get the final invoker.

This step completes the picture ( One ) Most of the remaining operations in , Next, we need to look at it in detail directory Construction process of .

3) structure directory( Contains more complex asynchronous operations )

4.png chart ( Two )

Aforementioned extension.GetDefaultRegistryDirectory(&registryUrl, reg) function , It essentially calls the registered NewRegistryDirectory function :

// file: registry/directory/directory.go: NewRegistryDirectory()

// NewRegistryDirectory will create a new RegistryDirectory
//  This function is used as default Registered in extension above 
// url To register url,reg by zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
  if url.SubURL == nil {
  return nil, perrors.Errorf("url is invalid, suburl can not be nil")
  }
  dir := &RegistryDirectory{
  BaseDirectory:    directory.NewBaseDirectory(url),
  cacheInvokers:    []protocol.Invoker{},
  cacheInvokersMap: &sync.Map{},
  serviceType:      url.SubURL.Service(),
  registry:         registry,
  }
  dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
  go dir.subscribe(url.SubURL)
  return dir, nil
}

First, we construct a registration directory, Open the coroutine and call it subscribe function , Pass in serviceURL.

This directory Now it contains the corresponding zkRegistry, And the incoming URL, its cacheInvokers It's partially empty .

Get into dir.subscribe(url.SubURL) This asynchronous function :

/ file: registry/directory/directory.go: subscribe()

// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
  //  Add two monitors ,
  dir.consumerConfigurationListener.addNotifyListener(dir)
  dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
  // subscribe call 
  dir.registry.Subscribe(url, dir)
}

The key is coming. , It calls for zkRegistry Of Subscribe Method , At the same time, treat yourself as ConfigListener Pass in .

I think this kind of introduction listener It's worth learning , And there's a lot of java The smell of . For waiting zk An asynchronous operation that returns subscription information , Need to pass in a Listener, This Listener Need to achieve Notify Method , Then, after being passed in as a parameter , Can be called asynchronously Notify, Asynchronous events that will be triggered internally “ Pass it on ”, Further processing . Layers of Listener Chain of events , Can bring in the original serviceURL adopt zkConn Send to zk service , Get the server registered url The corresponding binary information . and Notify Callback chain , Then the string of byte[] Step by step analysis 、 machining ; In the form of events , Finally fell to directory Last time , It's already in shape newInvokers 了 . The details are no longer shown in the form of source code , Refer to the above figure for the source code .

So far, I've got server End registration good real invoker.

Finished the picture ( One ) Part of :

5.png

4) Construct... With cluster strategy clusterinvoker

After the above operations , It's got server End Invokers, Put in directory Of cacheinvokers Cache in the array .

The following operation corresponds to this article from url To invoker The last step in the process of , from directory Generate cluster strategy with features invoker.

// ( Four )new cluster invoker, take directory Write to cluster , Get the invoker
  cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
  invoker := cluster.Join(directory)
123

Join The implementation of the function is as follows :

// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()

func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
  return &failoverClusterInvoker{
  baseClusterInvoker: newBaseClusterInvoker(directory),
  }
}
12345

dubbo-go The default frame selection is failover Strategy , Now that we've returned one invoker, So let's see failoverClusterInvoker Of Invoker Method , See how it encapsulates the cluster policy into Invoker intra-function :

// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()

// Invoker  function 
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
  ...
  // call List How to get directory All of the cache invokers
  invokers := invoker.directory.List(invocation)
  if err := invoker.checkInvokers(invokers, invocation); err != nil {//  Check whether the call can be implemented 
  return &protocol.RPCResult{Err: err}
  }
  //  Get incoming from user direction 
  methodName := invocation.MethodName()
  retries := getRetries(invokers, methodName)
  loadBalance := getLoadBalance(invokers[0], invocation)
  for i := 0; i <= retries; i++ {
  //  important ! Here is the embodiment of cluster strategy , Try again after failure !
  //Reselect before retry to avoid a change of candidate `invokers`.
  //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  if i > 0 {
    if err := invoker.checkWhetherDestroyed(); err != nil {
    return &protocol.RPCResult{Err: err}
    }
    invokers = invoker.directory.List(invocation)
    if err := invoker.checkInvokers(invokers, invocation); err != nil {
    return &protocol.RPCResult{Err: err}
    }
  }
  //  Here is the embodiment of the load balancing strategy ! Choose specific ivk To call .
  ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
  if ivk == nil {
    continue
  }
  invoked = append(invoked, ivk)
  //DO INVOKE
  result = ivk.Invoke(ctx, invocation)
  if result.Error() != nil {
    providers = append(providers, ivk.GetUrl().Key())
    continue
  }
  return result
  }
  ...
}

I saw a lot of Invoke Implementation of function , All the like Invoker Functions contain two directions : One is user oriented invcation; One is the bottom layer of the function direction invokers. And cluster strategy invoke Function itself as an operator , hold invocation Step by step , According to the call requirements and cluster strategy , Select specific invoker To execute . proxy So is the function , One is user oriented ins[] reflect.Type, One is the function direction invoker. proxy The function is responsible for ins Convert to invocation, Call the corresponding invoker Of invoker function , Connect . And out of this design , Step by step Invoker In the process of encapsulation , Every Invoker Only care about your part of the operation , The whole call stack is decoupled . Wonderful !!!

thus , We understand failoverClusterInvoker Of Invoke Function implementation , And it's with this cluster strategy Invoker Returned , Accept calls from above .

Finished drawing ( One ) Medium :

6.png

5) stay zookeeper Register on the current client

Get invokers after , You can go back to this function :

  // file: config/refrence_config.go: Refer()
  
  if len(c.urls) == 1 {
  //  This step goes to registry/protocol/protocol.go registryProtocol.Refer
  c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
  // ( One ) Got the real invokers
  } else {
  //  If there are multiple registries , There are many invoker, Cluster strategy is adopted 
  invokers := make([]protocol.Invoker, 0, len(c.urls))
  ...
  cluster := extension.GetCluster(hitClu)
  // If 'zone-aware' policy select, the invoker wrap sequence would be:
  // ZoneAwareClusterInvoker(StaticDirectory) ->
  // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
  c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
  }
  // ( Two )create proxy, Configure the proxy for the function 
  if c.Async {
  callback := GetCallback(c.id)
  c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
  } else {
  //  here c.invoker It's already the purpose addr 了 
  c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
  }

We have something we can get through invokers, But you can't call , because invoker The entry parameter of is invocation, The calling function uses a specific parameter list , It needs to go through a layer of proxy To standardize input and output parameters .

Next, create a new default proxy, Put in c.proxy Inside , For future use .

thus , Finished the picture ( One ) The last operation in :

7.png

3. Write the calling logic as a proxy function rpc-service

It's done config.Refer operation , go back to :

config/config_loader.go: loadConsumerConfig()

8.png

The next important function is Implement, Its operation is relatively simple : Designed to use the above generated c.proxy agent , Link user defined rpcService To clusterInvoker Information transmission of .

The function is longer , Only the important parts are selected :

// file: common/proxy/proxy.go: Implement()

// Implement
// proxy implement
// In consumer, RPCService like:
//    type XxxProvider struct {
//    Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
//    }
// Implement  Implementation process , Namely proxy According to the function name and return value , By calling invoker  Construct a proxy function with remote call logic 
//  Will the current rpc All available functions are registered with proxy.rpc Inside 
func (p *Proxy) Implement(v common.RPCService) {
  // makeDubboCallProxy  This is a construction proxy function , The return value of this function is zero func(in []reflect.Value) []reflect.Value  Such a function 
  //  The returned function is the carrier of the request implementation , It's up to him to initiate the call to get the result 
  makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
  return func(in []reflect.Value) []reflect.Value {
    //  according to methodName and outs The type of , Construct such a function , This function can change in  Input value Converted to output value
    //  This function is implemented as follows :
    ...
    //  So far I've got  methodName、 All of the participating interface and value, Give parameters reply
    // ( One ) Based on this, we generate a  rpcinvocation
    inv = invocation_impl.NewRPCInvocationWithOptions(
    invocation_impl.WithMethodName(methodName),
    invocation_impl.WithArguments(inIArr),
    invocation_impl.WithReply(reply.Interface()),
    invocation_impl.WithCallBack(p.callBack),
    invocation_impl.WithParameterValues(inVArr))
    for k, value := range p.attachments {
    inv.SetAttachments(k, value)
    }
    // add user setAttachment
    atm := invCtx.Value(constant.AttachmentKey) //  If the incoming ctx There are attachment, Also write inv
    if m, ok := atm.(map[string]string); ok {
    for k, value := range m {
      inv.SetAttachments(k, value)
    }
    }
    //  So far the structure inv complete 
    // ( Two ) Trigger Invoker  I've already put cluster_invoker Put in proxy, Use Invoke Method , adopt getty Remote procedure call 
    result := p.invoke.Invoke(invCtx, inv)
    //  If there is attachment, Join in 
    if len(result.Attachments()) > 0 {
    invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
    }
    ...
  }
  }
  numField := valueOfElem.NumField()
  for i := 0; i < numField; i++ {
  t := typeOf.Field(i)
  methodName := t.Tag.Get("dubbo")
  if methodName == "" {
    methodName = t.Name
  }
  f := valueOfElem.Field(i)
  if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { //  For each function 
    outNum := t.Type.NumOut()
    //  The function output can only have 1/2 individual 
    if outNum != 1 && outNum != 2 {
    logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
      t.Name, t.Type.String(), outNum)
    continue
    }
    // The latest return type of the method must be error.
    //  Specifies that the last return value must be error
    if returnType := t.Type.Out(outNum - 1); returnType != typError {
    logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
    continue
    }
    //  Get all the output parameter types , Put it in the array 
    var funcOuts = make([]reflect.Type, outNum)
    for i := 0; i < outNum; i++ {
    funcOuts[i] = t.Type.Out(i)
    }
    // do method proxy here:
    // ( 3、 ... and ) call make function , Pass in the function name and return value , Get a remote call to proxy, Put this proxy Replace the original function position 
    f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
    logger.Debugf("set method [%s]", methodName)
  }
  }
  ...
}

As I said before ,proxy The function is to list the user-defined function parameters , Translate into abstract invocation Pass in Invoker, To call .

Three important places have been marked :

  1. In the proxy function, it is generated by the parameter list Invocation The logic of
  2. In the proxy function implementation call Invoker The logic of
  3. Replace the surrogate function with the original rpc-service The corresponding function

thus , And it solved the problem at the beginning :

  // file: client.go: main()
  
  config.Load()
  user := &User{}
  err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)

The user-defined rpcService Function of GetUser, The actual call here is the rewritten function proxy , So we can implement remote call .

from client To server Of invoker Nested chains - Summary

In the reading dubbo-go In the process of source code , We can find a clear line of invoker-proxy Nested chains , I hope that it can be shown in the form of graph :

9.png

If you have any questions , Welcome to join the nail exchange group : Nailing group no. 23331795.

Author's brief introduction

Li Zhixin  (GitHubID LaurenceLiZhixin), Students majoring in software engineering in Sun Yat sen University , Good at using Java/Go Language , Focus on cloud native and micro services and other technical directions .

Alibaba cloud native Focus on microservices 、Serverless、 Containers 、Service Mesh And other technical fields 、 The trend of primary popular technology of focus cloud 、 Large scale practice of cloud original , Official account of cloud developers .”

版权声明
本文为[Alibaba cloud native]所创,转载请带上原文链接,感谢

Scroll to Top