Using Redis 5.0
Installing
To use Redis Streams as a messaging transport for Wolverine, first install the WolverineFx.Redis
Nuget package to your application. Behind the scenes, the Wolverine.Redis
library is using the StackExchange.Redis library.
dotnet add WolverineFx.Redis
Using as Message Transport
To connect to Redis and configure listeners and senders, use this syntax:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379")
// Auto-create streams and consumer groups
.AutoProvision()
// Configure default consumer name selector for all Redis listeners
.ConfigureDefaultConsumerName((runtime, endpoint) =>
$"{runtime.Options.ServiceName}-{runtime.DurabilitySettings.AssignedNodeNumber}")
// Useful for testing - auto purge queues on startup
.AutoPurgeOnStartup();
// Just publish all messages to Redis streams (uses database 0 by default)
opts.PublishAllMessages().ToRedisStream("wolverine-messages");
// Or explicitly configure message routing with database ID
opts.PublishMessage<ColorMessage>()
.ToRedisStream("colors", databaseId: 1)
// Configure specific settings for this stream
.BatchSize(50)
.SendInline();
// Listen to Redis streams with consumer groups (uses database 0 by default)
opts.ListenToRedisStream("red", "color-processors")
.ProcessInline()
// Configure consumer settings
.ConsumerName("red-consumer-1")
.BatchSize(10)
.BlockTimeout(TimeSpan.FromSeconds(5))
// Start from beginning to consume existing messages (like Kafka's AutoOffsetReset.Earliest)
.StartFromBeginning();
// Listen to Redis streams with database ID specified
opts.ListenToRedisStream("green", "color-processors", databaseId: 2)
.BufferedInMemory()
.BatchSize(25)
.StartFromNewMessages(); // Default: only new messages (like Kafka's AutoOffsetReset.Latest)
opts.ListenToRedisStream("blue", "color-processors", databaseId: 3)
.UseDurableInbox()
.ConsumerName("blue-consumer")
.StartFromBeginning(); // Process existing messages too
// Alternative: use StartFrom parameter directly
opts.ListenToRedisStream("purple", "color-processors", StartFrom.Beginning)
.BufferedInMemory();
// This will direct Wolverine to try to ensure that all
// referenced Redis streams and consumer groups exist at
// application start up time
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
If you need to control the database id within Redis, you have these options:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379");
// Configure streams on different databases
opts.PublishMessage<OrderCreated>()
.ToRedisStream("orders", databaseId: 1);
opts.PublishMessage<PaymentProcessed>()
.ToRedisStream("payments", databaseId: 2);
// Listen on different databases
opts.ListenToRedisStream("orders", "order-processors", databaseId: 1);
opts.ListenToRedisStream("payments", "payment-processors", databaseId: 2);
// Advanced configuration with database ID
opts.ListenToRedisStream("notifications", "notification-processors", databaseId: 3)
.ConsumerName("notification-consumer-1")
.BatchSize(100)
.BlockTimeout(10.Seconds())
.UseDurableInbox();
}).StartAsync();
To work with multiple databases in one application, see this sample:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRedisTransport("localhost:6379").AutoProvision();
// Different message types on different databases for isolation
// Database 0: Default messages
opts.PublishMessage<SystemEvent>().ToRedisStream("system-events");
opts.ListenToRedisStream("system-events", "system-processors");
// Database 1: Order processing
opts.PublishMessage<OrderCreated>().ToRedisStream("orders", 1);
opts.ListenToRedisStream("orders", "order-processors", 1);
// Database 2: Payment processing
opts.PublishMessage<PaymentProcessed>().ToRedisStream("payments", 2);
opts.ListenToRedisStream("payments", "payment-processors", 2);
// Database 3: Analytics and reporting
opts.PublishMessage<AnalyticsEvent>().ToRedisStream("analytics", 3);
opts.ListenToRedisStream("analytics", "analytics-processors", 3);
}).StartAsync();
Interoperability
First, see the tutorial on interoperability with Wolverine for general guidance.
Next, the Redis transport supports interoperability through the IRedisEnvelopeMapper
interface. If necessary, you can build your own version of this mapper interface like the following:
// Simplistic envelope mapper that expects every message to be of
// type "T" and serialized as JSON that works perfectly well w/ our
// application's default JSON serialization
public class OurRedisJsonMapper<TMessage> : EnvelopeMapper<StreamEntry, List<NameValueEntry>>, IRedisEnvelopeMapper
{
// Wolverine needs to know the message type name
private readonly string _messageTypeName = typeof(TMessage).ToMessageTypeName();
public OurRedisJsonMapper(Endpoint endpoint) : base(endpoint)
{
// Map the data property
MapProperty(x => x.Data!,
(e, m) => e.Data = m.Values.FirstOrDefault(x => x.Name == "data").Value,
(e, m) => m.Add(new NameValueEntry("data", e.Data)));
// Set up the message type
MapProperty(x => x.MessageType!,
(e, m) => e.MessageType = _messageTypeName,
(e, m) => m.Add(new NameValueEntry("message-type", _messageTypeName)));
// Set up content type
MapProperty(x => x.ContentType!,
(e, m) => e.ContentType = "application/json",
(e, m) => m.Add(new NameValueEntry("content-type", "application/json")));
}
protected override void writeOutgoingHeader(List<NameValueEntry> outgoing, string key, string value)
{
outgoing.Add(new NameValueEntry($"header-{key}", value));
}
protected override bool tryReadIncomingHeader(StreamEntry incoming, string key, out string? value)
{
var target = $"header-{key}";
foreach (var nv in incoming.Values)
{
if (nv.Name.Equals(target))
{
value = nv.Value.ToString();
return true;
}
}
value = null;
return false;
}
protected override void writeIncomingHeaders(StreamEntry incoming, Envelope envelope)
{
var headers = incoming.Values.Where(k => k.Name.StartsWith("header-"));
foreach (var nv in headers)
{
envelope.Headers[nv.Name.ToString()[7..]] = nv.Value.ToString(); // Remove "header-" prefix
}
// Capture the Redis stream message id
envelope.Headers["redis-entry-id"] = incoming.Id.ToString();
}
}