Decompressing Azure IoT messages using Azure Stream Analytics

Azure IoT Edge messages are bound to a maximum size limitation of 256KB. Each message sent is divided into chunks of 4KB. The metering of an IoT Hub is based on these chunks.

So, if a message size of 10KB, it is counted as three separate messages.

Note: the chunk size of the IoT Hub free tier is smaller, just 0.5KB.

In a recent project, we were sending messages of more than 70KB. That means almost twenty chunks or even more in additional cases.

This is technically just fine but we were not feeling comfortable about this:

  • This means going rapidly through the IoTHub quotum
  • Messages were sent over a metered network

So this could become quite some investment in traffic and chunks.

I checked out if this is possible to limit this message size, there are several solutions.

But first, let’s see how to start compressing messages.

Sending and Azure IoT Edge message from an IoT Edge module looks like this (in C#):

var jsonMessage = JsonConvert.SerializeObject(messageBodyObject);

using (var message = new Message(Encoding.UTF8.GetBytes(jsonMessage)))
{ 
	message.ContentEncoding = "utf-8";
	message.ContentType = "application/json";

	await _ioTHubModuleClient.SendEventAsync(Name, message);
}

An object is first serialized into a JSON string. This string is then changed into a byte array to preserve the content generically.

The Content-encoding and Content-type explain to the logic receiving the message, how to deserialize the inner content. In this case, the byte array represents a JSON object.

So compression could be done at two levels: compressing that string or compressing that byte array.

Finding the right kind of compression

In a previous blog post, I already checked out zipping a string into another (compressed) string. That logic was based on a mechanism used by zip files.

Although this looked fine, this gave me a challenge. This becomes clear after we have seen how to test this.

This is the ideal test I came up with:

The steps I want to perform are:

  1. Injecting a JSON message on the edge into the local routing mechanism using a direct method on a separate custom module, so I’m in control of the JSON message ready to be compressed (on the Inject Message Module)
  2. The actual compression logic test transforms the incoming JSON message into some compressed message (on the Zip Test Module)
  3. I test the compressed message locally using the Echo module which is upgraded so it can decompress routed messages (both GZip and Deflate) and tell something about the message size before and after the decompression (in the Echo Module)
  4. The compressed message arrives at the IoTHub
  5. The compressed message is picked up by the Azure Stream Analytics job using the IoT Hub input source
  6. The message should be decompressed and available in the query
  7. The EventHub shows output based on the decompressed messages

Other services picking up the compressed events could be Azure Functions, Storage Accounts, TimeSeries Insights, etc.

We are quite lucky with Azure Stream Analytics, which supports decompression out-of-the-box.

Check out the IoTHub input:

The compressions algorithms are based on GZip and Deflate.

This is not that algorithm I had already figured out!

This original compression mechanism is not sufficient.

So I updated the ZipHelperLib Nuget package I wrote back in 2017 (for that first compression solution).

Now, it also supports compression and decompression using GZip and Deflate.

I based it on compressing the byte array. This takes away the burden of actual encoding the text.

Let’s see if this works well together with the Stream Analytics job.

Compressing the message

For this blog post, I wrote these two new modules for both injecting a message and compressing a message.

The code is available on GitHub.

Injection

I insert a JSON message using a Direct Method and send it to an output. Check out the code for more details.

Compression

Compressing an incoming, locally routed JSON message is pretty straight-forward:

byte[] messageBytes = message.GetBytes(); // incoming routed message
string messageString = Encoding.UTF8.GetString(messageBytes); // JSON 

var zippedMessageBytes = UseGZip 
                            ? GZipHelper.Zip(messageBytes)
                            : DeflateHelper.Zip(messageBytes);                

using (var pipeMessage = new Message(zippedMessageBytes))
{
    if (UseGZip)
    {
        pipeMessage.ContentEncoding = "gzip";
    }
    else
    {
        pipeMessage.ContentEncoding = "deflate";
    }

    pipeMessage.ContentType = "application/zip";

    await moduleClient.SendEventAsync("output1", pipeMessage);
}

The incoming message is encoded using UTF-8 into a byte array.

That byte array is then compressed using either GZip or Deflate.

This new, compressed, byte array is then sent to the module output.

Notice that the new message has a different ContentEncoding and ContentType, depending on the compression type used. I follow the header guidelines. So the receiver should know the byte array represents a compressed message. What that decompressed message means is up to the user…

Adding the encoding and type makes it possible to select the right decompression later on. I use this in the Echo module.

Testing compression with the Echo module

I tested the solution with some test messages of considerable sizes.

The test JSON message I use is taken from this Open data source.

I have my modules and routes deployed on an edge device and the compression is set to ‘deflate’:

Injecting that message is easy using the InjectMessage module:

I am notified the message is sent once the JSON message is accepted:

So, the message should be picked up and compressed by the second module.

That module shows how the message is picked up and compressed:

The message size is reduced to only a fifth of the original size.

I added that third module, the echo module which is capable of decompressing if needed.

This echo module shows how the incoming compressed message is decompressed:

This proves we are able to reduce the size of the messages in a secure way!

How about the decompression in Stream Analytics?

The same compressed message coming from the ZipTest module is also sent upstream, to the IoTHub:

The IoTHub message is routing incoming (these compressed) messages to Azure Stream Analytics using the built-in endpoint:

We are using the deflate event decompression type (which is on par with the module settings):

Note: The other input source, for Event Hubs, supports the same decompression!

Now, check out the raw sampling of the incoming message:

Yes, that same message is completely decompressed, ingested, and ready for you to query in the Stream Analytics job.

Notice that we are forced to set the right event decompression type. The Stream Analytics input is not able to decide which type is used for the message. It seems to ignore the encoding and type?

Azure Stream Analytics job output

The Stream Analytics job outputs incoming (decompressed) messages to this EventHub in this example.

I just send the incoming messaged as-is to the output.

There, in the EventHub, the right (decompressed) message is shown:

These proofs Azure Stream Analytics is a convenient way to decompress incoming messages.

Side effects

Once you start sending compressed messages to the IoTHub, all services consuming these messages should be able to cope with decompression at some point.

Here are some examples:

  1. In a Blob Storage account (mainly used for cold storage), you have to decompress it yourself at some point in time.
  2. In an Azure Function, you have to write the decompression yourself (using the helper classes).

The EventData class has no knowledge about used ContentType and ContentEncoding.

If you add an application property, you are back in control to choose between the different compression techniques:

3. In TimeSeries Insights, the IoTHub event source lacks support for decompression:

So, there is an impact on other resources consuming the same messages.

The most simple way to work around this is to let the Azure Stream Analytics job do the heavy (decompression) lifting and use the decompressed output in the other resources.

Or you add an additional route for compressed messages on the IoT Hub itself.

Conclusion

So yes, we are able to compress and decompress messages in an efficient way and consume fewer chunks.

There is no automatic decompression available inside the IoTHub but both Azure Stream Analytics and Azure Function custom code can be used to do the job.

Remember you have to keep compression and decompression on par both on the device and the Azure Stream Analytics input. Using an Azure Function, you need to add a separate application property to identify compressed messages.

Geef een reactie

Vul je gegevens in of klik op een icoon om in te loggen.

WordPress.com logo

Je reageert onder je WordPress.com account. Log uit /  Bijwerken )

Google photo

Je reageert onder je Google account. Log uit /  Bijwerken )

Twitter-afbeelding

Je reageert onder je Twitter account. Log uit /  Bijwerken )

Facebook foto

Je reageert onder je Facebook account. Log uit /  Bijwerken )

Verbinden met %s