亚洲综合图片区自拍_思思91精品国产综合在线观看_一区二区三区欧美_欧美黑人又粗又大_亚洲人成精品久久久久桥本

天天熱消息:AMQP (RabbitMQ) 支持

2022-12-08 12:04:46 來源:51CTO博客

Spring 集成提供了通道適配器,用于使用高級消息隊列協(xié)議 (AMQP) 接收和發(fā)送消息。

您需要將此依賴項包含在項目中:


(資料圖片僅供參考)

    org.springframework.integration    spring-integration-amqp    6.0.0

以下適配器可用:

入站通道適配器入站網(wǎng)關(guān)出站通道適配器出站網(wǎng)關(guān)異步出站網(wǎng)關(guān)RabbitMQ 流隊列入站通道適配器RabbitMQ 流隊列出站通道適配器

Spring 集成還提供了點對點消息通道和由 AMQP 交換和隊列支持的發(fā)布-訂閱消息通道。

為了提供AMQP支持,Spring Integration依賴于(Spring AMQP),它將Spring的核心概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)。 Spring AMQP提供了與Spring JMS類似的語義。

雖然提供的AMQP通道適配器僅用于單向消息傳遞(發(fā)送或接收),但Spring Integration還提供了用于請求-回復(fù)操作的入站和出站AMQP網(wǎng)關(guān)。

提示: 您應(yīng)該熟悉Spring AMQP項目的參考文檔。 它提供了有關(guān)Spring與AMQP集成的更深入的信息,特別是RabbitMQ。

入站通道適配器

以下清單顯示了 AMQP 入站通道適配器的可能配置選項:

@Beanpublic IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))            .handle(m -> System.out.println(m.getPayload()))            .get();}

容器

請注意,在使用XML配置外部容器時,不能使用Spring AMQP命名空間來定義容器。 這是因為命名空間至少需要一個元素。 在此環(huán)境中,偵聽器位于適配器內(nèi)部。 因此,您必須使用常規(guī) Spring 定義來定義容器,如以下示例所示:????????

            

盡管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差異。 JMS 入站通道適配器正在使用底層,并且需要配置的輪詢器。 AMQP 入站通道適配器使用 和 是消息驅(qū)動的。 在這方面,它更類似于 JMS 消息驅(qū)動的通道適配器。??JmsDestinationPollingSource????AbstractMessageListenerContainer??

從版本 5.5 開始,可以使用在內(nèi)部調(diào)用重試操作時使用的策略進(jìn)行配置。 有關(guān)更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter????org.springframework.amqp.rabbit.retry.MessageRecoverer????RecoveryCallback????setMessageRecoverer()??

批處理消息

有關(guān)批處理消息的更多信息,請參閱Spring AMQP 文檔。

要使用 Spring 集成生成批處理消息,只需使用 .??BatchingRabbitTemplate??

接收批處理消息時,默認(rèn)情況下,偵聽器容器提取每個片段消息,適配器將為每個片段生成 。 從版本 5.2 開始,如果容器的屬性設(shè)置為 ,則由適配器執(zhí)行去批處理,并生成一個有效負(fù)載為片段有效負(fù)載列表(如果適用,轉(zhuǎn)換后)。??Message????deBatchingEnabled????false????Message>??

默認(rèn)值為 ,但可以在適配器上覆蓋。??BatchingStrategy????SimpleBatchingStrategy??

當(dāng)重試操作需要恢復(fù)時,必須與批處理一起使用。??org.springframework.amqp.rabbit.retry.MessageBatchRecoverer??

輪詢?nèi)胝就ǖ肋m配器

概述

版本 5.0.1 引入了輪詢通道適配器,允許您按需獲取單個消息 — 例如,使用 或 輪詢器。 有關(guān)詳細(xì)信息,請參閱延遲確認(rèn)可輪詢消息源。??MessageSourcePollingTemplate??

它當(dāng)前不支持 XML 配置。

以下示例顯示如何配置:??AmqpMessageSource??

@Beanpublic IntegrationFlow flow() {    return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))            .handle(p -> {                ...            })            .get();}

有關(guān)配置屬性,請參閱Javadoc。

批處理消息

請參閱批處理消息。

對于輪詢適配器,沒有偵聽器容器,批處理消息始終是反批處理的(如果支持這樣做)。??BatchingStrategy??

入站網(wǎng)關(guān)

入站網(wǎng)關(guān)支持入站通道適配器上的所有屬性(除了“通道”被“請求通道”替換),以及一些其他屬性。 以下清單顯示了可用的屬性:

@Bean // return the upper cased payloadpublic IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {    return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))            .transform(String.class, String::toUpperCase)            .get();}

此適配器的唯一 ID。 自選。

將轉(zhuǎn)換后的消息發(fā)送到的消息通道。 必填。

對接收 AMQP 消息時要使用的 的引用。 自選。 默認(rèn)情況下,只有標(biāo)準(zhǔn)的 AMQP 屬性(例如 )被復(fù)制到 Spring Integration 和從 Spring Integration 復(fù)制。 默認(rèn)情況下,不會將 AMQP 中的任何用戶定義的標(biāo)頭復(fù)制到 AMQP 消息或從 AMQP 消息復(fù)制。 如果提供了“請求標(biāo)頭名稱”或“回復(fù)標(biāo)頭名稱”,則不允許使用。??AmqpHeaderMapper????contentType????MessageHeaders????MessageProperties????DefaultAmqpHeaderMapper??

要從 AMQP 請求映射到 的 AMQP 標(biāo)頭名稱的逗號分隔列表。 僅當(dāng)未提供“標(biāo)頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是與標(biāo)頭名稱匹配的簡單模式(例如 或或)。??MessageHeaders????"*"????"thing1*, thing2"????"*thing1"??

要映射到 AMQP 回復(fù)消息的 AMQP 消息屬性中的名稱的逗號分隔列表。 所有標(biāo)準(zhǔn)標(biāo)頭(例如)都映射到 AMQP 消息屬性,而用戶定義的標(biāo)頭映射到“標(biāo)頭”屬性。 僅當(dāng)未提供“標(biāo)頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是要與標(biāo)頭名稱(例如,或或)匹配的簡單模式。??MessageHeaders????contentType????"*"????"foo*, bar"????"*foo"??

消息通道,其中應(yīng)回復(fù)消息。 自選。

設(shè)置用于從回復(fù)通道接收消息的基礎(chǔ)。 如果未指定,此屬性默認(rèn)為 (1 秒)。 僅當(dāng)容器線程在發(fā)送回復(fù)之前移交給另一個線程時,才適用。??receiveTimeout????o.s.i.core.MessagingTemplate????1000??

