【RabbitMQ】在 Spring Boot 實作 Fanout 與 Direct 模式


https://unsplash.com/photos/brown-rabbit-on-green-grass-field-during-daytime-cbjQByo9TEM

在上一篇文章中,實作了簡單的 Simple 與 Worker 模式。而接下來將加入「交換機」(exchange)這項元素,讓 producer 在發送訊息到多個 queue 時更方便。本文會介紹 exchange,並進行 queue 的綁定,再實作出「Fanout」與「Direct」模式。

此篇轉載自本人的 iThome 鐵人賽文章

一、什麼是交換機(exchange)

(一)定義

回顧一下上一篇介紹的 Simple 模式。情境是把留言後所發送的「站內通知」與「Email 通知」之功能,切分為一個新服務。其 producer 的程式片段如下。

我們使用「RabbitTemplate」,將訊息(CommentMsg)直接送至特定名稱的 queue。

假設今天產品功能變多了,並非每個使用情境,都要發送站內通知與 Email 通知這兩種。且團隊因此決定將這兩種通知的功能,細分為兩臺 server(一臺送站內,一臺送 Email)。那就代表這兩個服務各自的 queue,都要接收 CommentMsg 的訊息。

以直覺來想,producer 的程式碼會該寫成如下。

如此一來,我們可察覺一個問題。每個服務都有自己的 queue,如果有多個服務想接收同樣的訊息,則 producer 就要為了它們,撰寫出數行「RabbitTemplate.convertAndSend」的程式,以傳送訊息到不同 queue。

(二)交換機的概念

交換機能夠消除 producer 與 queue 之間的依賴。將 exchange 加入 MQ 的架構後,producer 只要傳送訊息到 exchange,它就會幫忙轉發到 queue。而 queue 必須事先與 exchange 綁定好。官方教學的架構圖如下。

以生活情境來比喻。學校有重要事項傳遞給同學們,若召集所有相關的同學到現場,會很麻煩。於是校方選擇將公告送至各科系的辦公室。而系辦再依據公告內容的適用對象,放到「班級櫃」。班級幹部有空會來領取公告,並協助執行。

從這個情境來看,校方是 producer,而「召集相關的同學」相當於寫出好幾行發送訊息到 queue 的程式。至於系辦則是 exchange,會負責轉發訊息,看是要給特定班級、年級,還是整個科系都要通知。而存放公告的班級櫃是 queue;拿取公告的班級幹部是 consumer。

二、實作前準備

本文範例程式的情境是,系統在發送「文章 / 商品有新留言」的通知給使用者時,可分為「站內通知」與「Email 通知」。且這兩項功能被切分為兩個服務,運行在不同 server 上。

當業務邏輯要發送通知時,會將需要的資料包裝成訊息(message),讓 producer 送往 MQ,由 consumer 接收後處理,達到非同步工作的效果。

(一)配置 Queue

為避免打錯字,以下先準備一些常數,用來代表 Spring bean、queue 與 exchange 的名稱。

以下是在程式中進行 queue 的配置。

(二)訊息 Model

由於本文加入了「exchange」與「routingKey」的概念,因此請先在訊息的 model 類別添加對應的欄位。還可攜帶其他想傳給 consumer 的資料。

其中「mode」欄位代表 producer 要以何種模式執行(Fanout 或 Direct),用來判斷要將訊息送給哪一個 exchange。而「rountingKey」欄位則是讓 exchange 判斷要送到哪些 queue。

(三)準備 Consumer

關於 consumer 的邏輯,筆者只取用 CommentMsg 訊息的其中一個欄位,印出簡單的 log。

(四)準備 Producer

在 producer 呼叫「RabbitTemplate.convertAndSend」方法時,請選用正確的 overloading 方法。第一個參數是指定 exchange 名稱;第二個參數叫做「routingKey」,在 Direct 模式中會用來指定 queue,而 Fanout 模式不會用到。

(五)準備 Controller

以上就是練習本文兩個模式的前置作業了。

三、實作 Fanout 模式

(一)定義

了解 exchange 的概念後,讓我們繼續以「發送通知」的情境為例子,認識「Fanout」模式。該模式是指將訊息發送給所有與該 exchange 綁定的 queue,也就是「廣播」。

Fanout 這個單字,有「發散」的意思。

(二)配置 Exchange 並綁定 Queue

請建立 exchange,只要在「FanoutExchange」的建構子傳入名稱即可。此名稱將會在 producer 傳送訊息時被指定。

以下是配置程式,綁定時需要建立「Binding」元件。

過程中注入了剛剛建立的 queue 與 exchange 元件,並透過「BindingBuilder」分別完成兩個 queue 的綁定。

這些「元件」並不是要拿來注入到其他地方的,而是會被 RabbitMQ 的 library 讀取,所以在 RabbitMQ 的服務中才能完成 queue 和 exchange 的建立與綁定。

(三)測試

可發送請求到第二節準備的 API。

POST http://localhost:8080/comment
{
    "content": "這是 Fanout 模式,訊息會送給所有 queue!",
    "mode": "fanout",
    "routingKey": null
}

以上面的請求為例,每個綁定在 exchange 的 queue,其對應的 consumer 都能收到訊息,這就是 Fanout 模式。


四、實作 Direct 模式

(一)定義

前面的 Fanout 模式會將訊息傳送給所有綁定的 queue。而本節的「Direct」模式則是選擇其中一個 queue。

在 Direct 模式,producer 會在發送訊息給 exchange 時,指定一個代表 queue 的 key。該 key 會在 queue 一開始綁定到 exchange 時設定好。

以生活情境來比喻,就像向郵局申請的「郵政信箱」。如果自己或家人有服兵役,可能會有點印象,寄信到所屬單位時,收件地址會寫「XX郵政幾號」,例如「桃園龍潭郵政91004號」。而這個郵政信箱的代號,就相當於 queue 的 key。

這麼做的好處是,當 queue 的名稱改變了,producer 的程式並不需要做調整,因為它是對著 key 發送訊息。

(二)配置 Exchange 並綁定 Queue

請建立 exchange,只要在「DirectExchange」的建構子傳入名稱即可。以下是配置程式。

由於使用的是 DirectExchange,因此 BindingBuilder 會要求我們提供叫做「routingKey」的參數。於是這裡將 queue 的 key 給傳入。

(三)測試

可發送請求到第二節準備的 API。

POST http://localhost:8080/comment
{
    "content": "這是 Direct 模式,只會發送站內通知!",
    "mode": "direct",
    "routingKey": "Internal"
}

以上面的請求為例,會發送訊息給 key 為「Internal」的 queue。也就是只有「Comment Internal Notification Queue」這個 queue 會收到訊息,並由「InternalNotificationConsumer」印出 log。

本文的完成專案:
https://github.com/ntub46010/SpringBootUsingRabbitMQ/tree/fanout-and-direct-pattern

上一篇:【RabbitMQ】在 Spring Boot 實作 Simple 與 Worker 模式

下一篇:【RabbitMQ】在 Spring Boot 實作 Routing 與 Topic 模式

參考資料:

留言