Azure Stream Analytics anomaly detection on the edge

Back in 2018, the Azure Stream Analytics team announced Anomaly detection for Azure Stream Analytics. And, it was also supported on the Azure IoT Edge. In November 2018, I was allowed to demonstrate this at the SPS IPC Drives in Nuremberg:

Since then, anomaly detection has become a first citizen of Azure Stream Analytics.

Azure Stream Analytics Anomaly Detection is able to ‘automatically’ detect spikes, dips, and trends in a stream of values. This is based on math and all you need to do is to specify how many values you expect and how sensitive the detection must be. It is in fact Machine Learning, in the end. So it is a prediction with a certain certainty…

And it’s just part of the Stream Analytics query language. So on the edge, we can deploy a Stream Analytics module (the engine of stream analytics). This is a fixed module from Microsoft.

All you have to do is feeding it a query, inputs, outputs and, user-defined functions if available:

We can define all this in the cloud, inside a stream analytics job and we can even test it. Those parts are then packed as a blob and put in blob storage. The Stream Analytics job can then download it and run it.

The inputs and outputs of the Stream Analytics job can be attached to the normal Azure IoT Edge routing mechanism.

Let’s see how this is set up:

We start with setting up an Azure IoT Edge device and we add the Microsoft Temperature Simulation module to it. Register an IoT Edge device in the IoT Hub and add this module using the Set Modules wizard in the Azure portal:

It is simple to add it. Just select it for free from the Azure IoT Edge Module marketplace (search for Temperature).

Do not forget to at this Environment variable:

The message count is set to infinity so we are not limited in time during the tests.

We also add this Echo module for testing purposes. Then we are able to explore the produced messages on the edge and see how these are flowing through the IoT Edge routing mechanism:

Now we adjust the routing so the simulated temperature messages are both send to our echo module and to the IoT Hub in the cloud (using the EdgeHub module):

Deploy this as part of the deployment manifest.

The EdgeAgent log can show us the progress of the deployment:

iotedge logs -f edgeAgent

We see both modules being installed, alongside the EdgeHub:

We can see the simulated temperature values being produced:

iotedge logs -f echo

Here are two example messages:

Using the cross-platform Azure IoT Explorer we see the arrival of the messages in the cloud:

Our edge device is producing simulated temperature values. Let’s do some anomaly detection on those values.

Add a Stream Analytics job for the edge to your subscription

First, we add a storage account to the same resource group where our IoT Hub lives in:

Give the storage account a name like ‘edgeasajobstorage’ and put it in the same region as all other resources:

This account will be used by our Stream Analytics job to store the job information like inputs, outputs, and the query.

Create the storage resource.

Now, add a Stream Analytics Job:

Give it a proper name like ‘AnomalyDetectionJob’:

Notice that I had to mark the Hosting environment as “Edge”.

I did also try out the “Secure all private data assets” for this demonstration also. What does this mean?

Selecting this option will ensure that your private data assets needed by this job are always stored in your Azure Storage account (v1 or v2). This includes job configuration, checkpoints, and other necessary data required for the job to function correctly.

Select the storage account you just created if you want to. It’s up to you if you need that extra security. During this demonstration, I deselected this option.

Create the stream analytics resource.

Before we look at the query, we first have to define inputs and outputs. these will be connected to the Azure IoT Edge routing mechanism. This means we can pick up temperature value messages and expose the anomaly detection results to the cloud.

Add an Edge Hub input (the Edge Hub module is responsible for inter-module communication and communication to down-stream devices. This is actually the routing mechanism):

Just give it a name like ‘ambientinput’:

Note: the names of inputs and outputs will be used when specifying the routes, later on.

We want to output generated anomaly detection outcomes. But we also want to know if our job is actually ingesting data. So, add two Edge Hub outputs.

Select Edge Hub output:

Note: see that other output selections like power bi are not available. That is something you see in a regular Stream Analytics job designed for running in the cloud.

This is the output that will output any anomalies:

I added a second output sink named “rawoutput” just to be able to test the query input. All incoming temperature values are sent directly to this output.

Remember these input and output names, we need them for the additional routes on the edge.

Once the inputs and outputs are created, we can build our query:

We take the ambient temperature and pass it on to the Spike and Dip detection. This is one of the anomaly detections provided:

WITH AnomalyDetectionStep AS
(
    SELECT
        timeCreated AS time,
        CAST(ambient.temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(ambient.temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM ambientinput
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO anomalyoutput
FROM AnomalyDetectionStep

SELECT
    *
INTO
    rawoutput
FROM
    ambientinput


this example is based on this example.

Save the query.

Testing the job

We can actually test the query before we deploy it to the edge.

In the portal, there is an option to upload a sample file:

I just took some messages from the echo log and built this message:

You can build your own file and upload it. Then test the query:

As you can see, there are spikes and dips detected in this sample file:

This proves the query is ok. Of course, this is just a demonstration.

We have to publish this query so it can be picked up by the edge.

For this to happen, we have to add a storage account to the job:

Save these storage account settings.

We are now ready to deploy the query as part of the job, to the storage account. We publish it:

You will see some messages about creating a job URL and sas tokens etc.:

This can take some time…

Once published, we see that SAS URL. We can ignore it because this is picked up with tooling:

Everything is put in place now to start the deployment.

Deploying the job

The Azure portal provides tooling to add the job to the edge device deployment manifest.

It starts with the Set Modules wizard. Add a module. This time we go for that Azure Stream Analytics Module option:

This opens a dialog where we have to select the right job:

Once selected, check the Module Twin settings:

There you see the SAS Uri.

After that, we add some extra routes so we can check the flow of the messages once the query is deployed:

Submit the deployment manifest.

Once submitted, check if the job is rolled out to the edge using the log of the edgeAgent:

As you can see, the module is created.

We can also see if the job is running well:

iotedge logs -f AnomalyDetectionJob

Every minute, a log is provided:

We see both a positive count for incoming and outgoing messages.

The job is not offering actual insights into the message stream produced. To overcome that, we just route those messages to the Echo module:

Here, we see both the ‘raw’ messages and the anomaly detection. The echo module gets both kinds of messages.

Finally, the IoT Explorer is showing incoming anomaly detection messages in the IoT Hub:

Conclusion

This post shows how to get started with Anomaly Detection using Azure Stream Analytics on the Edge.

Here, spike and dip detection is shown. Solutions using changepoint detection look quite the same.

Keep in mind that this is based on machine learning so there is a possibility for false positives and false negatives. It is key to put some effort into testing your query to find the right combination of settings.

Do not act on the outcome of anomaly detection only. Use other sources of information too.

Over time, keep an eye on your anomaly detection. Due to slowly changing message streams (more or less incoming messages) it could be possible the query has to be updated to the new situation.