Running HiveMQ MQTT broker on Azure IoT Edge

While working with IoT in general and Azure IoT Edge more specifically, you will always encounter multiple kinds of communication protocols, both in the cloud and on the edge.

In the past, I have posted multiple blogs regarding protocols, seen on the edge like UDP, Modbus RTU, Serial communication, and OPC-UA.

One of the most popular protocols in the world of IoT is MQTT:

MQTT (originally an initialism of MQ Telemetry Transport) is a lightweight, publish-subscribe, machine to machine network protocol

https://en.wikipedia.org/wiki/MQTT

So, it is time to dive into this protocol by running an MQTT broker on the edge and consuming it:

Yes, we are going to deploy a regular MQTT broker as a docker container within Azure IoT Edge (which uses Moby under the covers). Then, we bridge messages sent to MQTT topics to the cloud using Azure IoT Hub.

For this exercise, I have chosen HiveMQ as a broker.

Let’s see how this works.

Running HiveMQ as a Docker container

There are many MQTT brokers to choose from, both open source and commercially available.

To name a few of them: Mosquitto, Mosca, EMQ, or RabbitMQ.

I also came across HiveMQ while searching for an MQTT broker capable of running as a Docker container.

HiveMQ is advertised on their website with running a single node using this simple line:

docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4:latest

Note: Neither am I related to HiveMQ nor do I get any compensation for you using HiveMQ. This simple Docker support was the reason to check it out.

The broker itself will run on port 1883 and some ‘control center’ should be exposed on port 8080.

This seems to be a simple job, converting this to Azure IoT Edge.

So we use that information to deploy the HiveMQ as a module.

Here, I use the Azure portal to demonstrate this. First, we give our module a proper name and a reference to the image URI:

Then, we fill in the port mapping needed, using the Container Create Options:

The full Container Create Options JSON look like this:

{
    "ExposedPorts": {
        "1883/tcp": {},
        "8080/tcp": {}
    },
    "HostConfig": {
        "PortBindings": {
            "1883/tcp": [
                {
                    "HostPort": "1883"
                }
            ],
            "8080/tcp": [
                {
                    "HostPort": "8080"
                }
            ]
        }
    }
}

Note: This is the minimal viable HiveMQ support. You are free to add more settings for eg. security, persistence, etc.

Once we deploy this on the edge, it runs as expected:

Using ‘sudo docker ps’, we get a good look at the ports made available:

If we try to browse to the ‘control center’ (by combining the edge IP address and port 8080), we need to fill in the credentials:

The default credentials can be found on the same website so I fill in ‘admin : hivemq’.

Note: you should override these credentials using environment variables which are recommended in every situation other than demonstrating this solution.

Note: the default control center access is not secure so use this only in a controlled (test) environment.

This control center provides you with easy access to everything that is happening inside your MQTT Broker.

It is also possible to deploy a license to HiveMQ. Here we run the trial license, good for 25 simultaneous connections:

Note: I tried to find out what other limitations this trial license has. I did only find that limitation of 25 connections. HiveMQ is available as open source. Contact HiveMQ for their license propositions. If you know anything about other restrictions, please leave a note in the comments.

We now see some information about the node we run line CPU usage, message traffic, etc.:

I really like this control center. It gives me a lot of insights into what is happening at the broker backend.

So, we have deployed a working MQTT broker on Azure IoT Edge:

But, we have only deployed that single genuine Docker container on Azure IoT Edge:

Let’s send some MQTT messages by adding custom containers.

Producing MQTT messages and sending them to the broker

Connecting to the MQTT broker can be done with any programming language supporting some MQTT client library.

As long as you know the IP address of the edge device in the local network, you can connect with the HiveMQ broker using the configured host port (I kept it to 1883).

To raise the stakes, I created these docker containers, acting as an MQTT message producer and listener.

This way, I can deploy them on docker next to the HiveMQ container.

