RabbitMQ

Detailed documentation on the RabbitMQ pubsub component

Component format

To set up RabbitMQ pub/sub, create a component of type pubsub.rabbitmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: rabbitmq-pubsub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "amqp://localhost:5672"
  11. - name: protocol
  12. value: amqp
  13. - name: hostname
  14. value: localhost
  15. - name: username
  16. value: username
  17. - name: password
  18. value: password
  19. - name: consumerID
  20. value: channel1
  21. - name: durable
  22. value: false
  23. - name: deletedWhenUnused
  24. value: false
  25. - name: autoAck
  26. value: false
  27. - name: deliveryMode
  28. value: 0
  29. - name: requeueInFailure
  30. value: false
  31. - name: prefetchCount
  32. value: 0
  33. - name: reconnectWait
  34. value: 0
  35. - name: concurrencyMode
  36. value: parallel
  37. - name: publisherConfirm
  38. value: false
  39. - name: enableDeadLetter # Optional enable dead Letter or not
  40. value: true
  41. - name: maxLen # Optional max message count in a queue
  42. value: 3000
  43. - name: maxLenBytes # Optional maximum length in bytes of a queue.
  44. value: 10485760
  45. - name: exchangeKind
  46. value: fanout
  47. - name: saslExternal
  48. value: false
  49. - name: ttlInSeconds
  50. value: 60
  51. - name: clientName
  52. value: {podName}
  53. - name: heartBeat
  54. value: 10s

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsExample
connectionStringYThe RabbitMQ connection string. Mutally exclusive with protocol, hostname, username, password fieldamqp://user:pass@localhost:5672
protocolNThe RabbitMQ protocol. Mutally exclusive with connectionString fieldamqp
hostnameNThe RabbitMQ hostname. Mutally exclusive with connectionString fieldlocalhost
usernameNThe RabbitMQ username. Mutally exclusive with connectionString fieldusername
passwordNThe RabbitMQ password. Mutally exclusive with connectionString fieldpassword
consumerIDNConsumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.Can be set to string value (such as “channel1” in the example above) or string format value (such as “{podName}”, etc.). See all of template tags you can use in your component metadata.
durableNWhether or not to use durable queues. Defaults to “false”“true”, “false”
deletedWhenUnusedNWhether or not the queue should be configured to auto-delete Defaults to “true”“true”, “false”
autoAckNWhether or not the queue consumer should auto-ack messages. Defaults to “false”“true”, “false”
deliveryModeNPersistence mode when publishing messages. Defaults to “0”. RabbitMQ treats “2” as persistent, all other numbers as non-persistent“0”, “2”
requeueInFailureNWhether or not to requeue when sending a negative acknowledgement in case of a failure. Defaults to “false”“true”, “false”
prefetchCountNNumber of messages to prefetch. Consider changing this to a non-zero value for production environments. Defaults to “0”, which means that all available messages will be pre-fetched.“2”
publisherConfirmNIf enabled, client waits for publisher confirms after publishing a message. Defaults to “false”“true”, “false”
reconnectWaitNHow long to wait (in seconds) before reconnecting if a connection failure occurs“0”
concurrencyModeNparallel is the default, and allows processing multiple messages in parallel (limited by the app-max-concurrency annotation, if configured). Set to single to disable parallel processing. In most situations there’s no reason to change this.parallel, single
enableDeadLetterNEnable forwarding Messages that cannot be handled to a dead-letter topic. Defaults to “false”“true”, “false”
maxLenNThe maximum number of messages of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.“1000”
maxLenBytesNMaximum length in bytes of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.“1048576”
exchangeKindNExchange kind of the rabbitmq exchange. Defaults to “fanout”.“fanout”,“topic”
saslExternalNWith TLS, should the username be taken from an additional field (for example, CN). See RabbitMQ Authentication Mechanisms. Defaults to “false”.“true”, “false”
ttlInSecondsNSet message TTL at the component level, which can be overwritten by message level TTL per request.“60”
caCertRequired for using TLSCertificate Authority (CA) certificate in PEM format for verifying server TLS certificates.“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientCertRequired for using TLSTLS client certificate in PEM format. Must be used with clientKey.“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientKeyRequired for using TLSTLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference.“——-BEGIN RSA PRIVATE KEY——-\n<base64-encoded PKCS8>\n——-END RSA PRIVATE KEY——-“
clientNameNThis RabbitMQ client-provided connection name is a custom identifier. If set, the identifier is mentioned in RabbitMQ server log entries and management UI. Can be set to {uuid}, {podName}, or {appID}, which is replaced by Dapr runtime to the real value.“app1”, {uuid}, {podName}, {appID}
heartBeatNDefines the heartbeat interval with the server, detecting the aliveness of the peer TCP connection with the RabbitMQ server. Defaults to 10s .“10s”

