Using ADX table update policies projecting raw messages to target tables

I have blogged about Azure Data Explorer (ADX) integration already a couple of times.

The new ADX database data connection for IoT Hub is a nice addition but appears to be quite static because you can only fill in one (single) table mapping.

ADX also supports dynamic routing (using that same data connection) where the message body is accompanied by application/user properties which tell ADX the table and mapping to use during ingest.

The ultimate goal is to have a solution where one stream of data, with one or more types of messages, is split into multiple tables:

The combination of IoT Hub routing and multiple endpoints should help with that but due to message enrichment limitations, we need to find another way.

A viable solution is the ADX table update policy.

Let’s check it out.

This is the third part in a series about integrating Azure Data Explorer with Azure IoT:

  • Part one: Azure Data Explorer connector for IoT Hub with real-time ingest
  • Part two: Dynamic routing of IoT Hub telemetry to Azure Data Explorer

IoT Hub message enrichment limitations

Before we jump into the solution, we look into this IoT Hub limitation first.

In a perfect world, we could have multiple endpoints defined in our IoT Hub, each used to output different data to ADX (probably all pointing to the same EventHub).

Incoming messages are just routed/splitted to the separate endpoints using some logic (eg. user properties).

So, for each endpoint, we should enrich those messages with the appropriate user properties to tell ADX in which table the messages should end up.

Unfortunately, message enrichment is limited. regarding the ‘keys’.

Look at this example:

We see two separate enrichments each leading to another endpoint, one for ‘hot data’ and one for ‘warm data’.

Both endpoints need their own table name value based on the same key ‘Table’.

But keys need to be unique according to the IoT Hub!

This is also enforced for keys pointing to different endpoints!

In the previous blog post, we already discussed the limitations of dynamic routing in the current IoT Hub solution: adding properties at the device level is a bad design, adding properties at the IoT Hub level seems impossible, and adding properties using Azure Stream Analytics or Azure Functions is expensive.

Back to ADX then?

Kusto update policy

During the recent Azure IoT Architecture Summit, another way of splitting messages into separate tables was introduced, the ADX table update policy:

The idea is to ingest a stream of different messages into a single source table. Using update policies on other (target) tables, Kusto functions are triggered to take a copy of every incoming message/row and store it in the target table if it matches the right conditions.

For example, the source table is filled every second with regular telemetry messages and once in a while a heartbeat message.

Here, two Kusto functions (one for the telemetry target table and one for the heartbeat target table) are triggered based on the update policies and take either a telemetry message or a heartbeat message and put them in the related tables:

So, all you need is:

  • A device connected to an IoT Hub, sending two different, distinctive messages
  • An ADX connection to the IoT Hub ingesting all telemetry
  • A source table with a specific mapping able to ingest all kinds of messages
  • Two target tables
  • Two ADX functions triggered by table update policies

Let’s see this in action.

I expect there is already an IoT Hub, device registration, ADX cluster (with streaming enabled), and a database inside it.

By the way, ADX table update policies are a common tool for data scientists and data engineers. Here, we just make clever use of it for data ingest.

Creating a device, sending telemetry

We start with a simple device simulation, capable of sending two kinds of messages:

internal class Program
{
    static void Main(string[] args)
    {
        var connectionString = "[device connection string]";

        using var deviceClient = DeviceClient.CreateFromConnectionString(connectionString);

        var count = 0;

        while(true)
        {
            // send regular message

            var temp = count % 100;
            var hum = (count + 50) % 100;

            string telemetryJsonData = "{ \"temp\":" + temp + ", \"hum\":" + hum + " , \"version\":\"telemetry;1\" , \"ts\":\"" + DateTime.UtcNow.ToString("O") + "\" }";
            using var telemetryMessage = new Message(Encoding.UTF8.GetBytes(telemetryJsonData));
            deviceClient.SendEventAsync(telemetryMessage).Wait();

            Console.WriteLine($"Regular message {count} Sent: {telemetryJsonData}");

            Thread.Sleep(1000);

            count++;

            if (count % 60 == 0)
            {
                // send heartbeat

                string heartbeatJsonData = "{ \"counter\":" + count/60 + " , \"version\":\"heartbeat;1\" , \"ts\":\"" + DateTime.UtcNow.ToString("O") + "\" }";
                using var heartbeatMessage = new Message(Encoding.UTF8.GetBytes(heartbeatJsonData));
                deviceClient.SendEventAsync(heartbeatMessage).Wait();

                Console.WriteLine($"Heartbeat message {count/60} Sent: {heartbeatJsonData}");

            }
        }
    }
}