Note: I am NOT using any internal docker network to connect containers. Each piece of logic acts as a separate piece of logic living on the local network.

For the producer, I used the regular Azure IoT Edge C# container template although I do not actually use any of the IoT Hub module client logic in this demonstration. This makes it possible for future improvements like using Desired properties for adding parameters instead of hardcoded values.

Note: I used the same MQTTnet logic as seen in my previous blog post.

The producer code looks like this:

static async Task Init()
{
    MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
    ITransportSettings[] settings = { mqttSetting };

    // Open a connection to the Edge runtime
    ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
    await ioTHubModuleClient.OpenAsync();
    Console.WriteLine("IoT Hub module client initialized.");

    try
    {
        // connect
        mqttFactory = new MqttFactory();
        mqttClient = mqttFactory.CreateMqttClient();
        mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer([edge device IP address], 1883)
            .WithClientId("producer")
            .Build();
        mqttClient.DisconnectedAsync += OnDisconnected;
        await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
        Console.WriteLine("Client connected");

        var thread = new Thread(() => ThreadBody());
        thread.Start();
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }
}

private static IMqttClient mqttClient = null;
private static MqttFactory mqttFactory = null;
private static MqttClientOptions mqttClientOptions = null;
private static string send_message_topic = "producer/telemetry";

private static async void ThreadBody()
{
    while(true)
    {
        try
        {
            var message = new MqttApplicationMessageBuilder()
            .WithTopic(send_message_topic)
            .WithPayload(Encoding.UTF8.GetBytes("Message sent by producer"))
            .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
            .Build();

            await mqttClient.PublishAsync(message);
            Console.WriteLine($"Message sent at {DateTime.UtcNow.ToString("u")}");
        }
        catch
        {
            System.Console.WriteLine("Client not found. Message skipped.");
        }

        Thread.Sleep(5000);
    }
}

private static async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
    System.Console.WriteLine("Disconnect detected. Reconnect in 30 seconds...");
    await Task.Delay(TimeSpan.FromSeconds(30));
    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
    System.Console.WriteLine("Reconnected.");
}

Note: The full sample source code of the modules is made available with an MIT license on GitHub.

First, I connect an MQTT client to the MQTT broker using the public IP address of the edge device.

Then, using an infinite while loop, messages are sent every five seconds referencing the specific topic.

Finally, using an event handler registered to any MQTT client disconnect event, I try to reconnect after a short time-out. This way, our logic survives a restart of the MQTT broker module.

The flow of the producer module looks like this when the MQTT broker is restarted:

In the HiveMQ Control Center, the producer connection is seen (1 connection):

We see the registration of the client being named:

We can even trace the incoming messages over the ‘producer/telemetry’ topic:

This results in a downloadable file:

The body of each message is not disclosed but we clearly see the incoming messages being sent every five seconds.

At this moment we have the producer module running next to the MQTT broker:

This is a good simulation when you will eventually consume incoming telemetry from some device in your local network but the device is not available yet.

In that case, try to get a recording of MQTT messages coming from the device or just generate some dummy values honoring the expected message format. Then, you have created a simulation of that device.

Listening to incoming MQTT messages and sending them to the cloud

What is the purpose of sending messages if nobody is listening to them?

Let’s add a listener module, able to receive MQTT messages. Each message received is then sent to the cloud using regular inter-module routing on Azure IoT Edge.

This brings us to this module architecture on the edge:

The source code looks a lot like that of the producer module:

