Skip to content

Using MQTT

WARNING

Wolverine requires the V5 version of MQTT for its broker support

The Wolverine 1.9 release added a new transport option for the MQTT standard common in IoT Messaging.

Installing

To use MQTT as a transport with Wolverine, first install the Wolverine.MQTT library via nuget to your project. Behind the scenes, this package uses the MQTTnet managed library for accessing MQTT brokers and also for its own testing.

bash
dotnet add WolverineFx.Mqtt

In its most simplistic usage you enable the MQTT transport through calling the WolverineOptions.UseMqtt() extension method and defining which MQTT topics you want to publish or subscribe to with the normal subscriber rules as shown in this sample:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // Connect to the MQTT broker
        opts.UseMqtt(builder =>
        {
            var mqttServer = context.Configuration["mqtt_server"];

            builder
                .WithMaxPendingMessages(3)
                .WithClientOptions(client =>
                {
                    client.WithTcpServer(mqttServer);
                });
        });

        // Listen to an MQTT topic, and this could also be a wildcard
        // pattern
        opts.ListenToMqttTopic("app/incoming")
            // In the case of receiving JSON data, but
            // not identifying metadata, tell Wolverine
            // to assume the incoming message is this type
            .DefaultIncomingMessage<Message1>()
            
            
            // The default is AtLeastOnce
            .QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);

        // Publish messages to an outbound topic
        opts.PublishAllMessages()
            .ToMqttTopic("app/outgoing");
    })
    .StartAsync();

snippet source | anchor

INFO

The MQTT transport at this time only supports endpoints that are either Buffered or Durable.

WARNING

The MQTT transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes effectively an inline "Retry"

Broadcast to User Defined Topics

As long as the MQTT transport is enabled in your application, you can explicitly publish messages to any named topic through this usage:

cs
public static async Task broadcast(IMessageBus bus)
{
    var paymentMade = new PaymentMade(200, "EUR");
    await bus.BroadcastToTopicAsync("region/europe/incoming", paymentMade);
}

snippet source | anchor

Publishing to Derived Topic Names

INFO

The Wolverine is open to extending the options for determining the topic name from the message type, but is waiting for feedback from the community before trying to build anything else around this.

As a way of routing messages to MQTT topics, you also have this option:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // Connect to the MQTT broker
        opts.UseMqtt(builder =>
        {
            var mqttServer = context.Configuration["mqtt_server"];

            builder
                .WithMaxPendingMessages(3)
                .WithClientOptions(client =>
                {
                    client.WithTcpServer(mqttServer);
                });
        });

        // Publish messages to MQTT topics based on
        // the message type
        opts.PublishAllMessages()
            .ToMqttTopics()
            .QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
    })
    .StartAsync();

snippet source | anchor

In this approach, all messages will be routed to MQTT topics. The topic name for each message type would be derived from either Wolverine's message type name rules or by using the [Topic("topic name")] attribute as shown below:

cs
[Topic("one")]
public class TopicMessage1
{
}

snippet source | anchor

Publishing by Topic Rules

You can publish messages to MQTT topics based on user defined logic to determine the actual topic name.

As an example, say you have a marker interfaces for your messages like this:

cs
public interface ITenantMessage
{
    string TenantId { get; }
}

snippet source | anchor

To publish any message implementing that interface to an MQTT topic, you could specify the topic name logic like this:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // Connect to the MQTT broker
        opts.UseMqtt(builder =>
        {
            var mqttServer = context.Configuration["mqtt_server"];

            builder
                .WithMaxPendingMessages(3)
                .WithClientOptions(client =>
                {
                    client.WithTcpServer(mqttServer);
                });
        });

        // Publish any message that implements ITenantMessage to 
        // MQTT with a topic derived from the message
        opts.PublishMessagesToMqttTopic<ITenantMessage>(m => $"{m.GetType().Name.ToLower()}/{m.TenantId}")
            
            // Specify or configure sending through Wolverine for all
            // MQTT topic broadcasting
            .QualityOfService(MqttQualityOfServiceLevel.ExactlyOnce)
            .BufferedInMemory();
    })
    .StartAsync();

snippet source | anchor

Listening by Topic Filter

