Azure Data Explorer connector for Blob storage (IoT Hub) files

Over the last couple of months, I have written several blog posts regarding Azure Data explorer ingestion connectors.

I checked out the IoT Hub connector, dynamic mapping, and table update policies.

Until now, this is all based on both the IoT Hub connector and the Event Hub connector:

Next to those two, Azure Data Explorer also supports ingesting complete files using the third connector: Blob Storage ingestion in combination with EventGrid events:

Let’s check out what this connector offers and how to set it up.

As we will find out, it even has a surprise for those of you ingesting IoT Hub messages.

Prerequisites

We are going to ingest JSON messages by reading Blob storage files.

Note: ADX is capable of ingesting many more ingestion formats, we concentrate on JSON because this is most common in IoT (Hub) scenarios.

We need this source ingestion table. This works great for a table update policy construction.

So, add the following table, capable of ingesting complete JSON messages as one dynamic field (just one column):

.create table FileIngest (rawpayload:dynamic)

.create table ['FileIngest'] 
ingestion json 
mapping 'FileIngestMapping'
'[{"column": "rawpayload", "path":"$", "datatype":"dynamic"}]'

.alter table FileIngest policy streamingingestion enable

Complete JSON messages are then mapped to that single column.

Note: although not shown here, table update policies and ADX functions can pick up the ingested rows later on.

Notice also that the streaming option is enabled. This is because the JSON messages will arrive as batches (from the files ingested) so normally there is an ingestion delay. And we do not want this in our demonstration.

By setting this streaming ingestion policy, the ingest is semi-live. This will take a little amount of computation resources from the cluster though.

Next to that, we create a Blob storage container where our test files will be dropped:

Here, I created this ‘fileingest’ container in an ‘adxweustor’ Storage account.

Keep these (resource) names nearby, you need to reference them during the connector registration.

Setting up the EventGrid (Blob storage) data connection

To set up this ADX data connection, simply start the wizard:

A couple of things are worth mentioning:

  • I connect this data connection to the storage account
  • I listen to ‘blob creation’ events
  • Files created are just filtered using the ‘/blobServices/default/containers/fileingest’ filter pointing to the ‘fileingest’ container
  • We need a specific identity to be authorized for accessing the files. Here, that authorization was already available due to earlier test attempts from my side (where the identity was generated automatically)
  • I leave other filter options blank

Next, we need to add the data routing settings, as seen in my previous blogs already:

Just fill in the same table name and mapping name related to the already created table ‘FileIngest’.

Notice, that there is just one JSON option as data format:

This feels weird. ADX understands multiple kinds of JSON formats:

We leave this as-is. Below, we will see how the data connector copes with a variety of JSON file formats.

Finally, save the connector by hitting ‘Create’ and see how the resources are created:

Interestingly, the connector is using an extra EventHub to listen to Event grid messages.

This is because ADX is not capable of handling Event Grid messages itself.

So, next to an Event Grid System topic, an automatically created Event Hub is added too:

Note: When creating this connection using the Azure portal, we are not in control of selecting the Event Hub namespace. So, it’s likely to see a new Event Hub namespace appearing next to an already existing one (you already had created for something else) instead of reusing the existing one.

Once this connector is created, we can start ingesting files in ADX. The metrics are still showing no activity:

The identities are set and no file is picked up because the container is still empty.

Let’s add some files.

Ingesting a single-line JSON file

We start with this single line JSON message inside this file:

//// one-message-one-line.json

{"one-messsage-one-line":1}

This file is then uploaded using the Azure portal wizard for uploading a file into a blob container:

Once the file is uploaded, within a few seconds, the JSON message is seen in the ADX query editor:

From there, ADX table update policies can be used to copy the incoming messages from this source table to the right target tables. This is out of the scope of this blog post.

Note: If you do not see the file appear directly, it could be the streaming policy is not enabled yet on the table. Just wait for approx. five minutes.

What about files with multiple JSON messages?

Ingesting multi-line JSON files

Remember, as data format, we only selected ‘JSON’.

Although individual JSON messages themselves are well-defined, there are several ways to bundle them.

Here, you see how these messages are bundled by separating them by either a comma or a newline (a /r/n or /n, depending on the operating system):

//// comma-separated-messages.json

{"comma-separated-messsage":1},
{"comma-separated-messsage":2},
{"comma-separated-messsage":3}

//// newline-separated-messages.json

{"newline-separated-messsages":1}
{"newline-separated-messsages":2}
{"newline-separated-messsages":3}