static async Task Init()
{
    MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
    ITransportSettings[] settings = { mqttSetting };

    // Open a connection to the Edge runtime
    ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
    await ioTHubModuleClient.OpenAsync();
    Console.WriteLine("IoT Hub module client initialized.");

    try
    {
        // connect
        mqttFactory = new MqttFactory();

        mqttClient = mqttFactory.CreateMqttClient();

        mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer([edge device IP address], 1883)
            .WithClientId("listener")
            .Build();

        mqttClient.DisconnectedAsync += OnDisconnected;

        mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;

        await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

        Console.WriteLine("MQTT Client connected");

        //// subscribe for MQTT incoming messages
        var mqttSubscribeOptionsdirectmethod = mqttFactory.CreateSubscribeOptionsBuilder()
            .WithTopicFilter(f => { f.WithTopic(subscribe_topic_filter); })
            .Build();
        await mqttClient.SubscribeAsync(mqttSubscribeOptionsdirectmethod, CancellationToken.None);

        System.Console.WriteLine($"Subscribed for topic '{subscribe_topic_filter}'");

        while (true)
        {
            Thread.Sleep(1000);
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }

    System.Console.WriteLine("Exiting...");
}

private static ModuleClient ioTHubModuleClient = null;
private static IMqttClient mqttClient = null;
private static MqttFactory mqttFactory = null;
private static MqttClientOptions mqttClientOptions = null;
public static string subscribe_topic_filter => "producer/telemetry/#";

private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
    if (args.ApplicationMessage.Topic.StartsWith(subscribe_topic_filter.Replace("/#", "")))
    {
        var cloudMessage = new CloudMessage(subscribe_topic_filter.Split('/')[0], DateTime.UtcNow, args.ApplicationMessage.ConvertPayloadToString());
        var jsonMessage = JsonConvert.SerializeObject(cloudMessage);
        var messageBytes = Encoding.UTF8.GetBytes(jsonMessage);

        using (var pipeMessage = new Message(messageBytes))
        {
            ioTHubModuleClient.SendEventAsync("output1", pipeMessage).Wait();
        
            Console.WriteLine($"Message '{jsonMessage}' sent");
        }
    }
    else
    {
        System.Console.WriteLine($"Unknown message '{args.ApplicationMessage.ConvertPayloadToString()}' on topic '{args.ApplicationMessage.Topic}'");
    }

    return Task.CompletedTask;
}

private static async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
    System.Console.WriteLine("Disconnect detected. Reconnect in 30 seconds...");
    await Task.Delay(TimeSpan.FromSeconds(30));
    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
    System.Console.WriteLine("Reconnected.");
}

Note: The full sample source code of the modules is made available with an MIT license on GitHub.

Again, an MQTT client is connected to the MQTT broker using the public IP address of the edge device.

Also, using an event handler registered to any MQTT client disconnect event, I try to reconnect after a short time-out. This way, our logic survives a restart of the MQTT broker module.

We then subscribe to the same topic the producer uses to send messages.

If a message arrives, it is compared with the expected topic. Valid messages are then sent, formatted to JSON, to the cloud over output ‘output1’.

So, a routing line is added to the deployment manifest:

"routes": {
  "listener2upstream": {
    "route": "FROM /messages/modules/listener/outputs/output1 into $upstream"
  }
}

Once the module is deployed, MQTT messages are received and sent to the cloud:

Using the Azure IoT Explorer, we see how the producer MQTT messages are arriving in the IoT Hub as telemetry:

Conclusion

This is a working demonstration of hosting a generic MQTT broker on Azure IoT Edge.

Now, you can attach any device on the local network, capable of sending telemetry over MQTT. You only need to adjust the topics (and perhaps do something smart with the incoming messages before or after you send them to the cloud).

The same construction can also be used to communicate back to the MQTT supporting device. Just register a second topic and let the device (or producer module) register for it. Then, send data coming from eg. listener module twin desired properties or listener module direct methods over this second topic to the MQTT device or producer module.

Here, both a producer and a listener are also deployed as IoT Edge modules so this makes a perfect solution for simulating end devices with full zero-touch control from the cloud.

Note: Keep in mind, that there is still some work to be done before you even think of putting this into some production environment. Think about eg. securing the communication with credentials and TLS or adding support for desired properties.

The full sample source code of the modules is made available with an MIT license on GitHub.