Note: We are referencing this ‘Microsoft.Azure.Devices.Client’ Nuget package, the C# Device SDK. This could have been written in other languages like Java, Javascript, Python, or C too.

This device sends a telemetry message every single second. It also sends a heartbeat message every single minute:

The formats are quite distinctive by adding this ‘version’ field inside the body.

Note: We need to add this in the body because ADX does not ingest application properties (aka user properties) at this moment.

We see the messages being ingested into the IoT Hub (using the IoT Explorer):

So device messages are arriving in the cloud. Just let this device simulation running.

We now move over to Azure Data Explorer. Select a database and open to the query pane.

Setting up the source table in ADX

One source table is needed to ingest all kinds of messages.

So were need a generic column for the data.

Because the message body does not show the deviceId, we need to create a column for that too.

We do not want to create separate detail columns upfront for all possible incoming data message values. We do not know the format yet…

So, we make use of only one ‘dynamic’ column that is capable of ingesting complete messages!

In ADX, create this source table and related mapping:

.create table MySourceTable (rawpayload:dynamic, IotHubDeviceId: string)

.create table ['MySourceTable'] 
ingestion json 
mapping 'rawingestionmapping' 
'[{"column": "rawpayload", "path":"$", "datatype":"dynamic"}, {"column": "IotHubDeviceId", "path":"$.iothub-connection-device-id", "datatype":"string" }]'

.alter table MySourceTable policy streamingingestion enable

As can see, the table contains two columns for both the raw payload and the device id.

The raw payload column is just mapped to the complete incoming message (the ‘$’ path) and the device id column value must be provided by mapping a system property.

I also enabled streaming ingestion (if available on the cluster level).

Let’s create an IoT Hub data connection so we see messages arrive in the source table.

ADX database IoT Hub data connection

As seen in the first blog post of this series, ADX is capable to ingest data from an IoT Hub using the ‘Event Hub compatible endpoint’. Make sure you use a separate consumer group:

The data connection data routing settings are targetting the source table, using the right mapping name and format.

Once this is set up, while the device is sending messages, you will see raw messages being ingested together with the device id:

Note: we will look into strategies to remove these raw messages from ADX later on.

If we count the number of rows in the table a few times, you will see the count value being increased every second. This is the result of the streaming ingestion being enabled.

Now it’s time to start implementing all update policies along with the new target tables.

ADX (target) table update policies

Update policies are actually ADX functions written in KQL, triggered when a single row arrives at a source table. Target tables are thus ‘listening’ for these rows and then updated, one row at a time, if there is a match.

It’s even possible to have other target tables ‘listening’ to the first target table being updated. Circular references are not possible though.

First, we create two target tables able to store the specific telemetry and heartbeat messages:

.create table TelemetryTable (hum : double, temp : double, ts : datetime, IotHubDeviceId : string) 

.create table HeartbeatTable (counter:int, ts:datetime, IotHubDeviceId: string) 

Notice we do not need to enable streaming ingestion for these two tables too. The updates/inserts will occur immediately so we will see a live update already.

Regarding that update policy, we do this in two steps for both tables:

  1. Add a function that projects a single MySourceTable row to the target table columns with a ‘where clause’ as filter
  2. Add an update policy to that target table pointing to the right Kusto function

Here are the commands to execute for the telemetry table:

