编程知识 cdmana.com

(16) , spring cloud stream message driven

Stream Why was it introduced

common MQ( Message middleware ):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

Is there a new technology born , Let's stop focusing on specific MQ The details of the , We just need to use a way of adapting binding , Automatically give us all kinds of MQ Switch inside .( Be similar to Hibernate)

Cloud Stream What is it? ? Mask the differences of underlying message middleware , Reduce switching costs , Unified messaging Programming model .

Stream What is it and Binder Introduce

Official documents 1

Official documents 2

Cloud Stream Chinese instruction manual

What is? Spring Cloud Stream?

The official definition of Spring Cloud Stream It's a framework for building message driven microservices .

Application through inputs perhaps outputs Come and Spring Cloud Stream in binder Object interaction .

Through our configuration binding( binding ), and Spring Cloud Stream Of binder Object is responsible for interacting with message middleware . therefore , We just need to figure out how to deal with Spring Cloud Stream Interaction makes it easy to use message driven methods . By using Spring Integration To connect the message broker middleware for message event driven .
Spring Cloud Stream For some of the supplier's message middleware products, personalized automatic configuration is provided , Cited release - subscribe 、 Consumer groups 、 Three core concepts of zoning .

Currently only supported RabbitMQ、 Kafka.

image-20210916122806681

Stream Design idea

standard MQ

img

  • producer / Consumers rely on information media to transmit information content
  • News has to go through specific channels - News channel Message Channel
  • How is the message in the message channel consumed , Who is responsible for sending and receiving - News channel MessageChannel Sub interface of SubscribableChannel, from MessageHandler Message processor subscribed to .

Why Cloud Stream?

For example, we use RabbitMQ and Kafka, Due to the differences in the architecture of the two message oriented middleware , image RabbitMQ Yes exchange,kafka Yes Topic and Partitions Partition .

img

The differences of these middleware lead to our actual project development, which has caused us some difficulties , If we use one of the two message queues , The business requirements behind , I want to migrate to another message queue , There is no doubt that this is a disaster , A lot of things have to be pulled down and redone , Because it's coupled to our system , Now Spring Cloud Stream It provides us with — A way to decouple .

Stream How can we unify the underlying differences ?

Without the concept of a binder , our SpringBoot When an application wants to directly interact with message middleware , Because the original intention of each message middleware is different , There will be great differences in their implementation details by defining binders as the middle layer , Perfect isolation between application and message middleware details . By exposing unified Channel passageway , It makes the application no longer need to consider different message middleware implementations .

By defining the binder Binder As the middle layer , Realize the isolation between application and message middleware details .

Binder

  • INPUT Corresponding to the consumer
  • OUTPUT Corresponding to the producer

img

Stream The message communication mode in follows the release - A subscription model

Topic Broadcast the theme

  • stay RabbitMQ Namely Exchange
  • stay Kakfa The middle is Topic

Stream Introduction to common coding annotations

img

img

  • Binder - It's very convenient to connect middleware , Shielding differences .

  • Channel - passageway , Is a queue Queue An abstraction of , In the message communication system, it is the medium to store and forward , adopt Channel Configure the queue .

  • Source and Sink - It can be simply understood that the reference object is Spring Cloud Stream Oneself , from Stream To publish a message is to output , To receive a message is to input .

    code API And common notes

image-20210916125206221

Case description

Get ready RabbitMQ Environmental Science

Three new sub modules will be built in the project

  • cloud-stream-rabbitmq-provider8801, As a producer, send message module
  • cloud-stream-rabbitmq-consumer8802, As a message receiving module
  • cloud-stream-rabbitmq-consumer8803, As a message receiving module

Stream Message driven producers

newly build Module:cloud-stream-rabbitmq-provider8801

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2021</artifactId>
        <groupId>com.ylc.cloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!-- Basic configuration -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

YML

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #  Configure the... To be bound here rabbitmq Service information ;
        defaultRabbit: #  Represents the name of the definition , Used on binding Integrate 
          type: rabbit #  Message component type 
          environment: #  Set up rabbitmq Related environment configuration of 
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: #  Integration of services 
        output: #  This name is the name of a channel 
          destination: studyExchange #  Indicates what to use Exchange Name definition 
          content-type: application/json #  Set message type , This time is json, Text is set to “text/plain”
          binder: defaultRabbit #  Set the specific settings of the message service to be bound 

eureka:
  client: #  Client to Eureka Registered configuration 
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #  Set the interval between heartbeats ( The default is 30 second )
    lease-expiration-duration-in-seconds: 5 #  If it's more than 5 In seconds ( The default is 90 second )
    instance-id: send-8801.com  #  Display host name in message list 
    prefer-ip-address: true     #  The access path becomes IP Address 

