Aggregate Handlers and Event Sourcing
TIP
You can forgo the [AggregateHandler]
attribute by instead naming your message handler type with the AggregateHandler
suffix if the Wolverine/Marten integration is applied to your application. Do note that you will still have to use the attribute to opt into exclusive write locking.
See the OrderEventSourcingSample project on GitHub for more samples.
That Wolverine + Marten combination is optimized for efficient and productive development using a CQRS architecture style with Marten's event sourcing support. Specifically, let's dive into the responsibilities of a typical command handler in a CQRS with event sourcing architecture:
- Fetch any current state of the system that's necessary to evaluate or validate the incoming event
- Decide what events should be emitted and captured in response to an incoming event
- Manage concurrent access to system state
- Safely commit the new events
- Selectively publish some of the events based on system needs to other parts of your system or even external systems
- Instrument all of the above
And then lastly, you're going to want some resiliency and selective retry capabilities for concurrent access violations or just normal infrastructure hiccups.
Let's just right into an example order management system. I'm going to model the order workflow with this aggregate model:
public class Item
{
public string Name { get; set; }
public bool Ready { get; set; }
}
public class Order
{
public Order(OrderCreated created)
{
foreach (var item in created.Items) Items[item.Name] = item;
}
// This would be the stream id
public Guid Id { get; set; }
// This is important, by Marten convention this would
// be the
public int Version { get; set; }
public DateTimeOffset? Shipped { get; private set; }
public Dictionary<string, Item> Items { get; set; } = new();
// These methods are used by Marten to update the aggregate
// from the raw events
public void Apply(IEvent<OrderShipped> shipped)
{
Shipped = shipped.Timestamp;
}
public void Apply(ItemReady ready)
{
Items[ready.Name].Ready = true;
}
public bool IsReadyToShip()
{
return Shipped == null && Items.Values.All(x => x.Ready);
}
}
At a minimum, we're going to want a command handler for this command message that marks an order item as ready to ship and then evaluates whether or not based on the current state of the Order
aggregate whether or not the logical order is ready to be shipped out:
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);
In the code above, we're also utilizing Wolverine's outbox messaging support to both order and guarantee the delivery of a ShipOrder
message when the Marten transaction
Before getting into Wolverine middleware strategies, let's first build out an MVC controller method for the command above:
[HttpPost("/orders/itemready")]
public async Task Post(
[FromBody] MarkItemReady command,
[FromServices] IDocumentSession session,
[FromServices] IMartenOutbox outbox
)
{
// This is important!
outbox.Enroll(session);
// Fetch the current value of the Order aggregate
var stream = await session
.Events
// We're also opting into Marten optimistic concurrency checks here
.FetchForWriting<Order>(command.OrderId, command.Version);
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
item.Ready = true;
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
// Publish a cascading command to do whatever it takes
// to actually ship the order
// Note that because the context here is enrolled in a Wolverine
// outbox, the message is registered, but not "released" to
// be sent out until SaveChangesAsync() is called down below
await outbox.PublishAsync(new ShipOrder(command.OrderId));
stream.AppendOne(new OrderReady());
}
// This will also persist and flush out any outgoing messages
// registered into the context outbox
await session.SaveChangesAsync();
}
Hopefully, that code is easy to understand, but there's some potentially repetitive code (loading aggregates, appending events, committing transactions) that will reoccur across all your command handlers. Likewise, it would be best to completely isolate your business logic that decides what new events should be appended completely away from the infrastructure code so that you can more easily reason about that code and easily test that business logic. To that end, Wolverine supports the Decider pattern with Marten using the [AggregateHandler]
middleware. Using that middleware, we get this slimmer code:
[AggregateHandler]
public static IEnumerable<object> Handle(MarkItemReady command, Order order)
{
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
yield return new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
yield return new OrderReady();
}
}
In the case above, Wolverine is wrapping middleware around our basic command handler to to:
- Fetch the appropriate
Order
aggregate matching the command - Append any new events returned from the handle method to the Marten event stream for this
Order
- Saves any outstanding changes and commits the Marten unit of work
To make this more clear, here's the generated code (with some reformatting and extra comments):
public class MarkItemReadyHandler1442193977 : MessageHandler
{
private readonly OutboxedSessionFactory _outboxedSessionFactory;
public MarkItemReadyHandler1442193977(OutboxedSessionFactory outboxedSessionFactory)
{
_outboxedSessionFactory = outboxedSessionFactory;
}
public override async Task HandleAsync(MessageContext context, CancellationToken cancellation)
{
var markItemReady = (MarkItemReady)context.Envelope.Message;
await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
// Loading Marten aggregate
var eventStream = await eventStore.FetchForWriting<Order>(markItemReady.OrderId, markItemReady.Version, cancellation).ConfigureAwait(false);
var outgoing1 = MarkItemReadyHandler.Handle(markItemReady, eventStream.Aggregate);
if (outgoing1 != null)
{
// Capturing any possible events returned from the command handlers
eventStream.AppendMany(outgoing1);
}
await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
}
}
As you probably guessed, there are some naming conventions or other questions you need to be aware of before you use this middleware strategy.
Handler Method Signatures
The Marten workflow command handler method signature needs to follow these rules:
- Either explicitly use the
[AggregateHandler]
attribute on the handler method or use theAggregateHandler
suffix on the message handler type to tell Wolverine to opt into the aggregate command workflow. - The first argument should be the command type, just like any other Wolverine message handler
- The 2nd argument should be the aggregate -- either the aggregate itself (
Order
) or wrapped in the MartenIEventStream<T>
type (IEventStream<Order>
). There is an example of that usage below:
[AggregateHandler]
public static void Handle(OrderEventSourcingSample.MarkItemReady command, IEventStream<Order> stream)
{
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
}
Just as in other Wolverine message handlers, you can use additional method arguments for registered services ("method injection"), the CancellationToken
for the message, and the message Envelope
if you need access to message metadata.
As for the return values from these handler methods, you can use:
- It's legal to have no return values if you are directly using
IEventStream<T>
to append events IEnumerable<object>
orobject[]
to denote that a value is events to append to the current event streamIAsyncEnumerable<object
will also be treated as a variable enumerable to events to append to the current event streamWolverine.Events
to denote a list of events. You may find this to lead to more readable code in some casesOutgoingMessages
to refer to additional command messages to be published that should not be captured as eventsISideEffect
objects- Any other type would be considered to be a separate event type, and you may happily use that for either a single event or a tuple of separate events that will be appended to the event stream
Here's an alternative to the MarkItemReady
handler that uses Events
:
[AggregateHandler]
public static async Task<(Events, OutgoingMessages)> HandleAsync(MarkItemReady command, Order order, ISomeService service)
{
// All contrived, let's say we need to call some
// kind of service to get data so this handler has to be
// async
var data = await service.FindDataAsync();
var messages = new OutgoingMessages();
var events = new Events();
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
events += new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
events += new OrderReady();
messages.Add(new ShipOrder(order.Id));
}
// This results in both new events being captured
// and potentially the ShipOrder message going out
return (events, messages);
}
Determining the Aggregate Identity
Wolverine is trying to determine a public member on the command type that refers to the identity of the aggregate type. You've got two options, either use the implied naming convention below where the OrderId
property is assumed to be the identity of the Order
aggregate by appending "Id" to the aggregate type name (it's not case sensitive if you were wondering):
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);
Or if you want to use a different member, bypass the convention, or just don't like conventional magic, you can decorate a public member on the command class with Marten's [Identity]
attribute like so:
public class MarkItemReady
{
// This attribute tells Wolverine that this property will refer to the
// Order aggregate
[Identity] public Guid Id { get; init; }
public string ItemName { get; init; }
}
Forwarding Events
See Event Forwarding for more information.
Returning the Updated Aggregate 3.5
A common use case for the "aggregate handler workflow" has been to respond with the now updated state of the projected aggregate that has just been updated by appending new events. Until now, that's effectively meant making a completely separate call to the database through Marten to retrieve the latest updates.
INFO
To understand more about the inner workings of the next section, see the Marten documentation on its FetchLatest API.
As a quick tip for performance, assuming that you are not mutating the projected documents within your command handlers, you can opt for this significant Marten optimization to eliminate extra database round trips while using the aggregate handler workflow:
builder.Services.AddMarten(opts =>
{
// Other Marten configuration
// Use this setting to get the very best performance out
// of the UpdatedAggregate workflow and aggregate handler
// workflow over all
opts.Events.UseIdentityMapForAggregates = true;
}).IntegrateWithWolverine();
INFO
The setting above cannot be a default in Marten because it can break some existing code with a very different workflow that what the Critter Stack team recommends for the aggregate handler workflow.
Wolverine.Marten has a special response type for message handlers or HTTP endpoints we can use as a directive to tell Wolverine to respond with the latest state of a projected aggregate as part of the command execution. Let's make this concrete by taking the MarkItemReady
command handler we've used earlier in this guide and building a slightly new version that produces a response of the latest aggregate:
[AggregateHandler]
public static (
// Just tells Wolverine to use Marten's FetchLatest API to respond with
// the updated version of Order that reflects whatever events were appended
// in this command
UpdatedAggregate,
// The events that should be appended to the event stream for this order
Events) Handle(OrderEventSourcingSample.MarkItemReady command, Order order)
{
var events = new Events();
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
events.Add(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
events.Add(new OrderReady());
}
return (new UpdatedAggregate(), events);
}
Note the usage of the Wolverine.Marten.UpdatedAggregate
response in the handler. That type by itself is just a directive to Wolverine to generate the necessary code to call FetchLatest
and respond with that. The command handler above allows us to use the command in a mediator usage like so:
public static Task<Order> update_and_get_latest(IMessageBus bus, MarkItemReady command)
{
// This will return the updated version of the Order
// aggregate that incorporates whatever events were appended
// in the course of processing the command
return bus.InvokeAsync<Order>(command);
}
Likewise, you can use UpdatedAggregate
as the response body of an HTTP endpoint with Wolverine.HTTP as shown here.
INFO
This feature has been more or less requested several times, but was finally brought about because of the need to consume Wolverine + Marten commands within Hot Chocolate mutations and always return the current state of the projected aggregate being updated to the user interface.