Skip to content

Aggregate Handlers and Event Sourcing

TIP

You can forgo the [AggregateHandler] attribute by instead naming your message handler type with the AggregateHandler suffix if the Wolverine/Marten integration is applied to your application. Do note that you will still have to use the attribute to opt into exclusive write locking.

See the OrderEventSourcingSample project on GitHub for more samples.

That Wolverine + Marten combination is optimized for efficient and productive development using a CQRS architecture style with Marten's event sourcing support. Specifically, let's dive into the responsibilities of a typical command handler in a CQRS with event sourcing architecture:

  1. Fetch any current state of the system that's necessary to evaluate or validate the incoming event
  2. Decide what events should be emitted and captured in response to an incoming event
  3. Manage concurrent access to system state
  4. Safely commit the new events
  5. Selectively publish some of the events based on system needs to other parts of your system or even external systems
  6. Instrument all of the above

And then lastly, you're going to want some resiliency and selective retry capabilities for concurrent access violations or just normal infrastructure hiccups.

Let's just right into an example order management system. I'm going to model the order workflow with this aggregate model:

cs
public class Item
{
    public string Name { get; set; }
    public bool Ready { get; set; }
}

public class Order
{
    public Order(OrderCreated created)
    {
        foreach (var item in created.Items) Items[item.Name] = item;
    }

    // This would be the stream id
    public Guid Id { get; set; }

    // This is important, by Marten convention this would
    // be the
    public int Version { get; set; }

    public DateTimeOffset? Shipped { get; private set; }

    public Dictionary<string, Item> Items { get; set; } = new();

    // These methods are used by Marten to update the aggregate
    // from the raw events
    public void Apply(IEvent<OrderShipped> shipped)
    {
        Shipped = shipped.Timestamp;
    }

    public void Apply(ItemReady ready)
    {
        Items[ready.Name].Ready = true;
    }

    public bool IsReadyToShip()
    {
        return Shipped == null && Items.Values.All(x => x.Ready);
    }
}

snippet source | anchor

At a minimum, we're going to want a command handler for this command message that marks an order item as ready to ship and then evaluates whether or not based on the current state of the Order aggregate whether or not the logical order is ready to be shipped out:

cs
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);

snippet source | anchor

In the code above, we're also utilizing Wolverine's outbox messaging support to both order and guarantee the delivery of a ShipOrder message when the Marten transaction

Before getting into Wolverine middleware strategies, let's first build out an MVC controller method for the command above:

cs
[HttpPost("/orders/itemready")]
public async Task Post(
    [FromBody] MarkItemReady command,
    [FromServices] IDocumentSession session,
    [FromServices] IMartenOutbox outbox
)
{
    // This is important!
    outbox.Enroll(session);

    // Fetch the current value of the Order aggregate
    var stream = await session
        .Events

        // We're also opting into Marten optimistic concurrency checks here
        .FetchForWriting<Order>(command.OrderId, command.Version);

    var order = stream.Aggregate;

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        item.Ready = true;

        // Mark that the this item is ready
        stream.AppendOne(new ItemReady(command.ItemName));
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        // Publish a cascading command to do whatever it takes
        // to actually ship the order
        // Note that because the context here is enrolled in a Wolverine
        // outbox, the message is registered, but not "released" to
        // be sent out until SaveChangesAsync() is called down below
        await outbox.PublishAsync(new ShipOrder(command.OrderId));
        stream.AppendOne(new OrderReady());
    }

    // This will also persist and flush out any outgoing messages
    // registered into the context outbox
    await session.SaveChangesAsync();
}

snippet source | anchor

Hopefully, that code is easy to understand, but there's some potentially repetitive code (loading aggregates, appending events, committing transactions) that will reoccur across all your command handlers. Likewise, it would be best to completely isolate your business logic that decides what new events should be appended completely away from the infrastructure code so that you can more easily reason about that code and easily test that business logic. To that end, Wolverine supports the Decider pattern with Marten using the [AggregateHandler] middleware. Using that middleware, we get this slimmer code:

cs
[AggregateHandler]
public static IEnumerable<object> Handle(MarkItemReady command, Order order)
{
    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Not doing this in a purist way here, but just
        // trying to illustrate the Wolverine mechanics
        item.Ready = true;

        // Mark that the this item is ready
        yield return new ItemReady(command.ItemName);
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        yield return new OrderReady();
    }
}

snippet source | anchor

In the case above, Wolverine is wrapping middleware around our basic command handler to to:

  1. Fetch the appropriate Order aggregate matching the command
  2. Append any new events returned from the handle method to the Marten event stream for this Order
  3. Saves any outstanding changes and commits the Marten unit of work

