编程知识 cdmana.com

Java進階專題(二十) 訊息中介軟體架構體系(2)-- RabbitMQ研究

# 前言接上文,這個繼續介紹RabbitMQ,並理解其底層原理。# 介紹RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協議)協議實現的訊息佇列。為什麼使用RabbitMQ呢?1、使得簡單,功能強大。2、基於AMQP協議。3、社群活躍,文件完善。4、高併發效能好,這主要得益於Erlang語言。5、Spring Boot預設已整合RabbitMQ# AMQP協議##AMQP基本介紹AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體同產品、不同的開發語言等條件的限制。AMQP的實現有:RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、ØMQ、Zyre等。目前Rabbitmq最新的版本預設支援的是AMQP 0-9-1,該協議總共包含了3部分:**Module Layer**: 位於協議的最高層,主要定義了一些供客戶端呼叫的命令,客戶端可以利用這些命令實現自定義的業務邏輯。例如,客戶端可以是使用Queue.Declare命令宣告一個佇列或者使用Basic.Consume訂閱消費一個佇列中的訊息。**Session Layer**: 位於中間層,主要負責將客戶端的命令傳送給服務端,在將服務端的應答返回給客戶端,主要為客戶端與伺服器之間的通訊提供可靠性的同步機制和錯誤處理。**Transport Layer**: 位於最底層,主要傳輸二進位制資料流,提供幀的處理、通道的複用、錯誤檢查和資料表示等。## AMQP生產者流轉過程當客戶端與Broker建立連線的時候,客戶端會向Broker傳送一個Protocol Header 0-9-1的報文頭,以此通知Broker本次互動才採用的是AMQP 0-9-1協議。緊接著Broker返回Connection.Start來建立連線,在連線的過程中涉及Connection.Start/.Start-OK、Connection. Tune/. Tune-OK、Connection.Open/.Open-OK這6個命令的互動。連線建立以後需要建立通道,會使用到Channel.Open , Channel.Open-OK命令,在進行交換機宣告的時候需要使用到Exchange.Declare以及Exchange.Declare-OK的命令。以此類推,在宣告佇列以及完成佇列和交換機的繫結的時候都會使用到指定的命令來完成。在進行訊息傳送的時候會使用到Basic.Publish命令完成,這個命令還包含了Conetent-Header和Content-Body。Content Header裡面包含的是訊息體的屬性,Content-Body包含了訊息體本身。## AMQP消費者流轉過程消費者消費訊息的時候,所涉及到的命令和生成者大部分都是相同的。在原有的基礎之上,多個幾個命令:Basic.Qos/.Qos-OK以及Basic.Consume和Basic.Consume-OK。其中Basic.Qos/.Qos-OK這兩個命令主要用來確認消費者最大能保持的未確認的訊息數時使用。Basic.Consume和Basic.Consume-OK這兩個命令主要用來進行訊息消費確認。#RabbitMQ的特性RabbitMQ使用Erlang語言編寫,使用Mnesia資料庫儲存訊息。(1)可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。(2)靈活的路由(Flexible Routing) 在訊息進入佇列之前,通過Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的Exchange 來實現。針對更復雜的路由功能,可以將多個Exchange 繫結在一起,也通過外掛機制實現自己的Exchange 。(3)訊息叢集(Clustering) 多個RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯Broker 。(4)高可用(Highly Available Queues) 佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。(5)多種協議(Multi-protocol) RabbitMQ 支援多種訊息佇列協議,比如AMQP、STOMP、MQTT 等等。(6)多語言客戶端(Many Clients) RabbitMQ 幾乎支援所有常用語言,比如Java、.NET、Ruby、PHP、C#、JavaScript 等等。(7)管理介面(Management UI) RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息、叢集中的節點。(8)外掛機制(Plugin System)RabbitMQ提供了許多外掛,以實現從多方面擴充套件,當然也可以編寫自己的外掛。# 工作模型![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224173948156-417219023.png)**名詞解釋****Broker** :即RabbitMQ的實體伺服器。提供一種傳輸服務,維護一條從生產者到消費者的傳輸線路,保證訊息資料能按照指定的方式傳輸。**Exchange :**訊息交換機。指定訊息按照什麼規則路由到哪個佇列Queue。**Queue :**訊息佇列。訊息的載體,每條訊息都會被投送到一個或多個佇列中。**Binding :**繫結。作用就是將Exchange和Queue按照某種路由規則繫結起來。**Routing Key:**路由關鍵字。Exchange根據Routing Key進行訊息投遞。定義繫結時指定的關鍵字稱為Binding Key。**Vhost:**虛擬主機。一個Broker可以有多個虛擬主機,用作不同使用者的許可權分離。一個虛擬主機持有一組Exchange、Queue和Binding。**Producer:**訊息生產者。主要將訊息投遞到對應的Exchange上面。一般是獨立的程式。**Consumer:**訊息消費者。訊息的接收者,一般是獨立的程式。**Connection:**Producer 和Consumer 與Broker之間的TCP長連線。**Channel:**訊息通道,也稱通道。在客戶端的每個連線裡可以建立多個Channel,每個Channel代表一個會話任務。在RabbitMQ Java Client API中,channel上定義了大量的程式設計介面。## 交換機型別**Direct Exchange 直連交換機**定義:直連型別的交換機與一個佇列繫結時,需要指定一個明確的binding key。路由規則:傳送訊息到直連型別的交換機時,只有routing key跟binding key完全匹配時,繫結的佇列才能收到訊息。![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224173959435-528722340.png)**Topic Exchange 主題交換機**定義:主題型別的交換機與一個佇列繫結時,可以指定按模式匹配的routing key。萬用字元有兩個,*代表匹配一個單詞。#代表匹配零個或者多個單詞。單詞與單詞之間用 . 隔開。路由規則:傳送訊息到主題型別的交換機時,routing key符合binding key的模式時,繫結的佇列才能收到訊息。```java// 只有佇列1能收到訊息channel.basicPublish("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes()); // 佇列2和佇列3能收到訊息channel.basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes()); // 只有佇列4能收到訊息channel.basicPublish("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getBytes()); ```![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174011404-960730822.png)**Fanout Exchange 廣播交換機**定義:廣播型別的交換機與一個佇列繫結時,不需要指定binding key。路由規則:當訊息傳送到廣播型別的交換機時,不需要指定routing key,所有與之繫結的佇列都能收到訊息。![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174022306-239176582.png)#RabbitMq安裝下載映象```shelldocker pull rabbitmq```建立並啟動容器```shelldocker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq```- -d 後臺執行容器;- --name 指定容器名;- -p 指定服務執行的埠(5672:應用訪問埠;15672:控制檯Web埠號);- -v 對映目錄或檔案;- --hostname 主機名(RabbitMQ的一個重要注意事項是它根據所謂的 “節點名稱” 儲存資料,預設為主機名);- -e 指定環境變數;(RABBITMQ_DEFAULT_VHOST:預設虛擬機器名;RABBITMQ_DEFAULT_USER:預設的使用者名稱;RABBITMQ_DEFAULT_PASS:預設使用者名稱的密碼)啟動rabbitmq後臺管理服務```shelldocker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management```訪問後臺頁面:```shellhttp://127.0.0.1:15672 初始密碼: admin admin```![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174033954-241083311.png)# RabbitMQ快速入門maven依賴```xml org.springframework.boot spring-boot-starter 2.3.0.RELEASE org.springframework.boot spring-boot-starter-web 2.3.0.RELEASE org.springframework.boot spring-boot-starter-amqp 2.3.0.RELEASE ```rabbitmq配置類```java/** * @author 原 * @date 2020/12/22 * @since 1.0 **/@Configurationpublic class RabbitConfig { public static final String EXCHANGE_TOPICS_INFORM = "exchange_topic_inform"; public static final String QUEUE_SMS = "queue_sms"; public static final String QUEUE_EMAIL = "queue_email"; @Bean public Exchange getExchange() { //durable(true)持久化,訊息佇列重啟後交換機仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } @Bean("queue_sms") public Queue getQueueSms(){ return new Queue(QUEUE_SMS); } @Bean("queue_email") public Queue getQueueEmail(){ return new Queue(QUEUE_EMAIL); } @Bean public Binding bindingSms(@Qualifier("queue_sms") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.sms").noargs(); } @Bean public Binding bindingEmail(@Qualifier("queue_email") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.email").noargs(); }}```生產者```java@Servicepublic class RabbitmqProviderService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessageSms(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",message); } public void sendMessageEmail(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.email",message); }}```消費者```java@Componentpublic class RabbitMqConsumer { @RabbitListener(queues = {RabbitConfig.QUEUE_EMAIL}) public void listenerEmail(String message, Message msg , Channel channel) { System.out.println("EMAIL:"+message); System.out.println(msg); System.out.println(channel); } @RabbitListener(queues = {RabbitConfig.QUEUE_SMS}) public void listenerSms(String message) { System.out.println("SMS:"+message); }}```啟動類```java/** * @author 原 * @date 2020/12/22 * @since 1.0 **/@SpringBootApplication@EnableRabbitpublic class RabbitMqApplicaiton { public static void main(String[] args) { ResourceLoader resourceLoader = new DefaultResourceLoader(RabbitMqApplicaiton.class.getClassLoader()); try { String path = resourceLoader.getResource("classpath:").getURL().getPath(); System.out.println(path); } catch (IOException e) { e.printStackTrace(); } SpringApplication.run(RabbitMqApplicaiton.class, args); }}```web```java@RestControllerpublic class DemoController { @Autowired RabbitmqProviderService rabbitmqProviderService; @RequestMapping("/sms") public void sendMsgSms(String msg) { rabbitmqProviderService.sendMessageSms(msg); } @RequestMapping("/eamil") public void sendMsgEmail(String msg) { rabbitmqProviderService.sendMessageEmail(msg); }}```通過頁面傳送訊息:http://localhost:44000/sms?msg=1111![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174046074-1785884115.png)http://localhost:44000/email?msg=1111![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174057138-36286941.png)# RabbitMQ進階用法## TTL#### 訊息設定過期時間```javaMessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("30000");Message msg = new Message("訊息內容".getBytes(),messageProperties);//如果訊息沒有及時消費,那麼經過30秒以後,訊息變成死信,Rabbitmq會將這個訊息直接丟棄。rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",msg);```####佇列設定過期時間```javaQueue queue = QueueBuilder.durable(QUEUE_SMS).ttl(30000).build();```### 死信佇列當一個訊息變成死信了以後,預設情況下這個訊息會被mq刪除。如果我們給佇列指定了"死信交換機"(DLX:Dead-Letter-Exchange),那麼此時這個訊息就會轉發到死信交換機,進而被與死信交換機繫結的佇列(死信佇列)進行消費。從而實現了延遲訊息傳送的效果。有三種情況訊息會進入DLX(Dead Letter Exchange)死信交換機。1、(NACK || Reject ) && requeue == false2、訊息過期3、佇列達到最大長度,可以通過x-max-length引數來指定佇列的長度,如果不指定,可以認為是無限長(先入隊的訊息會被髮送到DLX)![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174117752-1950820659.png)1、宣告死信交換機、死信佇列、死信交換機和死信佇列的繫結```java// 宣告死信交換機@Bean(name = "dlx.exchange")public Exchange dlxExchange() {returnExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ;}// 宣告死信佇列@Bean(name = "dlx.queue")public Queue dlxQueue() { return QueueBuilder.durable("dlx.queue").build() ;}// 完成死信佇列和死信交換機的繫結@Beanpublic Binding dlxQueueBindToDlxExchange(@Qualifier(value ="dlx.exchange") Exchange exchange, @Qualifier(value = "dlx.queue")Queue queue) { returnBindingBuilder.bind(queue).to(exchange).with("delete").noargs() ;}```2、將死信佇列作為普通佇列的屬性設定過去```java// 宣告佇列@Bean(name = "direct.queue_02")public Queue commonQueue02() { QueueBuilder queueBuilder =QueueBuilder.durable("direct.queue_02"); queueBuilder.deadLetterExchange("dlx.exchange") ; // 將死信交換機作為普通佇列的屬性設定過去 queueBuilder.deadLetterRoutingKey("delete") ; // 設定訊息的routingKey // queueBuilder.ttl(30000) ; // 設定佇列訊息的過期時間,為30秒 // queueBuilder.maxLength(2) ; // 設定佇列的最大長度 return queueBuilder.build() ;}```3、消費端進行同樣的設定,並且指定消費死信佇列```java@Componentpublic class RabbitmqDlxQueueConsumer{ // 建立日誌記錄器 private static final Logger LOGGER =LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ; @RabbitListener(queues = "dlx.queue") public void dlxQueueConsumer(String msg) { LOGGER.info("dlx queue msg is : {} ", msg); }}```### 優先佇列優先順序高的訊息可以優先被消費,但是:只有訊息堆積(訊息的傳送速度大於消費者的消費速度)的情況下優先順序才有意義。```javaMap argss = new HashMap ();argss.put("x-max-priority",10); // 佇列最大優先順序channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);```### 延遲佇列RabbitMQ本身不支援延遲佇列。可以使用TTL結合DLX的方式來實現訊息的延遲投遞,即把DLX跟某個佇列繫結,到了指定時間,訊息過期後,就會從DLX路由到這個佇列,消費者可以從這個佇列取走訊息。另一種方式是使用rabbitmq-delayed-message-exchange外掛。當然,將需要傳送的資訊儲存在資料庫,使用任務排程系統掃描然後傳送也是可以實現的。###服務端限流在AutoACK為false的情況下,如果一定數目的訊息(通過基於consumer或者channel設定Qos的值)未被確認前,不進行消費新的訊息。```javachannel.basicQos(2); // 如果超過2條訊息沒有傳送ACK,當前消費者不再接受佇列訊息channel.basicConsume(QUEUE_NAME, false, consumer);```# RabbitMQ如何保證可靠性首先需要明確,效率與可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對訊息的收發效率造成影響。如果是一些業務實時一致性要求不是特別高的場合,可以犧牲一些可靠性來換取效率。![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174131405-1016863424.png)① 代表訊息從生產者傳送到Exchange;② 代表訊息從Exchange路由到Queue;③ 代表訊息在Queue中儲存;④ 代表消費者訂閱Queue並消費訊息。###1、確保訊息傳送到RabbitMQ伺服器可能因為網路或者Broker的問題導致①失敗,而生產者是無法知道訊息是否正確傳送到Broker的。有兩種解決方案,第一種是Transaction(事務)模式,第二種Confirm(確認)模式。在通過channel. txSelect方法開啟事務之後,我們便可以釋出訊息給RabbitMQ了,如果事務提交成功,則訊息定到達了RabbitMQ中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因丟擲異常,這個時候我們便可以將其捕獲,進而通過執行channel. txRollback方法來實現事務回滾。使用事務機制的話會“吸乾”RabbitMQ效能,一般不建議使用。生產者通過呼叫channel. confirmSelect方法(即Confirm. Select命令)將通道設定為confirm模式。一旦訊息投遞到所有匹配的佇列之後,RabbitMQ就會發送一個確認(Basic. Ack)給生產者(包含訊息的唯一ID),這就使得生產者知曉訊息已經正確到達了目的地了。###2、確保訊息路由到正確的佇列可能因為路由關鍵字錯誤,或者佇列不存在,或者佇列名稱錯誤導致②失敗。使用mandatory引數和ReturnListener,可以實現訊息無法路由的時候返回給生產者。另一種方式就是使用備份交換機(alternate-exchange),無法路由的訊息會發送到這個交換機上。###3、確保訊息在佇列正確地儲存可能因為系統宕機、重啟、關閉等等情況導致儲存在佇列的訊息丟失,即③出現問題。1、做佇列、交換機、訊息的持久化。2、做叢集,映象佇列。如果想更改這個預設的配置,我們可以在/etc/rabbitmq/目錄下建立一個rabbitmq.config檔案,配置資訊可以按照指定的json規則進行指定。如下所示:```shell[{ rabbit, [ { queue_index_embed_msgs_below, 4097 } ]}].```然後重啟rabbitmq服務(rabbitmqctl stop----> rabbitmq-server -detached)。那麼我們是不是把queue_index_embed_msgs_below引數的值調節的越大越好呢?肯定不是的rabbit_queue_index中以順序(檔名從0開始累加)的段檔案來進行儲存,字尾為".idx",每個段檔案中包含固定的SEGMENT_ENTRY_COUNT條記錄,SEGMENT_ENTRY_COUNT預設值為16384。每個rabbit_queue_index從磁碟中讀取訊息的時候至少在記憶體中維護一個段檔案,所以設定queue_index_embed_msgs_below值的時候需要格外謹慎,一點點增大也可能會引起記憶體爆炸式增長。**相關知識:訊息儲存機制**不管是持久化的訊息還是非持久化的訊息都可以被寫入到磁碟。1、持久化的訊息在到達佇列時就被寫入到磁碟,並且如果可以,持久化的訊息也會在記憶體中儲存一份備份,這樣可以提高一定的效能,當記憶體吃緊的時候會從記憶體中清除。2、非持久化的訊息一般只儲存在記憶體中,在記憶體吃緊的時候會被寫入到磁碟中,以節省記憶體空間。這兩種型別的訊息的落盤處理都在RabbitmqMQ的"持久層"中完成。持久層的組成如下所示:![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174143884-1808270239.png)rabbit_queue_index:負責維護佇列中的落盤訊息的資訊,包括訊息的儲存地點、是否已被交付給消費者、是否已被消費者ack。每一個佇列都有與之對應的一個rabbitmq_queue_index。rabbit_msg_store: 負責訊息的儲存,它被所有的佇列共享,在每個節點中有且只有一個。rabbit_msg_store可以在進行細分:![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174154413-1340198602.png) msg_store_persisent :負責持久化訊息的持久化,重啟不會丟失 msg_store_transient :負責非持久化訊息的持久化,重啟會丟失訊息可以儲存在rabbit_queue_index中也可以儲存在rabbit_msg_store中。最佳的配置是較小的訊息儲存在rabbit_queue_index中而較大的訊息儲存在rabbit_msg_store中。這個訊息的界定可以通過queue_index_embed_msgs_below來配置,預設大小為4096,單位為B。注意這裡的訊息大小是指訊息體、屬性以及headers整體的大小。當一個訊息小於設定的大小閾值時就可以儲存在rabbit_queue_index中,這樣可以得到效能上的優化。這種儲存機制是在Rabbitmq3.5 版本以後引入的,該優化提高了系統性能10%左右。**相關知識: 佇列的結構**Rabbitmq中佇列的是由兩部分組成:rabbit_amqpqueue_process和backing_queue組成:![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174206456-1443459475.png)**rabbit_amqpqueue_process:** 負責協議相關的訊息處理,即接收生產者釋出的訊息、向消費者交付訊息、處理訊息的確認(包括生產端的confirm和消費端的ack)等。**backing_queue:** 是訊息儲存的具體形式和引擎,並向rabbit_amqpqueue_process提供相關的介面以供呼叫。如果訊息傳送的佇列是空的且佇列有消費者,該訊息不會經過該佇列而是直接發往消費者,如果無法直接被消費,則需要將訊息暫存入佇列,以便重新投遞。訊息在存入佇列後,主要有以下幾種狀態:**alpha:**訊息內容(包括訊息體、屬性和headers)和訊息索引都存在記憶體中(消耗記憶體最多,CPU消耗最少)**beta:**訊息內容儲存在磁碟中,訊息索引都存在記憶體中(只需要一次IO操作就可以讀取到訊息)**gamma:**訊息內容儲存在磁碟中,訊息索引在磁碟和記憶體中都存在(只需要一次IO操作就可以讀取到訊息)**delta:**訊息內容和訊息索引都在磁碟中(消耗記憶體最小,但是會消耗更多的CPU和磁碟的IO操作)持久化的訊息,訊息內容和訊息索引必須先儲存在磁碟中,才會處於上面狀態中的一種,gamma狀態只有持久化的訊息才有這種狀態。Rabbitmq在執行時會根據統計的訊息傳送速度定期計算一個當前記憶體中能夠儲存的最大訊息數量(target_ram_count), 如果alpha狀態的訊息數量大於此值時,就會引起訊息的狀態轉換,多餘的訊息可能會轉換到beta狀態、gamma狀態或者delta狀態。區分這4種狀態的主要作用是滿足不同的記憶體和CPU 的需求。對於普通佇列而言,**backing_queue內部的實現是通過5個子佇列來體現訊息的狀態的**:Q1:只包含alpha狀態的訊息Q2:包含beta和gamma的訊息Delta:包含delta的訊息Q3:包含beta和gamma的訊息Q4:只包含alpha狀態的訊息一般情況下,訊息按照Q1->Q2->Delta->Q3->Q4這樣的順序進行流動,但並不是每一條訊息都會經歷所有狀態,這取決於當前系統的負載情況(比如非持久化的訊息在記憶體負載不高時,就不會經歷delta)。如此設計的好處:可以在佇列負載很高的情況下,能夠通過將一部分訊息由磁碟儲存來節省記憶體空間,而在負載降低的時候,這部分訊息又漸漸回到記憶體被消費者獲取,使得整個佇列具有良好的彈性。**相關知識:** **消費訊息時的狀態轉換**消費者消費訊息也會引起訊息狀態的轉換,狀態轉換的過程如下所示:1. 消費者消費時先從Q4獲取訊息,如果獲取成功則返回。2. 如果Q4為空,則從Q3中獲取訊息,首先判斷Q3是否為空,如果為空返回佇列為空,即此時佇列中無消 息3. 如果Q3不為空,取出Q3的訊息,然後判斷Q3和Delta中的長度,如果都為空,那麼Q2、Delta、Q3、 Q4都為空,直接將Q1中的訊息轉移至Q4,下次直接從Q4中讀取訊息4. 如果Q3為空,Delta不為空,則將Delta中的訊息轉移至Q3中,下次直接從Q3中讀取。5. 在將訊息從Delta轉移至Q3的過程中,是按照索引分段讀取,首先讀取某一段,然後判斷讀取的訊息個數和Delta訊息的個數,如果相等,判定Delta已無訊息,直接將讀取Q2和讀取到訊息一併放入Q3,如果不相等,僅將此次讀取的訊息轉移到Q3。 **通常在負載正常時**,如果訊息被消費的速度不小於接收新訊息的速度,對於不需要保證可靠性的訊息來說,極有可能只會處於alpha狀態。對於durable屬性設定為true的訊息,它一定會進入gamma狀態,並且在開啟publisher confirm機制時,只有到了gamma狀態時才會確認該訊息己被接收,若訊息消費速度足夠快、記憶體也充足,這些訊息也不會繼續走到下一個狀態。 **在系統負載較高中**,已經收到的訊息若不能很快被消費掉,就是這些訊息就是在佇列中"堆積", 那麼此時Rabbitmq就需要花更多的時間和資源處理"堆積"的訊息,如此用來處理新流入的訊息的能力就會降低,使得流入的訊息又被"堆積"繼續增大處理每個訊息的平均開銷,繼而情況變得越來越惡化,使得系統的處理能力大大降低。 減少訊息堆積的常見解決方案: 1、增加prefetch_count的值,設定消費者儲存未確認的訊息的最大值 2、消費者進行multiple ack,降低ack帶來的開銷 **相關知識: 惰性佇列** 預設情況下,當生產者將訊息傳送到Rabbitmq的時候,佇列中的訊息會盡可能地儲存在記憶體中,這樣可以更快地將訊息傳送給消費者。即使是持久化的訊息,在被寫入磁碟的同時也會在記憶體中駐留一份備份。這樣的機制無形會佔用更多系統資源,畢竟記憶體應該留給更多有需要的地方。如果傳送端過快或消費端宕機,導致訊息大量積壓,此時訊息還是在記憶體和磁碟各儲存一份,在訊息大爆發的時候,MQ伺服器會撐不住,影響其他佇列的訊息收發,能不能有效的處理這種情況呢。答案 惰性佇列。 RabbitMQ從3.6.0版本開始引入了惰性佇列(Lazy Queue)的概念。惰性佇列會將接收到的訊息直接存入檔案系統中,而不管是持久化的或者是非持久化的,這樣可以減少了記憶體的消耗,但是會增加I/0的使用,如果訊息是持久化的,那麼這樣的I/0操作不可避免,惰性佇列和持久化的訊息可謂是"最佳拍檔"。注意如果惰性佇列中儲存的是非持久化的訊息,記憶體的使用率會一直很穩定,但是重啟之後訊息一樣會丟失。 把一個佇列設定成惰性佇列的方式: ```java // 宣告佇列 @Bean(name = "direct.queue_03") public Queue commonQueue03() { QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_03"); queueBuilder.lazy(); // 把佇列設定成惰性佇列 return queueBuilder.build(); } ``` ###4、確保訊息從佇列正確地投遞到消費者如果消費者收到訊息後未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。為了保證訊息從佇列可靠地達到消費者,RabbitMQ提供了訊息確認機制(message acknowledgement)。消費者在訂閱佇列時,可以指定autoAck引數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從佇列中移去訊息。如果訊息消費失敗,也可以呼叫Basic. Reject或者Basic. Nack來拒絕當前訊息而不是確認。如果r equeue引數設定為true,可以把這條訊息重新存入佇列,以便發給下一個消費者(當然,只有一個消費者的時候,這種方式可能會出現無限迴圈重複消費的情況,可以投遞到新的佇列中,或者只打印異常日誌)。### 5、補償機制對於一定時間沒有得到響應的訊息,可以設定一個定時重發的機制,但要控制次數,比如最多重發3次,否則會造成訊息堆積。### 6、訊息冪等性服務端是沒有這種控制的,只能在消費端控制。如何避免訊息的重複消費?訊息重複可能會有兩個原因:1、生產者的問題,環節①重複傳送訊息,比如在開啟了Confirm模式但未收到確認。2、環節④出了問題,由於消費者未傳送ACK或者其他原因,訊息重複投遞。對於重複傳送的訊息,可以對每一條訊息生成一個唯一的業務ID,通過日誌或者建表來做重複控制。### 7、訊息的順序性訊息的順序性指的是消費者消費的順序跟生產者產生訊息的順序是一致的。在RabbitMQ中,一個佇列有多個消費者時,由於不同的消費者消費訊息的速度是不一樣的,順序無法保證。# RabbitMQ如何保證高可用![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174221000-1005882058.png)## RabbittMQ叢集叢集主要用於實現高可用與負載均衡。RabbitMQ通過/var/lib/r abbitmq/. erlang. cookie來驗證身份,需要在所有節點上保持一致。叢集有兩種節點型別,一種是磁碟節點,一種是記憶體節點。叢集中至少需要一個磁碟節點以實現元資料的持久化,未指定型別的情況下,預設為磁碟節點。叢集通過25672埠兩兩通訊,需要開放防火牆的埠。需要注意的是,RabbitMQ叢集無法搭建在廣域網上,除非使用feder ation或者shovel等外掛。叢集的配置步驟:1、配置hosts2、同步erlang. cookie3、加入叢集### 叢集搭建```shelldocker pull rabbitmq:3.6.10-management``````shelldocker run -di --network=docker-network --ip=172.19.0.50 --hostname=rabbitmq-node01 --name=rabbitmq_01 -p 15673:15672 -p 5673:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bashdocker run -di --network=docker-network --ip=172.19.0.51 --hostname=rabbitmq-node02 --name=rabbitmq_02 -p 15674:15672 -p 5674:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bashdocker run -di --network=docker-network --ip=172.19.0.52 --hostname=rabbitmq-node03 --name=rabbitmq_03 -p 15675:15672 -p 5675:5672--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'rabbitmq:3.6.10-management /bin/bash```引數說明:Erlang Cookie值必須相同,也就是RABBITMQ_ERLANG_COOKIE引數的值必須相同。因為RabbitMQ是用Erlang實現的,Erlang Cookie相當於不同節點之間相互通訊的祕鑰,Erlang節點通過交換Erlang Cookie獲得認證。```shelldocker exec -itrabbitmq_01 /bin/bash```配置hosts檔案,讓各個節點都能互相識別對方的存在。在系統中編輯/etc/hosts檔案,新增ip地址和節點名稱的對映資訊(apt-get update , apt-get install vim):```shell172.19.0.50 rabbitmq-node01172.19.0.51 rabbitmq-node02172.19.0.52 rabbitmq-node03```啟動rabbitmq,並且檢視狀態```shell [email protected]:/# rabbitmq-server -detached # 啟動rabbitmq服務,該命令可以啟動erlang虛擬機器和rabbitmq服務 [email protected]:/# rabbitmqctl status # 檢視節點資訊Status of [email protected][{pid,270},{running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.6.10"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},............. [email protected]:/# rabbitmqctl cluster_status # 檢視叢集節點狀態Cluster status of [email protected][{nodes,[{disc,[ [email protected]]}]},{running_nodes,[ [email protected]]}, # 正在執行的只有一個節點{cluster_name,<<" [email protected]">>},{partitions,[]},{alarms,[{ [email protected],[]}]}]```注意:此時我們可以通過瀏覽器訪問rabbitmq的後端管理系統,但是rabbitmq預設提供的guest使用者不支援遠端訪問。因此我們需要建立使用者,並且對其進行授權```shell [email protected]:/# rabbitmqctl add_user admin admin # 新增使用者,使用者名稱為admin,密碼為admin [email protected]:/# rabbitmqctl list_users # 檢視rabbitmq的使用者列表Listing usersadmin [] # admin使用者已經添加成功,但是沒有角色guest [administrator] [email protected]:/# rabbitmqctl set_user_tags admin administrator #給admin使用者設定管理員許可權# rabbitmqctl delete_user admin # 刪除admin使用者# rabbitmqctl stop_app # 停止rabbitmq服務# rabbitmqctl stop # 會將rabbitmq的服務和erlang虛擬機器一同關閉```再次使用admin使用者就可以登入web管理系統了。在其他的rabbitmq中也建立使用者,以便後期可以訪問後端管理系統。### 配置叢集1、同步cookie叢集中的Rabbitmq節點需要通過交換金鑰令牌以獲得相互認證,如果節點的金鑰令牌不一致,那麼在配置節點時就會報錯。獲取某一個節點上的/var/lib/rabbitmq/.erlang.cookie檔案,然後將其複製到其他的節點上。我們以node01節點為基準,進行此操作。```shelldocker cprabbitmq_01:/var/lib/rabbitmq/.erlang.cookie .docker cp.erlang.cookie rabbitmq_02:/var/lib/rabbitmqdocker cp.erlang.cookie rabbitmq_03:/var/lib/rabbitmq```2、建立叢集關係目前3個節點都是獨立的執行,之間並沒有任何的關聯關係。接下來我們就來建立3者之間的關聯關係,我們以rabbitmq-node01為基準,將其他的兩個節點加入進來。把rabbitmq-node02加入到節點1中```shell# 進入到rabbitmq-node02中rabbitmqctl stop_app # 關閉rabbitmq服務rabbitmqctl reset # 進行重置rabbitmqctl join_cluster [email protected] # rabbitmq-node01為節點1的主機名稱rabbitmqctl start_app # 啟動rabbitmq節點```把rabbitmq-node03加入到節點1中```shell# 進入到rabbitmq-node03中rabbitmqctl stop_app # 關閉rabbitmq服務rabbitmqctl reset # 清空節點的狀態,並將其恢復都空白狀態,當設定的節點時叢集中的一部分,該命令也會和叢集中的磁碟節點進行通訊,告訴他們該節點正在離開叢集。不然叢集會認為該節點處理故障,並期望其最終能夠恢復過來rabbitmqctl join_cluster [email protected] # rabbitmq-node01為節點1的主機名稱rabbitmqctl start_app # 啟動rabbitmq節點```進入後臺管理系統檢視叢集概述。### 節點型別**節點型別介紹**在使用rabbitmqctl cluster_status命令來檢視叢集狀態時會有[{nodes,[{disc[' [email protected]',' [email protected]',' [email protected]']}這一項資訊,其中的disc標註了Rabbitmq節點型別。Rabbitmq中的每一個節點,不管是單一節點系統或者是叢集中的一部分要麼是記憶體節點,要麼是磁碟節點。記憶體節點將所有的佇列,交換機,繫結關係、使用者、許可權和vhost的元資料定義都儲存在記憶體中,而磁碟節點則將這些資訊儲存到磁碟中。單節點的叢集中必然只有磁碟型別的節點,否則當重啟Rabbitmq之後,所有關於系統配置資訊都會丟失。不過在叢集中,可以選擇配置部分節點為記憶體節點,這樣可以獲得更高的效能。**節點型別變更**如果我們沒有指定節點型別,那麼預設就是磁碟節點。我們在新增節點的時候,可以使用如下的命令來指定節點的型別為記憶體節點:```shellrabbitmqctl join_cluster [email protected] --ram```我們也可以使用如下的命令將某一個磁碟節點設定為記憶體節點:```shellrabbitmqctl change_cluster_node_type {disc , ram}```如下所示```shell [email protected]:/# rabbitmqctl stop_app # 關閉rabbitmq服務Stopping rabbit application on node ' [email protected]' [email protected]:/# rabbitmqctl change_cluster_node_type ram # 將 [email protected]節點型別切換為記憶體節點Turning ' [email protected]'into a ram node [email protected]:/# rabbitmqctl start_app # 啟動rabbitmq服務Starting node ' [email protected]' [email protected]:/# rabbitmqctl cluster_status # 檢視叢集狀態Cluster status of node ' [email protected]'[{nodes,[{disc,[' [email protected]',' [email protected]']}, {ram,[' [email protected]']}]},{running_nodes,[' [email protected]',' [email protected]', ' [email protected]']},{cluster_name,<<" [email protected]">>},{partitions,[]},{alarms,[{' [email protected]',[]}, {' [email protected]',[]}, {' [email protected]',[]}]}] [email protected]:/#```**節點選擇**Rabbitmq只要求在叢集中至少有一個磁碟節點,其他所有的節點可以是記憶體節點。當節點加入或者離開叢集時,它們必須將變更通知到至少一個磁碟節點。如果只有一個磁碟節點,而且不湊巧它剛好崩潰了,那麼叢集可以繼續接收和傳送訊息。但是不能執行建立佇列,交換機,繫結關係、使用者已經更改許可權、新增和刪除叢集節點操作了。也就是說、如果叢集中唯一的磁碟節點崩潰了,叢集仍然可以保持執行,但是知道將該節點恢復到叢集前,你無法更改任何東西,所以在建立叢集的時候應該保證至少有兩個或者多個磁碟節點。當記憶體節點重啟後,它會連線到預先配置的磁碟節點,下載當前叢集元資料的副本。當在叢集中新增記憶體節點的時候,確保告知所有的磁碟節點(記憶體節點唯一儲存到磁碟中的元資料資訊是磁碟節點的地址)。只要記憶體節點可以找到叢集中至少一個磁碟節點,那麼它就能在重啟後重新加入叢集中。## 叢集優化:HAproxy負載+Keepalived高可用之前搭建的叢集存在的問題:不具有負載均衡能力本次我們所選擇的負載均衡層的軟體是HAProxy。為了保證負載均衡層的高可用,我們需要使用使用到keepalived軟體,使用vrrp協議產生虛擬ip實現動態的ip飄逸。![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174232981-1618895931.png)```markdownkeepalived是以VRRP協議為實現基礎的,VRRP全稱Virtual Router Redundancy Protocol,即虛擬路由冗餘協議。虛擬路由冗餘協議,可以認為是實現路由器高可用的協議,即將N臺提供相同功能的路由器組成一個路由器組,這個組裡面有一個master和多個backup,master上面有一個對外提供服務的vip(該路由器所在區域網內其他機器的預設路由為該vip),master會定義向backup傳送vrrp協議資料包,當backup收不到vrrp包時就認為master宕掉了,這時就需要根據VRRP的優先順序來選舉一個backup當master。這樣的話就可以保證路由器的高可用了。```###優化實現在兩個記憶體節點上安裝HAProxy```shellyum install haproxy```編輯配置檔案```shellvim /etc/haproxy/haproxy.cfgglobal log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon stats socket /var/lib/haproxy/statsdefaults log global option dontlognull option redispatch retries 3 timeout connect 10s timeout client 1m timeout server 1m maxconn 3000listen http_front mode http bind 0.0.0.0:1080 #監聽埠 stats refresh 30s #統計頁面自動重新整理時間 stats uri /haproxy?stats #統計頁面url stats realm Haproxy Manager #統計頁面密碼框上提示文字 stats auth admin:123456 #統計頁面使用者名稱和密碼設定listen rabbitmq_admin bind 0.0.0.0:15673 server node1 192.168.8.40:15672 server node2 192.168.8.45:15672listen rabbitmq_cluster 0.0.0.0:5673 mode tcp balance roundrobin timeout client 3h timeout server 3h timeout connect 3h server node1 192.168.8.40:5672 check inter 5s rise 2 fall 3 server node2 192.168.8.45:5672 check inter 5s rise 2 fall 3```啟動HAproxy```shellhaproxy -f /etc/haproxy/haproxy.cfg```安裝keepalived```shellyum -y install keepalived```修改配置檔案```shellvim /etc/keepalived/keepalived.confglobal_defs { notification_email { [email protected] [email protected] [email protected] } notification_email_from [email protected] smtp_server 192.168.200.1 smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # 註釋掉,不然訪問不到VIP vrrp_garp_interval 0 vrrp_gna_interval 0}global_defs { notification_email { [email protected] [email protected] [email protected] } notification_email_from [email protected] smtp_server 192.168.200.1 smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # 註釋掉,不然訪問不到VIP vrrp_garp_interval 0 vrrp_gna_interval 0}# 檢測任務vrrp_script check_haproxy { # 檢測HAProxy監本 script "/etc/keepalived/script/check_haproxy.sh" # 每隔兩秒檢測 interval 2 # 權重 weight 2}# 虛擬組vrrp_instance haproxy { state MASTER # 此處為`主`,備機是 `BACKUP` interface ens33 # 物理網絡卡,根據情況而定 mcast_src_ip 192.168.8.40 # 當前主機ip virtual_router_id 51 # 虛擬路由id,同一個組內需要相同 priority 100 # 主機的優先權要比備機高 advert_int 1 # 心跳檢查頻率,單位:秒 authentication { # 認證,組內的要相同 auth_type PASS auth_pass 1111 } # 呼叫指令碼 track_script { check_haproxy } # 虛擬ip,多個換行 virtual_ipaddress { 192.168.8.201 }}```啟動```shellkeepalived -D```##RabbitMQ映象佇列1、為什麼要存在映象佇列為了保證佇列和訊息的高可用2、什麼是映象佇列,映象佇列是如何進行選取主節點的?引入映象佇列的機制,可以將佇列映象到叢集中的其他的Broker節點之上,如果叢集中的一個節點失效了,佇列能自動的切換到映象中的另一個節點之上以保證服務的可用性。在通常的用法中,針對每一個配置映象的佇列(一下稱之為映象佇列)都包含一個主節點(master)和若干個從節點(slave),如下圖所示:叢集方式下,佇列和訊息是無法在節點之間同步的,因此需要使用RabbitMQ的映象佇列機制進行同步。![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174244706-400264268.png)深入瞭解參考文章:https://blog.csdn.net/u013256816/article/details/71097186# RabbitMQ的應用筆者就職於電商公司,就以電商秒殺場景為背景,闡述下RabbitMQ的實踐。## 場景:訂單未支付庫存回退當用戶秒殺成功以後,就需要引導使用者去訂單頁面進行支付。如果使用者在規定的時間之內(30分鐘),沒有完成訂單的支付,此時我們就需要進行庫存的回退操作。### 架構圖![](https://img2020.cnblogs.com/blog/874710/202012/874710-20201224174256618-355184951.png)具體實現就是使用的死信佇列,可以參考上面的程式碼。##場景:RabbitMQ秒殺公平性保證訊息的可靠性傳輸可以保證秒殺業務的公平性。關於秒殺業務的公平性,我們還需要考慮一點:訊息的順序性(先進入佇列的訊息先進行處理)**RabbitMQ訊息順序性說明**順序性:訊息的順序性是指消費者消費到的訊息和傳送者釋出的訊息的順序是一致的。舉個例子,不考慮訊息重複的情況,如果生產者釋出的訊息分別為msgl、msg2、msg3,那麼消費者必然也是按照msgl、msg2、msg3的順序進行消費的。目前很多資料顯示RabbitMQ的訊息能夠保障順序性,這是不正確的,或者說這個觀點有很大的侷限性。在不使用任何RabbitMQ的高階特性,也沒有訊息丟失、網路故障之類異常的情況發生,並且只有一個消費者的情況下,也只有一個生產者的情況下可以保證訊息的順序性。如果有多個生產者同時傳送訊息,無法確定訊息到達Broker 的前後順序,也就無法驗證訊息的順序性,因為每一次訊息的傳送都是在各自的執行緒中進行的。**RabbitMQ訊息順序錯亂演示**生產者傳送訊息:1、不使用生產者確認機制,單生產者單消費者可以保證訊息的順序性2、使用了生產者確認機制,那麼就無法保證訊息到達Broker的前後順序,因為訊息的傳送是非同步傳送的,每一個執行緒的執行時間不同3、生產端使用事務機制,保證訊息的順序性消費端消費訊息:1、單消費者可以保證訊息的順序性2、多消費者不能保證訊息的順序,因為每一個訊息的消費都是在各自的執行緒中進行,每一個執行緒的執行時間不同**RabbitMQ訊息順序性保障**生產端啟用事務機制,單生產者單消費者。如果我們不考慮訊息到達MQ的順序,只是考慮對已經到達到MQ中的訊息順序消費,那麼需要保證消費者是單消費者即可。##場景:RabbitMQ秒殺不超賣保證要保證秒殺不超賣,我們需要在很多的環節進行考慮。比如:在進行預扣庫存的時候,我們需要考慮不能超賣,在進行資料庫真實庫存扣減的時候也需要考慮不能超賣。而對我們的mq這個環節而言,要保證不超賣我們只需要保證訊息不被重複消費。首先我們可以確認的是,觸發訊息重複執行的條件會是很苛刻的! 也就說 在大多數場景下不會觸發該條件!!! 一般出在任務超時,或者沒有及時返回狀態,引起任務重新入佇列,由於服務端沒有收到消費端的ack應答,因此該條訊息還會重新進行投遞。### 冪等性保障方案重複消費不可怕,可怕的是你沒考慮到重複消費之後,怎麼保證冪等性。所謂冪等性,就是對介面的多次呼叫所產生的結果和呼叫一次是一致的。通俗點說,就一個數據,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。舉個例子:假設你有個系統,消費一條訊息就往資料庫裡插入一條資料,要是你一個訊息重複消費兩次,你不就插入了兩條,這資料不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條資料,從而保證了資料的正確性。一條資料重複消費兩次,資料庫裡就只有一條資料,這就保證了系統的冪等性。怎麼保證訊息佇列消費的冪等性?這一點需要結合的實際的業務來進行處理:1、比如我們消費的資料需要寫資料庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,執行以下update操作2、比如我們消費的資料需要寫Redis,那沒問題了,反正每次都是set,天然冪等性。3、比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者傳送每條資料的時候,裡面加一個全域性唯一的id,類似訂單id 之類的東西,然後你這裡消費到了之後,先根據這個id 去比如Redis 裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個id 寫Redis。如果消費過了,那你就別處理了,保證別重複處理相同的訊息即可。4、比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條。因為有唯一鍵約束了,重複資料插入只會報錯,不會導致資料庫中出現髒資料。# 面試題1、訊息佇列的作用與使用場景?2、建立佇列和交換機的方法?3、多個消費者監聽一個生產者時,訊息如何分發?4、無法被路由的訊息,去了哪裡?5、訊息在什麼時候會變成Dead Letter(死信)?6、RabbitMQ如何實現延遲佇列?7、如何保證訊息的可靠性投遞?8、如何在服務端和消費端做限流?9、如何保證訊息的順序性?10、RabbitMQ的節點

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://www.itread01.com/content/1608819188.html

Scroll to Top