Dynamic routing of IoT Hub telemetry to Azure Data Explorer

Azure Data Explorer (ADX) is a great data exploration tool for IoT developers building a full IoT solution. This could be a perfect target for the cold path.

As seen in my previous blog post, ADX even offers a native connector for the IoT Hub. This is based on the ‘default EventHub compatible endpoint’ offered by this cloud IoT gateway (optionally the built-in Events endpoint in the routing section of the IoT Hub or using the fallback mechanism).

Most of the documentation regarding this ADX connector is following this ‘happy flow’ where one connector stores incoming IoT telemetry in one ADX table using static routing.

This is a serious limitation where most IoT Hubs ingest multiple types of messages. These will not fit into that single table.

Luckily, the connector also offers the possibility to allow routing to other databases:

Here, we will check out this dynamic routing option and see how this provides much more flexibility.

Why do we need dynamic routing?

Actually, if you only have one type of telemetry message, as seen in the previous post, static mapping is good enough for you. Just keep using the happy flow.

Though, in real-world IoT solutions, we see a variety of different kinds of messages.

Think about some messages used to transport data messages, some messages with alerts and events, and some messages with control logic like heartbeats.

Do not cross the streams (source: Ghostbusters)

We know we must not cross telemetry streams. This is both related to data coming from different devices and data with a different purpose. Each type of message, having a unique format, must get its own table in some database.

You can compare the data later on using joins and unions. Because all messages have their own unique timestamp, making time series is the right way to ‘join’ data from multiple tables.

Note: the connector for Event Hub also supports dynamic mapping including decompression.

Other databases, other tables

We will see how to work with multiple tables in a moment.

The same connector also mentions a ‘Multi database data connection’.

The hint (shown in the connector tab) explains the purpose of the switch regarding allowing routing, to other databases…

Databases? This feels a bit awkward. The hierarchy of an ADX is Clusters -> Databases -> Tables.

Databases are used to bring different data, generated in the same context, together for easy querying and comparison. So, most solutions just use multiple tables within the same database.

The switch is there for those who use multiple databases. This switch is then suddenly important because ADX has a separate security context for separate databases. This switch supports secure connections to other databases if needed for dynamic mapping. You can even dynamically map to other databases in other clusters, this way.

Nonetheless, we ignore that ‘Database’ text here and concentrate on tables within the same database.

Ingestion properties

How does dynamic routing work?

Incoming messages follow the format of the EventData class. This object has three members to store data in:

  • The message body (some byte array representing some (optionally compressed) JSON, *SV, etc. message)
  • System properties (property bag filled in by Azure)
  • Application/User properties (property bag filled in by you)

Normally, ADX only takes the body. Optionally, system properties can be read also.

The secret to dynamic routing lies in filling in EventData Application properties.

The ADX team has defined specific application properties, describing the targeted ADX table location, mapping, and message format. These are those properties to add:

This follows the input fields seen in the original connector as seen below.

Only time, just leave the fields empty in the target table fields area:

If you pass these (now in the UI omitted) values as user properties along with the original message body, ADX can still figure out where to drop the messages.

Note: The documentation is not clear about case sensitivity, I handled them as case-sensitive.

Let’s see how this works in real life.

Telemetry message and heartbeat message

To test this, we first need two different tables to put specific messages in.

First, I created a specific table able to ingest simulated telemetry data, together with the mapping for incoming JSON messages:

.create table Telemetry (
messageId: int,
deviceId: string,
temperature:decimal,
humidity:decimal,
IotHubDeviceId: string,
IotHubEnqueuedTime: datetime
)

.create table Telemetry ingestion json mapping "JsonTelemetryMapping"
    '['
        '{"Column": "messageId", "Properties": {"Path": "$.messageId"}},'
        '{"Column": "deviceId", "Properties": {"Path": "$.deviceId"}},'
        '{"Column": "temperature", "Properties": {"Path": "$.temperature"}},'
        '{"Column": "humidity", "Properties": {"Path": "$.humidity"}},'
        '{ "column" : "IotHubDeviceId", "Properties":{"Path":"$.iothub-connection-device-id"}},'
        '{"Column": "IotHubEnqueuedTime", "Properties": {"Path": "$.iothub-enqueuedtime"}}'
    ']'

.alter table Telemetry policy streamingingestion enable 

Notice I create the table columns including two extra columns filled with values provided by the (IoT Hub) system properties. I also provide it with the correct table mapping for JSON formatted messages which I use in this demonstration. I also enable support for streaming ingestion.

Next to that table, a second table is created, able to ingest heartbeat messages:

.create table Heartbeat (
deviceId: string,
messageId: int,
timeStamp : datetime, 
IotHubDeviceId: string,
IotHubEnqueuedTime: datetime
)

