Pub/sub

Creating a pub/sub component requires just a few basic steps.

Add pub/sub namespaces

Add using statements for the pub/sub related namespaces.

  1. using Dapr.PluggableComponents.Components;
  2. using Dapr.PluggableComponents.Components.PubSub;

Implement IPubSub

Create a class that implements the IPubSub interface.

  1. internal sealed class MyPubSub : IPubSub
  2. {
  3. public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
  4. {
  5. // Called to initialize the component with its configured metadata...
  6. }
  7. public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
  8. {
  9. // Send the message to the "topic"...
  10. }
  11. public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
  12. {
  13. // Until canceled, check the topic for messages and deliver them to the Dapr runtime...
  14. }
  15. }

Calls to the PullMessagesAsync() method are “long-lived”, in that the method is not expected to return until canceled (for example, via the cancellationToken). The “topic” from which messages should be pulled is passed via the topic argument, while the delivery to the Dapr runtime is performed via the deliveryHandler callback. Delivery allows the component to receive notification if/when the application (served by the Dapr runtime) acknowledges processing of the message.

  1. public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
  2. {
  3. TimeSpan pollInterval = // Polling interval (e.g. from initalization metadata)...
  4. // Poll the topic until canceled...
  5. while (!cancellationToken.IsCancellationRequested)
  6. {
  7. var messages = // Poll topic for messages...
  8. foreach (var message in messages)
  9. {
  10. // Deliver the message to the Dapr runtime...
  11. await deliveryHandler(
  12. new PubSubPullMessagesResponse(topicName)
  13. {
  14. // Set the message content...
  15. },
  16. // Callback invoked when application acknowledges the message...
  17. async errorMessage =>
  18. {
  19. // An empty message indicates the application successfully processed the message...
  20. if (String.IsNullOrEmpty(errorMessage))
  21. {
  22. // Delete the message from the topic...
  23. }
  24. })
  25. }
  26. // Wait for the next poll (or cancellation)...
  27. await Task.Delay(pollInterval, cancellationToken);
  28. }
  29. }

Register pub/sub component

In the main program file (for example, Program.cs), register the pub/sub component with an application service.

  1. using Dapr.PluggableComponents;
  2. var app = DaprPluggableComponentsApplication.Create();
  3. app.RegisterService(
  4. "<socket name>",
  5. serviceBuilder =>
  6. {
  7. serviceBuilder.RegisterPubSub<MyPubSub>();
  8. });
  9. app.Run();