Combining Azure StreamAnalytics with Azure Function Sink

There are many ways to collect data from an IoTHub and pass it through Azure Functions:

  • Trigger an Azure Function directly by the IoTHub events or operations monitoring
  • Trigger an Azure Function using an EventHub or message bus as endpoints of IoTHub routing
  • Trigger an Azure Function using an EventHub as output sink of a Stream Analytics job

All these solutions serve their own purpose.

But the last one, using an EventHub, can be pretty annoying. Yes, this is a great way if you will use the security policies and/or consumer groups of the Eventhub. But otherwise, there is a lot of administration.

Let’s check out how life is becoming much easier with a new Azure functionality which is still in public preview called Azure Function Sink.

The old way

Previously, when you wanted to combine an Azure Function and StreamAnalytics, you first had to construct both an EventHub namespace (recently renamed to EventHub) as the ‘container’ and the EventHub inside:

 This is a lot of administration, just to access an Azure Function.

But recently, Microsoft added a new output sink in Stream Analytics: Azure Function (in public preview).

This makes it all a bit simpler:

An example

Documentation is poor, but from the picture on this blog, it seems to be done with an HttpTrigger. So Let’s first create this HttpTrigger in Azure Functions:

You only have to pass a unique name and the security level. We will see that the security will be handled automatically.

Once it’s created, we have to fix the automatically generated code:

using System.Net;
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
  string content = await req.Content.ReadAsStringAsync();
  log.Info("C# HTTP trigger function processed a request: " + content);
  return req.CreateResponse(HttpStatusCode.OK, "Executed");
}

The standard code assumes too much and so the execution will fail when the Azure Function is tested as output sink by the Stream Analytics job. Do not forget to save it.

Now, switch over to the StreamAnalytics job.

Add a new output sink like this:

Just give it a name and then reference your newly created Azure Function.

Write a StreamAnalytics query which consumes an IoTHub:

SELECT
  *
INTO
  functionsink
FROM
  hubinput

And you are now ready to start the StreamAnalytics query and ingest data from the IoTHub.

Getting information

While the Stream Analytics job is starting up, check out the logging of your Azure Function:

2017-10-19T07:22:36.661 Function started (Id=7070e681-3cb1-4a81-b34e-515b61efdc00)
2017-10-19T07:22:36.895 C# HTTP trigger function processed a request: []
2017-10-19T07:22:36.895 Function completed (Success, Id=7070e681-3cb1-4a81-b34e-515b61efdc00, Duration=236ms)

The newly create sink on the Stream Analytics job is tested by sending an empty array. Keep in mind that StreamAnalytics normally uses groups and therefore generates multiple records.

Once the StreamAnalytics job is started and the IoTHub receives data, we will see the data arrive at the Azure Function. This is a single line:

[{"temperature":19.0,"EventProcessedUtcTime":"2017-10-18T22:40:43.4113529Z","PartitionId":3,"EventEnqueuedUtcTime":"2017-10-18T22:40:42.599Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"Sensor01","ConnectionDeviceGenerationId":"636432344079829030","EnqueuedTime":"2017-10-18T22:40:42.219Z","StreamId":null}}]

And this is great! We get both the data (a temperature I send) and the context of time and device.

The data was sent using an IoT Edge SDK simulated device sending:

Receiving events...
19-10-2017 13:34:50> Device: [Sensor01], Data:[{"temperature": 20.00}]Properties:
'source': 'mapping'

So it’s good to see the context (device name and date/time) are preserved.

How to process this JSON message?

Ok, this message seems a bit much, I agree. But it’s actually the raw message from the device combined with information from the IoTHub. Let’s pick some values of it. After all, I am only interested in the device, the time and the data for now.

For that, I just want to convert the message into a dynamic. So I do not have to create a class which represents the message.

Keep in mind, we get an array of JSON items (hence the ‘[‘ and ‘]’).

So first, Add a new file to the Azure Function called ‘project.json’ which references a NuGet package:

{
 "frameworks": {
   "net46": {
     "dependencies": {
       "Newtonsoft.Json": "9.0.1"
     }
   }
 }
}

Now we have access to tooling to handle JSON.

Finally, we alter the same function:

using System.Net;

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
string content = await req.Content.ReadAsStringAsync();

log.Info("C# HTTP trigger function processed a request: " + content);

dynamic array = Newtonsoft.Json.JsonConvert.DeserializeObject(content);

foreach(var json in array)
{
log.Info($"Body: temperature={json.temperature}; datetime={json.EventProcessedUtcTime}; device={json.IoTHub.ConnectionDeviceId}");
}

return req.CreateResponse(HttpStatusCode.OK, "Executed");
}

I convert the content to a dynamic. And because it’s an array, I loop through it’s children.

So, after this code is made active, we get updated logging:

2017-10-19T11:40:43.958 Function started (Id=8e2720e4-c42c-406e-89c4-d2885edf8c51)
2017-10-19T11:40:43.958 C# HTTP trigger function processed a request: [{"temperature":27.0,"EventProcessedUtcTime":"2017-10-19T11:40:35.719549Z","PartitionId":1,"EventEnqueuedUtcTime":"2017-10-19T11:40:35.778Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"Sensor01","ConnectionDeviceGenerationId":"636439942101824621","EnqueuedTime":"2017-10-19T11:40:35.226Z","StreamId":null}}]
2017-10-19T11:40:43.958 Body: temperature=27; datetime=10/19/2017 11:40:35 AM; device=Sensor01
2017-10-19T11:40:43.958 Function completed (Success, Id=8e2720e4-c42c-406e-89c4-d2885edf8c51, Duration=1ms)

So here we see the data we want to see from the triggered function: de actual data, the timestamp and the device.

Conclusion

Yes, keep in mind this is just the public preview of the new Azure Function sink for Stream Analytics.

But this is very promising, it takes away a lot of ‘pain’ regarding the Eventhub stuff. And it’s good to see the IoTHub information is shared too.

It’s not clear to me why a HttpTrigger is used. Maybe other kinds of triggers will work too but I am hoping for a new StreamAnalytics trigger.

 

Advertentie