# spring-cloud-stream-binder-rabbit
**Repository Path**: easyang/spring-cloud-stream-binder-rabbit
## Basic Information
- **Project Name**: spring-cloud-stream-binder-rabbit
- **Description**: No description available
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-10-22
- **Last Updated**: 2021-10-22
## Categories & Tags
**Categories**: Uncategorized
**Tags**: SpringCloud
## README
////
DO NOT EDIT THIS FILE. IT WAS GENERATED.
Manual changes to this file will be lost when it is generated again.
Edit the files in the src/main/asciidoc/ directory instead.
////
:jdkversion: 1.8
:github-tag: master
:github-repo: spring-cloud/spring-cloud-stream-binder-rabbit
:github-raw: https://raw.githubusercontent.com/{github-repo}/{github-tag}
:github-code: https://github.com/{github-repo}/tree/{github-tag}
image::https://circleci.com/gh/spring-cloud/spring-cloud-stream-binder-rabbit.svg?style=svg["CircleCI", link="https://circleci.com/gh/spring-cloud/spring-cloud-stream-binder-rabbit"]
image::https://codecov.io/gh/spring-cloud/spring-cloud-stream-binder-rabbit/branch/{github-tag}/graph/badge.svg["codecov", link="https://codecov.io/gh/spring-cloud/spring-cloud-stream-binder-rabbit"]
image::https://badges.gitter.im/spring-cloud/spring-cloud-stream-binder-rabbit.svg[Gitter, link="https://gitter.im/spring-cloud/spring-cloud-stream-binder-rabbit?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge"]
// ======================================================================================
//= Overview
[partintro]
--
This guide describes the RabbitMQ implementation of the Spring Cloud Stream Binder.
It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into RabbitMQ specific constructs.
--
== Usage
To use the RabbitMQ binder, you can add it to your Spring Cloud Stream application, by using the following Maven coordinates:
[source,xml]
----
org.springframework.cloud
spring-cloud-stream-binder-rabbit
----
Alternatively, you can use the Spring Cloud Stream RabbitMQ Starter, as follows:
[source,xml]
----
org.springframework.cloud
spring-cloud-starter-stream-rabbit
----
== RabbitMQ Binder Overview
The following simplified diagram shows how the RabbitMQ binder operates:
.RabbitMQ Binder
image::{github-raw}/docs/src/main/asciidoc/images/rabbit-binder.png[width=300,scaledwidth="50%"]
By default, the RabbitMQ Binder implementation maps each destination to a `TopicExchange`.
For each consumer group, a `Queue` is bound to that `TopicExchange`.
Each consumer instance has a corresponding RabbitMQ `Consumer` instance for its group's `Queue`.
For partitioned producers and consumers, the queues are suffixed with the partition index and use the partition index as the routing key.
For anonymous consumers (those with no `group` property), an auto-delete queue (with a randomized unique name) is used.
By using the optional `autoBindDlq` option, you can configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange `DLX`, as well as routing infrastructure).
By default, the dead letter queue has the name of the destination, appended with `.dlq`.
If retry is enabled (`maxAttempts > 1`), failed messages are delivered to the DLQ after retries are exhausted.
If retry is disabled (`maxAttempts = 1`), you should set `requeueRejected` to `false` (the default) so that failed messages are routed to the DLQ, instead of being re-queued.
In addition, `republishToDlq` causes the binder to publish a failed message to the DLQ (instead of rejecting it).
This feature lets additional information (such as the stack trace in the `x-exception-stacktrace` header) be added to the message in headers.
See the <> for information about truncated stack traces.
This option does not need retry enabled.
You can republish a failed message after just one attempt.
Starting with version 1.2, you can configure the delivery mode of republished messages.
See the <>.
If the stream listener throws an `ImmediateAcknowledgeAmqpException`, the DLQ is bypassed and the message simply discarded.
Starting with version 2.1, this is true regardless of the setting of `republishToDlq`; previously it was only the case when `republishToDlq` was `false`.
IMPORTANT: Setting `requeueRejected` to `true` (with `republishToDlq=false` ) causes the message to be re-queued and redelivered continually, which is likely not what you want unless the reason for the failure is transient.
In general, you should enable retry within the binder by setting `maxAttempts` to greater than one or by setting `republishToDlq` to `true`.
Starting with version 3.1.2, if the consumer is marked as `transacted`, publishing to the DLQ will participate in the transaction.
This allows the transaction to roll back if the publishing fails for some reason (for example, if the user is not authorized to publish to the dead letter exchange).
In addition, if the connection factory is configured for publisher confirms or returns, the publication to the DLQ will wait for the confirmation and check for a returned message.
If a negative acknowledgment or returned message is received, the binder will throw an `AmqpRejectAndDontRequeueException`, allowing the broker to take care of publishing to the DLQ as if the `republishToDlq` property is `false`.
See <> for more information about these properties.
The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue).
Some options are described in <>.
NOTE: When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from `RabbitAutoConfiguration` being applied to the two binders.
You can exclude the class by using the `@SpringBootApplication` annotation.
Starting with version 2.0, the `RabbitMessageChannelBinder` sets the `RabbitTemplate.userPublisherConnection` property to `true` so that the non-transactional producers avoid deadlocks on consumers, which can happen if cached connections are blocked because of a https://www.rabbitmq.com/memory.html[memory alarm] on the broker.
NOTE: Currently, a `multiplex` consumer (a single consumer listening to multiple queues) is only supported for message-driven consumers; polled consumers can only retrieve messages from a single queue.
== Configuration Options
This section contains settings specific to the RabbitMQ Binder and bound channels.
For general binding configuration options and properties, see the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_configuration_options[Spring Cloud Stream core documentation].
[[rabbit-binder-properties]]
=== RabbitMQ Binder Properties
By default, the RabbitMQ binder uses Spring Boot's `ConnectionFactory`.
Conseuqently, it supports all Spring Boot configuration options for RabbitMQ.
(For reference, see the https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#common-application-properties[Spring Boot documentation]).
RabbitMQ configuration options use the `spring.rabbitmq` prefix.
In addition to Spring Boot options, the RabbitMQ binder supports the following properties:
spring.cloud.stream.rabbit.binder.adminAddresses::
A comma-separated list of RabbitMQ management plugin URLs.
Only used when `nodes` contains more than one entry.
Each entry in this list must have a corresponding entry in `spring.rabbitmq.addresses`.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See https://docs.spring.io/spring-amqp/reference/html/#queue-affinity[Queue Affinity and the LocalizedQueueConnectionFactory] for more information.
+
Default: empty.
spring.cloud.stream.rabbit.binder.nodes::
A comma-separated list of RabbitMQ node names.
When more than one entry, used to locate the server address where a queue is located.
Each entry in this list must have a corresponding entry in `spring.rabbitmq.addresses`.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See https://docs.spring.io/spring-amqp/reference/html/_reference.html#queue-affinity[Queue Affinity and the LocalizedQueueConnectionFactory] for more information.
+
Default: empty.
spring.cloud.stream.rabbit.binder.compressionLevel::
The compression level for compressed bindings.
See `java.util.zip.Deflater`.
+
Default: `1` (BEST_LEVEL).
spring.cloud.stream.binder.connection-name-prefix::
A connection name prefix used to name the connection(s) created by this binder.
The name is this prefix followed by `#n`, where `n` increments each time a new connection is opened.
+
Default: none (Spring AMQP default).
=== RabbitMQ Consumer Properties
The following properties are available for Rabbit consumers only and must be prefixed with `spring.cloud.stream.rabbit.bindings..consumer.`.
However if the same set of properties needs to be applied to most bindings, to
avoid repetition, Spring Cloud Stream supports setting values for all channels,
in the format of `spring.cloud.stream.rabbit.default.=`.
Also, keep in mind that binding specific property will override its equivalent in the default.
acknowledgeMode::
The acknowledge mode.
+
Default: `AUTO`.
anonymousGroupPrefix::
When the binding has no `group` property, an anonymous, auto-delete queue is bound to the destination exchange.
The default naming stragegy for such queues results in a queue named `anonymous.`.
Set this property to change the prefix to something other than the default.
+
Default: `anonymous.`.
autoBindDlq::
Whether to automatically declare the DLQ and bind it to the binder DLX.
+
Default: `false`.
bindingRoutingKey::
The routing key with which to bind the queue to the exchange (if `bindQueue` is `true`).
Can be multiple keys - see `bindingRoutingKeyDelimiter`.
For partitioned destinations, `-` is appended to each key.
+
Default: `#`.
bindingRoutingKeyDelimiter::
When this is not null, 'bindingRoutingKey' is considered to be a list of keys delimited by this value; often a comma is used.
+
Default: `null`.
bindQueue::
Whether to declare the queue and bind it to the destination exchange.
Set it to `false` if you have set up your own infrastructure and have previously created and bound the queue.
+
Default: `true`.
consumerTagPrefix::
Used to create the consumer tag(s); will be appended by `#n` where `n` increments for each consumer created.
Example: `${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}`.
+
Default: none - the broker will generate random consumer tags.
containerType::
Select the type of listener container to be used.
See https://docs.spring.io/spring-amqp/reference/html/_reference.html#choose-container[Choosing a Container] in the Spring AMQP documentation for more information.
+
Default: `simple`
deadLetterQueueName::
The name of the DLQ
+
Default: `prefix+destination.dlq`
deadLetterExchange::
A DLX to assign to the queue.
Relevant only if `autoBindDlq` is `true`.
+
Default: 'prefix+DLX'
deadLetterExchangeType::
The type of the DLX to assign to the queue.
Relevant only if `autoBindDlq` is `true`.
+
Default: 'direct'
deadLetterRoutingKey::
A dead letter routing key to assign to the queue.
Relevant only if `autoBindDlq` is `true`.
+
Default: `destination`
declareDlx::
Whether to declare the dead letter exchange for the destination.
Relevant only if `autoBindDlq` is `true`.
Set to `false` if you have a pre-configured DLX.
+
Default: `true`.
declareExchange::
Whether to declare the exchange for the destination.
+
Default: `true`.
delayedExchange::
Whether to declare the exchange as a `Delayed Message Exchange`.
Requires the delayed message exchange plugin on the broker.
The `x-delayed-type` argument is set to the `exchangeType`.
+
Default: `false`.
dlqBindingArguments::
Arguments applied when binding the dlq to the dead letter exchange; used with `headers` `deadLetterExchangeType` to specify headers to match on.
For example `...dlqBindingArguments.x-match=any`, `...dlqBindingArguments.someHeader=someValue`.
+
Default: empty
dlqDeadLetterExchange::
If a DLQ is declared, a DLX to assign to that queue.
+
Default: `none`
dlqDeadLetterRoutingKey::
If a DLQ is declared, a dead letter routing key to assign to that queue.
+
Default: `none`
dlqExpires::
How long before an unused dead letter queue is deleted (in milliseconds).
+
Default: `no expiration`
dlqLazy::
Declare the dead letter queue with the `x-queue-mode=lazy` argument.
See https://www.rabbitmq.com/lazy-queues.html["`Lazy Queues`"].
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
+
Default: `false`.
dlqMaxLength::
Maximum number of messages in the dead letter queue.
+
Default: `no limit`
dlqMaxLengthBytes::
Maximum number of total bytes in the dead letter queue from all messages.
+
Default: `no limit`
dlqMaxPriority::
Maximum priority of messages in the dead letter queue (0-255).
+
Default: `none`
dlqOverflowBehavior::
Action to take when `dlqMaxLength` or `dlqMaxLengthBytes` is exceeded; currently `drop-head` or `reject-publish` but refer to the RabbitMQ documentation.
+
Default: `none`
dlqQuorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
+
Default: none - broker default will apply.
dlqQuorum.enabled::
When true, create a quorum dead letter queue instead of a classic queue.
+
Default: false
dlqQuorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
+
Default: none - broker default will apply.
dlqSingleActiveConsumer::
Set to true to set the `x-single-active-consumer` queue property to true.
+
Default: `false`
dlqTtl::
Default time to live to apply to the dead letter queue when declared (in milliseconds).
+
Default: `no limit`
durableSubscription::
Whether the subscription should be durable.
Only effective if `group` is also set.
+
Default: `true`.
exchangeAutoDelete::
If `declareExchange` is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).
+
Default: `true`.
exchangeDurable::
If `declareExchange` is true, whether the exchange should be durable (that is, it survives broker restart).
+
Default: `true`.
exchangeType::
The exchange type: `direct`, `fanout`, `headers` or `topic` for non-partitioned destinations and `direct`, headers or `topic` for partitioned destinations.
+
Default: `topic`.
exclusive::
Whether to create an exclusive consumer.
Concurrency should be 1 when this is `true`.
Often used when strict ordering is required but enabling a hot standby instance to take over after a failure.
See `recoveryInterval`, which controls how often a standby instance attempts to consume.
Consider using `singleActiveConsumer` instead when using RabbitMQ 3.8 or later.
+
Default: `false`.
expires::
How long before an unused queue is deleted (in milliseconds).
+
Default: `no expiration`
failedDeclarationRetryInterval::
The interval (in milliseconds) between attempts to consume from a queue if it is missing.
+
Default: 5000
[[spring-cloud-stream-rabbit-frame-max-headroom]]
frameMaxHeadroom::
The number of bytes to reserve for other headers when adding the stack trace to a DLQ message header.
All headers must fit within the `frame_max` size configured on the broker.
Stack traces can be large; if the size plus this property exceeds `frame_max` then the stack trace will be truncated.
A WARN log will be written; consider increasing the `frame_max` or reducing the stack trace by catching the exception and throwing one with a smaller stack trace.
+
Default: 20000
headerPatterns::
Patterns for headers to be mapped from inbound messages.
+
Default: `['*']` (all headers).
lazy::
Declare the queue with the `x-queue-mode=lazy` argument.
See https://www.rabbitmq.com/lazy-queues.html["`Lazy Queues`"].
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
+
Default: `false`.
maxConcurrency::
The maximum number of consumers.
Not supported when the `containerType` is `direct`.
+
Default: `1`.
maxLength::
The maximum number of messages in the queue.
+
Default: `no limit`
maxLengthBytes::
The maximum number of total bytes in the queue from all messages.
+
Default: `no limit`
maxPriority::
The maximum priority of messages in the queue (0-255).
+
Default: `none`
missingQueuesFatal::
When the queue cannot be found, whether to treat the condition as fatal and stop the listener container.
Defaults to `false` so that the container keeps trying to consume from the queue -- for example, when using a cluster and the node hosting a non-HA queue is down.
+
Default: `false`
overflowBehavior::
Action to take when `maxLength` or `maxLengthBytes` is exceeded; currently `drop-head` or `reject-publish` but refer to the RabbitMQ documentation.
+
Default: `none`
prefetch::
Prefetch count.
+
Default: `1`.
prefix::
A prefix to be added to the name of the `destination` and queues.
+
Default: "".
queueBindingArguments::
Arguments applied when binding the queue to the exchange; used with `headers` `exchangeType` to specify headers to match on.
For example `...queueBindingArguments.x-match=any`, `...queueBindingArguments.someHeader=someValue`.
+
Default: empty
queueDeclarationRetries::
The number of times to retry consuming from a queue if it is missing.
Relevant only when `missingQueuesFatal` is `true`.
Otherwise, the container keeps retrying indefinitely.
Not supported when the `containerType` is `direct`.
+
Default: `3`
queueNameGroupOnly::
When true, consume from a queue with a name equal to the `group`.
Otherwise the queue name is `destination.group`.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
+
Default: false.
quorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
+
Default: none - broker default will apply.
quorum.enabled::
When true, create a quorum queue instead of a classic queue.
+
Default: false
quorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
+
Default: none - broker default will apply.
recoveryInterval::
The interval between connection recovery attempts, in milliseconds.
+
Default: `5000`.
requeueRejected::
Whether delivery failures should be re-queued when retry is disabled or `republishToDlq` is `false`.
+
Default: `false`.
[[spring-cloud-stream-rabbit-republish-delivery-mode]]
republishDeliveryMode::
When `republishToDlq` is `true`, specifies the delivery mode of the republished message.
+
Default: `DeliveryMode.PERSISTENT`
republishToDlq::
By default, messages that fail after retries are exhausted are rejected.
If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ.
If set to `true`, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.
Also see the <>.
+
Default: `true`
singleActiveConsumer::
Set to true to set the `x-single-active-consumer` queue property to true.
+
Default: `false`
transacted::
Whether to use transacted channels.
+
Default: `false`.
ttl::
Default time to live to apply to the queue when declared (in milliseconds).
+
Default: `no limit`
txSize::
The number of deliveries between acks.
Not supported when the `containerType` is `direct`.
+
Default: `1`.
=== Advanced Listener Container Configuration
To set listener container properties that are not exposed as binder or binding properties, add a single bean of type `ListenerContainerCustomizer` to the application context.
The binder and binding properties will be set and then the customizer will be called.
The customizer (`configure()` method) is provided with the queue name as well as the consumer group as arguments.
=== Advanced Queue/Exchange/Binding Configuration
From time to time, the RabbitMQ team add new features that are enabled by setting some argument when declaring, for example, a queue.
Generally, such features are enabled in the binder by adding appropriate properties, but this may not be immediately available in a current version.
Starting with version 3.0.1, you can now add `DeclarableCustomizer` bean(s) to the application context to modify a `Declarable` (`Queue`, `Exchange` or `Binding`) just before the declaration is performed.
This allows you to add arguments that are not currently directly supported by the binder.
[[rabbit-receiving-batch]]
=== Receiving Batched Messages
With the RabbitMQ binder, there are two types of batches handled by consumer bindings:
==== Batches Created by Producers
Normally, if a producer binding has `batch-enabled=true` (see <>), or a message is created by a `BatchingRabbitTemplate`, elements of the batch are returned as individual calls to the listener method.
Starting with version 3.0, any such batch can be presented as a `List>` to the listener method if `spring.cloud.stream.bindings..consumer.batch-mode` is set to `true`.
==== Consumer-side Batching
Starting with version 3.1, the consumer can be configured to assemble multiple inbound messages into a batch which is presented to the application as a `List>` of converted payloads.
The following simple application demonstrates how to use this technique:
====
[source, properties]
----
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
----
====
====
[source, java]
----
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
----
====
====
[source]
----
Received 2
Thing [field=value1]
Thing [field=value2]
----
====
The number of messages in a batch is specified by the `batch-size` and `receive-timeout` properties; if the `receive-timeout` elapses with no new messages, a "short" batch is delivered.
IMPORTANT: Consumer-side batching is only supported with `container-type=simple` (the default).
If you wish to examine headers of consumer-side batched messages, you should consume `Message>`; the headers are a `List