Communication using TLS

To configure communication using TLS, ensure that the RabbitMQ nodes have TLS enabled and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: rabbitmq-pubsub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "amqps://localhost:5671"
  11. - name: consumerID
  12. value: myapp
  13. - name: durable
  14. value: false
  15. - name: deletedWhenUnused
  16. value: false
  17. - name: autoAck
  18. value: false
  19. - name: deliveryMode
  20. value: 0
  21. - name: requeueInFailure
  22. value: false
  23. - name: prefetchCount
  24. value: 0
  25. - name: reconnectWait
  26. value: 0
  27. - name: concurrencyMode
  28. value: parallel
  29. - name: publisherConfirm
  30. value: false
  31. - name: enableDeadLetter # Optional enable dead Letter or not
  32. value: true
  33. - name: maxLen # Optional max message count in a queue
  34. value: 3000
  35. - name: maxLenBytes # Optional maximum length in bytes of a queue.
  36. value: 10485760
  37. - name: exchangeKind
  38. value: fanout
  39. - name: saslExternal
  40. value: false
  41. - name: caCert
  42. value: ${{ myLoadedCACert }}
  43. - name: clientCert
  44. value: ${{ myLoadedClientCert }}
  45. - name: clientKey
  46. secretKeyRef:
  47. name: myRabbitMQClientKey
  48. key: myRabbitMQClientKey

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Enabling message delivery retries

The RabbitMQ pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. When the service returns a result, the message will be marked as consumed regardless of whether it was processed correctly or not. Note that this is common among all Dapr PubSub components and not just RabbitMQ. Dapr can try redelivering a message a second time, when autoAck is set to false and requeueInFailure is set to true.

To make Dapr use more sophisticated retry policies, you can apply a retry resiliency policy to the RabbitMQ pub/sub component.

There is a crucial difference between the two ways to retry messages:

  1. When using autoAck = false and requeueInFailure = true, RabbitMQ is the one responsible for re-delivering messages and any subscriber can get the redelivered message. If you have more than one instance of your consumer, then it’s possible that another consumer will get it. This is usually the better approach because if there’s a transient failure, it’s more likely that a different worker will be in a better position to successfully process the message.
  2. Using Resiliency makes the same Dapr sidecar retry redelivering the messages. So it will be the same Dapr sidecar and the same app receiving the same message.

Create a RabbitMQ server

You can run a RabbitMQ server locally using Docker:

  1. docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

You can then interact with the server using the client port: localhost:5672.

The easiest way to install RabbitMQ on Kubernetes is by using the Helm chart:

  1. helm install rabbitmq stable/rabbitmq

Look at the chart output and get the username and password.

This will install RabbitMQ into the default namespace. To interact with RabbitMQ, find the service with: kubectl get svc rabbitmq.

For example, if installing using the example above, the RabbitMQ server client address would be:

rabbitmq.default.svc.cluster.local:5672

Use topic exchange to route messages

Setting exchangeKind to "topic" uses the topic exchanges, which are commonly used for the multicast routing of messages. In order to route messages using topic exchange, you must set the following metadata:

  • routingKey:
    Messages with a routing key are routed to one or many queues based on the routing key defined in the metadata when subscribing.

  • queueName:
    If you don’t set the queueName, only one queue is created, and all routing keys will route to that queue. This means all subscribers will bind to that queue, which won’t give the desired results.

For example, if an app is configured with a routing key keyA and queueName of queue-A:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: orderspubsub
  5. spec:
  6. topic: B
  7. routes:
  8. default: /B
  9. pubsubname: pubsub
  10. metadata:
  11. routingKey: keyA
  12. queueName: queue-A

It will receive messages with routing key keyA, and messages with other routing keys are not received.

  1. // publish messages with routing key `keyA`, and these will be received by the above example.
  2. client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
  3. // publish messages with routing key `keyB`, and these will not be received by the above example.
  4. client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))

