Azure Functions as watchdog for missing telemetry or devices

Once you are working with the Azure IoT platform, there comes a moment where you want to add notifications.

In a previous blog, we looked at how to send notifications when telemetry values did not match certain ranges. Although this is great, there are even more cases where we want notifications.

Here I am referring to watchdog functionality. When no telemetry is arriving anymore or devices are disconnected, the complete IoT platform is not working as designed. So we want to be informed when something fails.

In this blog, we will build a simple watchdog using Azure Functions.

Not Stream Analytics?

In the past, I tried to build this watchdog functionality using Stream Analytics. At first, this looks like a good choice. First of all, we had no other way to get telemetry out of an IoT Hub before IoT Hub routing arrived. And a Stream Analytics job knows which devices have sent which messages at which time.

We could construct a nice query which compares the number of messages sent by separate devices in the last X minutes to the number of messages sent in the last Y hours. When it drops to zero, we could send a message to an Event Hub.

But what if all telemetry fails? What if not a single message arrives at the StreamAnalytics job? Not a single message will be sent!

Routing to Azure Functions

Enter routing to Azure Functions, as seen in my last blog.

We can construct two tests:

  1. We check for devices which are disconnected
  2. We check for devices which are not sending telemetry within the last five minutes

Azure Functions are stateless. So we can only react on device connection, device disconnection and telemetry arriving. So it’s clear we need three functions and some storage:

  1. When a device connects or disconnects, we store this connection state
  2. When a device sends telemetry, we store the moment the device is last seen
  3. Every five minutes, we check every device to see if we have to send a notification

Bonus: When a device is out of order for a long, long time, we limit the number of notifications. So we need some kind of notification counter for each device.

Azure Storage table

I have chosen Azure Table Storage as persistent storage. It’s cheap, it’s supported by Azure Functions as input or output and I expected the code to be more simple than using Entity Framework for eg. SQL Azure.

At first, it seemed hard to use Azure Table Storage, though. I want to combine both merging records (not just input) and querying the storage which seems not possible due to casting different types for different usages. But in the end I got it working and the actual code is quite simple and elegant.

Function 1: Persisting date and time of last seen telemetry

We just have to create a new Azure Function which waits for IoT Hub telemetry. You can add an extra Azure Table Storage input next to the trigger:

{
"bindings": [
{
"type": "eventHubTrigger",
"name": "myEventHubMessage",
"direction": "in",
"path": "samples-workitems",
"connection": "[someeventhub-ih_events_IOTHUB]",
"consumerGroup": "azurefunction",
"cardinality": "many"
},
{
"type": "table",
"name": "inputTable",
"tableName": "deviceTable",
"take": 50,
"connection": "[somestorage_STORAGE]",
"direction": "in"
}
],
"disabled": false
}

Just fill in the right connections and you have yourself an extra input.

Note: this input (inputTable) is not automatically added to the function parameters. But we have you covered if you look at the new code…

Next, we have to add some NuGet packages.  Add a file named ‘project.json’ to the Azure function which references NuGet packages:

{
"frameworks": {
"net46":{
"dependencies": {
"WindowsAzure.Storage": "7.2.1",
"Newtonsoft.Json": "10.0.3"
}
}
}
}

Now finally, change the function. See that we have an extra input parameter name ‘inputTable’:

#r "Microsoft.WindowsAzure.Storage"

using System;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Newtonsoft.Json;

public static void Run(string myEventHubMessage, CloudTable inputTable, TraceWriter log)
{
  log.Info($"C# Event Hub trigger function processed a message: {myEventHubMessage}");

  var messageDevice = JsonConvert.DeserializeObject<MessageDevice>(myEventHubMessage);

  var tableDevice = new TableDevice
  {
    Partition​Key = "Devices",
    RowKey = messageDevice.deviceId,
    LastSeen = DateTime.Now,
    Enabled = true,
    WarningCount = 0,
  };

  var operation = TableOperation.InsertOrMerge(tableDevice);
  var result = inputTable.Execute(operation);

  log.Info($"Device {messageDevice.deviceId} is updated");
}

public class MessageDevice
{
  public string deviceId {get; set;}
}

public class TableDevice : TableEntity
{
  public DateTime LastSeen {get;set;}
  public bool Enabled {get; set;}
  public int WarningCount {get; set;}
}

If telemetry is arriving, we are only interested in the device name. We deserialize the message. Then we construct a record (an Azure Storage Table entity) and fill in the LastSeen column with the current (UTC) date and time (amongst others).

Azure Table Storage already contains by default two columns: Partition Key and Row Key. We use these two to identify our device in the table. If the (unique) combination is already found, a merge is executed. Otherwise, a new record is added to the table for this new device.