Wolverine supports topic filters for listening. The syntax is still just the same ListenToMqttTopic(filter) as shown in this snippet from the Wolverine.MQTT test suite:

cs
_receiver = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseMqttWithLocalBroker(port);
        opts.ListenToMqttTopic("incoming/#").RetainMessages();
    }).StartAsync();

snippet source | anchor

In the case of receiving any message that matches the topic filter according to the MQTT topic filter rules, that message will be handled by the listening endpoint defined for that filter.

Integrating with Non-Wolverine

It's quite likely that in using Wolverine with an MQTT broker that you will be communicating with non-Wolverine systems or devices on the other end, so you can't depend on the Wolverine metadata being sent in MQTT UserProperties data. Not to worry, you've got options.

In the case of the external system sending you JSON, but nothing else, if you can design the system such that there's only one type of message coming into a certain MQTT topic, you can just tell Wolverine to listen for that topic and what that message type would be so that Wolverine is able to deserialize the message and relay that to the correct message handler like so:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // Connect to the MQTT broker
        opts.UseMqtt(builder =>
        {
            var mqttServer = context.Configuration["mqtt_server"];

            builder
                .WithMaxPendingMessages(3)
                .WithClientOptions(client =>
                {
                    client.WithTcpServer(mqttServer);
                });
        });

        // Listen to an MQTT topic, and this could also be a wildcard
        // pattern
        opts.ListenToMqttTopic("app/payments/made")
            // In the case of receiving JSON data, but
            // not identifying metadata, tell Wolverine
            // to assume the incoming message is this type
            .DefaultIncomingMessage<PaymentMade>();
    })
    .StartAsync();

snippet source | anchor

For more complex interoperability, you can implement the IMqttEnvelopeMapper interface in Wolverine to map between incoming and outgoing MQTT messages and the Wolverine Envelope structure. Here's an example:

cs
public class MyMqttEnvelopeMapper : IMqttEnvelopeMapper
{
    public void MapEnvelopeToOutgoing(Envelope envelope, MqttApplicationMessage outgoing)
    {
        // This is the only absolutely mandatory item
        outgoing.PayloadSegment = envelope.Data;
        
        // Maybe enrich this more?
        outgoing.ContentType = envelope.ContentType;
    }

    public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage incoming)
    {
        // These are the absolute minimums necessary for Wolverine to function
        envelope.MessageType = typeof(PaymentMade).ToMessageTypeName();
        envelope.Data = incoming.PayloadSegment.Array;
        
        // Optional items
        envelope.DeliverWithin = 5.Seconds(); // throw away the message if it 
        // is not successfully processed
        // within 5 seconds
    }

    public IEnumerable<string> AllHeaders()
    {
        yield break;
    }
}

snippet source | anchor

And apply that to an MQTT topic like so:

cs
using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // Connect to the MQTT broker
        opts.UseMqtt(builder =>
        {
            var mqttServer = context.Configuration["mqtt_server"];

            builder
                .WithMaxPendingMessages(3)
                .WithClientOptions(client =>
                {
                    client.WithTcpServer(mqttServer);
                });
        });

        // Publish messages to MQTT topics based on
        // the message type
        opts.PublishAllMessages()
            .ToMqttTopics()
            
            // Tell Wolverine to map envelopes to MQTT messages
            // with our custom strategy
            .UseInterop(new MyMqttEnvelopeMapper())
            
            .QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
    })
    .StartAsync();

snippet source | anchor

Clearing Out Retained Messages

MQTT brokers allow you to publish retained messages to a topic, meaning that the last message will always be retained by the broker and sent to any new subscribers. That's a little bit problematic if your Wolverine application happens to be restarted, because that last retained message may easily be resent to your Wolverine application when you restart.

Not to fear, the MQTT protocol allows you to "clear" out a topic by sending it a zero byte message, and Wolverine has a couple shortcuts for doing just that by returning a cascading message to "zero out" the topic a message was received on or a named topic like this:

cs
public static AckMqttTopic Handle(ZeroMessage message)
{
    // "Zero out" the topic that the original message was received from
    return new AckMqttTopic();
}

public static ClearMqttTopic Handle(TriggerZero message)
{
    // "Zero out" the designated topic
    return new ClearMqttTopic("red");
}

snippet source | anchor

Released under the MIT License.