Bind multiple routingKey

Multiple routing keys can be separated by commas.
The example below binds three routingKey: keyA, keyB, and "". Note the binding method of empty keys.

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: orderspubsub
  5. spec:
  6. topic: B
  7. routes:
  8. default: /B
  9. pubsubname: pubsub
  10. metadata:
  11. routingKey: keyA,keyB,

For more information see rabbitmq exchanges.

Use priority queues

Dapr supports RabbitMQ priority queues. To set a priority for a queue, use the maxPriority topic subscription metadata.

Declarative priority queue example

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: pubsub
  5. spec:
  6. topic: checkout
  7. routes:
  8. default: /orders
  9. pubsubname: order-pub-sub
  10. metadata:
  11. maxPriority: 3

Programmatic priority queue example

  1. @app.route('/dapr/subscribe', methods=['GET'])
  2. def subscribe():
  3. subscriptions = [
  4. {
  5. 'pubsubname': 'pubsub',
  6. 'topic': 'checkout',
  7. 'routes': {
  8. 'default': '/orders'
  9. },
  10. 'metadata': {'maxPriority': '3'}
  11. }
  12. ]
  13. return jsonify(subscriptions)
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "checkout",
  11. routes: {
  12. default: '/orders'
  13. },
  14. metadata: {
  15. maxPriority: '3'
  16. }
  17. }
  18. ]);
  19. })
  1. package main
  2. "encoding/json"
  3. "net/http"
  4. const appPort = 3000
  5. type subscription struct {
  6. PubsubName string `json:"pubsubname"`
  7. Topic string `json:"topic"`
  8. Metadata map[string]string `json:"metadata,omitempty"`
  9. Routes routes `json:"routes"`
  10. }
  11. type routes struct {
  12. Rules []rule `json:"rules,omitempty"`
  13. Default string `json:"default,omitempty"`
  14. }
  15. // This handles /dapr/subscribe
  16. func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
  17. t := []subscription{
  18. {
  19. PubsubName: "pubsub",
  20. Topic: "checkout",
  21. Routes: routes{
  22. Default: "/orders",
  23. },
  24. Metadata: map[string]string{
  25. "maxPriority": "3"
  26. },
  27. },
  28. }
  29. w.WriteHeader(http.StatusOK)
  30. json.NewEncoder(w).Encode(t)
  31. }

Setting a priority when publishing a message

To set a priority on a message, add the publish metadata key maxPriority to the publish endpoint or SDK method.

  1. curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.priority=3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
  1. with DaprClient() as client:
  2. result = client.publish_event(
  3. pubsub_name=PUBSUB_NAME,
  4. topic_name=TOPIC_NAME,
  5. data=json.dumps(orderId),
  6. data_content_type='application/json',
  7. metadata= { 'priority': '3' })
  1. await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId, { 'priority': '3' });
  1. client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)), map[string]string{"priority": "3"})

Use quorum queues

By default, Dapr creates classic queues. To create quorum queues, add the following metadata to your pub/sub subscription

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: pubsub
  5. spec:
  6. topic: checkout
  7. routes:
  8. default: /orders
  9. pubsubname: order-pub-sub
  10. metadata:
  11. queueType: quorum

Time-to-live

You can set a time-to-live (TTL) value at either the message or component level. Set default component-level TTL using the component spec ttlInSeconds field in your component.

Note

If you set both component-level and message-level TTL, the default component-level TTL is ignored in favor of the message-level TTL.

Single Active Consumer

The RabbitMQ Single Active Consumer setup ensures that only one consumer at a time processes messages from a queue and switches to another registered consumer if the active one is canceled or fails. This approach might be required when it is crucial for messages to be consumed in the exact order they arrive in the queue and if distributed processing with multiple instances is not supported. When this option is enabled on a queue by Dapr, an instance of the Dapr runtime will be the single active consumer. To allow another application instance to take over in case of failure, Dapr runtime must probe the application’s health and unsubscribe from the pub/sub component.

Note

This pattern will prevent the application to scale as only one instance can process the load. While it might be interesting for Dapr integration with legacy or sensible applications, you should consider a design allowing distributed processing if you need scalability.

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: pubsub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /orders
  9. pubsubname: order-pub-sub
  10. metadata:
  11. singleActiveConsumer: "true"

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)