And we also reset any warning counters and even enable the device again (if already some warnings were sent to the table for this device, see also the last function).

Now run and compile the function. After compilation completes, see how the telemetry is passed:

2017-06-26T17:54:25.149 Function started (Id=9ab2ee10-2b6b-41b3-af6f-a15003512588)
2017-06-26T17:54:25.149 C# Event Hub trigger function processed a message: {"deviceId":"TempTest02","temperature":29,"humidity":42,"timeStamp":42}
2017-06-26T17:54:25.164 Device TempTest02 is updated
2017-06-26T17:54:25.164 Function completed (Success, Id=9ab2ee10-2b6b-41b3-af6f-a15003512588, Duration=24ms)
2017-06-26T17:54:35.237 Function started (Id=368975c3-7b5c-4f2a-81fd-16a49d3c58dc)
2017-06-26T17:54:35.237 C# Event Hub trigger function processed a message: {"deviceId":"TempTest02","temperature":29,"humidity":42,"timeStamp":42}
2017-06-26T17:54:35.252 Device TempTest02 is updated
2017-06-26T17:54:35.252 Function completed (Success, Id=368975c3-7b5c-4f2a-81fd-16a49d3c58dc, Duration=18ms)

Now check the Azure Table Storage table using the free Microsoft Azure Storage Explorer:

Great! We have received our record. That was not so hard. We now have this one record updated constantly for this device.

Function 2: Persisting connection and disconnection

Now repeat the same procedure for the next Azure Function. This Azure Function is connected to the Operations Monitoring of the IoT Hub. We are (only) interested in devices connecting and disconnecting.

So add the same input table and the same ‘project.json’ file for this second Azure Function.

And we change the code to:

#r "Microsoft.WindowsAzure.Storage"

using System;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Newtonsoft.Json;

public static void Run(string myEventHubMessage, CloudTable inputTable, TraceWriter log)
{
log.Info($"IoTHub Operations Monitoring trigger function processed a message: {myEventHubMessage}");

var messageOperation = JsonConvert.DeserializeObject<Operation>(myEventHubMessage);

var connected = messageOperation.operationName == "deviceConnect";

var tableDevice = new TableDevice
{
Partition​Key = "Devices",
RowKey = messageOperation.deviceId,
Connected = connected,
};

var operation = TableOperation.InsertOrMerge(tableDevice);
var result = inputTable.Execute(operation);

log.Info($"{messageOperation.deviceId} is updated for {messageOperation.operationName}");
}

public class Operation
{
public string protocol {get; set;}
public string authType {get; set;}
public string time {get; set;}
public string operationName {get; set;}
public string category {get; set;}
public string level {get; set;}
public int statusCode {get; set;}
public int statusType {get; set;}
public string statusDescription {get; set;}
public string deviceId {get; set;}
public string ipAddress {get; set;}
}

public class TableDevice : TableEntity
{
public bool Connected {get;set;}
}

Again, we check the information about the connection or disconnection for the device name. And we merge (or insert a new record) with the connection state of the device.

I can demonstrate this by resetting my connected NodeMcu:

...
2017-06-26T18:18:48 No new trace in the past 5 min(s).
2017-06-26T18:18:49.218 Function started (Id=e46431dd-36fb-46d7-81a7-bb2f6cbcd388)
2017-06-26T18:18:49.515 IoTHub Operations Monitoring trigger function processed a message: {"protocol":"Mqtt","authType":"{ \"scope\": \"device\", \"type\": \"sas\", \"issuer\": \"iothub\" }","time":"2017-06-26T18:18:43.47276Z","operationName":"deviceDisconnect","category":"Connections","level":"Information","deviceId":"TempTest02","ipAddress":"aa.bb.cc.XXX"}
2017-06-26T18:18:49.546 TempTest02 is updated for deviceDisconnect
2017-06-26T18:18:49.546 Function completed (Success, Id=e46431dd-36fb-46d7-81a7-bb2f6cbcd388, Duration=331ms)
2017-06-26T18:18:49.609 Script for function 'IoTHubOperationsMonitoringTrigger' changed. Reloading.
2017-06-26T18:18:49.671 Compilation succeeded.
2017-06-26T18:18:49.702 Function started (Id=e5c66a50-fe3f-4114-93fa-7f200ac2e6e2)
2017-06-26T18:18:49.827 IoTHub Operations Monitoring trigger function processed a message: {"protocol":"Mqtt","authType":"{ \"scope\": \"device\", \"type\": \"sas\", \"issuer\": \"iothub\" }","time":"2017-06-26T18:18:43.5834739Z","operationName":"deviceConnect","category":"Connections","level":"Information","deviceId":"TempTest02","ipAddress":"aa.bb.cc.XXX"}
2017-06-26T18:18:49.859 TempTest02 is updated for deviceConnect
2017-06-26T18:18:49.859 Function completed (Success, Id=e5c66a50-fe3f-4114-93fa-7f200ac2e6e2, Duration=154ms)