Both files are uploaded into the container:

And both files are then picked up by the ADX connector:

Now, we see seven lines in total (1 + 3 + 3 = 7).

It’s interesting to see how the connector is smart enough to understand multiple ways of formatting JSON messages.

Actually, it even understands compressed files:

Notice, that only Zip and GZIP compression is supported. another popular compression technique: Deflate (as seen in eg. Azure Stream Analytics and the ADX EventHub connector) is not supported.

I tested this with this new file:

//// gzip-newline-separated-messsages.json

{"gzip-newline-separated-messsages":1}
{"gzip-newline-separated-messsages":2}
{"gzip-newline-separated-messsages":3}

This JSON file was first zipped using GZIP compression before being uploaded to the blob storage:

using System;
using System.IO;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using ZipHelperLib;

internal class Program
{
    private static CloudBlobContainer _container;

    static void Main(string[] args)
    {
        var account = CloudStorageAccount.Parse("[connection string]");

        var client = account.CreateCloudBlobClient();

        _container = client.GetContainerReference("fileingest");

        _container.CreateIfNotExistsAsync().Wait();

        var filename = "gzip-newline-separated-messsages.json";

        var jsonStringList = File.ReadAllLines(filename);

        var jsonText = string.Join('\n', jsonStringList);

        filename = filename + DateTime.UtcNow.ToString("HH.mm") + ".zip";

        var blob = _container.GetBlockBlobReference(filename);

        // Do we already have a blob with the same name?
        var exists = blob.ExistsAsync().Result;

        if (!exists)
        {
            var textBufferUncompressed = Encoding.UTF8.GetBytes(jsonText);

            var textBufferCompressed = DeflateHelper.Zip(textBufferUncompressed);

            blob.UploadFromByteArrayAsync(textBufferCompressed, 0, textBufferCompressed.Length).Wait(); // awaitable inline

            Console.WriteLine($"{DateTime.UtcNow} - File '{filename}' with {jsonStringList.Length} lines uploaded (Compressed: {textBufferCompressed.Length}; uncompressed {textBufferUncompressed.Length})");
        }
    }
}

This C# console app is referencing multiple NuGet libraries:

<PackageReference Include="Azure.Storage.Blobs" Version="12.13.1" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
<PackageReference Include="ZipHelperLib" Version="1.0.6" />	

Once this application is run, a new ZIP file is loaded into the storage container:

This file is also picked up by the ADX connector:

This is an interesting solution for uploading raw IoT data files to a storage account, as seen in many ‘cold path’ IoT scenarios.

Actually, what about files coming directly from an IoT Hub route to a blob storage endpoint?

Ingesting IoT Hub generated JSON files with IoT messages

Let’s set up an IoT Hub routing endpoint:

Here, an IoT Hub routing endpoint is pointing to the same blob storage container. Routed IoT Hub messages are then sent to this container based on batch frequency/chunk size window.

A route is added to the IoT Hub and all incoming messages (the query is set to ‘true’) are routed to that endpoint:

I used this Raspberry PI simulator to generate IoT messages:

We see the message being routed and arriving in the blob storage. Here, two files are created already:

If we check what is inside these files, we see the IoT messages from the device, complete with body, user properties, and system properties:

These IoT messages are all picked up by ADX automatically:

And the beauty is that the user properties are also ingested without extra conversion logic…

That is a neat trick!

The metrics of the data connector shows when files were ingested (you see several attempts I made while working on this blog):

The only consideration to make is the costs of Azure resources needed for this connector:

  • Event Grid message events
  • The extra Event Hub
  • Temporary blob files
  • Some mechanism to delete files that are already ingested

Ingested files are not removed automatically. That would be a great feature to find out which files are missed by the connector.

An alternative solution is to remove older files (eg. files older than a few days or so) yourself.

My friend Wilko has shown how Azure Runbooks can be a great addition to automate other Azure automation tasks just like this one.

Conclusion

In this post, we have seen how the Azure Data Explorer connector for storage files (using EventGrid events) is a great solution for ingesting all kinds of message files.

Here, we only looked at JSON because this is a very common format in the Azure IoT world.

It does not matter how JSON files are formatted. Even zipped files are picked up.

This works great in combination with the table update policies so all kinds of messages can be distributed to other tables in Azure Data Explorer.

This even works for files outputted by blob storage endpoints, part of IoT Hub routing. In that case, the user properties are ingested too.

Check out the full list of data formats supported by ADX here.