New reading experience :http://www.zhouhong.icu/post/157

One 、 Business needs

   You need to implement a business that notifies users to do something 20 minutes in advance , The easiest way to get this business is to use Redis monitor Key value : When scheduling, calculate the difference between the current time and 20 minutes in advance , Then use a unique business Key Push the Redis And set the expiration time , Then just let Redis Monitor this Key value , When this Key You can get this directly after it expires Key And then realize business such as sending messages .

   About Redis I have taken a note before realizing the specific implementation of the business , If you are interested, you can go and have a look , But now I feel there are many shortcomings .

       Redis Realize timing : http://www.zhouhong.icu/post/144

Two 、Redis Deficiencies in realizing functions such as timing push

   because Redis You're not the only one using , Other businesses will also use Redis, Then one of the easiest shortcomings to think of is :1、 If there are a lot of other businesses at the moment of reminder Key It's overdue , Then you won't be able to turn this for a long time Key, There will be disadvantages such as message push delay ;2、 Another disadvantage is that Alibaba cloud's Redis There is no support for Redis Of Key Worth monitoring ( I also use Alibaba cloud Redis No way Key Monitoring was used before Redis Monitoring is transferred to use RocketMQ Delayed message push ...)

3、 ... and 、 Alibaba cloud RocketMQ timing / Delayed message queuing implementation

   In fact, the implementation is very simple