For a brief moment, we can see that the device is disconnected:

The device will be reconnected a few seconds later. But it proves my point.

Function 3: Bringing the pieces together; the watchdog clock

For the last time, we add another Azure Function. This last function is scheduled for every five minutes (using a Cron expression “0 */5 * * * *”).

And again, add the same input table and the same ‘project.json’ file for this last Azure Function.

We change the code to this new method (the actual notification using eg. Microsoft Flow or Azure Logic Apps is left out for simplicity):

#r "Microsoft.WindowsAzure.Storage"

using System;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

public static void Run(TimerInfo myTimer, CloudTable inputTable, TraceWriter log)
{
  log.Info($"IoTHub Timer trigger function executed at: {DateTime.Now}");

  var rangeQuery = inputTable.CreateQuery<TableDevice>()
                             .Where(p => p.PartitionKey == "Devices"
                                             && p.Enabled == true
                                             && (p.LastSeen < DateTime.Now.AddMinutes(-5)
                                                 ||
                                                 p.Connected == false));
  var devices = rangeQuery.ToList();
  log.Info($"Found Devices: {devices.Count}");

  foreach(var device in devices)
  {
    // DIY: Send notification message

    log.Info($"Warning sent for {device.RowKey} at {DateTime.Now}");

    device.WarningCount++;

    if(device.WarningCount > 3)
    {
      device.Enabled = false;
    }

    var operation = TableOperation.InsertOrMerge(device);
    var result = inputTable.Execute(operation);
  }
}

public class TableDevice : TableEntity
{
  public DateTime LastSeen {get;set;}
  public bool Connected {get;set;}
  public bool Enabled {get; set;}
  public int WarningCount {get; set;}
}

So we list all devices which have not sent telemetry for the last five minutes or are not currently connected.

Then, for each device, we decide if we have to send another notification (we just stop after a few messages, not to annoy our customer).

The output generated looks like this:

2017-06-26T18:29:17 Welcome, you are now connected to log-streaming service.
2017-06-26T18:30:00.282 Function started (Id=dcb41b01-7a66-4d8c-ace7-91df252147f8)
2017-06-26T18:30:00.282 IoTHub Timer trigger function executed at: 6/26/2017 6:30:00 PM
2017-06-26T18:30:00.360 Found Devices: 1
2017-06-26T18:30:00.360 <strong>Warning sent for TempTest02</strong> at 6/26/2017 6:30:00 PM
2017-06-26T18:30:00.391 Function completed (Success, Id=dcb41b01-7a66-4d8c-ace7-91df252147f8, Duration=100ms)
2017-06-26T18:31:17 No new trace in the past 1 min(s).
2017-06-26T18:32:17 No new trace in the past 2 min(s).
2017-06-26T18:33:17 No new trace in the past 3 min(s).
2017-06-26T18:34:17 No new trace in the past 4 min(s).
2017-06-26T18:35:00.165 Function started (Id=24d58736-06b5-4f84-877f-28c0dd0de164)
2017-06-26T18:35:00.165 IoTHub Timer trigger function executed at: 6/26/2017 6:35:00 PM
2017-06-26T18:35:11.629 Found Devices: 1
2017-06-26T18:35:11.629 <strong>Warning sent for TempTest02</strong> at 6/26/2017 6:35:11 PM
2017-06-26T18:35:19.281 Function completed (Success, Id=24d58736-06b5-4f84-877f-28c0dd0de164, Duration=19127ms)
2017-06-26T18:37:17 No new trace in the past 1 min(s).

Now let’s check the Azure Table Storage. The counter is already incremented:

And after a few more notifications the device is disabled:

Is this the last message we have seen for this device? No, the device will be enabled again when new telemetry arrives (check Function one)!

“I get this exception when I try to use a more recent Azure Storage NuGet package”

You are not alone. I got this exception: “Can’t bind Table to type ‘Microsoft.WindowsAzure.Storage.Table.CloudTable’.” I got it when I tried to upgrade to an Azure Storage NuGet package with version 8 or higher.

The reason is unknown. Here is suggested that the ‘runtime’ is expecting that version 7.2.1.

Either way, for me it’s fine to select this older NuGet version. The functions behave as expected.

Conclusion

With just a few Azure Functions, we have built a pretty solid Watchdog for Azure an IoT Hub. There is always room for improvements. It could be built even more smart if it would ‘bark’ once in a while so we know the watchdog itself is alive too! That’s a good move to check the availability of all logic parts.

 

Advertenties