自定義的 Bean 引用(以便更好地控制要發(fā)送的回復(fù)消息)。 您可以提供 的替代實現(xiàn)。??AmqpTemplate????RabbitTemplate??

當(dāng) 沒有屬性時要使用的。 如果未指定此選項,則提供 no,請求消息中不存在任何屬性,并且 拋出 AN 是因為無法路由回復(fù)。 如果未指定此選項并提供外部選項,則不會引發(fā)異常。 您必須指定此選項或配置默認(rèn)值,并在該模板上, 如果您預(yù)計請求消息中不存在任何屬性的情況。??replyTo????o.s.amqp.core.Address????requestMessage????replyTo????amqp-template????replyTo????IllegalStateException????amqp-template????exchange????routingKey????replyTo??

請參閱入站通道適配器中有關(guān)配置屬性的說明。??listener-container??

從版本 5.5 開始,可以使用在內(nèi)部調(diào)用重試操作時使用的策略進(jìn)行配置。 有關(guān)更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter????org.springframework.amqp.rabbit.retry.MessageRecoverer????RecoveryCallback????setMessageRecoverer()??

批處理消息

請參閱批處理消息。

入站終端節(jié)點確認(rèn)模式

默認(rèn)情況下,入站終端節(jié)點使用應(yīng)答方式,這意味著容器會在下游集成流完成(或使用 或 將消息傳遞給另一個線程)時自動確認(rèn)消息。 將模式設(shè)置為配置使用者,以便根本不使用確認(rèn)(代理在發(fā)送消息后立即自動確認(rèn)消息)。 設(shè)置模式以允許用戶代碼在處理過程中的某個其他點確認(rèn)消息。 為了支持此功能,在此模式下,終結(jié)點分別在 和 中提供 和 標(biāo)頭。??AUTO????QueueChannel????ExecutorChannel????NONE????MANUAL????Channel????deliveryTag????amqp_channel????amqp_deliveryTag??

您可以對 執(zhí)行任何有效的 Rabbit 命令,但通常只使用 and(或)。 為了不干擾容器的操作,不應(yīng)保留對通道的引用,而應(yīng)僅在當(dāng)前消息的上下文中使用它。??Channel????basicAck????basicNack????basicReject??

由于 是對“活動”對象的引用,因此它不能序列化,如果持久保存消息,則會丟失。??Channel??

以下示例演示如何使用確認(rèn):??MANUAL??

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {    // Do some processing    if (allOK) {        channel.basicAck(deliveryTag, false);        // perhaps do some more processing    }    else {        channel.basicNack(deliveryTag, false, true);    }    return someResultForDownStreamProcessing;}

出站終結(jié)點

以下出站終結(jié)點具有許多類似的配置選項。 從版本 5.2 開始,已添加。 通常,當(dāng)啟用發(fā)布者確認(rèn)時,代理將快速返回一個 ack(或 nack),該 ack(或 nack)將被發(fā)送到相應(yīng)的通道。 如果在收到確認(rèn)之前關(guān)閉了通道,Spring AMQP 框架將合成一個 nack。 “丟失”確認(rèn)不應(yīng)發(fā)生,但如果設(shè)置此屬性,則終結(jié)點將定期檢查它們,并在時間過去而未收到確認(rèn)時合成 nack。??confirm-timeout??

出站通道適配器

以下示例顯示了 AMQP 出站通道適配器的可用屬性:

@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,        MessageChannel amqpOutboundChannel) {    return IntegrationFlow.from(amqpOutboundChannel)            .handle(Amqp.outboundAdapter(amqpTemplate)                        .routingKey("queue1")) // default exchange - route to queue "queue1"            .get();}

此適配器的唯一 ID。 自選。

消息通道,消息應(yīng)發(fā)送到該通道,以便將其轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。

對已配置的 AMQP 模板的 Bean 引用。 可選(默認(rèn)為 )。??amqpTemplate??

向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交換名稱表達(dá)”相互排斥。 自選。

一個 SpEL 表達(dá)式,計算該表達(dá)式以確定消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。

注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。??Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]??

發(fā)送消息時使用的固定路由密鑰。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。??String??

一個 SpEL 表達(dá)式,計算該表達(dá)式以確定發(fā)送消息時要使用的路由密鑰,消息作為根對象(例如,“payload.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。??String??

郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭,則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果根本不自定義,則默認(rèn)值為 。 自選。??PERSISTENT????NON_PERSISTENT????header-mapper????amqp_deliveryMode????DefaultHeaderMapper????MessagePropertiesConverter????RabbitTemplate????PERSISTENT??

定義相關(guān)性數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用和屬性設(shè)置為 . 收到發(fā)布者確認(rèn)并提供相關(guān)數(shù)據(jù)時,將根據(jù)確認(rèn)類型將其寫入 或 。 確認(rèn)的有效負(fù)載是相關(guān)數(shù)據(jù),由此表達(dá)式定義。 郵件的“amqp_publishConfirm”標(biāo)頭設(shè)置為 () 或 ()。 示例:和 。 版本 4.1 引入了消息標(biāo)頭。 它包含用于發(fā)布者確認(rèn)的“nack”。 從版本 4.2 開始,如果表達(dá)式解析為實例(例如 ),則在 / 通道上發(fā)出的消息基于該消息,并添加其他標(biāo)頭。 以前,無論類型如何,都會使用相關(guān)數(shù)據(jù)作為其有效負(fù)載創(chuàng)建新消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。??RabbitTemplate????CachingConnectionFactory????publisherConfirms????true????confirm-ack-channel????confirm-nack-channel????true????ack????false????nack????headers["myCorrelationData"]????payload????amqp_publishConfirmNackCause????cause????Message????#this????ack????nack??

正 () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??ack????confirm-correlation-expression????#root????#this????amqp_publishConfirm????true????nullChannel??

將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)(如果未配置)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??nack????confirm-correlation-expression????ErrorMessageStrategy????#root????#this????amqp_publishConfirm????false????ErrorMessageStrategy????ErrorMessage????NackedAmqpMessageException????nullChannel??

設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),適配器將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制。 默認(rèn) none (不會生成 nacks)。

設(shè)置為 true 時,調(diào)用線程將阻塞,等待發(fā)布者確認(rèn)。 這需要配置確認(rèn)以及 . 線程將阻塞長達(dá) (或默認(rèn)為 5 秒)。 如果發(fā)生超時,將拋出 。 如果啟用了返回并返回了消息,或者在等待確認(rèn)時發(fā)生任何其他異常,則將拋出 a 并顯示相應(yīng)的消息。??RabbitTemplate????confirm-correlation-expression????confirm-timeout????MessageTimeoutException????MessageHandlingException??