1、 First, go to Alibaba cloud console to create the required message queue resources , Including message queuing RocketMQ Example 、Topic、Group ID (GID), And what authentication needs AccessKey(AK), Generally, companies have ready-made products that can be used directly .
2、 stay springboot project pom.xml Add required dependencies .
<!-- Alibaba cloud MQ TCP-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.1.Final</version>
</dependency>
3、 In the corresponding environment application.properties File configuration parameters
console:
rocketmq:
tcp:
accessKey: XXXXXXXX Use your own
secretKey: XXXXXXXXXXXXX Use your own
nameSrvAddr: XXXXXXXXXXXXXXXX Use your own
topic: XXXXXXX Use your own
groupId: XXXXXXX Use your own
tag: XXXXXXXXX Use your own
4、 encapsulation MQ Configuration class
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import java.util.Properties;
/**
* @Description: MQ Configuration class
* @Author: zhouhong
* @Date: 2021/8/4
*/
@Configuration
@EnableConfigurationProperties({PatrolMqConfig.class})
@ConfigurationProperties(prefix = "console.rocketmq.tcp")
@Primary
public class PatrolMqConfig { private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String topic;
private String groupId;
private String tag;
private String orderTopic;
private String orderGroupId;
private String orderTag; public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getOrderTopic() {
return orderTopic;
}
public void setOrderTopic(String orderTopic) {
this.orderTopic = orderTopic;
}
public String getOrderGroupId() {
return orderGroupId;
}
public void setOrderGroupId(String orderGroupId) {
this.orderGroupId = orderGroupId;
}
public String getOrderTag() {
return orderTag;
}
public void setOrderTag(String orderTag) {
this.orderTag = orderTag;
}
}
5、 Configure producers
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class PatrolProducerClient { @Autowired
private PatrolMqConfig mqConfig;
@Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
6、 Consumer subscription
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; // Add... To the project @Configuration annotation , So when the service starts consumer Also launched
@Configuration
@Slf4j
public class PatrolConsumerClient { @Autowired
private PatrolMqConfig mqConfig; @Autowired
private MqTimeMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
// The configuration file
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
// Fix the number of consumer threads to 20 individual 20 As the default value
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
// Subscribe to the relationship between
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.set);
subscriptionTable.put(subscription, messageListener);
// Subscribe to multiple topic As set above
consumerBean.setSubscriptionTable(subscriptionTable);
System.err.println(" Subscription succeeded !");
return consumerBean;
}
}
7、 Timing delay MQ Message monitoring consumption
/**
* @Description: timing / Time delay MQ Message monitoring consumption
* @Author: zhouhong
* @Create: 2021-08-03 09:16
**/
@Component
public class MqTimeMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public Action consume(Message message, ConsumeContext context) {
System.err.println(" Got the news !!");
logger.info(" Received MQ news -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));
try {
String msgTag = message.getTag(); // Message type
String msgKey = message.getKey(); // The business is unique id
switch (msgTag) {
case "XXXX":
// TODO Specific business implementation , Such as sending messages and other operations
System.err.println(" Push successful !!!!");
break;
}
return Action.CommitMessage;
} catch (Exception e) {
logger.error(" consumption MQ The news failed ! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
// Consumption failure , Tell the server to post the message later , Continue to consume other news
return Action.ReconsumeLater;
}
}
}
8、 Encapsulate a delay / Tool class of timing message
/**
* @Description: MQ Send message assistant
* @Author: zhouhong
* @Create: 2021-08-03 09:06
**/
@Component
public class ProducerUtil {
private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
@Autowired
private PatrolMqConfig config;
@Resource(name = "ConsoleProducer")
ProducerBean producerBean;
public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
msg.setStartDeliverTime(delayTime);
return this.send(msg,Boolean.FALSE);
}
/**
* Send and distribute ordinary messages
* @param msg news
* @param isOneWay One way delivery or not
*/
private SendResult send(Message msg,Boolean isOneWay) {
try {
if(isOneWay) {
// Because in oneway No request response processing when sending message by , In case of message sending failure , It will cause data loss due to no retry .
// If the data cannot be lost , Synchronous or asynchronous transmission mode is recommended .
producerBean.sendOneway(msg);
success(msg, " One-way message MsgId No return ");
return null;
}else {
// Reliable synchronous transmission
SendResult sendResult = producerBean.send(msg);
// Get send results , Send successfully without throwing exception
if (sendResult != null) {
success(msg, sendResult.getMessageId());
return sendResult;
}else {
error(msg,null);
return null;
}
}
} catch (Exception e) {
error(msg,e);
return null;
}
}
private ExecutorService threads = Executors.newFixedThreadPool(3);
private void error(Message msg,Exception e) {
logger.error(" send out MQ The news failed -- Topic:{}, Key:{}, tag:{}, body:{}"
,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
logger.error("errorMsg --- {}",e.getMessage());
}
private void success(Message msg,String messageId) {
logger.info(" send out MQ The news is successful -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
}
}
9、 The interface test (10000 Express delay 10 second , You can calculate according to your business )
//  test MQ Time delay 
@Autowired
ProducerUtil producerUtil;
@PostMapping("/patrolTaskTemp/mqtest")
public void mqTime(){
producerUtil.sendTimeMsg(
"SMARTPATROL",
" Hello duck !!!".getBytes(),
" Flaming and trance !!",
System.currentTimeMillis() + 10000
);
}
10、 result
2021-08-04 22:07:12.677  INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil     :  send out MQ The news is successful  -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key: Flaming and trance !!, tag:SMARTPATROL, body: Hello duck !!!
Got the news !!
Push successful !!!!
2021-08-04 22:07:22.179 INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener : Received MQ news -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key: Flaming and trance !!, body: Hello duck !!!

Alibaba cloud RocketMQ timing / More articles on the implementation of delayed message queuing

  1. [ original ] Alibaba cloud RocketMQ Which pits have you stepped on

    Due to the company's recent use RocketMQ To do payment business processing ,  It starts learning from Alibaba cloud RocketMQ The way of learning and practice ,  There are a lot of pits ,  Most of them stepped into the pit because they didn't carefully check Alibaba cloud's technical documents .  But there is a very big ...

  2. Alibaba cloud RocketMQ A simple implementation of

    // MQ There are application scenarios, such as The order change message can be sent through the place where this event occurs ( For example, the front end calls the back-end interface post One order , So it's here mapping Be a producer in the method [ But it's best to pass aop To achieve , Otherwise n Multiple interfaces are required ...

  3. be based on redis Design of delayed message queue based on

    Demand background After the user successfully placed the order 20 Minutes to send home service notification SMS to users One hour after the order is completed, inform the user to evaluate the door-to-door service After the failure of business execution 10 Try again in minutes There are many similar scenes The simple way to do it is to use timed tasks ...

  4. be based on redis Design of delayed message queue based on ( turn )

    Demand background After the user successfully placed the order 20 Minutes to send home service notification SMS to users One hour after the order is completed, inform the user to evaluate the door-to-door service After the failure of business execution 10 Try again in minutes There are many similar scenes The simple way to do it is to use timed tasks ...

  5. Delayer be based on Redis Delay Message Queuing Middleware for

    Delayer be based on Redis Delay Message Queuing Middleware for , use Golang Development , Support PHP.Golang And so on . Reference resources There is a delay queue design Part of the design , After optimization . Project links :http ...

  6. Didi's travel is based on RocketMQ The practice of building enterprise message queue service

    Summary : 1. https://mp.weixin.qq.com/s/v6NM3UgX-qTI7yO1QPCJrw Didi's travel is based on RocketMQ The practice of building enterprise message queue service original :  Jiang Haiting   Alibaba middle ...

  7. rabbitmq Delayed message queuing implementation of

    The first part : The implementation principle and knowledge of delay message Use RabbitMQ To implement a delayed task, you must first understand RabbitMQ Two concepts of : News TTL And dead letter Exchange, Through the combination of the two to achieve the above requirements . News TTL(Tim ...

  8. Spring boot The actual project integrates Alibaba cloud RocketMQ ( Non open source ) Message queuing is used to send ordinary messages , Delay message -- The attached code

    One . Why choose RocketMQ Message queue ? First RocketMQ It was developed by Alibaba , It's also open source . Its performance and stability from double 11 You can see that , Borrow an official introduction from Ali : Calendar double 11 Shopping Carnival 0 million level TPS. One trillion ...

  9. Alibaba cloud RocketMQ It's easy for consumers to realize

    Business scenarios and so on, please see another producer implementation : package com.ttt.eee; import com.aliyun.openservices.ons.api.Action; import com.a ...

  10. Tencent cloud distributed high reliable message queue service CMQ framework

    In today's world of Distributed Computing , We're inside the system . Message middleware is widely used for data exchange and decoupling between platforms .CMQ Tencent cloud internal research is based on high reliability . Strong consistency . Scalable distributed message queuing , In Tencent, including wechat mobile phones QQ Business red envelope . Tencent charges top up ...

Random recommendation

  1. Linux Next double network card binding bond0

    One : principle : linux There are seven modes of double network card binding under the operating system . Now the general enterprise will use the dual network card access , This can add network bandwidth , At the same time, it can do the corresponding redundancy , It can be said that there are many advantages . And the general enterprise will use linux Operating system with the network card binding ...

  2. Python [Leetcode 121]Best Time to Buy and Sell Stock

    Title Description : Say you have an array for which the ith element is the price of a given stock on day i. If you ...

  3. AngularJs( 8、 ... and ) filter filter establish

    The outline Example Use of filters Create filters demo This is the whole example demo 1.filter.js file angular.module("exampleApp", []) .constan ...

  4. MTK How to burn IMEI code ( Commonly known as serial number )

    Let's first introduce the use environment master control :MT6582VX android edition :4.4.2 operating system :windows XP SN Burning tools :SN_Write_tool_exe_v2.1420.00 Let's first introduce IMEI ...

  5. The finger of the sword offer Finding the minimum number of rotation array

    One . subject Move the first elements of an array to the end of the array , We call it rotation of arrays . Enter a rotation of a non decrementing array , Output the smallest element of the rotation array . For example, an array of {3,4,5,1,2} by {1,2,3,4,5} One of the ...

  6. linux Timer principle

    Kernel timer :    unsigned long timeout = jiffies + (x * HZ);    while(1) {        // Check the condition.   ...

  7. Activate pycharm

    1. modify hosts file : Add the following line to hosts file , The purpose is to shield Pycharm Verification of activation code **0.0.0.0 account.jetbrains.com ** windwos System hosts file ...

  8. beego Learning notes (3)

    A relatively complex example : package main import "github.com/astaxie/beego" type MainController struct{ beeg ...

  9. 【298】 IDL System process &amp; function

    Reference resources : Programming and Control Serial number Class name   Functional specifications   grammar & give an example 01 DEFSYSV   ====<<<< Descriptio ...

  10. python A thousand times the speed of blasting

    After reading the article written by the glacier boss, I was particularly impressed :https://bbs.ichunqiu.com/thread-16952-1-1.html Briefly describe : Using the traditional single data submission mode . For example, the following sentence : & ...