.create function
with (docstring = 'Ingest raw telemetry data and project to telemetry table', folder='ingestprojection')
ParseTelemetry ()
{
  MySourceTable
    | where tostring(rawpayload.version) startswith "telemetry"
    | project 
        hum = todouble(rawpayload.hum),
        temp = todouble(rawpayload.temp),
        ts = todatetime(rawpayload.ts),
        IotHubDeviceId = tostring(IotHubDeviceId)
}

.alter table 
TelemetryTable 
policy update @'[{"Source": "MySourceTable", "Query": "ParseTelemetry", "IsEnabled" : true, "IsTransactional": true }]'

Here are the commands to execute for the heartbeat table:

.create function 
with (docstring = 'Ingest raw heartbeat data and project to heartbeat table', folder='ingestprojection') 
ParseHeartbeat ()
{
  MySourceTable
    | where tostring(rawpayload.version) startswith "heartbeat"
    | project 
      counter = toint(rawpayload.counter),
      ts = todatetime(rawpayload.ts),
      IotHubDeviceId = tostring(IotHubDeviceId)
}

.alter table 
HeartbeatTable 
policy update @'[{"Source": "MySourceTable", "Query": "ParseHeartbeat", "IsEnabled" : true, "IsTransactional": true }]'

Note: the update policies are transactional. This is part of a row deletion strategy as seen below.

Notice the order of projection columns must match the order of target table columns exactly! If not, a warning is given.

Once these policies are in place, we will see separate messages arrive in the target tables once records arrive in the source table.

Here is an impression of the telemetry table:

This table is updated every second due to the streaming data behavior of the source table.

The heartbeat table also shows records:

Notice the update policy is only triggered with new rows are added to the source table, only after the update policy is established. Older rows are ignored and still available in the source table.

So, at this point, we have a working mapping towards both tables!

And, it’s all done inside ADX. So this is a good example of separation of concerns.

But what about the raw messages?

At this moment that source table is just growing and growing while new rows are already projected into the right target tables.

How can we remove redundant rows in the source table?

Removing raw messages

Now, I want to remove the ‘unneeded messages’ afterward, after the policies are triggered.

The documentation gave me a hint regarding the soft delete. According to that same documentation, the policies need to be transactional too (which is already done in the two policies seen above).

So I executed:

// delete raw message after function is ready with transaction
// this will take a few seconds before it is active
.alter-merge table MySourceTable policy retention softdelete = 0s

After a while, the source table was not growing anymore. No new messages were seen in that source table…

I checked the telemetry target table and there, new messages were still arriving.

So this is indeed a clever way to keep the size of your cluster as low as possible. According to the documentation, this also puts also less pressure on the ADX background processes responsible for persisting incoming messages.

The downside is that you have no control over the incoming messages.

It seems that messages not picked up by any function (so these are not projected into any target table) are also removed from the source table. It is as if these un-projected messages never arrived…

Check out the documentation for other ways to delete data in Azure Data Explorer.

Personally, I like this table retention policy:

.alter-merge table <TableName> policy retention softdelete = 60d

If this is not what you want, check out other ways to delete old raw messages.

Conclusion

We have seen how Azure Data Explorer offers a new way to put specific messages into separate tables using update policies.

I’m very excited regarding this solution because it is all managed inside ADX so there is a separation of concern. This way, ADX can be instrumented to cope with new types and versions of messages on forehand.

The main concern is losing data that is NOT mapped. Using a (slow) retention policy with additional logic to test for non-projected messages can be of help here. Or, just use the instant soft delete and be happy with a lean and mean solution.

The Azure IoT Architecture Summit recordings are still available after free registration.

Geef een reactie

Vul je gegevens in of klik op een icoon om in te loggen.

WordPress.com logo

Je reageert onder je WordPress.com account. Log uit /  Bijwerken )

Twitter-afbeelding

Je reageert onder je Twitter account. Log uit /  Bijwerken )

Facebook foto

Je reageert onder je Facebook account. Log uit /  Bijwerken )

Verbinden met %s