Spring 集成提供了通道適配器,用于使用高級消息隊列協(xié)議 (AMQP) 接收和發(fā)送消息。
您需要將此依賴項包含在項目中:
(資料圖片僅供參考)
org.springframework.integration spring-integration-amqp 6.0.0
以下適配器可用:
入站通道適配器入站網(wǎng)關(guān)出站通道適配器出站網(wǎng)關(guān)異步出站網(wǎng)關(guān)RabbitMQ 流隊列入站通道適配器RabbitMQ 流隊列出站通道適配器Spring 集成還提供了點對點消息通道和由 AMQP 交換和隊列支持的發(fā)布-訂閱消息通道。
為了提供AMQP支持,Spring Integration依賴于(Spring AMQP),它將Spring的核心概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)。 Spring AMQP提供了與Spring JMS類似的語義。
雖然提供的AMQP通道適配器僅用于單向消息傳遞(發(fā)送或接收),但Spring Integration還提供了用于請求-回復(fù)操作的入站和出站AMQP網(wǎng)關(guān)。
提示: 您應(yīng)該熟悉Spring AMQP項目的參考文檔。 它提供了有關(guān)Spring與AMQP集成的更深入的信息,特別是RabbitMQ。
以下清單顯示了 AMQP 入站通道適配器的可能配置選項:
@Beanpublic IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) { return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName")) .handle(m -> System.out.println(m.getPayload())) .get();}
容器 請注意,在使用XML配置外部容器時,不能使用Spring AMQP命名空間來定義容器。 這是因為命名空間至少需要一個元素。 在此環(huán)境中,偵聽器位于適配器內(nèi)部。 因此,您必須使用常規(guī) Spring 定義來定義容器,如以下示例所示:? |
盡管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差異。 JMS 入站通道適配器正在使用底層,并且需要配置的輪詢器。 AMQP 入站通道適配器使用 和 是消息驅(qū)動的。 在這方面,它更類似于 JMS 消息驅(qū)動的通道適配器。? |
從版本 5.5 開始,可以使用在內(nèi)部調(diào)用重試操作時使用的策略進(jìn)行配置。 有關(guān)更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter?
???org.springframework.amqp.rabbit.retry.MessageRecoverer?
???RecoveryCallback?
???setMessageRecoverer()?
?
有關(guān)批處理消息的更多信息,請參閱Spring AMQP 文檔。
要使用 Spring 集成生成批處理消息,只需使用 .??BatchingRabbitTemplate?
?
接收批處理消息時,默認(rèn)情況下,偵聽器容器提取每個片段消息,適配器將為每個片段生成 。 從版本 5.2 開始,如果容器的屬性設(shè)置為 ,則由適配器執(zhí)行去批處理,并生成一個有效負(fù)載為片段有效負(fù)載列表(如果適用,轉(zhuǎn)換后)。??Message>?
???deBatchingEnabled?
???false?
???Message
?>?
默認(rèn)值為 ,但可以在適配器上覆蓋。??BatchingStrategy?
???SimpleBatchingStrategy?
?
當(dāng)重試操作需要恢復(fù)時,必須與批處理一起使用。? |
版本 5.0.1 引入了輪詢通道適配器,允許您按需獲取單個消息 — 例如,使用 或 輪詢器。 有關(guān)詳細(xì)信息,請參閱延遲確認(rèn)可輪詢消息源。??MessageSourcePollingTemplate?
?
它當(dāng)前不支持 XML 配置。
以下示例顯示如何配置:??AmqpMessageSource?
?
@Beanpublic IntegrationFlow flow() { return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE), e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false)) .handle(p -> { ... }) .get();}
有關(guān)配置屬性,請參閱Javadoc。
請參閱批處理消息。
對于輪詢適配器,沒有偵聽器容器,批處理消息始終是反批處理的(如果支持這樣做)。??BatchingStrategy?
?
入站網(wǎng)關(guān)支持入站通道適配器上的所有屬性(除了“通道”被“請求通道”替換),以及一些其他屬性。 以下清單顯示了可用的屬性:
@Bean // return the upper cased payloadpublic IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase) .get();}
此適配器的唯一 ID。 自選。 |
將轉(zhuǎn)換后的消息發(fā)送到的消息通道。 必填。 |
對接收 AMQP 消息時要使用的 的引用。 自選。 默認(rèn)情況下,只有標(biāo)準(zhǔn)的 AMQP 屬性(例如 )被復(fù)制到 Spring Integration 和從 Spring Integration 復(fù)制。 默認(rèn)情況下,不會將 AMQP 中的任何用戶定義的標(biāo)頭復(fù)制到 AMQP 消息或從 AMQP 消息復(fù)制。 如果提供了“請求標(biāo)頭名稱”或“回復(fù)標(biāo)頭名稱”,則不允許使用。? |
要從 AMQP 請求映射到 的 AMQP 標(biāo)頭名稱的逗號分隔列表。 僅當(dāng)未提供“標(biāo)頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是與標(biāo)頭名稱匹配的簡單模式(例如 或或)。? |
要映射到 AMQP 回復(fù)消息的 AMQP 消息屬性中的名稱的逗號分隔列表。 所有標(biāo)準(zhǔn)標(biāo)頭(例如)都映射到 AMQP 消息屬性,而用戶定義的標(biāo)頭映射到“標(biāo)頭”屬性。 僅當(dāng)未提供“標(biāo)頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是要與標(biāo)頭名稱(例如,或或)匹配的簡單模式。? |
消息通道,其中應(yīng)回復(fù)消息。 自選。 |
設(shè)置用于從回復(fù)通道接收消息的基礎(chǔ)。 如果未指定,此屬性默認(rèn)為 (1 秒)。 僅當(dāng)容器線程在發(fā)送回復(fù)之前移交給另一個線程時,才適用。? |
自定義的 Bean 引用(以便更好地控制要發(fā)送的回復(fù)消息)。 您可以提供 的替代實現(xiàn)。? |
當(dāng) 沒有屬性時要使用的。 如果未指定此選項,則提供 no,請求消息中不存在任何屬性,并且 拋出 AN 是因為無法路由回復(fù)。 如果未指定此選項并提供外部選項,則不會引發(fā)異常。 您必須指定此選項或配置默認(rèn)值,并在該模板上, 如果您預(yù)計請求消息中不存在任何屬性的情況。? |
請參閱入站通道適配器中有關(guān)配置屬性的說明。??listener-container?
?
從版本 5.5 開始,可以使用在內(nèi)部調(diào)用重試操作時使用的策略進(jìn)行配置。 有關(guān)更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter?
???org.springframework.amqp.rabbit.retry.MessageRecoverer?
???RecoveryCallback?
???setMessageRecoverer()?
?
請參閱批處理消息。
默認(rèn)情況下,入站終端節(jié)點使用應(yīng)答方式,這意味著容器會在下游集成流完成(或使用 或 將消息傳遞給另一個線程)時自動確認(rèn)消息。 將模式設(shè)置為配置使用者,以便根本不使用確認(rèn)(代理在發(fā)送消息后立即自動確認(rèn)消息)。 設(shè)置模式以允許用戶代碼在處理過程中的某個其他點確認(rèn)消息。 為了支持此功能,在此模式下,終結(jié)點分別在 和 中提供 和 標(biāo)頭。??AUTO?
???QueueChannel?
???ExecutorChannel?
???NONE?
???MANUAL?
???Channel?
???deliveryTag?
???amqp_channel?
???amqp_deliveryTag?
?
您可以對 執(zhí)行任何有效的 Rabbit 命令,但通常只使用 and(或)。 為了不干擾容器的操作,不應(yīng)保留對通道的引用,而應(yīng)僅在當(dāng)前消息的上下文中使用它。??Channel?
???basicAck?
???basicNack?
???basicReject?
?
由于 是對“活動”對象的引用,因此它不能序列化,如果持久保存消息,則會丟失。? |
以下示例演示如何使用確認(rèn):??MANUAL?
?
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing;}
以下出站終結(jié)點具有許多類似的配置選項。 從版本 5.2 開始,已添加。 通常,當(dāng)啟用發(fā)布者確認(rèn)時,代理將快速返回一個 ack(或 nack),該 ack(或 nack)將被發(fā)送到相應(yīng)的通道。 如果在收到確認(rèn)之前關(guān)閉了通道,Spring AMQP 框架將合成一個 nack。 “丟失”確認(rèn)不應(yīng)發(fā)生,但如果設(shè)置此屬性,則終結(jié)點將定期檢查它們,并在時間過去而未收到確認(rèn)時合成 nack。??confirm-timeout?
?
以下示例顯示了 AMQP 出站通道適配器的可用屬性:
@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, MessageChannel amqpOutboundChannel) { return IntegrationFlow.from(amqpOutboundChannel) .handle(Amqp.outboundAdapter(amqpTemplate) .routingKey("queue1")) // default exchange - route to queue "queue1" .get();}
此適配器的唯一 ID。 自選。 |
消息通道,消息應(yīng)發(fā)送到該通道,以便將其轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。 |
對已配置的 AMQP 模板的 Bean 引用。 可選(默認(rèn)為 )。? |
向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交換名稱表達(dá)”相互排斥。 自選。 |
一個 SpEL 表達(dá)式,計算該表達(dá)式以確定消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。? |
發(fā)送消息時使用的固定路由密鑰。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。? |
一個 SpEL 表達(dá)式,計算該表達(dá)式以確定發(fā)送消息時要使用的路由密鑰,消息作為根對象(例如,“payload.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭,則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果根本不自定義,則默認(rèn)值為 。 自選。? |
定義相關(guān)性數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用和屬性設(shè)置為 . 收到發(fā)布者確認(rèn)并提供相關(guān)數(shù)據(jù)時,將根據(jù)確認(rèn)類型將其寫入 或 。 確認(rèn)的有效負(fù)載是相關(guān)數(shù)據(jù),由此表達(dá)式定義。 郵件的“amqp_publishConfirm”標(biāo)頭設(shè)置為 () 或 ()。 示例:和 。 版本 4.1 引入了消息標(biāo)頭。 它包含用于發(fā)布者確認(rèn)的“nack”。 從版本 4.2 開始,如果表達(dá)式解析為實例(例如 ),則在 / 通道上發(fā)出的消息基于該消息,并添加其他標(biāo)頭。 以前,無論類型如何,都會使用相關(guān)數(shù)據(jù)作為其有效負(fù)載創(chuàng)建新消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。? |
正 () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)(如果未配置)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),適配器將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制。 默認(rèn) none (不會生成 nacks)。 |
設(shè)置為 true 時,調(diào)用線程將阻塞,等待發(fā)布者確認(rèn)。 這需要配置確認(rèn)以及 . 線程將阻塞長達(dá) (或默認(rèn)為 5 秒)。 如果發(fā)生超時,將拋出 。 如果啟用了返回并返回了消息,或者在等待確認(rèn)時發(fā)生任何其他異常,則將拋出 a 并顯示相應(yīng)的消息。? |
返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為向適配器返回?zé)o法傳遞的消息。 如果未進(jìn)行配置,則從從 AMQP 接收的數(shù)據(jù)構(gòu)造消息,并具有以下附加標(biāo)頭:、、、。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。? |
對用于在發(fā)送返回或否定確認(rèn)的消息時生成實例的實現(xiàn)的引用。? |
對發(fā)送 AMQP 消息時要使用的 的引用。 默認(rèn)情況下,只有標(biāo)準(zhǔn)的 AMQP 屬性(例如 )被復(fù)制到 Spring 集成 中。 任何用戶定義的標(biāo)頭都不會通過默認(rèn)的“DefaultAmqpHeaderMapper”復(fù)制到消息中。 如果提供了“請求標(biāo)頭名稱”,則不允許。 自選。? |
要從 映射到 AMQP 消息的 AMQP 標(biāo)頭名稱的逗號分隔列表。 如果提供了“標(biāo)頭映射器”引用,則不允許。 此列表中的值也可以是與標(biāo)頭名稱匹配的簡單模式(例如 或或)。? |
設(shè)置為 時,端點將在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這允許對錯誤配置進(jìn)行“快速故障”檢測,但如果代理關(guān)閉,也會導(dǎo)致初始化失敗。 當(dāng)(默認(rèn)值)時,當(dāng)發(fā)送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。? |
設(shè)置為 時,類型的有效負(fù)載將作為離散消息在單個調(diào)用范圍內(nèi)在同一通道上發(fā)送。 需要 . when 為 true,在發(fā)送消息后調(diào)用。 使用事務(wù)模板,發(fā)送將在新事務(wù)或已啟動事務(wù)(如果存在)中執(zhí)行。? |
返回通道 使用 a 需要將屬性設(shè)置為 的 和將屬性設(shè)置為 的 。 將多個出站終結(jié)點與返回符一起使用時,每個終結(jié)點都需要一個單獨的終結(jié)點。? |
以下清單顯示了 AMQP 出站網(wǎng)關(guān)的可能屬性:
@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return f -> f.handle(Amqp.outboundGateway(amqpTemplate) .routingKey("foo")) // default exchange - route to queue "foo" .get();}@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")public interface MyGateway { String sendToRabbit(String data);}
此適配器的唯一 ID。 自選。 |
消息發(fā)送到的消息通道,消息被轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。 |
對已配置的 AMQP 模板的 Bean 引用。 可選(默認(rèn)為 )。? |
應(yīng)向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱 cxchange。 與“交換名稱表達(dá)”相互排斥。 自選。 |
一個 SpEL 表達(dá)式,計算該表達(dá)式以確定應(yīng)將消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。? |
從 AMQP 隊列接收并轉(zhuǎn)換回復(fù)后應(yīng)發(fā)送到的消息通道。 自選。 |
網(wǎng)關(guān)在向 發(fā)送回復(fù)消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當(dāng)前已滿的 。 默認(rèn)為無窮大。? |
當(dāng) 時,如果屬性中未收到回復(fù)消息,網(wǎng)關(guān)將引發(fā)異常。 默認(rèn)值為 。? |
發(fā)送消息時使用的。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。? |
一個 SpEL 表達(dá)式,經(jīng)過計算以確定發(fā)送消息時使用的表達(dá)式,將消息作為根對象(例如,“payload.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭,則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果根本不自定義,則默認(rèn)值為 。 自選。? |
從版本 4.2 開始。 定義相關(guān)數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用和屬性設(shè)置為 . 收到發(fā)布者確認(rèn)并提供關(guān)聯(lián)數(shù)據(jù)時,將根據(jù)確認(rèn)類型將其寫入 或 。 確認(rèn)的有效負(fù)載是相關(guān)數(shù)據(jù),由此表達(dá)式定義。 郵件的標(biāo)頭“amqp_publishConfirm”設(shè)置為 () 或 ()。 為了確認(rèn),Spring 集成提供了一個額外的標(biāo)頭。 示例:和 。 如果表達(dá)式解析為實例(例如 ),則消息 在 / 通道上發(fā)出的基于該消息,并添加了其他標(biāo)頭。 以前,無論類型如何,都會使用相關(guān)數(shù)據(jù)作為其有效負(fù)載創(chuàng)建新消息。 另請參閱發(fā)布者確認(rèn)和返回的代機(jī)制?。 自選。? |
向其發(fā)送正 () 發(fā)布者確認(rèn)的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由定義的關(guān)聯(lián)數(shù)據(jù)(如果未配置)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),網(wǎng)關(guān)將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 默認(rèn) none (不會生成 nacks)。 |
返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為向適配器返回?zé)o法傳遞的消息。 如果未進(jìn)行配置,則根據(jù)從 AMQP 接收的數(shù)據(jù)構(gòu)造消息,并具有以下附加標(biāo)頭:、、 和 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。? |
對用于在發(fā)送返回或否定確認(rèn)的消息時生成實例的實現(xiàn)的引用。? |
設(shè)置為 時,端點將在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這允許在代理關(guān)閉時通過記錄錯誤消息來“快速失敗”檢測錯誤配置。 當(dāng)(默認(rèn)值)時,當(dāng)發(fā)送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。? |
返回通道 使用 a 需要將屬性設(shè)置為 的 和將屬性設(shè)置為 的 。 將多個出站終結(jié)點與返回符一起使用時,每個終結(jié)點都需要一個單獨的終結(jié)點。? |
基礎(chǔ)的默認(rèn)值為 5 秒。 如果需要更長的超時,則必須在 上配置它。? |
請注意,出站適配器和出站網(wǎng)關(guān)配置之間的唯一區(qū)別是屬性的設(shè)置。??expectReply?
?
上一節(jié)中討論的網(wǎng)關(guān)是同步的,因為發(fā)送線程掛起,直到 收到回復(fù)(或發(fā)生超時)。 Spring Integration 版本 4.3 添加了一個異步網(wǎng)關(guān),該網(wǎng)關(guān)使用 from Spring AMQP。 發(fā)送消息時,線程會在發(fā)送操作完成后立即返回,收到消息后,將在模板的偵聽器容器線程上發(fā)送回復(fù)。 在輪詢器線程上調(diào)用網(wǎng)關(guān)時,這可能很有用。 線程已釋放,可用于框架中的其他任務(wù)。??AsyncRabbitTemplate?
?
以下清單顯示了 AMQP 異步出站網(wǎng)關(guān)的可能配置選項:
@Configurationpublic class AmqpAsyncApplication { @Bean public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) { return f -> f .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate) .routingKey("queue1")); // default exchange - route to queue "queue1" } @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); }}
此適配器的唯一 ID。 自選。 |
消息通道,消息應(yīng)發(fā)送到該通道,以便將其轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。 |
對已配置的 Bean 引用。 可選(默認(rèn)為 )。? |
應(yīng)向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交換名稱表達(dá)”相互排斥。 自選。 |
一個 SpEL 表達(dá)式,計算該表達(dá)式以確定消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。? |
從 AMQP 隊列接收并轉(zhuǎn)換回復(fù)后應(yīng)發(fā)送到的消息通道。 自選。 |
網(wǎng)關(guān)在向 發(fā)送回復(fù)消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當(dāng)前已滿的 。 默認(rèn)值為無窮大。? |
如果在屬性中未收到回復(fù)消息,并且此設(shè)置為 ,網(wǎng)關(guān)將向入站消息的標(biāo)頭發(fā)送錯誤消息。 如果在屬性中未收到回復(fù)消息,并且此設(shè)置為 ,則網(wǎng)關(guān)會將錯誤消息發(fā)送到默認(rèn)值(如果可用)。 默認(rèn)為 .? |
發(fā)送消息時要使用的路由密鑰。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。? |
一個 SpEL 表達(dá)式,經(jīng)過計算以確定發(fā)送消息時要使用的路由密鑰, 將消息作為根對象(例如,“有效負(fù)載.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭 (),則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果未自定義,則缺省值為 。 自選。? |
定義相關(guān)性數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用 和 ,其屬性設(shè)置為 。 收到發(fā)布者確認(rèn)并提供相關(guān)數(shù)據(jù)時,確認(rèn)將寫入 或 ,具體取決于確認(rèn)類型。 確認(rèn)的有效負(fù)載是此表達(dá)式定義的相關(guān)數(shù)據(jù),消息的“amqp_publishConfirm”標(biāo)頭設(shè)置為 () 或 ()。 例如,提供了一個額外的標(biāo)頭 ()。 例子:。 如果表達(dá)式解析為實例(例如“#this”),則在 / 通道上發(fā)出的消息將基于該消息,并添加其他標(biāo)頭。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。? |
向其發(fā)送正 () 發(fā)布者確認(rèn)的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
從版本 4.2 開始。 將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。? |
設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),網(wǎng)關(guān)將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制。 默認(rèn) none (不會生成 nacks)。 |
返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為將無法傳遞的消息返回到網(wǎng)關(guān)。 該消息是根據(jù)從 AMQP 接收的數(shù)據(jù)構(gòu)造的,具有以下附加標(biāo)頭:、、 和 。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。? |
設(shè)置為 時,端點在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這樣做允許“快速失敗”檢測錯誤配置,方法是在代理關(guān)閉時記錄錯誤消息。 當(dāng)(默認(rèn)值)建立連接時(如果由于建立了其他組件而不存在 it) 發(fā)送第一條消息時。? |
另請參閱異步服務(wù)激活器以獲取詳細(xì)信息。
兔子模板 當(dāng)您使用確認(rèn)和退貨時,我們建議將有線成專用的。 否則,可能會遇到意想不到的副作用。? |
將連接工廠配置為發(fā)布者確認(rèn)并返回時,上述部分將討論消息通道的配置,以便異步接收確認(rèn)和返回。 從版本 5.4 開始,還有一個通常更易于使用的附加機(jī)制。
在這種情況下,請勿配置 或 確認(rèn)和返回通道。 相反,在標(biāo)頭中添加一個實例;然后,您可以通過檢查已發(fā)送消息的實例中的未來狀態(tài)來等待稍后的結(jié)果。 在將來完成之前,將始終填充該字段(如果返回消息)。??confirm-correlation-expression?
???CorrelationData?
???AmqpHeaders.PUBLISH_CONFIRM_CORRELATION?
???CorrelationData?
???returnedMessage?
?
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returnssomeFlow.getInputChannel().send(MessageBuilder.withPayload("test") .setHeader("rk", "someKeyThatWontRoute") .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr) .build());...try { Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS); Message returned = corr.getReturnedMessage(); if (returned !- null) { // message could not be routed }}catch { ... }
為了提高性能,您可能希望發(fā)送多條消息并稍后等待確認(rèn),而不是一次發(fā)送一條。 返回的消息是轉(zhuǎn)換后的原始消息;您可以使用所需的任何其他數(shù)據(jù)對 A 進(jìn)行子類。??CorrelationData?
?
到達(dá)通道適配器或網(wǎng)關(guān)的入站消息將使用消息轉(zhuǎn)換器轉(zhuǎn)換為有效負(fù)載。 默認(rèn)情況下,使用 a,用于處理 java 序列化和文本。 默認(rèn)情況下,標(biāo)頭使用 映射。 如果發(fā)生轉(zhuǎn)換錯誤,并且未定義錯誤通道,則會將異常引發(fā)到容器,并由偵聽器容器的錯誤處理程序處理。 默認(rèn)錯誤處理程序?qū)⑥D(zhuǎn)換錯誤視為致命錯誤,消息將被拒絕(如果隊列已如此配置,則路由到死信交換)。 如果定義了錯誤通道,則有效負(fù)載為 具有屬性(無法轉(zhuǎn)換的 Spring AMQP 消息)和 . 如果容器是(默認(rèn)值),并且錯誤流使用錯誤而不引發(fā)異常,則將確認(rèn)原始消息。 如果錯誤流引發(fā)異常,則異常類型與容器的錯誤處理程序一起確定消息是否重新排隊。 如果容器配置了 ,則有效負(fù)載是具有附加屬性和 的 。 這使錯誤流能夠調(diào)用 或(或)消息,以控制其處置。??spring-messaging?
???Message>?
???SimpleMessageConverter?
???DefaultHeaderMapper.inboundMapper()?
???ErrorMessage?
???ListenerExecutionFailedException?
???failedMessage?
???cause?
???AcknowledgeMode?
???AUTO?
???AcknowledgeMode.MANUAL?
???ManualAckListenerExecutionFailedException?
???channel?
???deliveryTag?
???basicAck?
???basicNack?
???basicReject?
?
Spring AMQP 1.4 引入了 ,其中實際轉(zhuǎn)換器的選擇基于 在傳入內(nèi)容類型消息屬性上。 這可由入站終端節(jié)點使用。??ContentTypeDelegatingMessageConverter?
?
從 Spring 集成版本 4.3 開始,您也可以在出站端點上使用 ,標(biāo)頭指定使用哪個轉(zhuǎn)換器。??ContentTypeDelegatingMessageConverter?
???contentType?
?
以下示例配置了一個 ,默認(rèn)轉(zhuǎn)換器為 (處理 Java 序列化和純文本),以及一個 JSON 轉(zhuǎn)換器:??ContentTypeDelegatingMessageConverter?
???SimpleMessageConverter?
?
將標(biāo)頭設(shè)置為 to 的消息發(fā)送到會導(dǎo)致選擇 JSON 轉(zhuǎn)換器。??ctRequestChannel?
???contentType?
???application/json?
?
這適用于出站通道適配器和網(wǎng)關(guān)。
從版本 5.0 開始,添加到出站郵件的標(biāo)頭永遠(yuǎn)不會被映射標(biāo)頭覆蓋(默認(rèn)情況下)。 以前,只有當(dāng)消息轉(zhuǎn)換器是 (在這種情況下,首先映射標(biāo)頭以便可以選擇正確的轉(zhuǎn)換器)時才會出現(xiàn)這種情況。 對于其他轉(zhuǎn)換器,例如 ,映射標(biāo)頭將覆蓋轉(zhuǎn)換器添加的任何標(biāo)頭。 當(dāng)出站郵件具有一些剩余的標(biāo)頭(可能來自入站通道適配器)并且正確的出站被錯誤地覆蓋時,這會導(dǎo)致問題。 解決方法是在將消息發(fā)送到出站終結(jié)點之前使用標(biāo)頭篩選器刪除標(biāo)頭。? 但是,在某些情況下,需要以前的行為 - 例如,當(dāng)包含 JSON 的有效負(fù)載不知道內(nèi)容并將消息屬性設(shè)置為,但應(yīng)用程序希望通過設(shè)置發(fā)送到出站終結(jié)點的消息標(biāo)頭來覆蓋該行為。 正是這樣做的(默認(rèn)情況下)。? 現(xiàn)在,在出站通道適配器和網(wǎng)關(guān)(以及 AMQP 支持的通道)上調(diào)用了一個屬性。 設(shè)置此選項可還原覆蓋轉(zhuǎn)換器添加的屬性的行為。? 從版本 5.1.9 開始,當(dāng)我們生成回復(fù)并希望覆蓋轉(zhuǎn)換器填充的標(biāo)頭時,提供了類似的情況。 有關(guān)更多信息,請參閱其 JavaDocs。? |
Spring AMQP 版本 1.6 引入了一種機(jī)制,允許為出站消息指定默認(rèn)用戶 ID。 始終可以設(shè)置標(biāo)頭,該標(biāo)頭現(xiàn)在優(yōu)先于默認(rèn)值。 這可能對郵件收件人有用。 對于入站郵件,如果郵件發(fā)布者設(shè)置了該屬性,則該屬性將在標(biāo)頭中可用。 請注意,RabbitMQ會驗證用戶 ID 是連接的實際用戶 ID,還是連接允許模擬。??AmqpHeaders.USER_ID?
???AmqpHeaders.RECEIVED_USER_ID?
?
要為出站消息配置缺省用戶標(biāo)識,請在 上配置該標(biāo)識,并將出站適配器或網(wǎng)關(guān)配置為使用該模板。 同樣,要在回復(fù)上設(shè)置用戶 ID 屬性,請將適當(dāng)配置的模板注入入站網(wǎng)關(guān)。 有關(guān)更多信息,請參閱Spring AMQP 文檔。??RabbitTemplate?
?
Spring AMQP 支持RabbitMQ 延遲消息交換插件。 對于入站郵件,標(biāo)頭映射到標(biāo)頭。 設(shè)置標(biāo)頭會導(dǎo)致在出站郵件中設(shè)置相應(yīng)的標(biāo)頭。 還可以在出站終結(jié)點上指定 and 屬性(使用 XML 配置時)。 這些屬性優(yōu)先于標(biāo)頭。??x-delay?
???AmqpHeaders.RECEIVED_DELAY?
???AMQPHeaders.DELAY?
???x-delay?
???delay?
???delayExpression?
???delay-expression?
???AmqpHeaders.DELAY?
?
有兩種消息通道實現(xiàn)可用。 一個是點對點,另一個是發(fā)布-訂閱。 這兩個通道都為基礎(chǔ)和(如本章前面的通道適配器和網(wǎng)關(guān)所示)提供了廣泛的配置屬性。 但是,我們在此處顯示的示例具有最小配置。 瀏覽 XML 架構(gòu)以查看可用屬性。??AmqpTemplate?
???SimpleMessageListenerContainer?
?
點對點通道可能類似于以下示例:
在幕后,前面的示例導(dǎo)致聲明一個命名,并且此通道發(fā)送到該通道(從技術(shù)上講,通過使用與此名稱匹配的路由密鑰發(fā)送到無名稱的直接交換)。 此通道還在此 上注冊消費者。 如果希望通道是“可輪詢的”而不是消息驅(qū)動的,請為標(biāo)志提供值 ,如以下示例所示:??Queue?
???si.p2pChannel?
???Queue?
???Queue?
???Queue?
???message-driven?
???false?
?
發(fā)布-訂閱通道可能如下所示:
在后臺,前面的示例會導(dǎo)致聲明名為的扇出交換,并且此通道發(fā)送到該扇出交換。 此通道還聲明一個以服務(wù)器命名的獨占、自動刪除、非持久,并將其綁定到扇出交換,同時注冊使用者以接收消息。 發(fā)布-訂閱-通道沒有“可輪詢”選項。 它必須是消息驅(qū)動的。??si.fanout.pubSubChannel?
???Queue?
???Queue?
?
從版本 4.1 開始,AMQP 支持的消息通道(與 一起)支持單獨配置 和 對于 . 請注意,以前是默認(rèn)的。 現(xiàn)在,默認(rèn)情況下,它適用于 .??channel-transacted?
???template-channel-transacted?
???transactional?
???AbstractMessageListenerContainer?
???RabbitTemplate?
???channel-transacted?
???true?
???false?
???AbstractMessageListenerContainer?
?
在版本 4.3 之前,AMQP 支持的通道僅支持具有有效負(fù)載和標(biāo)頭的消息。 整個消息被轉(zhuǎn)換(序列化)并發(fā)送到 RabbitMQ。 現(xiàn)在,您可以將屬性(或使用 Java 配置時)設(shè)置為 。 當(dāng)此標(biāo)志為 時,將轉(zhuǎn)換消息有效負(fù)載并映射標(biāo)頭,其方式類似于使用通道適配器時的方式。 這種安排允許 AMQP 支持的通道與不可序列化的有效負(fù)載一起使用(可能與其他消息轉(zhuǎn)換器一起使用,例如 )。 有關(guān)默認(rèn)映射標(biāo)頭的更多信息,請參閱AMQP 消息標(biāo)頭。 您可以通過提供使用 和 屬性的自定義映射器來修改映射。 現(xiàn)在,您還可以指定 ,用于在沒有標(biāo)頭時設(shè)置傳遞模式。 默認(rèn)情況下,Spring AMQP 使用交付模式。??Serializable?
???extract-payload?
???setExtractPayload()?
???true?
???true?
???Jackson2JsonMessageConverter?
???outbound-header-mapper?
???inbound-header-mapper?
???default-delivery-mode?
???amqp_deliveryMode?
???MessageProperties?
???PERSISTENT?
?
與其他支持持久性的通道一樣,支持 AMQP 的通道旨在提供消息持久性以避免消息丟失。 它們不用于將工作分發(fā)給其他對等應(yīng)用程序。 為此,請改用通道適配器。 |
從版本 5.0 開始,可輪詢通道現(xiàn)在會阻止指定的輪詢器線程(默認(rèn)值為 1 秒)。 以前,與其他實現(xiàn)不同,如果沒有可用的消息,線程會立即返回到調(diào)度程序,而不管接收超時如何。 阻止比使用 a 檢索消息(沒有超時)要昂貴一些,因為必須創(chuàng)建一個使用者來接收每條消息。 若要恢復(fù)以前的行為,請將輪詢器的 0 設(shè)置為 0。? |
以下示例顯示如何使用 Java 配置配置通道:
@Beanpublic AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("foo"); factoryBean.setPubSub(false); return factoryBean;}@Beanpublic AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("bar"); factoryBean.setPubSub(false); return factoryBean;}@Beanpublic AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("baz"); factoryBean.setPubSub(false); return factoryBean;}
以下示例顯示如何使用 Java DSL 配置通道:
@Beanpublic IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.pollableChannel(connectionFactory) .queueName("foo")) ... .get();}@Beanpublic IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.channel(connectionFactory) .queueName("bar")) ... .get();}@Beanpublic IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.publishSubscribeChannel(connectionFactory) .queueName("baz")) ... .get();}
Spring 集成 AMQP 適配器會自動映射所有 AMQP 屬性和標(biāo)頭。 (這是對 4.3 的更改 - 以前,僅映射標(biāo)準(zhǔn)標(biāo)頭)。 默認(rèn)情況下,這些屬性通過使用DefaultAmqpHeaderMapper復(fù)制到 Spring Integration 中。MessageHeaders
您可以傳入自己的特定于 AMQP 的標(biāo)頭映射程序的實現(xiàn),因為適配器具有支持這樣做的屬性。
AMQP 消息屬性中的任何用戶定義的標(biāo)頭都將復(fù)制到 AMQP 消息或從 AMQP 消息復(fù)制,除非 的 或?qū)傩燥@式否定。 默認(rèn)情況下,對于出站映射器,不映射任何標(biāo)頭。 請參閱本節(jié)后面出現(xiàn)的警告,了解原因。??requestHeaderNames?
???replyHeaderNames?
???DefaultAmqpHeaderMapper?
???x-*?
?
要覆蓋默認(rèn)值并恢復(fù)到 4.3 之前的行為,請在屬性中使用 和。??STANDARD_REQUEST_HEADERS?
???STANDARD_REPLY_HEADERS?
?
映射用戶定義的標(biāo)頭時,這些值還可以包含要匹配的簡單通配符模式(例如 或)。 匹配所有標(biāo)頭。? |
從版本 4.1 開始,(超類)允許為 和 屬性(除了現(xiàn)有的 和 )配置令牌,以映射所有用戶定義的標(biāo)頭。??AbstractHeaderMapper?
???DefaultAmqpHeaderMapper?
???NON_STANDARD_HEADERS?
???requestHeaderNames?
???replyHeaderNames?
???STANDARD_REQUEST_HEADERS?
???STANDARD_REPLY_HEADERS?
?
該類標(biāo)識由 :??org.springframework.amqp.support.AmqpHeaders?
???DefaultAmqpHeaderMapper?
?
?amqp_appId?
???amqp_clusterId?
???amqp_contentEncoding?
???amqp_contentLength?
???content-type?
?(請參閱內(nèi)容類型標(biāo)頭)??amqp_correlationId?
???amqp_delay?
???amqp_deliveryMode?
???amqp_deliveryTag?
???amqp_expiration?
???amqp_messageCount?
???amqp_messageId?
???amqp_receivedDelay?
???amqp_receivedDeliveryMode?
???amqp_receivedExchange?
???amqp_receivedRoutingKey?
???amqp_redelivered?
???amqp_replyTo?
???amqp_timestamp?
???amqp_type?
???amqp_userId?
???amqp_publishConfirm?
???amqp_publishConfirmNackCause?
???amqp_returnReplyCode?
???amqp_returnReplyText?
???amqp_returnExchange?
???amqp_returnRoutingKey?
???amqp_channel?
???amqp_consumerTag?
???amqp_consumerQueue?
?如本節(jié)前面所述,使用 標(biāo)頭映射模式 是復(fù)制所有標(biāo)頭的常用方法。 但是,這可能會產(chǎn)生一些意想不到的副作用,因為某些 RabbitMQ 專有屬性/標(biāo)頭也會被復(fù)制。 例如,使用聯(lián)合身份驗證?時,收到的消息可能具有一個名為 的屬性,該屬性包含發(fā)送消息的節(jié)點。 如果對入站網(wǎng)關(guān)上的請求和回復(fù)標(biāo)頭映射使用通配符,則會復(fù)制此標(biāo)頭,這可能會導(dǎo)致聯(lián)合身份驗證出現(xiàn)一些問題。 此回復(fù)消息可能會聯(lián)合回發(fā)送代理,發(fā)送代理可能會認(rèn)為消息正在循環(huán),因此以靜默方式丟棄它。 如果您希望使用通配符標(biāo)頭映射的便利性,則可能需要篩選出下游流中的一些標(biāo)頭。 例如,為了避免將標(biāo)頭復(fù)制回回復(fù),您可以在將回復(fù)發(fā)送到 AMQP 入站網(wǎng)關(guān)之前使用。 或者,可以顯式列出實際要映射的屬性,而不是使用通配符。 出于這些原因,對于入站消息,映射器(默認(rèn)情況下)不映射任何標(biāo)頭。 它也不會將 映射到標(biāo)頭,以避免該標(biāo)頭從入站消息傳播到出站消息。 相反,此標(biāo)頭映射到 ,不會映射到輸出。? |
從版本 4.3 開始,標(biāo)頭映射中的模式可以通過在模式前面加上 來否定。 否定模式獲得優(yōu)先級,因此諸如 不映射(也不是 )之類的列表。 標(biāo)準(zhǔn)標(biāo)頭加上 和 被映射。 否定技術(shù)可能很有用,例如,當(dāng) JSON 反序列化邏輯以不同的方式在接收方下游完成時,不映射傳入消息的 JSON 類型標(biāo)頭。 為此,應(yīng)為入站通道適配器/網(wǎng)關(guān)的標(biāo)頭映射器配置模式。??!?
???STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1?
???thing1?
???thing2?
???thing3?
???bad?
???qux?
???!json_*?
?
如果您有一個用戶定義的標(biāo)頭,該標(biāo)頭以您希望映射的標(biāo)頭開頭,則需要對其進(jìn)行轉(zhuǎn)義,如下所示:。 現(xiàn)在已映射名為標(biāo)題的標(biāo)頭。? |
從版本 5.1 開始,如果出站消息中不存在相應(yīng)的 or 標(biāo)頭,則將分別回退到映射和 to。 入站屬性將像以前一樣映射到標(biāo)頭。 當(dāng)消息使用者使用有狀態(tài)重試時填充屬性很有用。? |
?contentType?
?與其他標(biāo)頭不同,不以 ;這允許跨不同技術(shù)透明地傳遞 contentType 標(biāo)頭。 例如,發(fā)送到 RabbitMQ 隊列的入站 HTTP 消息。??AmqpHeaders.CONTENT_TYPE?
???amqp_?
?
標(biāo)頭映射到Spring AMQP的屬性,隨后映射到RabbitMQ的屬性。??contentType?
???MessageProperties.contentType?
???content_type?
?
在版本 5.1 之前,此標(biāo)頭也映射為映射中的條目;這是不正確的,此外,該值可能是錯誤的,因為基礎(chǔ) Spring AMQP 消息轉(zhuǎn)換器可能已更改內(nèi)容類型。 這樣的更改將反映在 first-class 屬性中,但不反映在 RabbitMQ 標(biāo)頭映射中。 入站映射忽略標(biāo)頭映射值。 不再映射到標(biāo)頭映射中的條目。??MessageProperties.headers?
???content_type?
???contentType?
?
本節(jié)介紹入站和出站消息的消息排序。
如果需要對入站消息進(jìn)行嚴(yán)格排序,則必須將入站偵聽器容器的屬性配置為 。 這是因為,如果消息失敗并重新傳遞,它將在現(xiàn)有的預(yù)取消息之后到達(dá)。 從 Spring AMQP 2.0 版本開始,默認(rèn)為提高性能。 嚴(yán)格的訂購要求是以性能下降為代價的。??prefetchCount?
???1?
???prefetchCount?
???250?
?
請考慮以下集成流程:
@Beanpublic IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) .split(s -> s.delimiters(",")) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get();}
假設(shè)我們發(fā)送消息 ,并發(fā)送到網(wǎng)關(guān)。 雖然消息 、 很可能是按順序發(fā)送的,但不能保證。 這是因為模板為每個發(fā)送操作從緩存中“借用”一個通道,并且不能保證對每條消息使用相同的通道。 一種解決方案是在拆分器之前啟動事務(wù),但是在 RabbitMQ 中事務(wù)成本高昂,并且會使性能降低數(shù)百倍。??A?
???B?
???C?
???A?
???B?
???C?
?
為了以更有效的方式解決這個問題,從版本 5.1 開始,Spring 集成提供了 這是一個 . 請參閱處理消息建議。 在拆分器之前應(yīng)用時,它可確保在同一通道上執(zhí)行所有下游操作,并且可以選擇等到收到所有已發(fā)送消息的發(fā)布者確認(rèn)(如果連接工廠配置為確認(rèn))。 以下示例演示如何使用:??BoundRabbitChannelAdvice?
???HandleMessageAdvice?
???BoundRabbitChannelAdvice?
?
@Beanpublic IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) .split(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get();}
請注意,在建議和出站適配器中使用了相同的(實現(xiàn))。 該建議在模板的方法中運行下游流,以便所有操作都在同一通道上運行。 如果提供了可選的超時,則當(dāng)流完成時,建議將調(diào)用該方法,如果在指定時間內(nèi)未收到確認(rèn),則會引發(fā)異常。??RabbitTemplate?
???RabbitOperations?
???invoke?
???waitForConfirmsOrDie?
?
下游流(、 和其他流)中不得有線程切換。? |
要試驗 AMQP 適配器,請查看 Spring 集成示例 git 存儲庫中提供的示例,網(wǎng)址為https://github.com/SpringSource/spring-integration-samples
目前,一個示例通過使用出站通道適配器和入站通道適配器演示了 Spring 集成 AMQP 適配器的基本功能。 由于示例中的 AMQP 代理實現(xiàn)使用RabbitMQ。
為了運行該示例,您需要一個正在運行的 RabbitMQ 實例。 僅具有基本默認(rèn)值的本地安裝就足夠了。 有關(guān)詳細(xì)的 RabbitMQ 安裝過程,請參閱https://www.rabbitmq.com/install.html |
啟動示例應(yīng)用程序后,在命令提示符下輸入一些文本,包含該輸入文本的消息將調(diào)度到 AMQP 隊列。 作為回報,Spring Integration 檢索該消息并將其打印到控制臺。
下圖說明了此示例中使用的一組基本 Spring 集成組件。
AMQP 示例圖像的彈簧集成圖::images/spring-integration-amqp-sample-graph.png[]
版本 6.0 引入了對 RabbitMQ 流隊列的支持。
這些終結(jié)點的 DSL 工廠類是 。??Rabbit?
?
@BeanIntegrationFlow flow(Environment env) { @Bean IntegrationFlow simpleStream(Environment env) { return IntegrationFlow.from(RabbitStream.inboundAdapter(env) .configureContainer(container -> container.queueName("my.stream"))) // ... .get(); } @Bean IntegrationFlow superStream(Environment env) { return IntegrationFlow.from(RabbitStream.inboundAdapter(env) .configureContainer(container -> container.superStream("my.stream", "my.consumer"))) // ... .get(); }}
@BeanIntegrationFlow outbound(RabbitStreamTemplate template) { return f -> f // ... .handle(RabbitStream.outboundStreamAdapter(template));}