编程知识 cdmana.com

Alicloud rocketmq timing / delay message queue implementation

New reading experience :http://www.zhouhong.icu/post/157

One 、 Business needs

   You need to implement a business that notifies users to do something 20 minutes in advance , The easiest way to get this business is to use Redis monitor Key value : When scheduling, calculate the difference between the current time and 20 minutes in advance , Then use a unique business Key Push the Redis And set the expiration time , Then just let Redis Monitor this Key value , When this Key You can get this directly after it expires Key And then realize business such as sending messages .

   About Redis I have taken a note before realizing the specific implementation of the business , If you are interested, you can go and have a look , But now I feel there are many shortcomings .

       Redis Realize timing : http://www.zhouhong.icu/post/144

Two 、Redis Deficiencies in realizing functions such as timing push

   because Redis You're not the only one using , Other businesses will also use Redis, Then one of the easiest shortcomings to think of is :1、 If there are a lot of other businesses at the moment of reminder Key It's overdue , Then you won't be able to turn this for a long time Key, There will be disadvantages such as message push delay ;2、 Another disadvantage is that Alibaba cloud's Redis There is no support for Redis Of Key Worth monitoring ( I also use Alibaba cloud Redis No way Key Monitoring was used before Redis Monitoring is transferred to use RocketMQ Delayed message push ...)

3、 ... and 、 Alibaba cloud RocketMQ timing / Delayed message queuing implementation

   In fact, the implementation is very simple

1、 First, go to Alibaba cloud console to create the required message queue resources , Including message queuing RocketMQ Example 、Topic、Group ID (GID), And what authentication needs AccessKey(AK), Generally, companies have ready-made products that can be used directly .
2、 stay springboot project pom.xml Add required dependencies .
<!-- Alibaba cloud MQ TCP-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.7.1.Final</version>
</dependency>
3、 In the corresponding environment application.properties File configuration parameters
console:
  rocketmq:
    tcp:
      accessKey: XXXXXXXX Use your own 
      secretKey: XXXXXXXXXXXXX Use your own 
      nameSrvAddr: XXXXXXXXXXXXXXXX Use your own 
      topic: XXXXXXX Use your own 
      groupId: XXXXXXX Use your own 
      tag: XXXXXXXXX Use your own 
4、 encapsulation MQ Configuration class
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.Properties;
/**
* @Description: MQ Configuration class 
* @Author: zhouhong
* @Date: 2021/8/4
*/
@Configuration
@EnableConfigurationProperties({PatrolMqConfig.class})
@ConfigurationProperties(prefix = "console.rocketmq.tcp")
@Primary
public class PatrolMqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
    public String getAccessKey() {
        return accessKey;
    }
    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }
    public String getSecretKey() {
        return secretKey;
    }
    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }
    public String getNameSrvAddr() {
        return nameSrvAddr;
    }
    public void setNameSrvAddr(String nameSrvAddr) {
        this.nameSrvAddr = nameSrvAddr;
    }
    public String getTopic() {
        return topic;
    }
    public void setTopic(String topic) {
        this.topic = topic;
    }
    public String getGroupId() {
        return groupId;
    }
    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
    public String getTag() {
        return tag;
    }
    public void setTag(String tag) {
        this.tag = tag;
    }
    public String getOrderTopic() {
        return orderTopic;
    }
    public void setOrderTopic(String orderTopic) {
        this.orderTopic = orderTopic;
    }
    public String getOrderGroupId() {
        return orderGroupId;
    }
    public void setOrderGroupId(String orderGroupId) {
        this.orderGroupId = orderGroupId;
    }
    public String getOrderTag() {
        return orderTag;
    }
    public void setOrderTag(String orderTag) {
        this.orderTag = orderTag;
    }
}
5、 Configure producers
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PatrolProducerClient {

    @Autowired
    private PatrolMqConfig mqConfig;
    @Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}
6、 Consumer subscription
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

// Add... To the project  @Configuration  annotation , So when the service starts consumer Also launched 
@Configuration
@Slf4j
public class PatrolConsumerClient {

