编程知识 cdmana.com

Rocketmq source code analysis: message sending process

 Insert picture description here

RocketMQ Three ways to send messages

stay RocketMQ There are three ways to send messages in , The synchronous , Asynchronous sending and one-way sending . The top 2 The sending method is reliable , Because there will be a successful response , The one-way transmission only sends regardless of whether the transmission is successful

Synchronous messaging , Used to send important message notifications

public class SyncProducer {
    
	public static void main(String[] args) throws Exception {
    
    	//  Instantiate message producer Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	//  Set up NameServer The address of 
    	producer.setNamesrvAddr("localhost:9876");
    	//  start-up Producer example 
        producer.start();
    	for (int i = 0; i < 100; i++) {
    
    	    //  Create a message , And designate Topic,Tag And the message body 
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	//  Send a message to a Broker
            SendResult sendResult = producer.send(msg);
            //  adopt sendResult Whether the return message was successfully delivered 
            System.out.printf("%s%n", sendResult);
    	}
    	//  If you don't send messages anymore , close Producer example .
    	producer.shutdown();
    }
}

Send message asynchronously , Used for response time sensitive business scenarios , That is, the sender cannot wait for a long time Broker Response

public class AsyncProducer {
    
    public static void main(
        String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
    

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
    
            try {
    
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback Asynchronously receive the sent results 
                producer.send(msg, new SendCallback() {
    
                    @Override
                    public void onSuccess(SendResult sendResult) {
    
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
    
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
    
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

Send messages one way , Used for scenarios that don't care about sending results , For example, log sending

public class OnewayProducer {
    
	public static void main(String[] args) throws Exception{
    
    	//  Instantiate message producer Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	//  Set up NameServer The address of 
        producer.setNamesrvAddr("localhost:9876");
    	//  start-up Producer example 
        producer.start();
    	for (int i = 0; i < 100; i++) {
    
        	//  Create a message , And designate Topic,Tag And the message body 
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	//  Send a one-way message , There are no returned results 
        	producer.sendOneway(msg);

    	}
    	//  If you don't send messages anymore , close Producer example .
    	producer.shutdown();
    }
}

RocketMQ Five message types

 Insert picture description here

Bulk messages

List<Message> messageList = new ArrayList<>();
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id001", "hello world1".getBytes()));
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id002", "hello world2".getBytes()));
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id003", "hello world3".getBytes()));
producer.send(messageList);

Batch message means sending multiple messages at one time , The underlying implementation is to Collection<Message> To MessageBatch Object sent out

Sequential message

SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    
    /** * @param mqs topic Corresponding message queue * @param msg send Method passed in message * @param arg send Method passed in orderId */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    
        //  Select the corresponding queue according to the business object 
        Integer orderId = (Integer) arg;
        int index = orderId % mqs.size();
        return mqs.get(index);
    }
}, orderId);

Transaction message

Delay message

Message message = new Message(TOPIC_NAME, TAG_NAME, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//  Set the message delay level to 2, Time delay 5s about 
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);

We can change the message into a delayed message by setting the delay level of the message , All in all 18 Delay levels , The corresponding delay time is as follows

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

The source code parsing

Producer Start process

Producer Send message flow

Reference blog

版权声明
本文为[Java knowledge hall]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/11/20211109055609489r.html

Scroll to Top