返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為向適配器返回?zé)o法傳遞的消息。 如果未進(jìn)行配置,則從從 AMQP 接收的數(shù)據(jù)構(gòu)造消息,并具有以下附加標(biāo)頭:、、、。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。??ErrorMessageStrategy????amqp_returnReplyCode????amqp_returnReplyText????amqp_returnExchange????amqp_returnRoutingKey????ErrorMessageStrategy????ErrorMessage????ReturnedAmqpMessageException??

對用于在發(fā)送返回或否定確認(rèn)的消息時生成實例的實現(xiàn)的引用。??ErrorMessageStrategy????ErrorMessage??

對發(fā)送 AMQP 消息時要使用的 的引用。 默認(rèn)情況下,只有標(biāo)準(zhǔn)的 AMQP 屬性(例如 )被復(fù)制到 Spring 集成 中。 任何用戶定義的標(biāo)頭都不會通過默認(rèn)的“DefaultAmqpHeaderMapper”復(fù)制到消息中。 如果提供了“請求標(biāo)頭名稱”,則不允許。 自選。??AmqpHeaderMapper????contentType????MessageHeaders??

要從 映射到 AMQP 消息的 AMQP 標(biāo)頭名稱的逗號分隔列表。 如果提供了“標(biāo)頭映射器”引用,則不允許。 此列表中的值也可以是與標(biāo)頭名稱匹配的簡單模式(例如 或或)。??MessageHeaders????"*"????"thing1*, thing2"????"*thing1"??

設(shè)置為 時,端點將在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這允許對錯誤配置進(jìn)行“快速故障”檢測,但如果代理關(guān)閉,也會導(dǎo)致初始化失敗。 當(dāng)(默認(rèn)值)時,當(dāng)發(fā)送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。??false????true??

設(shè)置為 時,類型的有效負(fù)載將作為離散消息在單個調(diào)用范圍內(nèi)在同一通道上發(fā)送。 需要 . when 為 true,在發(fā)送消息后調(diào)用。 使用事務(wù)模板,發(fā)送將在新事務(wù)或已啟動事務(wù)(如果存在)中執(zhí)行。??true????Iterable>????RabbitTemplate????RabbitTemplate????wait-for-confirms????RabbitTemplate.waitForConfirmsOrDie()??

返回通道

使用 a 需要將屬性設(shè)置為 的 和將屬性設(shè)置為 的 。 將多個出站終結(jié)點與返回符一起使用時,每個終結(jié)點都需要一個單獨的終結(jié)點。??return-channel????RabbitTemplate????mandatory????true????CachingConnectionFactory????publisherReturns????true????RabbitTemplate??

出站網(wǎng)關(guān)

以下清單顯示了 AMQP 出站網(wǎng)關(guān)的可能屬性:

@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)                    .routingKey("foo")) // default exchange - route to queue "foo"            .get();}@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")public interface MyGateway {    String sendToRabbit(String data);}

此適配器的唯一 ID。 自選。

消息發(fā)送到的消息通道,消息被轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。

對已配置的 AMQP 模板的 Bean 引用。 可選(默認(rèn)為 )。??amqpTemplate??

應(yīng)向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱 cxchange。 與“交換名稱表達(dá)”相互排斥。 自選。

一個 SpEL 表達(dá)式,計算該表達(dá)式以確定應(yīng)將消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。

注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。??Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]??

從 AMQP 隊列接收并轉(zhuǎn)換回復(fù)后應(yīng)發(fā)送到的消息通道。 自選。

網(wǎng)關(guān)在向 發(fā)送回復(fù)消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當(dāng)前已滿的 。 默認(rèn)為無窮大。??reply-channel????reply-channel????QueueChannel??