.create table Heartbeat ingestion json mapping "JsonHeartbeatMapping"
    '['
        '{"Column": "deviceId", "Properties": {"Path": "$.deviceId"}},'
        '{"Column": "messageId", "Properties": {"Path": "$.messageId"}},'
        '{"Column": "timeStamp", "Properties": {"Path": "$.timeStamp"}},'
        '{ "column" : "IotHubDeviceId", "Properties":{"Path":"$.iothub-connection-device-id"}},'
        '{"Column": "IotHubEnqueuedTime", "Properties": {"Path": "$.iothub-enqueuedtime"}}'
    ']'

.alter table Heartbeat policy streamingingestion enable

Again, another table is created, a mapping for JSON messages is added, and streaming ingestion is enabled.

In my previous blog post, I used static mapping. Here, I reuse that same connection but now I switch over to dynamic mapping.

As seen below, the target table fields are empty now:

This way, the data routing is kept agnostic about the incoming data and possible target tables.

So, the incoming message itself (read: the device composing and sending the message) is in control regarding the target location.

Now, let’s start sending telemetry data for both message types to the IoT Hub and see how the dynamic routes pick up the messages.

Dynamic routing in action

For this experiment, I created an Azure IoT device based on the C# SDK. If you want to try this yourself, you can use any programming language due to the wide SDK support (Python, NodeJS, C, C#, Java).

First, I compose and send the Telemetry message:

As you can see, I added all possible, ADX predefined, user properties: Database, Table, Format, Encoding, and IngestionMappingReference.

Once messages are sent to the IoT Hub, we see the arrival of streamed messages in the right database, the right table, using the right mapping and even enriched with the IoT Hub system properties as defined in the ADX connection:

So, based on dynamic routing, the ADX connection can indeed drop off incoming telemetry messages in the right table.

We can even have some fun with the Telemetry data, showing it in a chart using KQL:

But how about the Heartbeat message? Will this work too?

As seen below, we also start composing and sending Heartbeat messages to the IoT Hub:

This time, I omitted all predefined user properties except for the Table property!

Let’s see if this works too…

Both messages are sent directly after each other to the IoT Hub:

Once these Heartbeats are sent, we see them arrive as a stream in the correct Heartbeat table in ADX:

Yes, passing on only a table name works too.

The connection is related to a single database. There is a Heartbeat table in there (that is the only part we describe). This table has (only) one mapping for JSON. So it seems, ADX can make an educated guess regarding the other dynamic routing properties.

But, when I checked the actual content of the Heartbeat, while running the simulation app with and without the IngestionMappingReference property, the IoT Hub related were sometimes missing:

This is on par with the documentation:

If you selected Event system properties in the Data Source section of the table, you must include the properties in the table schema and mapping.

Finally, we have two different tables filled with (dummy) data coming from a single device in a single IoT Hub.

Although the following KQL query has no actual purpose (my example Telemetry table does not pass a timestamp and the (somewhat unique) IoT Hub ingestion time will probably never match), it shows we can join table data like this:

Telemetry 
| join Heartbeat on $left.IotHubEnqueuedTime == $right.IotHubEnqueuedTime
| project IotHubEnqueuedTime, temperature, humidity, timeStamp

A more meaningful query could even work with time windows. Check out this article in the Microsoft documentation about that.

Alternative message context enrichment

So, the solution to work with multiple kinds of messages is using message enrichment with one or more ADX predefined user properties. This can be done the way we have seen above.

There is only one thing I have quite some issues with: what about single responsibility?

What we see here is a device that normally only has one purpose, sending telemetry. Now, it also needs to take care of the name of the table in ADX, using one or more ADX predefined user properties. ON top of that, the device needs to mention the name of the mapping.

This does not feel good. Nope.

So, if you have the luxury of adding an Azure Stream Analytics job or an Azure Function in the cloud, between IoT Hub and ADX, please use that to enrich the message with the required user properties you need. ASA users can also use the ADX output which is currently in public preview.

Yes, it’s a common pattern that a message arrives in the IoT Hub with a self-described classification (e.g. custom user properties or a generic part of the message body) describing both the type and version of the payload it carries. That is a good example of device responsibilities. It’s better to use that classification context as input for the actual mapping towards ADX dynamic routing.

Note: Unfortunately, the IoT Hub routing message enrichment is not able to help us here due to the limitation of the key registration. You can define only one ‘Table’ key in there, even if two different ‘Table’ keys point to two different endpoints. Next to that, only ten enrichments can be added in total so working with multiple message types at scale will never work…

Conclusion

This is the second post about the ADX IoT Hub integration. The previous post was all about static routing.

We have seen how ADX offers dynamic routing to support more real-life data ingestion based on multiple kinds of messages.

The predefined user properties help us to distribute them into the right table.

Only providing the Table property could work for you already. Also, do not forget to add the IngestionMappingReference property too if you want to make use of system properties.

Use extra logic like Azure Functions or Stream Analytics to separate the concerns between the classification of an incoming message and the specific table the message should be sent to.

Een gedachte over “Dynamic routing of IoT Hub telemetry to Azure Data Explorer

Reacties zijn gesloten.