Main startup class

package com.ylc.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

Business class

1. Send message interface

package com.ylc.cloud.service;

public interface IMessageProvider {
    public String send();
}

2. Send message interface implementation class

package com.ylc.cloud.service.Imp;

import com.ylc.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;


@EnableBinding(Source.class) // Define the push pipeline for messages 
public class MessageProviderImpl implements IMessageProvider
{
    @Resource
    private MessageChannel output; //  Message delivery pipeline 

    @Override
    public String send()
    {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null;
    }
}

3.Controller

package com.ylc.cloud.controller;

import com.ylc.cloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class SendMessageController
{
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }

}

test

image-20210916131653272

Stream Message driven consumers

newly build Module:cloud-stream-rabbitmq-consumer8802

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2021</artifactId>
        <groupId>com.ylc.cloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- Basic configuration -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #  Configure the... To be bound here rabbitmq Service information ;
        defaultRabbit: #  Represents the name of the definition , Used on binding Integrate 
          type: rabbit #  Message component type 
          environment: #  Set up rabbitmq Related environment configuration of 
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: #  Integration of services 
        input: #  This name is the name of a channel 
          destination: studyExchange #  Indicates what to use Exchange Name definition 
          content-type: application/json #  Set message type , This time for the object json, If it's text, set “text/plain”
          binder: defaultRabbit #  Set the specific settings of the message service to be bound 

eureka:
  client: #  Client to Eureka Registered configuration 
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #  Set the interval between heartbeats ( The default is 30 second )
    lease-expiration-duration-in-seconds: 5 #  If it's more than 5 In seconds ( The default is 90 second )
    instance-id: receive-8802.com  #  Display host name in message list 
    prefer-ip-address: true     #  The access path becomes IP Address 

Main startup class

package com.ylc.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

Business class

package com.ylc.cloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message)
    {
        System.out.println(" consumer 1 Number ,-----> Received news : "+message.getPayload()+"\t  port: "+serverPort);
    }
}

test

  • start-up EurekaMain7001
  • start-up StreamMQMain8801
  • start-up StreamMQMain8802
  • 8801 send out 8802 receive messages

image-20210916141908784

Stream The news of repeated consumption

according to 8802, Clone a copy to run 8803 - cloud-stream-rabbitmq-consumer8803.

start-up

  • RabbitMQ
  • Service registration - 8801
  • Message production - 8801
  • News consumption - 8802
  • News consumption - 8802

There are two problems after running

  1. There is the problem of repeated consumption
  2. Message persistence problem

consumption

  • http://localhost:8801/sendMessage

  • At present, it is 8802/8803 All received at the same time , There is a problem of repeated consumption

    image-20210916143634574

    image-20210916143622779

  • How to solve : Grouping and persistence properties group( important )

Actual production cases

For example, in the following scenario , We do cluster deployment for order system , Will come from RabbitMQ Get order information , Then if an order is obtained by two services at the same time , That would cause data errors , We have to avoid this situation . Then we can Use Stream Message grouping in .

img

Pay attention to Stream In the same group Many of the consumers in are competitive , It can guarantee that the message will only be consumed by one of the applications once . Different groups can be fully consumed ( Repeated consumption ).

Just be in the same group , It can guarantee that the message will only be consumed by one of the applications once .

Stream And group Solve the repeated consumption of messages

principle

Microservice applications are placed in the same group in , It can guarantee that the message will only be consumed by one of the applications once .

Different groups It can be consumed repeatedly , Same group There will be competition in , Only one of them can consume .

8802/8803 All become different groups ,group Two different

modify 8002、8003 Of YAL file :

Add groups , Put them in the same group

image-20210916144232991

Conclusion : Multiple microservice instances of the same group , Only one gets it at a time

By default, consumers in the same queue are competitive , Who's fast, who grabs

Stream The message is persistent

stop it 8802/8803 And get rid of 8802 The grouping group: A_Group,8803 The grouping group: A_Group Not removed .

8801 Send... First 4 Message to the RabbitMq.

Start... First 8802, No grouping property configuration , There's no news from the backstage .

Restart 8803, There are group property configurations , It came out backstage MQ News on .( Message persistence embodiment )

The principle is ,exchange Data is sent to the queue , because 02 No grouping is set for restart , The queue is recreated and listens , and 03 Or listen to the original queue .

版权声明
本文为[Fried stewed sugar chestnut]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/10/20211002145410581x.html

Scroll to Top