當(dāng) 時,如果屬性中未收到回復(fù)消息,網(wǎng)關(guān)將引發(fā)異常。 默認(rèn)值為 。??true????AmqpTemplate’s `replyTimeout????true??

發(fā)送消息時使用的。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。??routing-key????String??

一個 SpEL 表達(dá)式,經(jīng)過計算以確定發(fā)送消息時使用的表達(dá)式,將消息作為根對象(例如,“payload.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。??routing-key????String??

郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭,則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果根本不自定義,則默認(rèn)值為 。 自選。??PERSISTENT????NON_PERSISTENT????header-mapper????amqp_deliveryMode????DefaultHeaderMapper????MessagePropertiesConverter????RabbitTemplate????PERSISTENT??

從版本 4.2 開始。 定義相關(guān)數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用和屬性設(shè)置為 . 收到發(fā)布者確認(rèn)并提供關(guān)聯(lián)數(shù)據(jù)時,將根據(jù)確認(rèn)類型將其寫入 或 。 確認(rèn)的有效負(fù)載是相關(guān)數(shù)據(jù),由此表達(dá)式定義。 郵件的標(biāo)頭“amqp_publishConfirm”設(shè)置為 () 或 ()。 為了確認(rèn),Spring 集成提供了一個額外的標(biāo)頭。 示例:和 。 如果表達(dá)式解析為實例(例如 ),則消息 在 / 通道上發(fā)出的基于該消息,并添加了其他標(biāo)頭。 以前,無論類型如何,都會使用相關(guān)數(shù)據(jù)作為其有效負(fù)載創(chuàng)建新消息。 另請參閱發(fā)布者確認(rèn)和返回的代機(jī)制?。 自選。??RabbitTemplate????CachingConnectionFactory????publisherConfirms????true????confirm-ack-channel????confirm-nack-channel????true????ack????false????nack????nack????amqp_publishConfirmNackCause????headers["myCorrelationData"]????payload????Message????#this????ack????nack??

向其發(fā)送正 () 發(fā)布者確認(rèn)的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??ack????confirm-correlation-expression????#root????#this????amqp_publishConfirm????true????nullChannel??

將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由定義的關(guān)聯(lián)數(shù)據(jù)(如果未配置)。 如果表達(dá)式為 或 ,則消息是從原始消息構(gòu)建的,標(biāo)頭設(shè)置為 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??nack????confirm-correlation-expression????ErrorMessageStrategy????#root????#this????amqp_publishConfirm????false????ErrorMessageStrategy????ErrorMessage????NackedAmqpMessageException????nullChannel??

設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),網(wǎng)關(guān)將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 默認(rèn) none (不會生成 nacks)。

返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為向適配器返回?zé)o法傳遞的消息。 如果未進(jìn)行配置,則根據(jù)從 AMQP 接收的數(shù)據(jù)構(gòu)造消息,并具有以下附加標(biāo)頭:、、 和 。 當(dāng)存在 時,消息是帶有有效負(fù)載的消息。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。??ErrorMessageStrategy????amqp_returnReplyCode????amqp_returnReplyText????amqp_returnExchange????amqp_returnRoutingKey????ErrorMessageStrategy????ErrorMessage????ReturnedAmqpMessageException??

對用于在發(fā)送返回或否定確認(rèn)的消息時生成實例的實現(xiàn)的引用。??ErrorMessageStrategy????ErrorMessage??

設(shè)置為 時,端點將在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這允許在代理關(guān)閉時通過記錄錯誤消息來“快速失敗”檢測錯誤配置。 當(dāng)(默認(rèn)值)時,當(dāng)發(fā)送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。??false????true??

返回通道

使用 a 需要將屬性設(shè)置為 的 和將屬性設(shè)置為 的 。 將多個出站終結(jié)點與返回符一起使用時,每個終結(jié)點都需要一個單獨的終結(jié)點。??return-channel????RabbitTemplate????mandatory????true????CachingConnectionFactory????publisherReturns????true????RabbitTemplate??

基礎(chǔ)的默認(rèn)值為 5 秒。 如果需要更長的超時,則必須在 上配置它。??AmqpTemplate????replyTimeout????template??

請注意,出站適配器和出站網(wǎng)關(guān)配置之間的唯一區(qū)別是屬性的設(shè)置。??expectReply??

異步出站網(wǎng)關(guān)

上一節(jié)中討論的網(wǎng)關(guān)是同步的,因為發(fā)送線程掛起,直到 收到回復(fù)(或發(fā)生超時)。 Spring Integration 版本 4.3 添加了一個異步網(wǎng)關(guān),該網(wǎng)關(guān)使用 from Spring AMQP。 發(fā)送消息時,線程會在發(fā)送操作完成后立即返回,收到消息后,將在模板的偵聽器容器線程上發(fā)送回復(fù)。 在輪詢器線程上調(diào)用網(wǎng)關(guān)時,這可能很有用。 線程已釋放,可用于框架中的其他任務(wù)。??AsyncRabbitTemplate??

以下清單顯示了 AMQP 異步出站網(wǎng)關(guān)的可能配置選項:

@Configurationpublic class AmqpAsyncApplication {    @Bean    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {        return f -> f                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)                        .routingKey("queue1")); // default exchange - route to queue "queue1"    }    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")    public interface MyGateway {        String sendToRabbit(String data);    }}

此適配器的唯一 ID。 自選。

消息通道,消息應(yīng)發(fā)送到該通道,以便將其轉(zhuǎn)換并發(fā)布到 AMQP 交換。 必填。

對已配置的 Bean 引用。 可選(默認(rèn)為 )。??AsyncRabbitTemplate????asyncRabbitTemplate??

應(yīng)向其發(fā)送消息的 AMQP 交換的名稱。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交換名稱表達(dá)”相互排斥。 自選。

一個 SpEL 表達(dá)式,計算該表達(dá)式以確定消息發(fā)送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發(fā)送到默認(rèn)的無名稱交換。 與“交易所名稱”相互排斥。 自選。

注冊多個使用者時此使用者的順序,從而啟用負(fù)載平衡和故障轉(zhuǎn)移。 可選(默認(rèn)為 )。??Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]??

從 AMQP 隊列接收并轉(zhuǎn)換回復(fù)后應(yīng)發(fā)送到的消息通道。 自選。

網(wǎng)關(guān)在向 發(fā)送回復(fù)消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當(dāng)前已滿的 。 默認(rèn)值為無窮大。??reply-channel????reply-channel????QueueChannel??

如果在屬性中未收到回復(fù)消息,并且此設(shè)置為 ,網(wǎng)關(guān)將向入站消息的標(biāo)頭發(fā)送錯誤消息。 如果在屬性中未收到回復(fù)消息,并且此設(shè)置為 ,則網(wǎng)關(guān)會將錯誤消息發(fā)送到默認(rèn)值(如果可用)。 默認(rèn)為 .??AsyncRabbitTemplate’s `receiveTimeout????true????errorChannel????AsyncRabbitTemplate’s `receiveTimeout????false????errorChannel????true??

發(fā)送消息時要使用的路由密鑰。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰表達(dá)式”互斥。 自選。??String??

一個 SpEL 表達(dá)式,經(jīng)過計算以確定發(fā)送消息時要使用的路由密鑰, 將消息作為根對象(例如,“有效負(fù)載.key”)。 默認(rèn)情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。??String??

郵件的默認(rèn)傳遞方式:或 。 如果設(shè)置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標(biāo)頭 (),則設(shè)置該值。 如果未提供此屬性,并且標(biāo)頭映射器未設(shè)置此屬性,則默認(rèn)值取決于 使用的基礎(chǔ) Spring AMQP。 如果未自定義,則缺省值為 。 自選。??PERSISTENT????NON_PERSISTENT????header-mapper????amqp_deliveryMode????DefaultHeaderMapper????MessagePropertiesConverter????RabbitTemplate????PERSISTENT??

定義相關(guān)性數(shù)據(jù)的表達(dá)式。 如果提供,這會將基礎(chǔ) AMQP 模板配置為接收發(fā)布者確認(rèn)。 需要專用 和 ,其屬性設(shè)置為 。 收到發(fā)布者確認(rèn)并提供相關(guān)數(shù)據(jù)時,確認(rèn)將寫入 或 ,具體取決于確認(rèn)類型。 確認(rèn)的有效負(fù)載是此表達(dá)式定義的相關(guān)數(shù)據(jù),消息的“amqp_publishConfirm”標(biāo)頭設(shè)置為 () 或 ()。 例如,提供了一個額外的標(biāo)頭 ()。 例子:。 如果表達(dá)式解析為實例(例如“#this”),則在 / 通道上發(fā)出的消息將基于該消息,并添加其他標(biāo)頭。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。??RabbitTemplate????CachingConnectionFactory????publisherConfirms????true????confirm-ack-channel????confirm-nack-channel????true????ack????false????nack????nack????amqp_publishConfirmNackCause????headers["myCorrelationData"]????payload????Message????ack????nack??

向其發(fā)送正 () 發(fā)布者確認(rèn)的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??ack????confirm-correlation-expression????AsyncRabbitTemplate????enableConfirms????true????nullChannel??

從版本 4.2 開始。 將負(fù) () 發(fā)布者確認(rèn)發(fā)送到的通道。 有效負(fù)載是由 定義的關(guān)聯(lián)數(shù)據(jù)。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 可選(默認(rèn)值為 )。??nack????confirm-correlation-expression????AsyncRabbitTemplate????enableConfirms????true????nullChannel??

設(shè)置后,如果在此時間內(nèi)未收到發(fā)布者確認(rèn)(以毫秒為單位),網(wǎng)關(guān)將合成否定確認(rèn) (nack)。 每 50% 檢查一次掛起的確認(rèn),因此發(fā)送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制。 默認(rèn) none (不會生成 nacks)。

返回的消息發(fā)送到的通道。 提供后,基礎(chǔ) AMQP 模板配置為將無法傳遞的消息返回到網(wǎng)關(guān)。 該消息是根據(jù)從 AMQP 接收的數(shù)據(jù)構(gòu)造的,具有以下附加標(biāo)頭:、、 和 。 要求基礎(chǔ)數(shù)據(jù)庫將其屬性設(shè)置為 。 另請參閱發(fā)布者確認(rèn)和返回的替代機(jī)制?。 自選。??amqp_returnReplyCode????amqp_returnReplyText????amqp_returnExchange????amqp_returnRoutingKey????AsyncRabbitTemplate????mandatory????true??

設(shè)置為 時,端點在應(yīng)用程序上下文初始化期間嘗試連接到代理。 這樣做允許“快速失敗”檢測錯誤配置,方法是在代理關(guān)閉時記錄錯誤消息。 當(dāng)(默認(rèn)值)建立連接時(如果由于建立了其他組件而不存在 it) 發(fā)送第一條消息時。??false????true??

另請參閱異步服務(wù)激活器以獲取詳細(xì)信息。

兔子模板

當(dāng)您使用確認(rèn)和退貨時,我們建議將有線成專用的。 否則,可能會遇到意想不到的副作用。??RabbitTemplate????AsyncRabbitTemplate??

發(fā)布商確認(rèn)和返回的替代機(jī)制

將連接工廠配置為發(fā)布者確認(rèn)并返回時,上述部分將討論消息通道的配置,以便異步接收確認(rèn)和返回。 從版本 5.4 開始,還有一個通常更易于使用的附加機(jī)制。

在這種情況下,請勿配置 或 確認(rèn)和返回通道。 相反,在標(biāo)頭中添加一個實例;然后,您可以通過檢查已發(fā)送消息的實例中的未來狀態(tài)來等待稍后的結(jié)果。 在將來完成之前,將始終填充該字段(如果返回消息)。??confirm-correlation-expression????CorrelationData????AmqpHeaders.PUBLISH_CONFIRM_CORRELATION????CorrelationData????returnedMessage??

CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returnssomeFlow.getInputChannel().send(MessageBuilder.withPayload("test")        .setHeader("rk", "someKeyThatWontRoute")        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)        .build());...try {    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);    Message returned = corr.getReturnedMessage();    if (returned !- null) {        // message could not be routed    }}catch { ... }

為了提高性能,您可能希望發(fā)送多條消息并稍后等待確認(rèn),而不是一次發(fā)送一條。 返回的消息是轉(zhuǎn)換后的原始消息;您可以使用所需的任何其他數(shù)據(jù)對 A 進(jìn)行子類。??CorrelationData??

入站消息轉(zhuǎn)換

到達(dá)通道適配器或網(wǎng)關(guān)的入站消息將使用消息轉(zhuǎn)換器轉(zhuǎn)換為有效負(fù)載。 默認(rèn)情況下,使用 a,用于處理 java 序列化和文本。 默認(rèn)情況下,標(biāo)頭使用 映射。 如果發(fā)生轉(zhuǎn)換錯誤,并且未定義錯誤通道,則會將異常引發(fā)到容器,并由偵聽器容器的錯誤處理程序處理。 默認(rèn)錯誤處理程序?qū)⑥D(zhuǎn)換錯誤視為致命錯誤,消息將被拒絕(如果隊列已如此配置,則路由到死信交換)。 如果定義了錯誤通道,則有效負(fù)載為 具有屬性(無法轉(zhuǎn)換的 Spring AMQP 消息)和 . 如果容器是(默認(rèn)值),并且錯誤流使用錯誤而不引發(fā)異常,則將確認(rèn)原始消息。 如果錯誤流引發(fā)異常,則異常類型與容器的錯誤處理程序一起確定消息是否重新排隊。 如果容器配置了 ,則有效負(fù)載是具有附加屬性和 的 。 這使錯誤流能夠調(diào)用 或(或)消息,以控制其處置。??spring-messaging????Message????SimpleMessageConverter????DefaultHeaderMapper.inboundMapper()????ErrorMessage????ListenerExecutionFailedException????failedMessage????cause????AcknowledgeMode????AUTO????AcknowledgeMode.MANUAL????ManualAckListenerExecutionFailedException????channel????deliveryTag????basicAck????basicNack????basicReject??

出站消息轉(zhuǎn)換

Spring AMQP 1.4 引入了 ,其中實際轉(zhuǎn)換器的選擇基于 在傳入內(nèi)容類型消息屬性上。 這可由入站終端節(jié)點使用。??ContentTypeDelegatingMessageConverter??

從 Spring 集成版本 4.3 開始,您也可以在出站端點上使用 ,標(biāo)頭指定使用哪個轉(zhuǎn)換器。??ContentTypeDelegatingMessageConverter????contentType??

以下示例配置了一個 ,默認(rèn)轉(zhuǎn)換器為 (處理 Java 序列化和純文本),以及一個 JSON 轉(zhuǎn)換器:??ContentTypeDelegatingMessageConverter????SimpleMessageConverter??

                                                                

將標(biāo)頭設(shè)置為 to 的消息發(fā)送到會導(dǎo)致選擇 JSON 轉(zhuǎn)換器。??ctRequestChannel????contentType????application/json??

這適用于出站通道適配器和網(wǎng)關(guān)。

從版本 5.0 開始,添加到出站郵件的標(biāo)頭永遠(yuǎn)不會被映射標(biāo)頭覆蓋(默認(rèn)情況下)。 以前,只有當(dāng)消息轉(zhuǎn)換器是 (在這種情況下,首先映射標(biāo)頭以便可以選擇正確的轉(zhuǎn)換器)時才會出現(xiàn)這種情況。 對于其他轉(zhuǎn)換器,例如 ,映射標(biāo)頭將覆蓋轉(zhuǎn)換器添加的任何標(biāo)頭。 當(dāng)出站郵件具有一些剩余的標(biāo)頭(可能來自入站通道適配器)并且正確的出站被錯誤地覆蓋時,這會導(dǎo)致問題。 解決方法是在將消息發(fā)送到出站終結(jié)點之前使用標(biāo)頭篩選器刪除標(biāo)頭。??MessageProperties????ContentTypeDelegatingMessageConverter????SimpleMessageConverter????contentType????contentType??

但是,在某些情況下,需要以前的行為 - 例如,當(dāng)包含 JSON 的有效負(fù)載不知道內(nèi)容并將消息屬性設(shè)置為,但應(yīng)用程序希望通過設(shè)置發(fā)送到出站終結(jié)點的消息標(biāo)頭來覆蓋該行為。 正是這樣做的(默認(rèn)情況下)。??String????SimpleMessageConverter????contentType????text/plain????application/json????contentType????ObjectToJsonTransformer??

現(xiàn)在,在出站通道適配器和網(wǎng)關(guān)(以及 AMQP 支持的通道)上調(diào)用了一個屬性。 設(shè)置此選項可還原覆蓋轉(zhuǎn)換器添加的屬性的行為。??headersMappedLast????true??

從版本 5.1.9 開始,當(dāng)我們生成回復(fù)并希望覆蓋轉(zhuǎn)換器填充的標(biāo)頭時,提供了類似的情況。 有關(guān)更多信息,請參閱其 JavaDocs。??replyHeadersMappedLast????AmqpInboundGateway??

出站用戶標(biāo)識

Spring AMQP 版本 1.6 引入了一種機(jī)制,允許為出站消息指定默認(rèn)用戶 ID。 始終可以設(shè)置標(biāo)頭,該標(biāo)頭現(xiàn)在優(yōu)先于默認(rèn)值。 這可能對郵件收件人有用。 對于入站郵件,如果郵件發(fā)布者設(shè)置了該屬性,則該屬性將在標(biāo)頭中可用。 請注意,RabbitMQ會驗證用戶 ID 是連接的實際用戶 ID,還是連接允許模擬。??AmqpHeaders.USER_ID????AmqpHeaders.RECEIVED_USER_ID??

要為出站消息配置缺省用戶標(biāo)識,請在 上配置該標(biāo)識,并將出站適配器或網(wǎng)關(guān)配置為使用該模板。 同樣,要在回復(fù)上設(shè)置用戶 ID 屬性,請將適當(dāng)配置的模板注入入站網(wǎng)關(guān)。 有關(guān)更多信息,請參閱Spring AMQP 文檔。??RabbitTemplate??

延遲消息交換

Spring AMQP 支持RabbitMQ 延遲消息交換插件。 對于入站郵件,標(biāo)頭映射到標(biāo)頭。 設(shè)置標(biāo)頭會導(dǎo)致在出站郵件中設(shè)置相應(yīng)的標(biāo)頭。 還可以在出站終結(jié)點上指定 and 屬性(使用 XML 配置時)。 這些屬性優(yōu)先于標(biāo)頭。??x-delay????AmqpHeaders.RECEIVED_DELAY????AMQPHeaders.DELAY????x-delay????delay????delayExpression????delay-expression????AmqpHeaders.DELAY??

AMQP 支持的消息通道

有兩種消息通道實現(xiàn)可用。 一個是點對點,另一個是發(fā)布-訂閱。 這兩個通道都為基礎(chǔ)和(如本章前面的通道適配器和網(wǎng)關(guān)所示)提供了廣泛的配置屬性。 但是,我們在此處顯示的示例具有最小配置。 瀏覽 XML 架構(gòu)以查看可用屬性。??AmqpTemplate????SimpleMessageListenerContainer??

點對點通道可能類似于以下示例:

在幕后,前面的示例導(dǎo)致聲明一個命名,并且此通道發(fā)送到該通道(從技術(shù)上講,通過使用與此名稱匹配的路由密鑰發(fā)送到無名稱的直接交換)。 此通道還在此 上注冊消費者。 如果希望通道是“可輪詢的”而不是消息驅(qū)動的,請為標(biāo)志提供值 ,如以下示例所示:??Queue????si.p2pChannel????Queue????Queue????Queue????message-driven????false??

發(fā)布-訂閱通道可能如下所示:

在后臺,前面的示例會導(dǎo)致聲明名為的扇出交換,并且此通道發(fā)送到該扇出交換。 此通道還聲明一個以服務(wù)器命名的獨占、自動刪除、非持久,并將其綁定到扇出交換,同時注冊使用者以接收消息。 發(fā)布-訂閱-通道沒有“可輪詢”選項。 它必須是消息驅(qū)動的。??si.fanout.pubSubChannel????Queue????Queue??

從版本 4.1 開始,AMQP 支持的消息通道(與 一起)支持單獨配置 和 對于 . 請注意,以前是默認(rèn)的。 現(xiàn)在,默認(rèn)情況下,它適用于 .??channel-transacted????template-channel-transacted????transactional????AbstractMessageListenerContainer????RabbitTemplate????channel-transacted????true????false????AbstractMessageListenerContainer??

在版本 4.3 之前,AMQP 支持的通道僅支持具有有效負(fù)載和標(biāo)頭的消息。 整個消息被轉(zhuǎn)換(序列化)并發(fā)送到 RabbitMQ。 現(xiàn)在,您可以將屬性(或使用 Java 配置時)設(shè)置為 。 當(dāng)此標(biāo)志為 時,將轉(zhuǎn)換消息有效負(fù)載并映射標(biāo)頭,其方式類似于使用通道適配器時的方式。 這種安排允許 AMQP 支持的通道與不可序列化的有效負(fù)載一起使用(可能與其他消息轉(zhuǎn)換器一起使用,例如 )。 有關(guān)默認(rèn)映射標(biāo)頭的更多信息,請參閱AMQP 消息標(biāo)頭。 您可以通過提供使用 和 屬性的自定義映射器來修改映射。 現(xiàn)在,您還可以指定 ,用于在沒有標(biāo)頭時設(shè)置傳遞模式。 默認(rèn)情況下,Spring AMQP 使用交付模式。??Serializable????extract-payload????setExtractPayload()????true????true????Jackson2JsonMessageConverter????outbound-header-mapper????inbound-header-mapper????default-delivery-mode????amqp_deliveryMode????MessageProperties????PERSISTENT??

與其他支持持久性的通道一樣,支持 AMQP 的通道旨在提供消息持久性以避免消息丟失。 它們不用于將工作分發(fā)給其他對等應(yīng)用程序。 為此,請改用通道適配器。

從版本 5.0 開始,可輪詢通道現(xiàn)在會阻止指定的輪詢器線程(默認(rèn)值為 1 秒)。 以前,與其他實現(xiàn)不同,如果沒有可用的消息,線程會立即返回到調(diào)度程序,而不管接收超時如何。 阻止比使用 a 檢索消息(沒有超時)要昂貴一些,因為必須創(chuàng)建一個使用者來接收每條消息。 若要恢復(fù)以前的行為,請將輪詢器的 0 設(shè)置為 0。??receiveTimeout????PollableChannel????basicGet()????receiveTimeout??

使用 Java 配置進(jìn)行配置

以下示例顯示如何使用 Java 配置配置通道:

@Beanpublic AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();    factoryBean.setConnectionFactory(connectionFactory);    factoryBean.setQueueName("foo");    factoryBean.setPubSub(false);    return factoryBean;}@Beanpublic AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);    factoryBean.setConnectionFactory(connectionFactory);    factoryBean.setQueueName("bar");    factoryBean.setPubSub(false);    return factoryBean;}@Beanpublic AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);    factoryBean.setConnectionFactory(connectionFactory);    factoryBean.setQueueName("baz");    factoryBean.setPubSub(false);    return factoryBean;}

使用 Java DSL 進(jìn)行配置

以下示例顯示如何使用 Java DSL 配置通道:

@Beanpublic IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {    return IntegrationFlow.from(...)            ...            .channel(Amqp.pollableChannel(connectionFactory)                    .queueName("foo"))            ...            .get();}@Beanpublic IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {    return IntegrationFlow.from(...)            ...            .channel(Amqp.channel(connectionFactory)                    .queueName("bar"))            ...            .get();}@Beanpublic IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {    return IntegrationFlow.from(...)            ...            .channel(Amqp.publishSubscribeChannel(connectionFactory)                    .queueName("baz"))            ...            .get();}

AMQP 消息標(biāo)頭

概述

Spring 集成 AMQP 適配器會自動映射所有 AMQP 屬性和標(biāo)頭。 (這是對 4.3 的更改 - 以前,僅映射標(biāo)準(zhǔn)標(biāo)頭)。 默認(rèn)情況下,這些屬性通過使用DefaultAmqpHeaderMapper復(fù)制到 Spring Integration 中。MessageHeaders

您可以傳入自己的特定于 AMQP 的標(biāo)頭映射程序的實現(xiàn),因為適配器具有支持這樣做的屬性。

AMQP 消息屬性中的任何用戶定義的標(biāo)頭都將復(fù)制到 AMQP 消息或從 AMQP 消息復(fù)制,除非 的 或?qū)傩燥@式否定。 默認(rèn)情況下,對于出站映射器,不映射任何標(biāo)頭。 請參閱本節(jié)后面出現(xiàn)的警告,了解原因。??requestHeaderNames????replyHeaderNames????DefaultAmqpHeaderMapper????x-*??

要覆蓋默認(rèn)值并恢復(fù)到 4.3 之前的行為,請在屬性中使用 和。??STANDARD_REQUEST_HEADERS????STANDARD_REPLY_HEADERS??

映射用戶定義的標(biāo)頭時,這些值還可以包含要匹配的簡單通配符模式(例如 或)。 匹配所有標(biāo)頭。??thing*????*thing????*??

從版本 4.1 開始,(超類)允許為 和 屬性(除了現(xiàn)有的 和 )配置令牌,以映射所有用戶定義的標(biāo)頭。??AbstractHeaderMapper????DefaultAmqpHeaderMapper????NON_STANDARD_HEADERS????requestHeaderNames????replyHeaderNames????STANDARD_REQUEST_HEADERS????STANDARD_REPLY_HEADERS??

該類標(biāo)識由 :??org.springframework.amqp.support.AmqpHeaders????DefaultAmqpHeaderMapper??

??amqp_appId????amqp_clusterId????amqp_contentEncoding????amqp_contentLength????content-type??(請參閱內(nèi)容類型標(biāo)頭)??amqp_correlationId????amqp_delay????amqp_deliveryMode????amqp_deliveryTag????amqp_expiration????amqp_messageCount????amqp_messageId????amqp_receivedDelay????amqp_receivedDeliveryMode????amqp_receivedExchange????amqp_receivedRoutingKey????amqp_redelivered????amqp_replyTo????amqp_timestamp????amqp_type????amqp_userId????amqp_publishConfirm????amqp_publishConfirmNackCause????amqp_returnReplyCode????amqp_returnReplyText????amqp_returnExchange????amqp_returnRoutingKey????amqp_channel????amqp_consumerTag????amqp_consumerQueue??

如本節(jié)前面所述,使用 標(biāo)頭映射模式 是復(fù)制所有標(biāo)頭的常用方法。 但是,這可能會產(chǎn)生一些意想不到的副作用,因為某些 RabbitMQ 專有屬性/標(biāo)頭也會被復(fù)制。 例如,使用聯(lián)合身份驗證?時,收到的消息可能具有一個名為 的屬性,該屬性包含發(fā)送消息的節(jié)點。 如果對入站網(wǎng)關(guān)上的請求和回復(fù)標(biāo)頭映射使用通配符,則會復(fù)制此標(biāo)頭,這可能會導(dǎo)致聯(lián)合身份驗證出現(xiàn)一些問題。 此回復(fù)消息可能會聯(lián)合回發(fā)送代理,發(fā)送代理可能會認(rèn)為消息正在循環(huán),因此以靜默方式丟棄它。 如果您希望使用通配符標(biāo)頭映射的便利性,則可能需要篩選出下游流中的一些標(biāo)頭。 例如,為了避免將標(biāo)頭復(fù)制回回復(fù),您可以在將回復(fù)發(fā)送到 AMQP 入站網(wǎng)關(guān)之前使用。 或者,可以顯式列出實際要映射的屬性,而不是使用通配符。 出于這些原因,對于入站消息,映射器(默認(rèn)情況下)不映射任何標(biāo)頭。 它也不會將 映射到標(biāo)頭,以避免該標(biāo)頭從入站消息傳播到出站消息。 相反,此標(biāo)頭映射到 ,不會映射到輸出。??*????x-received-from????*????x-received-from????????x-*????deliveryMode????amqp_deliveryMode????amqp_receivedDeliveryMode??

從版本 4.3 開始,標(biāo)頭映射中的模式可以通過在模式前面加上 來否定。 否定模式獲得優(yōu)先級,因此諸如 不映射(也不是 )之類的列表。 標(biāo)準(zhǔn)標(biāo)頭加上 和 被映射。 否定技術(shù)可能很有用,例如,當(dāng) JSON 反序列化邏輯以不同的方式在接收方下游完成時,不映射傳入消息的 JSON 類型標(biāo)頭。 為此,應(yīng)為入站通道適配器/網(wǎng)關(guān)的標(biāo)頭映射器配置模式。??!????STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1????thing1????thing2????thing3????bad????qux????!json_*??

如果您有一個用戶定義的標(biāo)頭,該標(biāo)頭以您希望映射的標(biāo)頭開頭,則需要對其進(jìn)行轉(zhuǎn)義,如下所示:。 現(xiàn)在已映射名為標(biāo)題的標(biāo)頭。??!????\????STANDARD_REQUEST_HEADERS,\!myBangHeader????!myBangHeader??

從版本 5.1 開始,如果出站消息中不存在相應(yīng)的 or 標(biāo)頭,則將分別回退到映射和 to。 入站屬性將像以前一樣映射到標(biāo)頭。 當(dāng)消息使用者使用有狀態(tài)重試時填充屬性很有用。??DefaultAmqpHeaderMapper????MessageHeaders.ID????MessageHeaders.TIMESTAMP????MessageProperties.messageId????MessageProperties.timestamp????amqp_messageId????amqp_timestamp????amqp_*????messageId??

頁眉??contentType??

與其他標(biāo)頭不同,不以 ;這允許跨不同技術(shù)透明地傳遞 contentType 標(biāo)頭。 例如,發(fā)送到 RabbitMQ 隊列的入站 HTTP 消息。??AmqpHeaders.CONTENT_TYPE????amqp_??

標(biāo)頭映射到Spring AMQP的屬性,隨后映射到RabbitMQ的屬性。??contentType????MessageProperties.contentType????content_type??

在版本 5.1 之前,此標(biāo)頭也映射為映射中的條目;這是不正確的,此外,該值可能是錯誤的,因為基礎(chǔ) Spring AMQP 消息轉(zhuǎn)換器可能已更改內(nèi)容類型。 這樣的更改將反映在 first-class 屬性中,但不反映在 RabbitMQ 標(biāo)頭映射中。 入站映射忽略標(biāo)頭映射值。 不再映射到標(biāo)頭映射中的條目。??MessageProperties.headers????content_type????contentType??

嚴(yán)格的消息排序

本節(jié)介紹入站和出站消息的消息排序。

入境

如果需要對入站消息進(jìn)行嚴(yán)格排序,則必須將入站偵聽器容器的屬性配置為 。 這是因為,如果消息失敗并重新傳遞,它將在現(xiàn)有的預(yù)取消息之后到達(dá)。 從 Spring AMQP 2.0 版本開始,默認(rèn)為提高性能。 嚴(yán)格的訂購要求是以性能下降為代價的。??prefetchCount????1????prefetchCount????250??

出境

請考慮以下集成流程:

@Beanpublic IntegrationFlow flow(RabbitTemplate template) {    return IntegrationFlow.from(Gateway.class)            .split(s -> s.delimiters(","))            .transform(String::toUpperCase)            .handle(Amqp.outboundAdapter(template).routingKey("rk"))            .get();}

假設(shè)我們發(fā)送消息 ,并發(fā)送到網(wǎng)關(guān)。 雖然消息 、 很可能是按順序發(fā)送的,但不能保證。 這是因為模板為每個發(fā)送操作從緩存中“借用”一個通道,并且不能保證對每條消息使用相同的通道。 一種解決方案是在拆分器之前啟動事務(wù),但是在 RabbitMQ 中事務(wù)成本高昂,并且會使性能降低數(shù)百倍。??A????B????C????A????B????C??

為了以更有效的方式解決這個問題,從版本 5.1 開始,Spring 集成提供了 這是一個 . 請參閱處理消息建議。 在拆分器之前應(yīng)用時,它可確保在同一通道上執(zhí)行所有下游操作,并且可以選擇等到收到所有已發(fā)送消息的發(fā)布者確認(rèn)(如果連接工廠配置為確認(rèn))。 以下示例演示如何使用:??BoundRabbitChannelAdvice????HandleMessageAdvice????BoundRabbitChannelAdvice??

@Beanpublic IntegrationFlow flow(RabbitTemplate template) {    return IntegrationFlow.from(Gateway.class)            .split(s -> s.delimiters(",")                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))            .transform(String::toUpperCase)            .handle(Amqp.outboundAdapter(template).routingKey("rk"))            .get();}

請注意,在建議和出站適配器中使用了相同的(實現(xiàn))。 該建議在模板的方法中運行下游流,以便所有操作都在同一通道上運行。 如果提供了可選的超時,則當(dāng)流完成時,建議將調(diào)用該方法,如果在指定時間內(nèi)未收到確認(rèn),則會引發(fā)異常。??RabbitTemplate????RabbitOperations????invoke????waitForConfirmsOrDie??

下游流(、 和其他流)中不得有線程切換。??QueueChannel????ExecutorChannel??

AMQP 樣品

要試驗 AMQP 適配器,請查看 Spring 集成示例 git 存儲庫中提供的示例,網(wǎng)址為https://github.com/SpringSource/spring-integration-samples

目前,一個示例通過使用出站通道適配器和入站通道適配器演示了 Spring 集成 AMQP 適配器的基本功能。 由于示例中的 AMQP 代理實現(xiàn)使用RabbitMQ。

為了運行該示例,您需要一個正在運行的 RabbitMQ 實例。 僅具有基本默認(rèn)值的本地安裝就足夠了。 有關(guān)詳細(xì)的 RabbitMQ 安裝過程,請參閱https://www.rabbitmq.com/install.html

啟動示例應(yīng)用程序后,在命令提示符下輸入一些文本,包含該輸入文本的消息將調(diào)度到 AMQP 隊列。 作為回報,Spring Integration 檢索該消息并將其打印到控制臺。

下圖說明了此示例中使用的一組基本 Spring 集成組件。

RabbitMQ 流隊列支持

AMQP 示例圖像的彈簧集成圖::images/spring-integration-amqp-sample-graph.png[]

版本 6.0 引入了對 RabbitMQ 流隊列的支持。

這些終結(jié)點的 DSL 工廠類是 。??Rabbit??

RabbitMQ 流入站通道適配器

@BeanIntegrationFlow flow(Environment env) {    @Bean    IntegrationFlow simpleStream(Environment env) {        return IntegrationFlow.from(RabbitStream.inboundAdapter(env)                        .configureContainer(container -> container.queueName("my.stream")))                // ...                .get();    }    @Bean    IntegrationFlow superStream(Environment env) {        return IntegrationFlow.from(RabbitStream.inboundAdapter(env)                        .configureContainer(container -> container.superStream("my.stream", "my.consumer")))                // ...                .get();    }}

RabbitMQ 流出站通道適配器

@BeanIntegrationFlow outbound(RabbitStreamTemplate template) {    return f -> f            // ...            .handle(RabbitStream.outboundStreamAdapter(template));}

標(biāo)簽: 通道適配器 有效負(fù)載 發(fā)送消息

上一篇:環(huán)球今日報丨【Dubbo3入門到精通】總體技術(shù)體系介紹及技術(shù)指南(目錄)
下一篇:焦點熱訊:百度 Android 直播秒開體驗優(yōu)化