1. summary

The old saying is good : Be flexible , Be good at thinking , Sometimes take a little turn , Maybe the problem is solved .

Get down to business , We talked about it before RabbitMQ 3.9.7 Build a mirror mode cluster , Let's talk today RabbitMQ 3.9.7 Mirror mode cluster and Springboot 2.5.5 Integrate .

2. The scene that

The server A IP:

The server B IP:

The server C IP:

These three servers have been set up RabbitMQ Mirror mode cluster , Build a mirror mode cluster , See my last article .

3. And Springboot Integration of

3.1 Introduce dependencies

<relativePath/> <!-- lookup parent from repository -->

3.2 Production service configuration

username: guest
password: guest
virtual-host: /
connection-timeout: 16000 # Enable message confirmation mode
publisher-confirm-type: correlated # Enable return Message schema
publisher-returns: true
mandatory: true

3.3 Production service code

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import java.util.Map; @Component
public class Producer { @Autowired
private RabbitTemplate rabbitTemplate; /**
* Confirm callback
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// correlationData Unique identification
// ack mq Did you receive a message
// cause Reasons for failure
System.out.println("correlationData:" + correlationData.getId());
System.out.println("ack:" + ack);
System.out.println("cause:" + cause); }
}; /**
* Send a message
* @param messageBody Message body
* @param headers Additional attributes
* @throws Exception
public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception { MessageHeaders messageHeaders = new MessageHeaders(headers); Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders); rabbitTemplate.setConfirmCallback(confirmCallback); String exchangeName = "exchange-hello";
String routingKey = "test.123"; CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() { /**
* What to do after sending a message
* @param message
* @return
* @throws AmqpException
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
return message;
}, correlationData);

3.4 Consumer service configuration

username: guest
password: guest
virtual-host: /
connection-timeout: 16000 listener:
# Set to manual ACK
acknowledge-mode: manual
concurrency: 5
prefetch: 1
max-concurrency: 10

3.5 Consumer service code

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; @Component
public class Consumer { @RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-hello", durable = "true"),
exchange = @Exchange(value = "exchange-hello" , durable = "true", type = "topic"),
key = "test.*"
public void onMessage(Message message, Channel channel) throws Exception { System.out.println(" Received a message :" + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);

3.6 Rest Test code

public class RabbitmqController { @Autowired
private Producer producer; @GetMapping("/sendMessage")
public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception {
Map<String, Object> headers = new HashMap<>();
producer.sendMessage(messageBody, headers, id);
return "success";

4. review

Today I talked about RabbitMQ 3.9.7 Mirror mode cluster and Springboot 2.5.5 Integrate , I hope it can be helpful to everyone's work .

RabbitMQ 3.9.7 Mirror mode cluster and Springboot 2.5.5 More articles on Integration

