The output sink format of a Stream Analytics job matters!

When you look at examples of Stream Analytics queries usage, most examples are pretty straightforward. These work with simple queries which return single line output.

For example, a query like:

SELECT
  *
INTO
  huboutput
FROM
  hubinput timestamp by EventProcessedUtcTime

… will return a line like:

{"cycle":4, "errorcode"=1, "deviceid":"MachineCyclesDemo"}

In an Azure Function, this will arrive as:

2016-12-19T15:15:07.045 Function started (Id=e58ea9ec-fd8d-469e-bd9c-ea027ce2dbb4)
2016-12-19T15:15:07.690 Messages arrived: {"cycle":4, "errorcode"=1, "deviceid":"MachineCyclesDemo"}
2016-12-19T15:15:07.690 Function completed (Success, Id=e58ea9ec-fd8d-469e-bd9c-ea027ce2dbb4)

But when the query is a bit more complicated, like grouping with a time interval:

SELECT 
  Count(errorCode),
  IoTHub.ConnectionDeviceId as deviceId
INTO
  arrayoutput
FROM
  hubinput timestamp by EventProcessedUtcTime
WHERE
  errorCode <> 0
GROUP BY IoTHub.ConnectionDeviceId, TumblingWindow(Duration(minute, 1))
HAVING Count(errorCode) > 1 

Then the format of the messages returned can be unexpected.

For example, when the query above is combined with an event hub and an Azure function, deserializing the JSON output to a .Net class can give an exception.

This Azure Function code:

using System;
using Microsoft.Azure.Devices;
using System.Text;
using Newtonsoft.Json;
public static void Run(string myEventHubMessage, TraceWriter log)
{
  log.Info($"Messages arrived: {myEventHubMessage}");
  try
  {
    var message = JsonConvert.DeserializeObject<StreamAnalyticsCommand>(myEventHubMessage);
    // do something...
  }
  catch (Exception ex)
  {
    log.Warning($"Error {ex.Message}");
  }
}

public class StreamAnalyticsCommand
{
  public string deviceid {get; set;}
  public int count {get; set;}
}

this will result in an exception when deserializing:

2016-12-19T15:16:08.945 Function started (Id=0efe8735-3bb0-4a3c-a0fb-0f5809dc96e6)
2016-12-19T15:16:09.560 Messages arrived: {"count":4,"deviceid":"MachineCyclesDemo"}
{"count":3,"deviceid":"00000000804A4162"}
2016-12-19T15:16:09.575 Error Additional text encountered after finished reading JSON content: {. Path '', line 2, position 0.
2016-12-19T15:16:09.575 Function completed (Success, Id=0efe8735-3bb0-4a3c-a0fb-0f5809dc96e6)

We get an error that the deserializing the message to a class results in unexpected additional text. Wut? I have added the actual output just above the error. And what we see is TWO message separated by a ‘newline’.

Wut?

There is more data returned than what I need for the class?

I have added the actual output just above the error. And what we see is TWO message separated by a ‘newline’.

So there are two messages returned!?!

Why is that?

This is because the ‘group by’ results in multiple messages put together (there are multiple devices which produce telemetry in the same interval). And when the output sink was created, we used the default settings:

a02

This output sink generated JSON messages (which is great) but multiple messages are separated by the line format. Yes, multiple messages are separated by a ‘new line’ or #13 or ‘return’.

We can fix this in a simple, not recommended, way:

var jsonMessages = myEventHubMessage.Split('\n');
log.Info($"{jsonMessages.Length} messages arrived.");
foreach (var jsonMessage in jsonMessages)
{
  log.Info($"Eventhub produced {jsonMessage.count}/{jsonMessage.deviceid}"); 
} 

We just split using the newline character and use a foreach, and from there business as usual. Right…

But there is a better, simple, solution. The JSON language supports arrays.

And the good news is that the format of a Stream Analytics job output array also supports arrays:

a01

So now we can deserialize in a correct way:

...
var jsonMessages = JsonConvert.DeserializeObject<StreamAnalyticsCommand[]>(myEventHubMessage);
log.Info($"{jsonMessages.Length} messages arrived.");
foreach (var jsonMessage in jsonMessages)
{
  log.Info($"Eventhub produced {jsonMessage.count}/{jsonMessage.deviceid}");
}

Do you see the array (the square brackets) notation? The deserialization will result in an array. This works just fine, even with only one message is passed:

2016-12-19T22:21:05.914 Function started (Id=8512a281-108a-43d0-bb31-b0fa8e9ab09b)
2016-12-19T22:21:05.914 Array Event Hub trigger function processed a message: [{"count":3,"deviceid":"00000000804A4162"}]
2016-12-19T22:21:05.914 1 messages arrived.
2016-12-19T22:21:05.914 Eventhub produced 3/00000000804A4162
2016-12-19T22:21:05.914 Function completed (Success, Id=8512a281-108a-43d0-bb31-b0fa8e9ab09b)

So next time, when you are expecting to receive JSON multiple lines in one message, select the non-default ‘array’ as the format.

Advertenties