Raw Azure StreamAnalytics output using Azure Storage

StreamAnalytics is a monster. It is capable of making decisions on large amounts of data, coming in like a stream. It has input sources and output sinks. And in between, a query is running. This query has an SQL-like structure, something like Select * into [output sink] from [input source].

Because I work a lot with the IoT part of Azure (IoT Hubs, EventHubs, etc.), a StreamAnalytics job is a great addition, handling the continuous stream of telemetry, coming from multiple devices.

StreamAnalytics is less known for usage in a non-Internet of Things environment. You can think of handling credit card payments or mutations in personal files.

In my opinion, StreamAnalytics has one drawback; it takes some time to start up and it can only be changed (alter the query of change sinks or sources) when it is stopped again. My biggest issue is that I can not directly see what telemetry is passed.

But there is a simple solution. A query can output the same telemetry (to be more specific, the data you want to pass to the output sink) to multiple sinks. So if you are not sure which data is passed, you can output it to a second sink.

In this blog, I show you how to output to Azure storage sinks.

At this moment, StreamAnalytics supports multiple kinds of sinks:

  • SQL Database (SQL Azure)
  • Blob Storage
  • Event Hub
  • Table Storage
  • Service Bus Queue
  • Service Bus Topic
  • DocumentDB
  • Power BI
  • Data Lake Store

I have shown how to handle Power BI and Event Hub in previous blogs. These are essential for normal production cases. But here we want to have a quick look into the stream. So Blob Storage and Table Storage should be sufficient.

Note: Having this second output running in production will be an overkill eventually. Just use it during the testing phase or make it conditionally using Reference data.

This is how the Query looks if multiple sinks are filled with data:

03query

First, let’s look at a Blob Storage sink.

Blob Storage Sink

Adding a Blob Storage sink is easy. The trick is bundle telemetry in blobs, following a file path template:

01bloboutout

I used the path “/logs/{date}/{time}”. I have clustered all telemetry of the same hour in the same blob. That’s because the {time} part of the path is representing the hours.

So let’s pass all telemetry, the time and the id of the device to the blob:

SELECT
    EventProcessedUtcTime as time,
    IoTHub.ConnectionDeviceId as deviceId,
    *
INTO
    outputblob
FROM
    inputhub

 

After starting the StreamAnalytics job, you can check the Blob storage in the portal:

04bloboutput

As you can see, the path ends up in a folder structure and the blobs are available:

05blobdownload

It’s even possible to download the blob and open it in a Text Editor (like Visual Code).

And it’s filled with JSON strings:

06blobcontent

And that’s it for blobs.

Note: this output could be used as a test set for the test functionality of Stream Analytics.

Table Storage Sink

The other Azure Storage sink is the Table Storage:

02tableoutout

The toughest parts are the partition key and row key. In the query output, there must be these two rows. Together, they need to be unique for the entire row.

The partition key says this:

09partkey

And the row key says this:

10rowkey

So I use the device id as partition key and the unique timestamp as row key:

SELECT
    EventProcessedUtcTime as time,
    IoTHub.ConnectionDeviceId as deviceId,
    *
INTO
    outputblob 
FROM
    inputhub

The other telemetry values are not defined separately.

And after the StreamAnalytics job is started, the table is created. We can check it’s creation in the portal:

07tablestorage

Unfortunately, it’s not possible to check the rows in the table using the Azure portal.

There are lots of Storage Explorers on the internet. Most of them, only support blob storage. Luckily,  Microsoft also provides a Storage Explorer, and this one does support Table Storage:

08tabledata

The table contains the partition key and the row key. And the output of the job results in extra columns in the table rows. As you can see, the ‘parkey’ and the ‘rowkey’ are duplicated now.

Conclusion

In just a few clicks, you now have confidence on what is passed to the StreamAnalytics and what output is produced.

 

Advertentie