Compare previous and current message in Stream Analytics

Last week I was testing the temporary storage in IoT Edge. I was interested in the stability so I wanted to know if messages were missing or maybe even coming in twice.

I have this heartbeat module which produces a counter. So I am able to generate messages which can be measured as a sequence.

One way is to check this using your eyes 🙂

But this can be seen as a more generic issue, comparing two messages after each other. So I was thinking about Azure Stream Analytics. This should be the perfect tool for this job.

Let’s check out how we can compare subsequent messages using Stream Analytics.

Choose the right operator

I first checked the obvious Last operator. This operator gives me the last message for a certain condition. This is great you have a condition which looks for a specific value or range. But there is no way to compare one of the last messages with the current value.

Then I remembered the Last operator is a special version of the Lag operator.

So I tried something like this in a Stream Analytics job:

WITH DiffTest as
(
SELECT
    deviceid,
    counter,
    timestamp,
    LAG(counter) OVER (PARTITION BY deviceid LIMIT DURATION(hour, 3)) 
FROM
    iothubinput
)

SELECT 
    *
INTO 
    functionoutput
FROM 
    DiffTest
WHERE 
    lag != counter-1

I receive a counter for each device which implements my Heartbeat module. I expect the outage of a device lasts for a maximum of three hours. So I combine the current event with the last counter (but not the current counter). This previous counter is named lag.

Then I compare the lag with the current counter. If these are not x and x-1 (so I get the same value twice or I miss a message), I get a message. These messages are sent to an Azure Function.

Azure Function

To make the output of a query visible, I test it with an HTTP Trigger function in Azure Functions:

#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;

public static async Task<IActionResult> Run(HttpRequest req, ILogger log)
{
    string requestBody = await new StreamReader(req.Body).ReadToEndAsync();

//    log.LogInformation($"request {requestBody}");

    dynamic data = JsonConvert.DeserializeObject(requestBody);

    foreach(var item in data)
    {
        log.LogInformation($"request {item.counter} - {item.lag}");
    }

    return (ActionResult)new OkObjectResult($"Executed {data.Count}");
}

We receive one or more messages from the Stream Analytics job. each message is logged as information and shows both the counter and lag.

Testing the solution

Let’s test it out. We just kill the Heartbeat module. IoT Edge acts like a service so it will try to start the module automatically. And then the Heartbeat module starts over counting by one.

Here is the output of multiple restarts:

We see that message 6 is followed by message 1. The same goes for message 4.

This is also the output in the Azure Function:

We see that the second counter 1 has a preceding lag 6 and the third has a preceding 4.

Conclusion

This is just one usage of the Lag operator in Azure Stream Analytics. It is a simple solution to a simple problem with very big complications.