To make this more clear, here's the generated code (with some reformatting and extra comments):

cs
public class MarkItemReadyHandler1442193977 : MessageHandler
{
    private readonly OutboxedSessionFactory _outboxedSessionFactory;

    public MarkItemReadyHandler1442193977(OutboxedSessionFactory outboxedSessionFactory)
    {
        _outboxedSessionFactory = outboxedSessionFactory;
    }

    public override async Task HandleAsync(MessageContext context, CancellationToken cancellation)
    {
        var markItemReady = (MarkItemReady)context.Envelope.Message;
        await using var documentSession = _outboxedSessionFactory.OpenSession(context);
        var eventStore = documentSession.Events;
        // Loading Marten aggregate
        var eventStream = await eventStore.FetchForWriting<Order>(markItemReady.OrderId, markItemReady.Version, cancellation).ConfigureAwait(false);

        var outgoing1 = MarkItemReadyHandler.Handle(markItemReady, eventStream.Aggregate);
        if (outgoing1 != null)
        {
            // Capturing any possible events returned from the command handlers
            eventStream.AppendMany(outgoing1);

        }

        await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
    }

}

As you probably guessed, there are some naming conventions or other questions you need to be aware of before you use this middleware strategy.

Handler Method Signatures

The Marten workflow command handler method signature needs to follow these rules:

  • Either explicitly use the [AggregateHandler] attribute on the handler method or use the AggregateHandler suffix on the message handler type to tell Wolverine to opt into the aggregate command workflow.
  • The first argument should be the command type, just like any other Wolverine message handler
  • The 2nd argument should be the aggregate -- either the aggregate itself (Order) or wrapped in the Marten IEventStream<T> type (IEventStream<Order>). There is an example of that usage below:

cs
[AggregateHandler]
public static void Handle(OrderEventSourcingSample.MarkItemReady command, IEventStream<Order> stream)
{
    var order = stream.Aggregate;

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Not doing this in a purist way here, but just
        // trying to illustrate the Wolverine mechanics
        item.Ready = true;

        // Mark that the this item is ready
        stream.AppendOne(new ItemReady(command.ItemName));
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        stream.AppendOne(new OrderReady());
    }
}

snippet source | anchor

Just as in other Wolverine message handlers, you can use additional method arguments for registered services ("method injection"), the CancellationToken for the message, and the message Envelope if you need access to message metadata.

As for the return values from these handler methods, you can use:

  • It's legal to have no return values if you are directly using IEventStream<T> to append events
  • IEnumerable<object> or object[] to denote that a value is events to append to the current event stream
  • IAsyncEnumerable<object will also be treated as a variable enumerable to events to append to the current event stream
  • Wolverine.Events to denote a list of events. You may find this to lead to more readable code in some cases
  • OutgoingMessages to refer to additional command messages to be published that should not be captured as events
  • ISideEffect objects
  • Any other type would be considered to be a separate event type, and you may happily use that for either a single event or a tuple of separate events that will be appended to the event stream

Here's an alternative to the MarkItemReady handler that uses Events:

cs
[AggregateHandler]
public static async Task<(Events, OutgoingMessages)> HandleAsync(MarkItemReady command, Order order, ISomeService service)
{
    // All contrived, let's say we need to call some
    // kind of service to get data so this handler has to be
    // async
    var data = await service.FindDataAsync();

    var messages = new OutgoingMessages();
    var events = new Events();

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Not doing this in a purist way here, but just
        // trying to illustrate the Wolverine mechanics
        item.Ready = true;

        // Mark that the this item is ready
        events += new ItemReady(command.ItemName);
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        events += new OrderReady();
        messages.Add(new ShipOrder(order.Id));
    }

    // This results in both new events being captured
    // and potentially the ShipOrder message going out
    return (events, messages);
}

snippet source | anchor

Determining the Aggregate Identity

Wolverine is trying to determine a public member on the command type that refers to the identity of the aggregate type. You've got two options, either use the implied naming convention below where the OrderId property is assumed to be the identity of the Order aggregate by appending "Id" to the aggregate type name (it's not case sensitive if you were wondering):

cs
// OrderId refers to the identity of the Order aggregate
public record MarkItemReady(Guid OrderId, string ItemName, int Version);

snippet source | anchor

Or if you want to use a different member, bypass the convention, or just don't like conventional magic, you can decorate a public member on the command class with Marten's [Identity] attribute like so:

cs
public class MarkItemReady
{
    // This attribute tells Wolverine that this property will refer to the
    // Order aggregate
    [Identity] public Guid Id { get; init; }

    public string ItemName { get; init; }
}

snippet source | anchor

Forwarding Events

See Event Forwarding for more information.

Released under the MIT License.