    @Autowired
    private PatrolMqConfig mqConfig;

    @Autowired
    private MqTimeMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        // The configuration file 
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        // Fix the number of consumer threads to 20 individual  20 As the default value 
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        // Subscribe to the relationship between 
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.set);
        subscriptionTable.put(subscription, messageListener);
        // Subscribe to multiple topic As set above 
        consumerBean.setSubscriptionTable(subscriptionTable);
        System.err.println(" Subscription succeeded !");
        return consumerBean;
    }
}
7、 Timing delay MQ Message monitoring consumption
/**
 * @Description:  timing / Time delay MQ Message monitoring consumption 
 * @Author: zhouhong
 * @Create: 2021-08-03 09:16
 **/
@Component
public class MqTimeMessageListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public Action consume(Message message, ConsumeContext context) {
        System.err.println(" Got the news !!");
        logger.info(" Received MQ news  -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));
        try {
            String msgTag = message.getTag(); //  Message type 
            String msgKey = message.getKey(); //  The business is unique id
            switch (msgTag) {
                case "XXXX":
                    // TODO  Specific business implementation , Such as sending messages and other operations 
                    System.err.println(" Push successful !!!!");
                    break;
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error(" consumption MQ The news failed ! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
            // Consumption failure , Tell the server to post the message later , Continue to consume other news 
            return Action.ReconsumeLater;
        }
    }
}
8、 Encapsulate a delay / Tool class of timing message
/**
 * @Description: MQ Send message assistant 
 * @Author: zhouhong
 * @Create: 2021-08-03 09:06
 **/
@Component
public class ProducerUtil {
    private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
    @Autowired
    private PatrolMqConfig config;
    @Resource(name = "ConsoleProducer")
    ProducerBean producerBean;
    public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
        Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
        msg.setStartDeliverTime(delayTime);
        return this.send(msg,Boolean.FALSE);
    }
    /**
     *  Send and distribute ordinary messages 
     * @param msg  news 
     * @param isOneWay  One way delivery or not 
     */
    private SendResult send(Message msg,Boolean isOneWay) {
        try {
            if(isOneWay) {
                // Because in  oneway  No request response processing when sending message by , In case of message sending failure , It will cause data loss due to no retry .
                // If the data cannot be lost , Synchronous or asynchronous transmission mode is recommended .
                producerBean.sendOneway(msg);
                success(msg, " One-way message MsgId No return ");
                return null;
            }else {
                // Reliable synchronous transmission 
                SendResult sendResult = producerBean.send(msg);
                // Get send results , Send successfully without throwing exception 
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                }else {
                    error(msg,null);
                    return null;
                }
            }
        } catch (Exception e) {
            error(msg,e);
            return null;
        }
    }
    private ExecutorService threads = Executors.newFixedThreadPool(3);
    private void error(Message msg,Exception e) {
        logger.error(" send out MQ The news failed -- Topic:{}, Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
        logger.error("errorMsg --- {}",e.getMessage());
    }
    private void success(Message msg,String messageId) {
        logger.info(" send out MQ The news is successful  -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
    }
}
9、 The interface test (10000 Express delay 10 second , You can calculate according to your business )
//  test MQ Time delay 
    @Autowired
    ProducerUtil producerUtil;
    @PostMapping("/patrolTaskTemp/mqtest")
    public void mqTime(){
        producerUtil.sendTimeMsg(
                "SMARTPATROL",
                " Hello duck !!!".getBytes(),
                " Flaming and trance !!",
                System.currentTimeMillis() + 10000
        );
    }
10、 result
2021-08-04 22:07:12.677  INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil     :  send out MQ The news is successful  -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key: Flaming and trance !!, tag:SMARTPATROL, body: Hello duck !!!
 Got the news !!
 Push successful !!!!
2021-08-04 22:07:22.179  INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener  :  Received MQ news  -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key: Flaming and trance !!, body: Hello duck !!!

版权声明
本文为[Tom-shushu]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/08/20210804222733082f.html

Scroll to Top