Batch Message Processing 3.0
Sometimes you might want to process a stream of incoming messages in batches rather than one at a time. This might be for performance reasons, or maybe there's some kind of business logic that makes more sense to calculate for batches, or maybe you want a logical "debounce" in how your system responds to the incoming messages.
INFO
The batching is supported both for messages published in process to local queues and from incoming messages from external transports.
Regardless, Wolverine has a mechanism to locally batch incoming messages and forward them to a batch handler. First, let's say that you have a message type called Item:
public record Item(string Name);And for whatever reason, we need to process these messages in batches. To do that, we first need to have a message handler for an array of Item like so:
public static class ItemHandler
{
public static void Handle(Item[] items)
{
// Handle this just like a normal message handler,
// just that the message type is Item[]
}
}WARNING
At this point, Wolverine only supports an array of the message type for the batched handler
TIP
Batch message handlers are just like any other message handler and have no special rules about their capabilities
With that in our system, now we need to tell Wolverine to group Item messages, and we do that with the following syntax:
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.BatchMessagesOf<Item>(batching =>
{
// Really the maximum batch size
batching.BatchSize = 500;
// You can alternatively override the local queue
// for the batch publishing.
batching.LocalExecutionQueueName = "items";
// We can tell Wolverine to wait longer for incoming
// messages before kicking out a batch if there
// are fewer waiting messages than the maximum
// batch size
batching.TriggerTime = 1.Seconds();
})
// The object returned here is the local queue configuration that
// will handle the batched messages. This may be useful for fine
// tuning the behavior of the batch processing
.Sequential();
}).StartAsync();And that's that! Just to bring this a little more into focus, here's an end to end test from the Wolverine codebase:
[Fact]
public async Task send_end_to_end_with_batch()
{
// Items to publish
var item1 = new Item("one");
var item2 = new Item("two");
var item3 = new Item("three");
var item4 = new Item("four");
Func<IMessageContext, Task> publish = async c =>
{
// I'm publishing the 4 items in sequence
await c.PublishAsync(item1);
await c.PublishAsync(item2);
await c.PublishAsync(item3);
await c.PublishAsync(item4);
};
// This is the "act" part of the test
var session = await theHost.TrackActivity()
// Wolverine testing helper to "wait" until
// the tracking receives a message of Item[]
.WaitForMessageToBeReceivedAt<Item[]>(theHost)
.ExecuteAndWaitAsync(publish);
// The four Item messages should be processed as a single
// batch message
var items = session.Executed.SingleMessage<Item[]>();
items.Length.ShouldBe(4);
items.ShouldContain(item1);
items.ShouldContain(item2);
items.ShouldContain(item3);
items.ShouldContain(item4);
}Alright, with all that being said, here's a few more facts about the batch messaging support:
- There is absolutely no need to create a specific message handler for the
Itemmessage, and in fact, you should not do so -- unless you are running inMultipleHandlerBehavior.Separatedmode and deliberately want both a per-message handler and a batched handler (see Combining a direct handler with batching below) - The message batching is able to group the message batches by tenant id if your Wolverine system uses multi-tenancy
Combining a direct handler with batching
By default Wolverine assumes the batch handler is the only consumer of the element type, so an incoming Item is always routed straight to the batch. If you also declare a direct Handle(Item) handler alongside BatchMessagesOf<Item>(), the direct handler wins and the batch is silently shadowed -- the batched handler never runs.
The one exception is MultipleHandlerBehavior.Separated. Under that mode Wolverine treats the per-message handler and the batched handler as two independent consumers of Item, so both run for every Item:
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<Item>();
// Direct, per-message handler
public static class ItemAuditHandler
{
public static void Handle(Item item) { /* runs once per message */ }
}
// Batched handler
public static class ItemHandler
{
public static void Handle(Item[] items) { /* runs once per assembled batch */ }
}To make this work, Wolverine moves the batch onto its own dedicated local queue (the element type's queue name with a -batch suffix) so it no longer collides with the direct handler's queue, and fans every Item out to both queues. This applies to messages published in-process and to Item messages arriving from an external transport listener.
Multiple batched handlers
MultipleHandlerBehavior.Separated also lets you register more than one batched handler for the same element type -- for example one handler that publishes an integration event for the batch and another that archives it:
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<Item>();
public static class ItemPublisher
{
public static void Handle(Item[] items) { /* publish an integration event */ }
}
public static class ItemArchiver
{
public static void Handle(Item[] items) { /* archive the batch */ }
}Under Separated mode each Handle(Item[]) handler is given its own sticky queue, so Wolverine fans the assembled batch out to every one of them and each runs independently. (Under the default Classic behavior the multiple Handle(Item[]) handlers are instead combined into a single logical handler that invokes each one in turn.)
What about durable messaging ("inbox")?
The durable inbox behaves just a little bit differently for message batching. Wolverine will technically "handle" the individual messages, but does not mark them as handled in the message store until a batch message that refers to the original message is completely processed.
Custom Batching Strategies
INFO
This feature was originally added for a JasperFx Software customer who needed to batch messages by a logical saga id.
By default, Wolverine is simply batching messages of type Item into a message of type Item[]. But what if you need to do something a little more custom? Like batching messages by a logical saga id or some kind of entity identity?
As an example, let's say that you are building some kind of task tracking system where you might easily have dozens of sub tasks for each parent task that could be getting marked complete in rapid succession. That's maybe a good example of where batching would be handy. Let's say that we have two message types for the individual item message and a custom task for the batched message like so:
// Messages at the granular level that might be streaming in
// very quickly
public record SubTaskCompleted(string TaskId, string SubTaskId);
// A custom message type for processing a batch of sub task
// completed messages. Note that it's batched by the TaskId
public record SubTaskCompletedBatch(string TaskId, string[] SubTaskIdList);To teach Wolverine how to batch up our SubTaskCompleted messages into our custom batch message, we need to supply our own implementation of Wolverine's built in Wolverine.Runtime.Batching.IMessageBatcher type:
/// <summary>
/// Plugin strategy for creating custom grouping of messages
/// </summary>
public interface IMessageBatcher
{
/// <summary>
/// Main method that batches items
/// </summary>
/// <param name="envelopes"></param>
/// <returns></returns>
IEnumerable<Envelope> Group(IReadOnlyList<Envelope> envelopes);
/// <summary>
/// The actual message type being built that is assumed to contain
/// all the batched items
/// </summary>
Type BatchMessageType { get; }
}A custom implementation of that interface in this case would look like this:
public class SubTaskCompletedBatcher : IMessageBatcher
{
public IEnumerable<Envelope> Group(IReadOnlyList<Envelope> envelopes)
{
var groups = envelopes
// You can trust that the message will be a non-null SubTaskCompleted here
.GroupBy(x => x.Message!.As<SubTaskCompleted>().TaskId)
.ToArray();
foreach (var group in groups)
{
var subTaskIdList = group
.Select(x => x.Message)
.OfType<SubTaskCompleted>()
.Select(x => x.SubTaskId)
.ToArray();
var message = new SubTaskCompletedBatch(group.Key,
subTaskIdList);
// It's important here to pass along the group of envelopes that make up
// this batched message for Wolverine's transactional inbox/outbox
// tracking
yield return new Envelope(message, group);
}
}
public Type BatchMessageType => typeof(SubTaskCompletedBatch);
}And of course, this doesn't work without a matching message handler for our custom message type:
public static class SubTaskCompletedBatchHandler
{
public static Task<TrackedTask> LoadAsync(SubTaskCompletedBatch batch, ITrackedTaskRepository repository)
{
return repository.LoadAsync(batch.TaskId);
}
public static Task Handle(SubTaskCompletedBatch batch)
{
// actually do something here....
return Task.CompletedTask;
}
}And finally, we need to tell Wolverine about the batching and the strategy for batching the SubTaskCompleted message type:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.BatchMessagesOf<SubTaskCompleted>(x =>
{
// We just have to let Wolverine know about our custom
// message batcher
x.Batcher = new SubTaskCompletedBatcher();
});
}).StartAsync();
