编程知识 cdmana.com

Java advanced topic (20) information broker software architecture (2) -- Research on rabbitmq

# The foreword follows , This goes on RabbitMQ, And understand the underlying principles .# Introduce RabbitMQ By erlang Language development , Based on AMQP(Advanced Message Queue High level message queuing protocol ) Message queuing for protocol implementation . Why use RabbitMQ Well ?1、 Make it simple , Powerful .2、 Based on AMQP Agreement .3、 Community is active , Document improvement .4、 High concurrency, good performance , This is mainly due to Erlang Language .5、Spring Boot Presupposition is integrated RabbitMQ# AMQP Agreement ##AMQP Basic introduction AMQP, namely Advanced Message Queuing Protocol, An application layer standard high level message queuing protocol for unified messaging service , It is an open standard of application layer protocol , Design for message oriented mediation software . Based on this protocol, the client and the message broker can deliver messages , Not subject to client / Intermediary software is the same as product 、 Different development languages, etc .AMQP There are :RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、ØMQ、Zyre etc. . at present Rabbitmq The latest version is designed to support AMQP 0-9-1, In total, the agreement contains 3 part :**Module Layer**: At the top of the deal , It mainly defines some commands for clients to call , The client can use these commands to implement custom business logic . for example , The client can use Queue.Declare The command declares a queue or uses Basic.Consume Subscriptions consume messages in a queue .**Session Layer**: In the middle layer , It is mainly responsible for transmitting the command from the client to the server , After returning the server's response to the client , It mainly provides reliable synchronization mechanism and error handling for communication between client and server .**Transport Layer**: At the bottom , It mainly transmits binary data stream , Provide frame processing 、 Channel multiplexing 、 Error checking and data representation, etc .## AMQP Producer flow process when client and Broker When you set up a connection , The client will send Broker Send a Protocol Header 0-9-1 The header of the message , To inform Broker This interaction uses AMQP 0-9-1 Agreement . And then Broker return Connection.Start To establish a connection , In the process of connecting, it involves Connection.Start/.Start-OK、Connection. Tune/. Tune-OK、Connection.Open/.Open-OK This 6 A command interaction . After the connection is established, a channel needs to be established , Will use Channel.Open , Channel.Open-OK command , When making switch announcement, you need to use Exchange.Declare as well as Exchange.Declare-OK The order of . And so on , The specified command is used to announce the queue and complete the binding between the queue and the switch . When sending messages, you will use Basic.Publish Command complete , This command also contains Conetent-Header and Content-Body.Content Header It contains the properties of the message body ,Content-Body Contains the message body itself .## AMQP The process of consumer circulation, when consumers consume information , Most of the commands and generators involved are the same . On the basis of the original , Several orders :Basic.Qos/.Qos-OK as well as Basic.Consume and Basic.Consume-OK. among Basic.Qos/.Qos-OK These two commands are mainly used to confirm the maximum number of unacknowledged messages that consumers can hold .Basic.Consume and Basic.Consume-OK These two commands are mainly used to confirm message consumption .#RabbitMQ Characteristics of RabbitMQ Use Erlang Language writing , Use Mnesia The database stores information .(1) reliability (Reliability) RabbitMQ Use mechanisms to ensure reliability , Such as persistence 、 Transmission confirmation 、 Release confirmation .(2) Flexible routing (Flexible Routing) Before the message enters the queue , Through Exchange To route messages . For typical routing functions ,RabbitMQ Some built-in Exchange To achieve . For more complex routing functions , You can put more than one Exchange Tie together , Also through the plug-in mechanism to achieve their own Exchange .(3) Information gathering (Clustering) Multiple RabbitMQ Servers can form a cluster , Form a logic Broker .(4) High availability (Highly Available Queues) Queues can be mapped on machines in clusters , So that the queue is still available even if some nodes have problems .(5) Multiple protocols (Multi-protocol) RabbitMQ Support multiple message queuing protocols , such as AMQP、STOMP、MQTT wait .(6) Multilingual client (Many Clients) RabbitMQ Almost all common languages are supported , such as Java、.NET、Ruby、PHP、C#、JavaScript wait .(7) Management interface (Management UI) RabbitMQ Provides an easy-to-use user interface , Enables users to monitor and manage messages 、 Nodes in a cluster .(8) External mechanism (Plugin System)RabbitMQ A lot of plug-ins are provided , To expand the suite in many ways , Of course, you can also write your own plug-in .# A working model ![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224173948156-417219023.png)** Explanation of terms ****Broker** : namely RabbitMQ Physical servers for . Providing a transport service , Maintain a transmission line from producer to consumer , Ensure that the message data can be transmitted in the specified way .**Exchange :** Message exchange . Specifies the rules by which messages are routed to which queue Queue.**Queue :** Message queuing . The carrier of information , Each message is posted to one or more queues .**Binding :** Knot . The effect is to make Exchange and Queue Tie them up according to some kind of routing rule .**Routing Key:** Routing keywords .Exchange According to Routing Key To deliver messages . The keyword specified when defining a binding is called Binding Key.**Vhost:** Virtual host . One Broker There can be multiple virtual hosts , Separation of permissions for different users . A virtual host holds a set of Exchange、Queue and Binding.**Producer:** Information producers . The main purpose is to deliver the message to the corresponding Exchange above . It's usually a separate program .**Consumer:** Message consumer . The receiver of the message , It's usually a separate program .**Connection:**Producer and Consumer And Broker Between TCP Long line .**Channel:** Message channel , It's also called a channel . More than one can be created in each connection of the client Channel, Every Channel Represents a conversation task . stay RabbitMQ Java Client API in ,channel A large number of programming interfaces are defined on .## Switch type **Direct Exchange Direct connect switch ** Define : When a direct connected switch is tied to a queue , You need to specify a specific binding key. Routing rules : When sending a message to a direct connected switch , Only routing key Follow binding key When it's a perfect match , Only the bound queue can receive the message .![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224173959435-528722340.png)**Topic Exchange Theme switch ** Define : When the subject type switch is bound to a queue , You can specify the routing key. There are two universal characters ,* For matching a word .# Represents matching zero or more words . Use... Between words . Separate . Routing rules : When sending messages to the switch of the topic type ,routing key accord with binding key In the same mode , Only the bound queue can receive the message .```java// Only queues 1 Can receive messages channel.basicPublish("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes()); // Waiting for 2 And queue 3 Can receive messages channel.basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes()); // Only queues 4 Can receive messages channel.basicPublish("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getBytes()); ```![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174011404-960730822.png)**Fanout Exchange Broadcast switch ** Define : When a broadcast type switch is tied to a queue , You don't have to specify binding key. Routing rules : When a message is sent to a broadcast type switch , You don't have to specify routing key, All queues associated with it can receive messages .![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174022306-239176582.png)#RabbitMq Install Download Image ```shelldocker pull rabbitmq``` Create and start the container ```shelldocker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq```- -d Background execution container ;- --name Specify container name ;- -p Specifies the port where the service is executed (5672: Application access port ;15672: Console Web Port number );- -v A catalog or file ;- --hostname Host name (RabbitMQ One of the important considerations is that it is based on the so-called “ Node name ” Store data , Default to host name );- -e Specify the environment variable ;(RABBITMQ_DEFAULT_VHOST: Default virtual machine name ;RABBITMQ_DEFAULT_USER: Default user name ;RABBITMQ_DEFAULT_PASS: Password for default user name ) Start rabbitmq Back office management service ```shelldocker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management``` Visit the background page :```shellhttp://127.0.0.1:15672 Initial password : admin admin```![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174033954-241083311.png)# RabbitMQ Quick start maven Rely on ```xml org.springframework.boot spring-boot-starter 2.3.0.RELEASE org.springframework.boot spring-boot-starter-web 2.3.0.RELEASE org.springframework.boot spring-boot-starter-amqp 2.3.0.RELEASE ```rabbitmq Configuration class ```java/** * @author primary * @date 2020/12/22 * @since 1.0 **/@Configurationpublic class RabbitConfig { public static final String EXCHANGE_TOPICS_INFORM = "exchange_topic_inform"; public static final String QUEUE_SMS = "queue_sms"; public static final String QUEUE_EMAIL = "queue_email"; @Bean public Exchange getExchange() { //durable(true) Persistence , The switch still exists after message queuing is restarted return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } @Bean("queue_sms") public Queue getQueueSms(){ return new Queue(QUEUE_SMS); } @Bean("queue_email") public Queue getQueueEmail(){ return new Queue(QUEUE_EMAIL); } @Bean public Binding bindingSms(@Qualifier("queue_sms") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.sms").noargs(); } @Bean public Binding bindingEmail(@Qualifier("queue_email") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.email").noargs(); }}``` Producers ```java@Servicepublic class RabbitmqProviderService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessageSms(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",message); } public void sendMessageEmail(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.email",message); }}``` Consumers ```java@Componentpublic class RabbitMqConsumer { @RabbitListener(queues = {RabbitConfig.QUEUE_EMAIL}) public void listenerEmail(String message, Message msg , Channel channel) { System.out.println("EMAIL:"+message); System.out.println(msg); System.out.println(channel); } @RabbitListener(queues = {RabbitConfig.QUEUE_SMS}) public void listenerSms(String message) { System.out.println("SMS:"+message); }}``` Start class ```java/** * @author primary * @date 2020/12/22 * @since 1.0 **/@SpringBootApplication@EnableRabbitpublic class RabbitMqApplicaiton { public static void main(String[] args) { ResourceLoader resourceLoader = new DefaultResourceLoader(RabbitMqApplicaiton.class.getClassLoader()); try { String path = resourceLoader.getResource("classpath:").getURL().getPath(); System.out.println(path); } catch (IOException e) { e.printStackTrace(); } SpringApplication.run(RabbitMqApplicaiton.class, args); }}```web```java@RestControllerpublic class DemoController { @Autowired RabbitmqProviderService rabbitmqProviderService; @RequestMapping("/sms") public void sendMsgSms(String msg) { rabbitmqProviderService.sendMessageSms(msg); } @RequestMapping("/eamil") public void sendMsgEmail(String msg) { rabbitmqProviderService.sendMessageEmail(msg); }}``` Send a message through a page :http://localhost:44000/sms?msg=1111![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174046074-1785884115.png)http://localhost:44000/email?msg=1111![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174057138-36286941.png)# RabbitMQ Advanced usage ## TTL#### Message set expiration time ```javaMessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("30000");Message msg = new Message(" Message content ".getBytes(),messageProperties);// If the message is not consumed in time , Then go through 30 Seconds later , The message becomes dead letter ,Rabbitmq This message will be discarded directly .rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",msg);```#### Queue set expiration time ```javaQueue queue = QueueBuilder.durable(QUEUE_SMS).ttl(30000).build();```### Dead letter queue when a message becomes a dead letter , By default, this message will be mq Delete . If we specify the queue " Dead letter switch "(DLX:Dead-Letter-Exchange), Then the message will be forwarded to the dead letter switch , The queues that are then bound to the dead letter switch ( Dead letter queue ) Consume . So as to realize the effect of delayed message transmission . There are three situations in which messages enter DLX(Dead Letter Exchange) Dead letter switch .1、(NACK || Reject ) && requeue == false2、 Message expired 3、 Maximum queue length reached , It can be done by x-max-length The length of the queue , If you don't specify , It can be said to be infinitely long ( The first in line message will be sent to DLX)![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174117752-1950820659.png)1、 Announce dead letter switch 、 Dead letter queue 、 The binding between dead letter switch and dead letter queue ```java// Announce dead letter switch @Bean(name = "dlx.exchange")public Exchange dlxExchange() {returnExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ;}// Announce dead letter queue @Bean(name = "dlx.queue")public Queue dlxQueue() { return QueueBuilder.durable("dlx.queue").build() ;}// Complete the binding of dead letter queue and dead letter switch @Beanpublic Binding dlxQueueBindToDlxExchange(@Qualifier(value ="dlx.exchange") Exchange exchange, @Qualifier(value = "dlx.queue")Queue queue) { returnBindingBuilder.bind(queue).to(exchange).with("delete").noargs() ;}```2、 Set dead letter queue as a property of normal queue ```java// Announcement queue @Bean(name = "direct.queue_02")public Queue commonQueue02() { QueueBuilder queueBuilder =QueueBuilder.durable("direct.queue_02"); queueBuilder.deadLetterExchange("dlx.exchange") ; // Set the dead letter switch as the property of normal queue queueBuilder.deadLetterRoutingKey("delete") ; // Set message routingKey // queueBuilder.ttl(30000) ; // Set the expiration time of queued messages , For 30 second // queueBuilder.maxLength(2) ; // Set the maximum length of the queue return queueBuilder.build() ;}```3、 The consumer side makes the same settings , And specify the consumer dead letter queue ```java@Componentpublic class RabbitmqDlxQueueConsumer{ // Set up a logger private static final Logger LOGGER =LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ; @RabbitListener(queues = "dlx.queue") public void dlxQueueConsumer(String msg) { LOGGER.info("dlx queue msg is : {} ", msg); }}```### Priority queue messages with high priority can be consumed first , however : It's just message stacking ( The speed of message transmission is faster than that of consumers ) In this case, priority makes sense .```javaMap argss = new HashMap ();argss.put("x-max-priority",10); // Queue maximum priority channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);```### Delay queue RabbitMQ Delay queues are not supported by itself . have access to TTL Combine DLX To achieve the delayed delivery of messages , Namely the DLX Bind to a queue , At the appointed time , After the message has expired , It will come from DLX Route to this queue , Consumers can take messages from this queue . Another way is to use rabbitmq-delayed-message-exchange Plug in . Of course , Store the information to be transmitted in the database , It is also possible to scan and then transfer using the task scheduling system .### The current limit of the server is in AutoACK For false In the case of , If a certain number of messages ( The passage is based on consumer perhaps channel Set Qos Value ) Before being confirmed , Don't consume new information .```javachannel.basicQos(2); // If more than 2 Message not sent ACK, The current consumer no longer accepts queued messages channel.basicConsume(QUEUE_NAME, false, consumer);```# RabbitMQ How to guarantee the reliability first needs to be clear , Efficiency and reliability cannot be achieved at the same time , If you want to make sure that every link is successful , It is bound to affect the efficiency of sending and receiving messages . If the real-time consistency requirements of some services are not particularly high , You can sacrifice some reliability for efficiency .![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174131405-1016863424.png)① Represents a message sent from the producer to Exchange;② Represents the message from Exchange Route to Queue;③ On behalf of the message in Queue Medium storage ;④ Subscribe on behalf of consumers Queue And consume information .###1、 Make sure the message is sent to RabbitMQ The server may be due to the network or Broker It's a problem that leads to ① Failure , The producer can't know if the message is sent to Broker Of . There are two solutions , The first is Transaction( Business ) Pattern , The second kind Confirm( Confirm ) Pattern . Through channel. txSelect Method to open the transaction , Then we can send a message to RabbitMQ 了 , If the transaction is committed successfully , Then the message must arrive RabbitMQ in , If before a transaction is committed for execution due to RabbitMQ Abnormal crash or other reasons throw abnormal , Then we can capture it , And then through execution channel. txRollback Method to implement transaction rollback . Using transaction mechanism will “ Suck it dry ”RabbitMQ Efficiency of the , It is generally not recommended to use . Producers call channel. confirmSelect Method ( namely Confirm. Select command ) Set the channel to confirm Pattern . Once the message has been delivered to all matching queues ,RabbitMQ A confirmation will be sent (Basic. Ack) To the producer ( The only one that contains the message ID), This makes the producer know that the message has arrived at the destination correctly .###2、 Ensure that messages are routed to the correct queue, possibly due to the wrong routing key , Or the queue does not exist , Or the wrong queue name causes ② Failure . Use mandatory Sum of arguments ReturnListener, The message can be returned to the producer when it cannot be routed . Another way is to use a backup switch (alternate-exchange), Messages that cannot be routed are sent to this switch .###3、 Ensure that messages are correctly stored in the queue, possibly due to system downtime 、 Restart 、 The message stored in the queue is lost due to shutdown, etc , namely ③ There's a problem .1、 Queue up 、 Switch 、 Persistence of information .2、 Make a cluster , Image queue . If you want to change the default configuration , We can do it in /etc/rabbitmq/ Create a rabbitmq.config Archives , The configuration information can be as specified json Rules to specify . As shown below :```shell[{ rabbit, [ { queue_index_embed_msgs_below, 4097 } ]}].``` Then restart rabbitmq Service (rabbitmqctl stop----> rabbitmq-server -detached). So we don't put queue_index_embed_msgs_below The larger the value of the argument, the better ? Definitely not rabbit_queue_index In order ( File name from 0 Start to accumulate ) To save the file , It ends with ".idx", Each segment file contains fixed SEGMENT_ENTRY_COUNT Records ,SEGMENT_ENTRY_COUNT The default value is 16384. Every rabbit_queue_index Maintain at least one segment file in memory when reading from disk , So set queue_index_embed_msgs_below You need to be very careful when it comes to value , A little bit of growth can also lead to an explosion in memory .** Relevant knowledge : Message storage mechanism ** Both persistent and non persistent messages can be written to disk .1、 Persistent messages are written to disk when they arrive in the queue , And if you can , Persistent messages also store a backup in memory , This can improve a certain efficiency , When memory is tight, it will be cleared from memory .2、 Non persistent messages are generally stored only in memory , When the memory is tight, it will be written to the disk , To save memory space . These two types of messages are both processed in RabbitmqMQ Of " Persistence layer " Finish in . The composition of the persistence layer is as follows :![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174143884-1808270239.png)rabbit_queue_index: Responsible for maintaining the information of the drop disk message in the queue , Including where messages are stored 、 Whether it has been delivered to the consumer 、 Whether it has been accepted by consumers ack. Each queue has a corresponding one rabbitmq_queue_index.rabbit_msg_store: Responsible for the storage of information , It is shared by all queues , There is and only one... In each node .rabbit_msg_store It can be subdivided :![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174154413-1340198602.png) msg_store_persisent : Responsible for the persistence of persistent messages , Restart won't be lost msg_store_transient : Responsible for persistence of non persistent messages , Restart will lose messages that can be stored in rabbit_queue_index Can also be stored in rabbit_msg_store in . The best configuration is to store smaller messages in rabbit_queue_index Medium and large messages are stored in rabbit_msg_store in . This message can be defined by queue_index_embed_msgs_below To configure , The default size is 4096, The unit is B. Note that the message size here refers to the message body 、 Properties and headers The size of the whole . When a message is less than the set size threshold, it can be stored in rabbit_queue_index in , In this way, we can get performance optimization . This storage mechanism is in Rabbitmq3.5 After the release of , The optimization improves the system performance 10% about .** Relevant knowledge : The structure of the queue **Rabbitmq The queue in is made up of two parts :rabbit_amqpqueue_process and backing_queue Make up :![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174206456-1443459475.png)**rabbit_amqpqueue_process:** Responsible for protocol related information processing , That is to receive the information released by the producer 、 Delivering information to consumers 、 Process the confirmation of the message ( Including the production side of confirm And consumer side ack) etc. .**backing_queue:** It is the concrete form and engine of information storage , And to rabbit_amqpqueue_process Provide relevant interfaces for calling . If the message transmission queue is empty and the queue has consumers , The message will not go through the queue but will be sent directly to the consumer , If it can't be consumed directly , The message needs to be temporarily queued , In order to resend . After the message is queued , There are mainly the following states :**alpha:** Message content ( Including the body of information 、 Properties and headers) And the message index are in memory ( Consumes the most memory ,CPU The consumption is the least )**beta:** The message content is stored on disk , The message index is stored in memory ( Just once IO Operation can read the message )**gamma:** The message content is stored on disk , The message index exists on both disk and memory ( Just once IO Operation can read the message )**delta:** The message content and the message index are on disk ( Minimum memory consumption , But it will consume more CPU And disk IO operation ) Persistent messages , Message content and message index must be stored on disk first , Will be in one of the above states ,gamma Only this state of information is persistent .Rabbitmq During execution, the maximum number of messages that can be stored in the current memory will be calculated periodically according to the statistical message transmission speed (target_ram_count), If alpha When the number of messages in the state is greater than this value , It will cause a state transition of the message , Extra information may be converted to beta Status 、gamma State or delta Status . Distinguish between this 4 The main function of each state is to satisfy different memory and CPU The needs of . For normal queues ,**backing_queue The internal implementation is through 5 Sub queue to reflect the state of the message **:Q1: Contains only alpha Status messages Q2: contain beta and gamma Message for Delta: contain delta Message for Q3: contain beta and gamma Message for Q4: Contains only alpha The state message is usually , The message follows Q1->Q2->Delta->Q3->Q4 Flow in this order , But not every message goes through all States , It depends on the current system load ( For example, non persistent messages when the memory load is not high , You don't go through delta). The benefits of this design : When the queue load is high , It can save memory space by storing some information on disk , And when the load drops , This part of the information is gradually returned to memory, and consumers get it , Make the whole queue have good flexibility .** Relevant knowledge :** ** State transition when consuming information ** Consumer information will also change , The process of state transition is as follows :1. Consumers should start with Q4 Get information , If successful, return .2. If Q4 It's empty , Then from Q3 Get information from , First of all, judge Q3 Is it empty , If it is empty, the return queue is empty , That is, there is no cancellation in the queue Rest 3. If Q3 Not empty , Take out Q3 Message for , Then judge Q3 and Delta The length in , If it's all empty , So Q2、Delta、Q3、 Q4 All empty , Directly Q1 Transfer the message in to Q4, Next time directly from Q4 Read the message in 4. If Q3 It's empty ,Delta Not empty , Will Delta Transfer the message in to Q3 in , Next time directly from Q3 Read .5. In sending messages from Delta Transfer to Q3 In the process of , Is to read by index segmentation , First read a paragraph , Then judge the number of messages read and Delta The number of messages , If equal , determine Delta No more information , Read directly Q2 And read the message and put it in Q3, If it's not equal , Only transfer this read message to Q3. ** Usually when the load is normal **, If a message is consumed at a rate not less than that of receiving a new message , For messages that don't need to be reliable , It's very likely that alpha Status . For durable Property is set to true Message for , It's bound to get into gamma Status , And it's opening publisher confirm Mechanism , Only then. gamma The message has been received only when the status is set , If information is consumed fast enough 、 There's plenty of memory , These messages will not go on to the next state . ** High load in the system **, If the received message cannot be consumed quickly , These messages are in the queue " Pile up ", Then at this time Rabbitmq It takes more time and resources to deal with " Pile up " Message for , This reduces the ability to process new incoming information , So that the incoming information is " Pile up " Continue to increase the average cost of processing each message , And then it gets worse and worse , The processing power of the system is greatly reduced . Common solutions to reduce the accumulation of information : 1、 increase prefetch_count Value , Set the maximum value for the consumer to store unacknowledged messages 2、 Consumers do multiple ack, Reduce ack The cost of it ** Relevant knowledge : Lazy queues ** By default , When a producer sends a message to Rabbitmq When , Messages in the queue are stored in memory as much as possible , It's faster to send messages to consumers . Even persistent messages , When it is written to disk, a backup will also reside in memory . Such a mechanism will occupy more system resources , After all, memory should be left more room for need . If the transmitter is too fast or the consumer is down , This leads to a huge backlog of information , At this time, the message is still stored in memory and disk , At the time of the information explosion ,MQ The server won't hold up , Affect the sending and receiving of messages in other queues , Can we deal with this situation effectively . answer Lazy queues . RabbitMQ From 3.6.0 The version begins to introduce lazy queues (Lazy Queue) The concept of . Lazy queues store received messages directly into the file system , Whether it's persistent or non persistent , This reduces memory consumption , But it will increase I/0 Use , If the message is persistent , So this is I/0 The operation is inevitable , Lazy queues and persistent messages are " Best partner ". Note that if the lazy queue holds non persistent messages , Memory usage will always be stable , But after the restart, the message will also be lost . The way to set a queue as an inert queue : ```java // Announcement queue @Bean(name = "direct.queue_03") public Queue commonQueue03() { QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_03"); queueBuilder.lazy(); // Make the queue lazy return queueBuilder.build(); } ``` ###4、 Ensure that the message is correctly delivered from the queue to the consumer. If the consumer receives the message and does not have time to process it, an exception occurs , Or an exception occurred during processing , Will lead to ④ Failure . To ensure that messages from the queue reach the consumer reliably ,RabbitMQ Provides a message confirmation mechanism (message acknowledgement). When the consumer subscribes to the queue , You can specify autoAck Arguments , When autoAck Equal to false When ,RabbitMQ It will wait for the consumer to explicitly reply the acknowledgement signal before removing the message from the queue . If information consumption fails , You can also call Basic. Reject perhaps Basic. Nack To reject the current message rather than confirm . If r equeue The argument is set to true, This message can be re queued , So that it can be sent to the next consumer ( Of course , When there's only one consumer , In this way, there may be infinite circle repeated consumption , Can be posted to a new queue , Or just print the exception log ).### 5、 Compensation mechanism for messages that have not been responded to for a certain period of time , You can set a mechanism for timing retransmission , But control the number of times , For example, resend at most 3 Time , Otherwise, it will cause information accumulation .### 6、 Message idempotency is not controlled by the server , It can only be controlled on the consumer side . How to avoid the repeated consumption of information ? There may be two reasons for the repetition of messages :1、 The producer's problem , Link ① Repeat the message , For example, when it's on Confirm Mode but no confirmation received .2、 Link ④ Something's wrong , As the consumer did not transmit ACK Or for other reasons , The message is delivered repeatedly . For repeated messages , A unique service can be generated for each message ID, To create a log or duplicate control tables .### 7、 The order of information means that the order of consumers' consumption is consistent with that of producers . stay RabbitMQ in , When a queue has multiple consumers , Because different consumers consume information at different speeds , The order cannot be guaranteed .# RabbitMQ How to ensure high availability ![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174221000-1005882058.png)## RabbittMQ Cluster cluster is mainly used to achieve high availability and load balancing .RabbitMQ Through /var/lib/r abbitmq/. erlang. cookie To verify your identity , Need to be consistent on all nodes . Clusters have two node types , One is the disk node , One is memory nodes . At least one disk node is required in the cluster for metadata persistence , If the type is not specified , Default to disk node . To pass through 25672 Port two to two communication , Ports that need to open firewalls . It should be noted that ,RabbitMQ Clustering can't be built on wan , Unless used feder ation perhaps shovel Wait for the plug-in . Cluster configuration steps :1、 To configure hosts2、 Sync erlang. cookie3、 Join the group ### Build in clusters ```shelldocker pull rabbitmq:3.6.10-management``````shelldocker run -di --network=docker-network --ip=172.19.0.50 --hostname=rabbitmq-node01 --name=rabbitmq_01 -p 15673:15672 -p 5673:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bashdocker run -di --network=docker-network --ip=172.19.0.51 --hostname=rabbitmq-node02 --name=rabbitmq_02 -p 15674:15672 -p 5674:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bashdocker run -di --network=docker-network --ip=172.19.0.52 --hostname=rabbitmq-node03 --name=rabbitmq_03 -p 15675:15672 -p 5675:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bash``` The argument says :Erlang Cookie The values must be the same , That is to say RABBITMQ_ERLANG_COOKIE Arguments must have the same value . Because RabbitMQ Yes, it is Erlang Realized ,Erlang Cookie It is equivalent to the secret key of communication between different nodes ,Erlang Nodes are switched Erlang Cookie Get certified .```shelldocker exec -itrabbitmq_01 /bin/bash``` To configure hosts Archives , Let each node recognize each other's existence . Edit in the system /etc/hosts Archives , newly added ip Address and node name mapping information (apt-get update , apt-get install vim):```shell172.19.0.50 rabbitmq-node01172.19.0.51 rabbitmq-node02172.19.0.52 rabbitmq-node03``` Start rabbitmq, And look at the State ```shell [email protected]:/# rabbitmq-server -detached # Start rabbitmq Service , This command can start erlang Virtual machines and rabbitmq Service [email protected]:/# rabbitmqctl status # View node information Status of [email protected][{pid,270},{running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.6.10"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},............. [email protected]:/# rabbitmqctl cluster_status # View cluster node status Cluster status of [email protected][{nodes,[{disc,[ [email protected]]}]},{running_nodes,[ [email protected]]}, # There is only one node in progress {cluster_name,<<" [email protected]">>},{partitions,[]},{alarms,[{ [email protected],[]}]}]``` Be careful : At this point, we can access rabbitmq Back end management system of , however rabbitmq The preset provides guest The user does not support remote access . So we need to create users , And authorize it ```shell [email protected]:/# rabbitmqctl add_user admin admin # New users , The user name is admin, Password is admin [email protected]:/# rabbitmqctl list_users # Look at rabbitmq A list of users of Listing usersadmin [] # admin The user has successfully added , But there's no role guest [administrator] [email protected]:/# rabbitmqctl set_user_tags admin administrator # to admin Users set administrator permissions # rabbitmqctl delete_user admin # Delete admin Users # rabbitmqctl stop_app # stop it rabbitmq Service # rabbitmqctl stop # Will rabbitmq Service and erlang Virtual machines shut down together ``` Again using admin Users can log in to web Management system . In other rabbitmq Also create users in , So that the back-end management system can be accessed later .### Configure clusters 1、 Sync cookie In a cluster Rabbitmq Nodes need to exchange key tokens for mutual authentication , If the node's key token is inconsistent , Then an error will be reported when configuring the node . Get... On a node /var/lib/rabbitmq/.erlang.cookie Archives , It is then copied to other nodes . We take the node01 Node as the benchmark , Do this .```shelldocker cprabbitmq_01:/var/lib/rabbitmq/.erlang.cookie .docker cp.erlang.cookie rabbitmq_02:/var/lib/rabbitmqdocker cp.erlang.cookie rabbitmq_03:/var/lib/rabbitmq```2、 Build a cluster relationship right now 3 Each node is executed independently , There's no connection between them . And then we're going to build 3 The relationship between them , We take the rabbitmq-node01 As a benchmark , Add the other two nodes . hold rabbitmq-node02 Join the node 1 in ```shell# Enter to rabbitmq-node02 in rabbitmqctl stop_app # Shut down rabbitmq Service rabbitmqctl reset # To reset rabbitmqctl join_cluster [email protected] # rabbitmq-node01 For nodes 1 Host name of rabbitmqctl start_app # Start rabbitmq Node ``` hold rabbitmq-node03 Join the node 1 in ```shell# Enter to rabbitmq-node03 in rabbitmqctl stop_app # Shut down rabbitmq Service rabbitmqctl reset # Clear the state of the node , And return it to a blank state , When the set node is part of the cluster , This command also communicates with the disk nodes in the cluster , Tell them that the node is leaving the cluster . Otherwise, the cluster will think that the node is dealing with a failure , And expect it to recover eventually rabbitmqctl join_cluster [email protected] # rabbitmq-node01 For nodes 1 Host name of rabbitmqctl start_app # Start rabbitmq Node ``` Enter the background management system to view the cluster Overview .### Node type ** Node type introduction ** In the use of rabbitmqctl cluster_status Command to view the cluster state [{nodes,[{disc[' [email protected]',' [email protected]',' [email protected]']} This information , Among them disc Marked with Rabbitmq Node type .Rabbitmq Every node in , Whether it's a single node system or part of a cluster or a memory node , Or disk node . Memory nodes will all queue up , Switch , Tie the knot 、 Users 、 Licensing and vhost All metadata definitions are stored in memory , The disk node stores the information on the disk . A single node cluster must have only disk type nodes , Otherwise, when it restarts Rabbitmq After that , All configuration information about the system will be lost . But in the cluster , You can choose to configure some nodes as memory nodes , This allows for higher efficiency .** Node type change ** If we don't specify the node type , Then the default is the disk node . When we add nodes , You can use the following command to specify the type of node as memory node :```shellrabbitmqctl join_cluster [email protected] --ram``` We can also use the following command to set a disk node as a memory node :```shellrabbitmqctl change_cluster_node_type {disc , ram}``` As shown below ```shell [email protected]:/# rabbitmqctl stop_app # Shut down rabbitmq Service Stopping rabbit application on node ' [email protected]' [email protected]:/# rabbitmqctl change_cluster_node_type ram # Will [email protected] Node type switches to memory node Turning ' [email protected]'into a ram node [email protected]:/# rabbitmqctl start_app # Start rabbitmq Service Starting node ' [email protected]' [email protected]:/# rabbitmqctl cluster_status # View cluster status Cluster status of node ' [email protected]'[{nodes,[{disc,[' [email protected]',' [email protected]']}, {ram,[' [email protected]']}]},{running_nodes,[' [email protected]',' [email protected]', ' [email protected]']},{cluster_name,<<" [email protected]">>},{partitions,[]},{alarms,[{' [email protected]',[]}, {' [email protected]',[]}, {' [email protected]',[]}]}] [email protected]:/#```** Node selection **Rabbitmq Only requires at least one disk node in the cluster , All other nodes can be memory nodes . When a node joins or leaves the cluster , They must notify at least one disk node of the change . If there is only one disk node , And unfortunately, it just crashed , So clusters can continue to receive and send messages . But queue creation cannot be performed , Switch , Tie the knot 、 User has changed permissions 、 Add and delete cluster nodes . That is to say 、 If the only disk node in the cluster crashes , Clustering can still keep execution , But know to restore the node to before clustering , You can't change anything , Therefore, when establishing a cluster, you should ensure that there are at least two or more disk nodes . When the memory node restarts , It will connect to the preconfigured disk node , Download a copy of the current cluster metadata . When a new memory node is added to the cluster , Be sure to tell all disk nodes ( The only metadata information that a memory node stores on disk is the address of the disk node ). As long as the memory node can find at least one disk node in the plex , Then it can rejoin the cluster after a reboot .## Clustering optimization :HAproxy Load +Keepalived Problems with clusters built before high availability : It does not have the ability of load balancing. The software of load balancing layer we selected this time is HAProxy. In order to ensure the high availability of load balancing layer , We need to use to keepalived Software , Use vrrp Protocols generate virtual ip Achieve dynamic ip Elegant .![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174232981-1618895931.png)```markdownkeepalived In order to VRRP Protocol based implementation of ,VRRP Full name Virtual Router Redundancy Protocol, Virtual routing is redundant protocol . Virtual routing redundancy protocol , It can be considered as a protocol to achieve high availability of routers , About to N Routers that provide the same function form a router group , There's one in this group master And multiple backup,master There's a service provider on it vip( The default route of other machines in the LAN where the router is located is vip),master Will define to backup Transmit vrrp Agreement package , When backup Don't get vrrp When you pack, you think master It's gone down , At this point, we need to base on VRRP There's an election priority backup When master. In this way, the router can be guaranteed to be highly available .```### The optimized implementation is installed on two memory nodes HAProxy```shellyum install haproxy``` Edit configuration profile ```shellvim /etc/haproxy/haproxy.cfgglobal log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon stats socket /var/lib/haproxy/statsdefaults log global option dontlognull option redispatch retries 3 timeout connect 10s timeout client 1m timeout server 1m maxconn 3000listen http_front mode http bind 0.0.0.0:1080 # Monitor port stats refresh 30s # Statistics page auto refresh time stats uri /haproxy?stats # Statistics page url stats realm Haproxy Manager # Statistics page password box prompt text stats auth admin:123456 # Statistics page user name and password settings listen rabbitmq_admin bind 0.0.0.0:15673 server node1 192.168.8.40:15672 server node2 192.168.8.45:15672listen rabbitmq_cluster 0.0.0.0:5673 mode tcp balance roundrobin timeout client 3h timeout server 3h timeout connect 3h server node1 192.168.8.40:5672 check inter 5s rise 2 fall 3 server node2 192.168.8.45:5672 check inter 5s rise 2 fall 3``` Start HAproxy```shellhaproxy -f /etc/haproxy/haproxy.cfg``` Install keepalived```shellyum -y install keepalived``` Modify configuration file ```shellvim /etc/keepalived/keepalived.confglobal_defs { notification_email { [email protected] [email protected] [email protected] } notification_email_from [email protected] smtp_server 192.168.200.1 smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # Comment out , Otherwise I can't visit VIP vrrp_garp_interval 0 vrrp_gna_interval 0}global_defs { notification_email { [email protected] [email protected] [email protected] } notification_email_from [email protected] smtp_server 192.168.200.1 smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # Comment out , Otherwise I can't visit VIP vrrp_garp_interval 0 vrrp_gna_interval 0}# Detection task vrrp_script check_haproxy { # Testing HAProxy Jianben script "/etc/keepalived/script/check_haproxy.sh" # Every two seconds interval 2 # Weight weight 2}# Virtual groups vrrp_instance haproxy { state MASTER # Here is ` Lord `, The standby machine is `BACKUP` interface ens33 # Physical network card , It depends on the situation mcast_src_ip 192.168.8.40 # Current host ip virtual_router_id 51 # Virtual routing id, You need the same... In the same group priority 100 # The priority of the host computer is higher than that of the standby machine advert_int 1 # Heart rate , Unit : second authentication { # Certification , The same in the group auth_type PASS auth_pass 1111 } # Call code track_script { check_haproxy } # Virtual ip, Multiple line breaks virtual_ipaddress { 192.168.8.201 }}``` Start ```shellkeepalived -D```##RabbitMQ Image queue 1、 Why do image queues exist? To ensure high availability of queues and messages 2、 What is image queue , How the image queue selects the primary node ? The mechanism of introducing image queuing , Queues can be mapped to other Broker Above the nodes , If a node in the cluster fails , The queue can automatically switch to another node in the image to ensure the availability of services . In common usage , Queue for each configuration image ( It is called image queue ) All contain a master node (master) And several slave nodes (slave), As shown in the figure below : In cluster mode , Queues and messages cannot be synchronized between nodes , So you need to use RabbitMQ Image queue mechanism for synchronization .![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174244706-400264268.png) Learn more about reference articles :https://blog.csdn.net/u013256816/article/details/71097186# RabbitMQ I work in e-commerce company , In the background of the e-commerce seckilling scene , Explain RabbitMQ The practice of .## Scene : When the order is not paid, the inventory will be returned when the user's second kill is successful , You need to guide users to the order page to pay . If the user is within the specified time (30 Minutes ), The payment of the order has not been completed , At this point, we need to carry out the inventory rollback operation .### Architecture diagram ![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174256618-355184951.png) The specific implementation is to use the dead letter queue , You can refer to the code above .## Scene :RabbitMQ Second kill fairness ensures the reliability of information transmission can ensure the fairness of the second kill service . About the fairness of second kill business , One more thing we need to think about : Sequence of messages ( First queued messages are processed first )**RabbitMQ The sequential description of the message ** Sequence : The ordering of messages means that the order of messages consumed by consumers is consistent with that of messages released by senders . For example , It doesn't take into account the repetition of messages , If the message released by the producer is msgl、msg2、msg3, Then consumers must also follow msgl、msg2、msg3 The order of consumption . Now a lot of information shows that RabbitMQ The message can guarantee sequence , This is not true , Or this view has great limitations . Without using any RabbitMQ The higher-order characteristics of , And there's no loss of information 、 Abnormal conditions such as network failure occur , And with only one consumer , In the case of only one producer, the order of messages can be guaranteed . If there are multiple producers sending messages at the same time , Unable to confirm that the message arrived Broker The order before and after , It's impossible to verify the order of the message , Because every message is sent in its own thread .**RabbitMQ The message is out of order ** Producers send messages :1、 No producer confirmation mechanism is used , Single producer and single consumer can guarantee the order of information 2、 The producer confirmation mechanism is used , Then there is no guarantee that the message will arrive Broker The order before and after , Because messages are sent asynchronously , Each thread runs at a different time 3、 The production side uses transaction mechanism , To ensure the sequence of information, the consumer side consumption information :1、 A single consumer can guarantee the sequence of messages 2、 Multiple consumers can't guarantee the order of messages , Because each message is consumed in its own thread , Each thread runs at a different time **RabbitMQ Information sequence assurance ** The transaction mechanism is enabled on the production side , Single producer, single consumer . If we don't think about message arrival MQ The order of , It's just about thinking about having reached MQ Order of information consumption in , Then we need to ensure that consumers are single consumers .## Scene :RabbitMQ No oversold guarantee to ensure that the second is not oversold , We need to think about a lot of things . such as : When withholding inventory , We need to think about not overselling , We also need to consider not oversold when we reduce the real inventory of the database . And for us mq In terms of this link , To ensure that we don't oversold, we just need to ensure that the information is not consumed repeatedly . First of all, we can confirm that , The conditions for triggering repeated execution of messages can be very harsh ! That is to say This condition is not triggered in most scenarios !!! It usually happens when the task is over time , Or not back in time , Causes the task to re queue , Because the server did not receive from the consumer ack Response , So the message will be re delivered .### Idempotent guarantee scheme, repeated consumption is not terrible , The terrible thing is that you don't take into account the cost of repeat consumption , How to guarantee idempotency . So called idempotence , That is, the result of multiple calls to the interface is consistent with that of one call . Popular point , Just one piece of data , Or a request , I'll repeat it for you many times , You have to make sure that the corresponding data is not changed , Don't make mistakes . For example : Suppose you have a system , Consume a message and insert a piece of data into the database , If you repeat a message twice , You just inserted two , This information is wrong ? But if you spend the second time , Judge for yourself whether it has been consumed , If you throw it directly , In this way, a piece of information is kept , So as to ensure the correctness of the data . A piece of data is consumed twice , There's only one piece of data in the database , This guarantees the idempotency of the system . How to ensure the idempotency of message queuing consumption ? This needs to be handled in combination with the actual business :1、 For example, the data we consume need to be written into a database , You can check it according to the primary key , If all this information is available , You don't have to plug in , Perform the following update operation 2、 For example, we need to write about consumption information Redis, That's OK , Anyway, every time set, Natural idempotency .3、 For example, you are not in the above two scenes , It's a little more complicated , When you need the producer to send each piece of data , Add a globally unique id, Like an order id Things like that , And then when you get there , Based on this id Go for example Redis Look it up in the library , Have you consumed it before ? If you haven't consumed , You deal with it , And then this id Write Redis. If you spend too much , Then you don't have to deal with it , Make sure you don't repeat the same message .4、 For example, the unique key of the database is used to ensure that duplicate data will not be inserted repeatedly . Because there's only one key constraint , Duplicate data insertion will only report an error , It will not cause dirty data in the database .# Interview questions 1、 The role and usage scenarios of message queuing ?2、 How to set up queues and switches ?3、 When multiple consumers monitor a producer , How messages are distributed ?4、 Messages that cannot be routed , Where did you go ?5、 When will the message become Dead Letter( Dead letter )?6、RabbitMQ How to implement delay queue ?7、 How to ensure the reliable delivery of information ?8、 How to limit the current between the server and the consumer ?9、 How to ensure the sequence of messages ?10、RabbitMQ The node of

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201225002517349t.html

Scroll to Top