Aggregate Handlers and Event Sourcing
TIP
Only use the "aggregate handler workflow" if you are wanting to potentially write new events to an existing event stream. If all you need in a message handler or HTTP endpoint is a read-only copy of an event streamed aggregate from Marten, use the [ReadAggregate] attribute instead that has a little bit lighter weight runtime within Marten.
INFO
If your message handler or HTTP endpoint uses more than one declarative attribute for retrieving Marten data, Wolverine 5.0+ is able to utilize Marten's Batch Querying capability for more efficient interaction with the database.
This batching behavior is also supported for all the declarative attributes and the "aggregate handler workflow" in general described in this page.
See the OrderEventSourcingSample project on GitHub for more samples.
That Wolverine + Marten combination is optimized for efficient and productive development using a CQRS architecture style with Marten's event sourcing support. Specifically, let's dive into the responsibilities of a typical command handler in a CQRS with event sourcing architecture:
- Fetch any current state of the system that's necessary to evaluate or validate the incoming event
- Decide what events should be emitted and captured in response to an incoming event
- Manage concurrent access to system state
- Safely commit the new events
- Selectively publish some of the events based on system needs to other parts of your system or even external systems
- Instrument all of the above
And then lastly, you're going to want some resiliency and selective retry capabilities for concurrent access violations or just normal infrastructure hiccups.
Let's just right into an example order management system. I'm going to model the order workflow with this aggregate model:
public class Item
{
public string Name { get; set; }
public bool Ready { get; set; }
}
public class Order
{
public Order(OrderCreated created)
{
foreach (var item in created.Items) Items[item.Name] = item;
}
// This would be the stream id
public Guid Id { get; set; }
// This is important, by Marten convention this would
// be the
public int Version { get; set; }
public DateTimeOffset? Shipped { get; private set; }
public Dictionary<string, Item> Items { get; set; } = new();
// These methods are used by Marten to update the aggregate
// from the raw events
public void Apply(IEvent<OrderShipped> shipped)
{
Shipped = shipped.Timestamp;
}
public void Apply(ItemReady ready)
{
Items[ready.Name].Ready = true;
}
public bool IsReadyToShip()
{
return Shipped == null && Items.Values.All(x => x.Ready);
}
}At a minimum, we're going to want a command handler for this command message that marks an order item as ready to ship and then evaluates whether or not based on the current state of the Order aggregate whether or not the logical order is ready to be shipped out:
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);In the code above, we're also utilizing Wolverine's outbox messaging support to both order and guarantee the delivery of a ShipOrder message when the Marten transaction
Before getting into Wolverine middleware strategies, let's first build out an MVC controller method for the command above:
[HttpPost("/orders/itemready")]
public async Task Post(
[FromBody] MarkItemReady command,
[FromServices] IDocumentSession session,
[FromServices] IMartenOutbox outbox
)
{
// This is important!
outbox.Enroll(session);
// Fetch the current value of the Order aggregate
var stream = await session
.Events
// We're also opting into Marten optimistic concurrency checks here
.FetchForWriting<Order>(command.OrderId, command.Version);
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
item.Ready = true;
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
// Publish a cascading command to do whatever it takes
// to actually ship the order
// Note that because the context here is enrolled in a Wolverine
// outbox, the message is registered, but not "released" to
// be sent out until SaveChangesAsync() is called down below
await outbox.PublishAsync(new ShipOrder(command.OrderId));
stream.AppendOne(new OrderReady());
}
// This will also persist and flush out any outgoing messages
// registered into the context outbox
await session.SaveChangesAsync();
}Hopefully, that code is easy to understand, but there's some potentially repetitive code (loading aggregates, appending events, committing transactions) that will reoccur across all your command handlers. Likewise, it would be best to completely isolate your business logic that decides what new events should be appended completely away from the infrastructure code so that you can more easily reason about that code and easily test that business logic. To that end, Wolverine supports the Decider pattern with Marten using the [AggregateHandler] middleware. Using that middleware, we get this slimmer code:
[AggregateHandler]
public static IEnumerable<object> Handle(MarkItemReady command, Order order)
{
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
yield return new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
yield return new OrderReady();
}
}In the case above, Wolverine is wrapping middleware around our basic command handler to to:
- Fetch the appropriate
Orderaggregate matching the command - Append any new events returned from the handle method to the Marten event stream for this
Order - Saves any outstanding changes and commits the Marten unit of work
To make this more clear, here's the generated code (with some reformatting and extra comments):
public class MarkItemReadyHandler1442193977 : MessageHandler
{
private readonly OutboxedSessionFactory _outboxedSessionFactory;
public MarkItemReadyHandler1442193977(OutboxedSessionFactory outboxedSessionFactory)
{
_outboxedSessionFactory = outboxedSessionFactory;
}
public override async Task HandleAsync(MessageContext context, CancellationToken cancellation)
{
var markItemReady = (MarkItemReady)context.Envelope.Message;
await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
// Loading Marten aggregate
var eventStream = await eventStore.FetchForWriting<Order>(markItemReady.OrderId, markItemReady.Version, cancellation).ConfigureAwait(false);
var outgoing1 = MarkItemReadyHandler.Handle(markItemReady, eventStream.Aggregate);
if (outgoing1 != null)
{
// Capturing any possible events returned from the command handlers
eventStream.AppendMany(outgoing1);
}
await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
}
}As you probably guessed, there are some naming conventions or other questions you need to be aware of before you use this middleware strategy.
WARNING
There are some open, let's call them imperfections with Wolverine's code generation against the [WriteAggregate] and [ReadAggregate] usage. For best results, only use these attributes on a parameter within the main HTTP endpoint method and not in Validate/Before/Load methods.
INFO
The [Aggregate] and [WriteAggregate] attributes require the requested stream and aggregate to be found by default, meaning that the handler or HTTP endpoint will be stopped if the requested data is not found. You can explicitly mark individual attributes as Required=false.
Alternatively, there is also the newer [WriteAggregate] usage, with this example being a functional alternative mark up:
public static IEnumerable<object> Handle(
// The command
MarkItemReady command,
// This time we'll mark the parameter as the "aggregate"
[WriteAggregate] Order order)
{
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
yield return new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
yield return new OrderReady();
}
}The [WriteAggregate] attribute also opts into the "aggregate handler workflow", but is placed at the parameter level instead of the class level. This was added to extend the "aggregate handler workflow" to operations that involve multiple event streams in one transaction.
TIP
[WriteAggregate] works equally on message handlers as it does on HTTP endpoints. In fact, the older [Aggregate] attribute in Wolverine.Http.Marten is now just a subclass of [WriteAggregate].
Validation on Stream Existence 4.8
By default, the "aggregate handler workflow" does no validation on whether or not the identified event stream actually exists at runtime, and it's possible to receive a null for the aggregate in this example if the aggregate does not exist:
public static IEnumerable<object> Handle(
// The command
MarkItemReady command,
// This time we'll mark the parameter as the "aggregate"
[WriteAggregate] Order order)
{
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
yield return new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
yield return new OrderReady();
}
}As long as you handle the case where the requested is null, you can even effectively start a new stream by emitting events from your handler or HTTP endpoint.
If you do want to protect message handlers or HTTP endpoints from acting on missing streams because of bad user inputs (or who knows what, it's a chaotic world and you should never trust your system is receiving valid input), you now have some options to mark the aggregate itself as required and even control how Wolverine deals with the aggregate being missing as shown in these sample signatures below:
public static class ValidatedMarkItemReadyHandler
{
public static IEnumerable<object> Handle(
// The command
MarkItemReady command,
// In HTTP this will return a 404 status code and stop
// the request if the Order is not found
// In message handlers, this will log that the Order was not found,
// then stop processing. The message would be effectively
// discarded
[WriteAggregate(Required = true)] Order order) => [];
[WolverineHandler]
public static IEnumerable<object> Handle2(
// The command
MarkItemReady command,
// In HTTP this will return a 400 status code and
// write out a ProblemDetails response with a default message explaining
// the data that could not be found
[WriteAggregate(Required = true, OnMissing = OnMissing.ProblemDetailsWith400)] Order order) => [];
[WolverineHandler]
public static IEnumerable<object> Handle3(
// The command
MarkItemReady command,
// In HTTP this will return a 404 status code and
// write out a ProblemDetails response with a default message explaining
// the data that could not be found
[WriteAggregate(Required = true, OnMissing = OnMissing.ProblemDetailsWith404)] Order order) => [];
[WolverineHandler]
public static IEnumerable<object> Handle4(
// The command
MarkItemReady command,
// In HTTP this will return a 400 status code and
// write out a ProblemDetails response with a custom message.
// Wolverine will substitute in the order identity into the message for "{0}"
// In message handlers, Wolverine will log using your custom message then discard the message
[WriteAggregate(Required = true, OnMissing = OnMissing.ProblemDetailsWith404, MissingMessage = "Cannot find Order {0}")] Order order) => [];
}The Required, OnMissing, and MissingMessage properties behave consistently on all Wolverine attributes like [Entity] or [WriteAggregate] or [ReadAggregate].
Handler Method Signatures
The Marten workflow command handler method signature needs to follow these rules:
- Either explicitly use the
[AggregateHandler]attribute on the handler method or use theAggregateHandlersuffix on the message handler type to tell Wolverine to opt into the aggregate command workflow. - The first argument should be the command type, just like any other Wolverine message handler
- The 2nd argument should be the aggregate -- either the aggregate itself (
Order) or wrapped in the MartenIEventStream<T>type (IEventStream<Order>). There is an example of that usage below:
[AggregateHandler]
public static void Handle(OrderEventSourcingSample.MarkItemReady command, IEventStream<Order> stream)
{
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
}Just as in other Wolverine message handlers, you can use additional method arguments for registered services ("method injection"), the CancellationToken for the message, and the message Envelope if you need access to message metadata.
As for the return values from these handler methods, you can use:
- It's legal to have no return values if you are directly using
IEventStream<T>to append events IEnumerable<object>orobject[]to denote that a value is events to append to the current event streamIAsyncEnumerable<objectwill also be treated as a variable enumerable to events to append to the current event streamWolverine.Marten.Eventsto denote a list of events. You may find this to lead to more readable code in some casesOutgoingMessagesto refer to additional command messages to be published that should not be captured as eventsISideEffectobjects- Any other type would be considered to be a separate event type, and you may happily use that for either a single event or a tuple of separate events that will be appended to the event stream
Here's an alternative to the MarkItemReady handler that uses Events:
[AggregateHandler]
public static async Task<(Events, OutgoingMessages)> HandleAsync(MarkItemReady command, Order order, ISomeService service)
{
// All contrived, let's say we need to call some
// kind of service to get data so this handler has to be
// async
var data = await service.FindDataAsync();
var messages = new OutgoingMessages();
var events = new Events();
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
events += new ItemReady(command.ItemName);
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
events += new OrderReady();
messages.Add(new ShipOrder(order.Id));
}
// This results in both new events being captured
// and potentially the ShipOrder message going out
return (events, messages);
}Determining the Aggregate Identity
Wolverine is trying to determine a public member on the command type that refers to the identity of the aggregate type. You've got two options, either use the implied naming convention below where the OrderId property is assumed to be the identity of the Order aggregate by appending "Id" to the aggregate type name (it's not case sensitive if you were wondering):
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);Or if you want to use a different member, bypass the convention, or just don't like conventional magic, you can decorate a public member on the command class with Marten's [Identity] attribute like so:
public class MarkItemReady
{
// This attribute tells Wolverine that this property will refer to the
// Order aggregate
[Identity] public Guid Id { get; init; }
public string ItemName { get; init; }
}Validation 4.8
Every possible attribute for triggering the "aggregate handler workflow" includes support for data requirements as shown below with [ReadAggregate]:
// Straight up 404 on missing
[WolverineGet("/letters1/{id}")]
public static LetterAggregate GetLetter1([ReadAggregate] LetterAggregate letters) => letters;
// Not required
[WolverineGet("/letters2/{id}")]
public static string GetLetter2([ReadAggregate(Required = false)] LetterAggregate letters)
{
return letters == null ? "No Letters" : "Got Letters";
}
// Straight up 404 & problem details on missing
[WolverineGet("/letters3/{id}")]
public static LetterAggregate GetLetter3([ReadAggregate(OnMissing = OnMissing.ProblemDetailsWith404)] LetterAggregate letters) => letters;Forwarding Events
See Event Forwarding for more information.
Returning the Updated Aggregate 3.5
A common use case for the "aggregate handler workflow" has been to respond with the now updated state of the projected aggregate that has just been updated by appending new events. Until now, that's effectively meant making a completely separate call to the database through Marten to retrieve the latest updates.
INFO
To understand more about the inner workings of the next section, see the Marten documentation on its FetchLatest API.
As a quick tip for performance, assuming that you are not mutating the projected documents within your command handlers, you can opt for this significant Marten optimization to eliminate extra database round trips while using the aggregate handler workflow:
builder.Services.AddMarten(opts =>
{
// Other Marten configuration
// Use this setting to get the very best performance out
// of the UpdatedAggregate workflow and aggregate handler
// workflow over all
opts.Events.UseIdentityMapForAggregates = true;
}).IntegrateWithWolverine();INFO
The setting above cannot be a default in Marten because it can break some existing code with a very different workflow than what the Critter Stack team recommends for the aggregate handler workflow.
Wolverine.Marten has a special response type for message handlers or HTTP endpoints we can use as a directive to tell Wolverine to respond with the latest state of a projected aggregate as part of the command execution. Let's make this concrete by taking the MarkItemReady command handler we've used earlier in this guide and building a slightly new version that produces a response of the latest aggregate:
[AggregateHandler]
public static (
// Just tells Wolverine to use Marten's FetchLatest API to respond with
// the updated version of Order that reflects whatever events were appended
// in this command
UpdatedAggregate,
// The events that should be appended to the event stream for this order
Events) Handle(OrderEventSourcingSample.MarkItemReady command, Order order)
{
var events = new Events();
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Not doing this in a purist way here, but just
// trying to illustrate the Wolverine mechanics
item.Ready = true;
// Mark that the this item is ready
events.Add(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
events.Add(new OrderReady());
}
return (new UpdatedAggregate(), events);
}Note the usage of the Wolverine.Marten.UpdatedAggregate response in the handler. That type by itself is just a directive to Wolverine to generate the necessary code to call FetchLatest and respond with that. The command handler above allows us to use the command in a mediator usage like so:
public static Task<Order> update_and_get_latest(IMessageBus bus, MarkItemReady command)
{
// This will return the updated version of the Order
// aggregate that incorporates whatever events were appended
// in the course of processing the command
return bus.InvokeAsync<Order>(command);
}Likewise, you can use UpdatedAggregate as the response body of an HTTP endpoint with Wolverine.HTTP as shown here.
INFO
This feature has been more or less requested several times, but was finally brought about because of the need to consume Wolverine + Marten commands within Hot Chocolate mutations and always return the current state of the projected aggregate being updated to the user interface.
Passing the Aggregate to Before/Validate/Load Methods
The "compound handler" feature is a valuable way in Wolverine to organize your handler code, and fully supported within the aggregate handler workflow as well. If you have a command handler method marked with [AggregateHandler] or the [Aggregate] attribute in HTTP usage, you can also pass the aggregate type as an argument to any Before / LoadAsync / Validate method on that handler to do validation before the main handler method. Here's a sample from the tests of doing just that:
public record RaiseIfValidated(Guid LetterAggregateId);
public static class RaiseIfValidatedHandler
{
public static HandlerContinuation Validate(LetterAggregate aggregate) =>
aggregate.ACount == 0 ? HandlerContinuation.Continue : HandlerContinuation.Stop;
[AggregateHandler]
public static IEnumerable<object> Handle(RaiseIfValidated command, LetterAggregate aggregate)
{
yield return new BEvent();
}
}Archiving Streams
To mark a Marten event stream as archived from a Wolverine aggregate handler, just append the special Marten Archived event to the stream just like you would in any other aggregate handler.
Reading the Latest Version of an Aggregate
INFO
This is using Marten's FetchLatest API and is limited to single stream projections.
If you want to inject the current state of an event sourced aggregate as a parameter into a message handler method strictly for information and don't need the heavier "aggregate handler workflow," use the [ReadAggregate] attribute like this:
public record FindAggregate(Guid Id);
public static class FindLettersHandler
{
// This is admittedly just some weak sauce testing support code
public static LetterAggregateEnvelope Handle(FindAggregate command, [ReadAggregate] LetterAggregate aggregate)
{
return new LetterAggregateEnvelope(aggregate);
}
/* ALTERNATIVE VERSION
[WolverineHandler]
public static LetterAggregateEnvelope Handle2(
FindAggregate command,
// Just showing you that you can disable the validation
[ReadAggregate(Required = false)] LetterAggregate aggregate)
{
return aggregate == null ? null : new LetterAggregateEnvelope(aggregate);
}
*/
}If the aggregate doesn't exist, the HTTP request will stop with a 404 status code. The aggregate/stream identity is found with the same rules as the [Entity] or [Aggregate] attributes:
- You can specify a particular request body property name or route argument
- Look for a request body property or route argument named "EntityTypeId"
- Look for a request body property or route argument named "Id" or "id"
You can override the validation rules for how Wolverine handles an aggregate / event stream not being found by setting these properties on [ReadAttribute] (which is much more useful for HTTP endpoints):
// Straight up 404 on missing
[WolverineGet("/letters1/{id}")]
public static LetterAggregate GetLetter1([ReadAggregate] LetterAggregate letters) => letters;
// Not required
[WolverineGet("/letters2/{id}")]
public static string GetLetter2([ReadAggregate(Required = false)] LetterAggregate letters)
{
return letters == null ? "No Letters" : "Got Letters";
}
// Straight up 404 & problem details on missing
[WolverineGet("/letters3/{id}")]
public static LetterAggregate GetLetter3([ReadAggregate(OnMissing = OnMissing.ProblemDetailsWith404)] LetterAggregate letters) => letters;There is also an option with OnMissing to throw a RequiredDataMissingException exception if a required data element is missing. This option is probably most useful with message handlers where you may want to key off the exception with custom error handling rules.
Targeting Multiple Streams at Once 4.9.0
It's now possible to use the "aggregate handler workflow" while needing to append events to more than one event stream at a time.
TIP
You can use read only views of event streams through [ReadAggregate] at will, and that will use Marten's FetchLatest() API underneath. For appending to multiple streams though, for now you will have to directly target IEventStream<T> to help Marten know which stream you're appending events to.
Using the canonical example of a use case where you move money from one account to another account and need both changes to be persisted in one atomic transaction. Let’s start with a simplified domain model of events and a “self-aggregating” Account type like this:
public record AccountCreated(double InitialAmount);
public record Debited(double Amount);
public record Withdrawn(double Amount);
public class Account
{
public Guid Id { get; set; }
public double Amount { get; set; }
public static Account Create(IEvent<AccountCreated> e)
=> new Account { Id = e.StreamId, Amount = e.Data.InitialAmount};
public void Apply(Debited e) => Amount += e.Amount;
public void Apply(Withdrawn e) => Amount -= e.Amount;
}And you need to handle a command like this:
public record TransferMoney(Guid FromId, Guid ToId, double Amount);Using the [WriteAggregate] attribute to denote the event streams we need to work with, we could write this message handler + HTTP endpoint:
public static class TransferMoneyHandler
{
[WolverinePost("/accounts/transfer")]
public static void Handle(
TransferMoney command,
[WriteAggregate(nameof(TransferMoney.FromId))] IEventStream<Account> fromAccount,
[WriteAggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
{
// Would already 404 if either referenced account does not exist
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}The IEventStream<T> abstraction comes from Marten’s FetchForWriting() API that is our recommended way to interact with Marten streams in typical command handlers. This API is used underneath Wolverine’s “aggregate handler workflow”, but normally hidden from user written code if you’re only working with one stream at a time. In this case though, we’ll need to work with the raw IEventStream<T> objects that both wrap the projected aggregation of each Account as well as providing a point where we can explicitly append events separately to each event stream. FetchForWriting() guarantees that you get the most up to date information for the Account view of each event stream regardless of how you have configured Marten’s ProjectionLifecycle for Account (kind of an important detail here!).
The typical Marten transactional middleware within Wolverine is calling SaveChangesAsync() for us on the Marten unit of work IDocumentSession for the command. If there’s enough funds in the “From” account, this command will append a Withdrawn event to the “From” account and a Debited event to the “To” account. If either account has been written to between fetching the original information, Marten will reject the changes and throw its ConcurrencyException as an optimistic concurrency check.
In unit testing, we could write a unit test for the “happy path” where you have enough funds to cover the transfer like this:
public class when_transfering_money
{
[Fact]
public void happy_path_have_enough_funds()
{
// StubEventStream<T> is a type that was recently added to Marten
// specifically to facilitate testing logic like this
var fromAccount = new StubEventStream<Account>(new Account { Amount = 1000 })
{
Id = Guid.NewGuid()
};
var toAccount = new StubEventStream<Account>(new Account { Amount = 100})
{
Id = Guid.NewGuid()
};
TransferMoneyHandler.Handle(new TransferMoney(fromAccount.Id, toAccount.Id, 100), fromAccount, toAccount);
// Now check the events we expected to be appended
fromAccount.Events.Single().Data.ShouldBeOfType<Withdrawn>().Amount.ShouldBe(100);
toAccount.Events.Single().Data.ShouldBeOfType<Debited>().Amount.ShouldBe(100);
}
}Finer-Grained Optimistic Concurrency in Multi-Stream Operations 5.17
When a handler uses multiple [WriteAggregate] parameters, Wolverine automatically applies version discovery only to the first aggregate parameter. Secondary aggregate parameters will not automatically look for a Version variable, preventing them from accidentally sharing the same version source.
To opt a secondary stream into optimistic concurrency checking, use the VersionSource property to explicitly point it at a different member:
public record TransferMoney(Guid FromId, Guid ToId, decimal Amount,
long FromVersion, long ToVersion);
public static class TransferMoneyHandler
{
public static void Handle(
TransferMoney command,
// First parameter: discovers "Version" automatically, or use
// VersionSource for an explicit member name
[WriteAggregate(nameof(TransferMoney.FromId),
VersionSource = nameof(TransferMoney.FromVersion))]
IEventStream<Account> fromAccount,
// Secondary parameter: only gets version checking if VersionSource is set
[WriteAggregate(nameof(TransferMoney.ToId),
VersionSource = nameof(TransferMoney.ToVersion))]
IEventStream<Account> toAccount)
{
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}You can also use AlwaysEnforceConsistency on individual streams within a multi-stream operation to ensure a concurrency check even when no events are appended to that stream:
public static void Handle(
TransferMoney command,
[WriteAggregate(nameof(TransferMoney.FromId),
AlwaysEnforceConsistency = true)]
IEventStream<Account> fromAccount,
[WriteAggregate(nameof(TransferMoney.ToId))]
IEventStream<Account> toAccount)
{
// Even if insufficient funds cause no events to be appended
// to fromAccount, Marten will still verify its version hasn't changed
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}Enforcing Consistency Without New Events 5.17
In some cases, your command handler may decide not to emit any new events after evaluating the current aggregate state. By default, Marten will silently succeed in this case without checking whether the stream has been modified since it was fetched. This can be problematic for cross-stream operations where you need to guarantee that all referenced aggregates are still in the state you expect at commit time.
The AlwaysEnforceConsistency option tells Marten to perform an optimistic concurrency check on the stream even if no events are appended. If another session has written to the stream between your FetchForWriting() and SaveChangesAsync(), Marten will throw a ConcurrencyException.
Using the property on [AggregateHandler]
You can set AlwaysEnforceConsistency = true on the [AggregateHandler] attribute:
[AggregateHandler(AlwaysEnforceConsistency = true)]
public static class MyAggregateHandler
{
public static void Handle(DoSomething command, IEventStream<MyAggregate> stream)
{
// Even if no events are appended, Marten will verify
// the stream version hasn't changed since it was fetched
}
}Using [ConsistentAggregateHandler]
For convenience, there is a [ConsistentAggregateHandler] attribute that automatically sets AlwaysEnforceConsistency = true:
[ConsistentAggregateHandler]
public static class MyAggregateHandler
{
public static void Handle(DoSomething command, IEventStream<MyAggregate> stream)
{
// AlwaysEnforceConsistency is automatically true
}
}Parameter-level usage with [ConsistentAggregate]
When using parameter-level attributes (the [Aggregate] pattern), you can use [ConsistentAggregate] instead:
public static class MyHandler
{
public static void Handle(DoSomething command,
[ConsistentAggregate] IEventStream<MyAggregate> stream)
{
// AlwaysEnforceConsistency is automatically true
}
}Or set the property directly on [Aggregate]:
public static class MyHandler
{
public static void Handle(DoSomething command,
[Aggregate(AlwaysEnforceConsistency = true)] IEventStream<MyAggregate> stream)
{
// Explicitly opt into consistency enforcement
}
}Overriding Version Discovery 5.17
By default, Wolverine discovers a version member on your command type by looking for a property or field named Version of type int or long. In multi-stream operations where each stream needs its own version source, this convention breaks down because you can't have multiple properties all named "Version".
The VersionSource property lets you explicitly specify which member supplies the expected stream version for optimistic concurrency checks.
On [AggregateHandler]
public record TransferMoney(Guid FromId, Guid ToId, decimal Amount, long FromVersion);
[AggregateHandler(VersionSource = nameof(TransferMoney.FromVersion))]
public static class TransferMoneyHandler
{
public static IEnumerable<object> Handle(TransferMoney command, Account account)
{
// FromVersion will be checked against the "from" account's stream version
yield return new Withdrawn(command.Amount);
}
}On [WriteAggregate] / [Aggregate]
This is particularly useful for multi-stream operations where each stream needs independent version tracking:
public record TransferMoney(Guid FromId, Guid ToId, decimal Amount,
long FromVersion, long ToVersion);
public static class TransferMoneyHandler
{
public static void Handle(
TransferMoney command,
[WriteAggregate(nameof(TransferMoney.FromId),
VersionSource = nameof(TransferMoney.FromVersion))]
IEventStream<Account> fromAccount,
[WriteAggregate(nameof(TransferMoney.ToId),
VersionSource = nameof(TransferMoney.ToVersion))]
IEventStream<Account> toAccount)
{
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}For HTTP endpoints, VersionSource can resolve from route arguments, query string parameters, or request body members:
[WolverinePost("/orders/{orderId}/ship/{expectedVersion}")]
[EmptyResponse]
public static OrderShipped Ship(
ShipOrder command,
[Aggregate(VersionSource = "expectedVersion")] Order order)
{
return new OrderShipped();
}Strong Typed Identifiers 5.0
If you're so inclined, you can use strong typed identifiers from tools like Vogen and StronglyTypedId within the "Aggregate Handler Workflow." You can also use hand rolled value types that wrap either Guid or string depending on your Marten event store configuration (StreamIdentity) as long as it conforms to Marten's own rules about value type identifiers.
For a message handler, let's start with this example identifier type and aggregate from the Wolverine tests:
[StronglyTypedId(Template.Guid)]
public readonly partial struct LetterId;
public class StrongLetterAggregate
{
public StrongLetterAggregate()
{
}
public LetterId Id { get; set; }
public int ACount { get; set; }
public int BCount { get; set; }
public int CCount { get; set; }
public int DCount { get; set; }
public void Apply(AEvent _) => ACount++;
public void Apply(BEvent _) => BCount++;
public void Apply(CEvent _) => CCount++;
public void Apply(DEvent _) => DCount++;
}And now let's use that identifier type in message handlers:
public record IncrementStrongA(LetterId Id);
public record AddFrom(LetterId Id1, LetterId Id2);
public record IncrementBOnBoth(LetterId Id1, LetterId Id2);
public record FetchCounts(LetterId Id);
public static class StrongLetterHandler
{
public static StrongLetterAggregate Handle(FetchCounts counts,
[ReadAggregate] StrongLetterAggregate aggregate) => aggregate;
public static AEvent Handle(IncrementStrongA command, [WriteAggregate] StrongLetterAggregate aggregate)
{
return new();
}
public static void Handle(
IncrementBOnBoth command,
[WriteAggregate(nameof(IncrementBOnBoth.Id1))] IEventStream<StrongLetterAggregate> stream1,
[WriteAggregate(nameof(IncrementBOnBoth.Id2))] IEventStream<StrongLetterAggregate> stream2
)
{
stream1.AppendOne(new BEvent());
stream2.AppendOne(new BEvent());
}
public static IEnumerable<object> Handle(
AddFrom command,
[WriteAggregate(nameof(AddFrom.Id1))] StrongLetterAggregate _,
[ReadAggregate(nameof(AddFrom.Id2))] StrongLetterAggregate readOnly)
{
for (int i = 0; i < readOnly.ACount; i++)
{
yield return new AEvent();
}
for (int i = 0; i < readOnly.BCount; i++)
{
yield return new BEvent();
}
for (int i = 0; i < readOnly.CCount; i++)
{
yield return new CEvent();
}
for (int i = 0; i < readOnly.DCount; i++)
{
yield return new DEvent();
}
}
}And also in some of the equivalent Wolverine.HTTP endpoints:
[WolverineGet("/sti/aggregate/longhand/{id}")]
public static ValueTask<StrongLetterAggregate> Handle2(LetterId id, IDocumentSession session) =>
session.Events.FetchLatest<StrongLetterAggregate>(id.Value);
// This is an equivalent to the endpoint above
[WolverineGet("/sti/aggregate/{id}")]
public static StrongLetterAggregate Handle(
[ReadAggregate] StrongLetterAggregate aggregate) => aggregate;tools do this for you, and value types generated by these tools are legal route argument variables for Wolverine.HTTP now.
Natural Keys
Marten supports natural keys on aggregates, allowing you to look up event streams by a domain-meaningful identifier (like an order number) instead of the internal stream id. Wolverine's aggregate handler workflow fully supports natural keys, letting you route commands to the correct aggregate using a business identifier.
Defining the Aggregate with a Natural Key
First, define your aggregate with a [NaturalKey] property and mark the methods that set the key with [NaturalKeySource]:
public record NkHandlerOrderNumber(string Value);
public class NkOrderAggregate
{
public Guid Id { get; set; }
[NaturalKey]
public NkHandlerOrderNumber OrderNum { get; set; } = null!;
public decimal TotalAmount { get; set; }
public string CustomerName { get; set; } = string.Empty;
public bool IsComplete { get; set; }
[NaturalKeySource]
public void Apply(NkHandlerOrderCreated e)
{
OrderNum = e.OrderNumber;
CustomerName = e.CustomerName;
}
public void Apply(NkHandlerItemAdded e)
{
TotalAmount += e.Price;
}
public void Apply(NkHandlerOrderCompleted e)
{
IsComplete = true;
}
}
public record NkHandlerOrderCreated(NkHandlerOrderNumber OrderNumber, string CustomerName);
public record NkHandlerItemAdded(string ItemName, decimal Price);
public record NkHandlerOrderCompleted;Using Natural Keys in Command Handlers
When your command carries the natural key value instead of a stream id, Wolverine can resolve it automatically. The command property should match the aggregate's natural key type:
public record AddNkOrderItem(NkHandlerOrderNumber OrderNum, string ItemName, decimal Price);
public record AddNkOrderItems(NkHandlerOrderNumber OrderNum, (string Name, decimal Price)[] Items);
public record CompleteNkOrder(NkHandlerOrderNumber OrderNum);Wolverine uses the natural key type on the command property to call FetchForWriting<TAggregate, TNaturalKey>() under the covers, resolving the stream by the natural key in a single database round-trip.
Handler Examples
Here are the handlers that process those commands, using [WriteAggregate] and IEventStream<T>:
public static class NkOrderHandler
{
public static NkHandlerItemAdded Handle(AddNkOrderItem command,
[WriteAggregate] NkOrderAggregate aggregate)
{
return new NkHandlerItemAdded(command.ItemName, command.Price);
}
public static IEnumerable<object> Handle(AddNkOrderItems command,
[WriteAggregate] NkOrderAggregate aggregate)
{
foreach (var (name, price) in command.Items)
{
yield return new NkHandlerItemAdded(name, price);
}
}
public static void Handle(CompleteNkOrder command,
[WriteAggregate] IEventStream<NkOrderAggregate> stream)
{
stream.AppendOne(new NkHandlerOrderCompleted());
}
}For more details on how natural keys work at the Marten level, see the Marten natural keys documentation.
Dynamic Consistency Boundary (DCB)
TIP
The Dynamic Consistency Boundary pattern enables event-sourced handlers to work across multiple event streams simultaneously within a single consistency boundary. This is essential for domain logic that naturally spans multiple entities.
Traditional aggregate handlers work with a single event stream at a time. But some business decisions require state from multiple streams — for example, subscribing a student to a course requires checking both the student's enrollment history and the course's capacity. The DCB pattern solves this by loading events from multiple streams based on event tags, projecting them into a single aggregate state, and appending new events atomically.
How It Works
- A
Load()orBefore()method returns anEventTagQuerythat specifies which tagged events to load - Marten loads all matching events and projects them into your aggregate type using the standard
Apply()methods - Your handler receives the projected state and makes decisions
- Returned events are appended atomically through the
IEventBoundary<T>interface
Example: University Course Subscription
This example is ported from the AxonIQ DCB demo. A student subscribing to a course must enforce rules spanning both the student and course boundaries:
- Student must be enrolled in faculty
- Student can't subscribe to more than 3 courses
- Course must exist and have vacant spots
- Student not already subscribed
First, define your domain events and strong-typed IDs:
namespace MartenTests.Dcb.University;
/// <summary>
/// Strong-typed ID for a course. Uses string value with "Course:" prefix.
/// </summary>
public record CourseId(string Value)
{
public static CourseId Random() => new($"Course:{Guid.NewGuid()}");
public static CourseId Of(string raw) => new(raw.StartsWith("Course:") ? raw : $"Course:{raw}");
public override string ToString() => Value;
}
/// <summary>
/// Strong-typed ID for a student. Uses string value with "Student:" prefix.
/// </summary>
public record StudentId(string Value)
{
public static StudentId Random() => new($"Student:{Guid.NewGuid()}");
public static StudentId Of(string raw) => new(raw.StartsWith("Student:") ? raw : $"Student:{raw}");
public override string ToString() => Value;
}
/// <summary>
/// Strong-typed ID for the faculty. Single-instance in this demo.
/// </summary>
public record FacultyId(string Value)
{
public static readonly FacultyId Default = new("Faculty:ONLY_FACULTY_ID");
public static FacultyId Of(string raw) => new(raw.StartsWith("Faculty:") ? raw : $"Faculty:{raw}");
public override string ToString() => Value;
}
/// <summary>
/// Composite ID for a student-course subscription.
/// </summary>
public record SubscriptionId(CourseId CourseId, StudentId StudentId);namespace MartenTests.Dcb.University;
public record CourseCreated(FacultyId FacultyId, CourseId CourseId, string Name, int Capacity);
public record CourseRenamed(FacultyId FacultyId, CourseId CourseId, string Name);
public record CourseCapacityChanged(FacultyId FacultyId, CourseId CourseId, int Capacity);
public record StudentEnrolledInFaculty(FacultyId FacultyId, StudentId StudentId, string FirstName, string LastName);
public record StudentSubscribedToCourse(FacultyId FacultyId, StudentId StudentId, CourseId CourseId);
public record StudentUnsubscribedFromCourse(FacultyId FacultyId, StudentId StudentId, CourseId CourseId);
public record AllCoursesFullyBookedNotificationSent(FacultyId FacultyId);Next, define the aggregate state that spans both boundaries. This single type projects events tagged with either a CourseId or StudentId:
namespace MartenTests.Dcb.University;
/// Built from events tagged with BOTH CourseId and StudentId.
/// This is the core DCB pattern — the consistency boundary spans multiple streams.
///
/// Ported from the Axon SubscribeStudentToCourseCommandHandler.State which uses
/// EventCriteria.either() to load events matching CourseId OR StudentId.
/// </summary>
public class SubscriptionState
{
public CourseId? CourseId { get; private set; }
public int CourseCapacity { get; private set; }
public int StudentsSubscribedToCourse { get; private set; }
public StudentId? StudentId { get; private set; }
public int CoursesStudentSubscribed { get; private set; }
public bool AlreadySubscribed { get; private set; }
public void Apply(CourseCreated e)
{
CourseId = e.CourseId;
CourseCapacity = e.Capacity;
}
public void Apply(CourseCapacityChanged e)
{
CourseCapacity = e.Capacity;
}
public void Apply(StudentEnrolledInFaculty e)
{
StudentId = e.StudentId;
}
public void Apply(StudentSubscribedToCourse e)
{
if (e.CourseId == CourseId)
StudentsSubscribedToCourse++;
if (e.StudentId == StudentId)
CoursesStudentSubscribed++;
if (e.StudentId == StudentId && e.CourseId == CourseId)
AlreadySubscribed = true;
}
public void Apply(StudentUnsubscribedFromCourse e)
{
if (e.CourseId == CourseId)
StudentsSubscribedToCourse--;
if (e.StudentId == StudentId)
CoursesStudentSubscribed--;
if (e.StudentId == StudentId && e.CourseId == CourseId)
AlreadySubscribed = false;
}
}Using the [BoundaryModel] Attribute
The [BoundaryModel] attribute on a handler parameter triggers the DCB workflow. Your handler class must include a Load() (or LoadAsync(), Before(), BeforeAsync()) method that returns an EventTagQuery:
public static class BoundaryModelSubscribeStudentHandler
{
public const int MaxCoursesPerStudent = 3;
public static EventTagQuery Load(BoundaryModelSubscribeStudentToCourse command)
=> EventTagQuery
.For(command.CourseId)
.AndEventsOfType<CourseCreated, CourseCapacityChanged, StudentSubscribedToCourse, StudentUnsubscribedFromCourse>()
.Or(command.StudentId)
.AndEventsOfType<StudentEnrolledInFaculty, StudentSubscribedToCourse, StudentUnsubscribedFromCourse>();
public static StudentSubscribedToCourse Handle(
BoundaryModelSubscribeStudentToCourse command,
[BoundaryModel]
SubscriptionState state)
{
if (state.StudentId == null)
throw new InvalidOperationException("Student with given id never enrolled the faculty");
if (state.CoursesStudentSubscribed >= MaxCoursesPerStudent)
throw new InvalidOperationException("Student subscribed to too many courses");
if (state.CourseId == null)
throw new InvalidOperationException("Course with given id does not exist");
if (state.StudentsSubscribedToCourse >= state.CourseCapacity)
throw new InvalidOperationException("Course is fully booked");
if (state.AlreadySubscribed)
throw new InvalidOperationException("Student already subscribed to this course");
return new StudentSubscribedToCourse(FacultyId.Default, command.StudentId, command.CourseId);
}
}The EventTagQuery uses a fluent API to define which events to load:
EventTagQuery.For(tag)— start with a tag value (e.g., aCourseId).AndEventsOfType<T1, T2, ...>()— filter to specific event types for that tag.Or(tag)— add another tag to query (e.g., aStudentId)
Marten loads all events matching any of the tag criteria, projects them into your aggregate using the standard Apply() methods, and provides the result to your handler.
Using IEventBoundary<T> Directly
For more control over event appending, you can accept IEventBoundary<T> as a parameter instead of the aggregate type:
public static void Handle(
SubscribeStudentToCourse command,
[BoundaryModel] IEventBoundary<SubscriptionState> boundary)
{
var state = boundary.Aggregate;
// validation logic...
boundary.AppendOne(new StudentSubscribedToCourse(
FacultyId.Default, command.StudentId, command.CourseId));
}Return Value Handling
The DCB workflow supports the same return value patterns as the standard aggregate handler workflow:
- Single event objects are appended via
boundary.AppendOne() IEnumerable<object>orEventscollections are appended viaboundary.AppendMany()IAsyncEnumerable<object>events are appended one at a timeOutgoingMessagesandISideEffectare handled as cascading messages, not events
Validation on Boundary Existence
Use the Required property to enforce that the projected aggregate state is not null:
public static StudentSubscribedToCourse Handle(
SubscribeStudentToCourse command,
[BoundaryModel(Required = true)] SubscriptionState state)
{
// state is guaranteed to be non-null
// ...
}
