编程知识 cdmana.com

Spring cloud stream event routing- spring.io

Spring Cloud Stream(SCSt) The event routing of has the following functions :a) Route events to specific event subscribers , or b) Route events generated by event subscribers to specific destinations .

Let's take a quick look at how the annotation based programming model works . In this paper , We call it routing “ TO” And routing “ FROM”.

To route to event subscribers , We use condition 了 StreamListener Properties of annotations , As shown below :

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

this More details about this method .

and , To route from event subscribers , We used Dynamic binding target  - This approach allows the framework to bind the framework to the target based on certain instructions provided in a single event .

Event routing with functions

Using a functional approach , We can do all of the above in a more concise way through some additional functions .

route “ TO”

You can rely on Spring Cloud Function(SCF) The routing function is available in “ TO” function . You can set spring.cloud.stream.function.routing.enabled Property to explicitly enable routing , It can also be set spring.cloud.function.routing-expression Attribute and use Spring Expression Language(SpEL) Provide routing instructions to implicitly enable routing . The routing instruction should result in routing to “ TO” The definition of the function of .

For routing purposes , The name of the routing destination is functionRouter-in-0( see RoutingFunction.FUNCTION_NAME And description of the binding naming convention ad locum ).

When a message is sent to the destination , The routing function tries to determine which actual functions need to handle such events . It first tries to access spring.cloud.function.routing-expression The message header , And if you provide , Determines the name of the actual function call . This is the most dynamic way . The second most dynamic approach is to provide spring.cloud.function.definition header , It should include “ TO” Definition of the function to route to . Both methods need to be set up through spring.cloud.stream.function.routing.enabled Property to explicitly enable routing .

As for other features not available in previous versions ,spring.cloud.function.routing-expression It can also be used as an application property . for example , Consider regardless of the incoming event , If the expressions are all the same , As shown in the annotation based example earlier in this article ( for example ,spring.cloud.function.routing-expression=headers['type']=='order'). For this method , You don't need to explicitly enable routing , because spring.cloud.function.routing-expression As an application property has the same effect .

Simple as it is , But here's a complete example of one of the above :

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
      "--spring.cloud.function.routing-expression="
      + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

By sending a message to functionRouter-in-0, This is from rabbit or kafka Binding exposed , Message system time based nanoTime() Method return value , The message will be routed to Consumer Corresponding 'even' or 'odd'

route “ FROM”

Same as before , route “ FROM” Depend on SCSt Of “ Dynamic binding target ” function . however , And routing “ TO” equally , There are many other features .

The following example shows the basics :

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

All you need is BinderAwareChannelResolver References to ( In the following example, automatic injection ). then , You can use some logic to determine the target name ( In this case , We use “ type ” Header value ). After determining the destination name , You can use the BinderAwareChannelResolver.resolveDestination(..) Operation and send a message to it to get a reference to it . That's all .

The drawback of this approach is that some framework specific abstractions leak into your code . Take a look at what you need to know BinderAwareChannelResolver And MessageChannel The facts . actually , Most of the code in the previous example is boilerplate code .

A more dynamic , The way to reduce leakage is to rely on spring.cloud.stream.sendto.destination attribute , This effectively completes all the above operations - But behind the scenes . The following example demonstrates how to use this method :

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
        .withPayload("Hello")
        .setHeader("spring.cloud.stream.sendto.destination", "even")
        .build();
       return outgoingMessage;
     };
  }
}

We no longer need to inject BinderAwareChannelResolver Execution Analysis MessageChannel. We just need to create a new Message, Specify a head header: The framework uses this header to dynamically parse the target .

Routing source

Last but not least , Let's take a look at routing “ FROM” Another popular use case for , The data source originated from SCSt Out of the context of , But you need to route to the right destination :

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

then , We can do this by running the following curl Command to see the results :

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080

ad locum , We use Supplier<Flux<?>>bean  Using a mixture of functional methods and reactive paradigms . We have a simple MVC controller , We hope that according to the content 'id' Property values route requests downstream . Even though EmitterProcessor The details here and their usage are the subject of another article , But the important thing is that it demonstrates a fully functional application , among HTTP Requests are dynamically routed to destinations managed by the target binder .

stay GitHub Check out Spring Cloud Stream.

 

              

版权声明
本文为[Jiedao jdon]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/04/20210408115700027j.html

Scroll to Top