Creating an Azure Stream Analytics job using VS Code

Azure Stream Analytics is often the centerpiece of our IoT solutions.

It acts like a rule engine where data streams from multiple sources can be combined together, even enriched with static reference data.

Azure Stream Analytics does not come cheap if you only want to do some simple aggregations. For that, probably Azure Functions can help you out.

But, when it comes to more elaborate rules using multiple inputs, multiple outputs, time windowing, custom functions, and Machine learning integration, Azure Stream Analytics (and many more capabilities) should be your first choice:

Normally, I demonstrate Azure Stream Analytics using the Azure Portal.

There, it offers me a convenient browser experience where I can show how inputs, outputs, and user-defined functions are created. I can also copy/paste a (basic) query and demonstrate how it can be tested and run in a simple manner:

For people new to Azure Stream Analytics, this is a perfect starting point.

Still, this is for demonstration purposes only!

The Azure portal lacks (professional) abilities like source control/versioning, superior user-defined functions, and diagnostics.

If you plan to use Azure Stream Analytics in your projects, please consider starting using the VS Code project template.

In this blog, we will see how to start with the Visual Studio Code project for Stream Analytics.

Notice there is also a Visual Studio extension called Stream Analytics tools. This tooling is quite on par with the Visual Studio Code experience.

Note: you can even create a job using CLI, ARM, or Powershell.

The setup of this demonstration is simple. We ingest some telemetry from a device using an IoT Hub, we do some simple querying inside the job, and we send the output to an Azure Function. This way, we can see what’s happening with our data.

Note: the query we use is super simple, the flow is what we are interested in.

Ingesting telemetry

We ingest telemetry from an Advantech Wise 4012E:

This education version of the real Wise industrial-grade devices is a nice match for our demonstration.

This device can ingest data from two digital inputs (switches) and two analog inputs (potentiometers) and it can alter the position of two digital outputs (relays). These ‘sensors’ are already provided using a separate input board. It is normally controlled using the Modbus protocol on the local (WIFI) network but it can also send logging to various cloud services:

There was already Azure service support using the ‘classic’ Azure IoT Device SDK.

With the most recent firmware update, the support for MQTT is added. Even better, this MQTT can also be relayed over port 443 using web sockets:

The fact this device only needs port 443 to communicate with the outside world makes it much more friendly toward network administrators. Still, communication is secured using TLS.

Notice the UUID of the device must be on par with the device registration in the IoT Hub:

Also, the Wise 4012E must actually send data logs to the cloud (push notification using JSON):

You can also manipulate what data is logged:

In the end, here is an example of a message sent by the Wise 4012E (seen in the Azure IoT Explorer):

As you can see, both the switches (digital in) on the Wise are enabled and the analog values (analog in) are 4663 and 3895. The relays (digital out) are not enabled.

This device information will be used to ingest data into our Stream Analytics job.

IoT Hub default routing

Perhaps, you noticed already the IoT Explorer used this ‘explorer’ consumer group for IoT Hub communication.

Previously, this was the ‘$default’ group but I changed it to ‘explorer’. This prevents the IoT Hub from routing messages reserved for any service consuming the $default consumer group messages, to the IoT Explorer. Otherwise, both the explorer and that other service would struggle to get the same messages (ending in multiple errors).

By giving the explorer its own consumer group, the problem is solved. Actually, each service consuming IoT Hub messages should get its own consumer group.

Note: the same goes for an Event Hub.

So, a Stream Analytics job should have its own consumer group:

Therefore, I added that ‘asa’ consumer group. Remember this name, you will need this later on.

Setting up an Azure Function resource as output

I have chosen to send the output of the Stream Analytics job to an Azure Function.

Adding an Azure Function as output is a typical IoT scenario because, with an Azure Function, I can eg. trigger a direct method on an Azure IoT Device or change the device twin.

For this blog, I create this Azure Function.

Note: it is created using the portal. Still, the same rules apply for an Azure Function as for a Stream analytics job: it’s best to create it using a Visual Studio (Code) project template so the code is under version control.

This is the Azure Function App I created:

It’s just running on a consumer plan. And it only contains one HTTP triggered function I added:

I simplified the logic, it now just shows the JSON message received, in the logging output.

Note: In a more elaborate use case, the incoming message could be an alert that has to be turned into an action performed by the same device using a direct method, device twin change, etc.

Creating a Stream Analytics project in Visual Studio Code

Now, we have an input resource and an output resource defined.

So, we are able to start creating a Visual Studio Code project for our Stream Analytics Job. To do this, we need to install this extension in Visual Studio Code:

Once installed, we can create a new project.

Note: This extension works together with an Azure Subscription. At some point in this workflow, you will be asked to login into your Azure subscription. So, have the credentials ready if you have not connected your Azure subscription to Visual Studio Code yet!

If you press F1 (shortcut to the Command Palette) and filter on ‘asa’, you see all Stream Analytics related commands:

We select the creation command of a new project. So, create it and give it a name:

Select a folder:

A subfolder (as project) is created:

At this moment, only a default query is added, no inputs or outputs are defined yet. This will not work… yet.

So, Let’s add both an input and output, partially using the command palette dialog, partially using a very special configuration file.

Adding an input

There are multiple kinds of inputs available. We select an IoT Hub input:

We select this existing ASA script for the administration:

We want to select an existing IoT Hub using our subscription:

Note: If you want to add an IoT Hub living outside your current subscription, you probably want to provide the settings manually although the tooling supports adding multiple subscriptions…

Provide a unique logical name for your input (you are not entering the actual name of the IoT Hub yet):

This results in this special input file where we need to fill in the actual connection information:

This is an interactive document; the grey parts a clickable!

Here you add the actual IoT Hub connection, including the consumer group, serialization type, etc.

Note: This is on par with the Azure portal dialog when adding an IoT Hub.

First, click that grey “Select from your Subscriptions” and see how you are asked to select an existing IoT Hub from your current subscription.

We can even select the right consumer group from a drop-down list (remember I added that ‘asa’ for this purpose):

This is probably all you need to do: selecting that subscription, selecting that IoT Hub, and selecting that consumer group.

If you think you have submitted the right information, you can already preview incoming data on this input:

Wait for a minute or so and see the telemetry is shown (it’s smart to test if the IoT Hub is actually receiving messages using the IoT Explorer alongside):

You can play with the preview dialog and see how the messages arrived over time:

Or you can see the raw messages:

You can save a copy too (I use it later on). Notice the output is just a concatenation of the incoming messages. It needs some care to turn it into a correct JSON message file (not needed for now).

Yes, we have a successfully configured input.

Adding an output

While having in mind the previous flow of adding that input, we now add an output.

Adding an output sink works almost the same:

Here, I select adding an Azure function output.

I enter a logical name for the output and now this Azure function output configuration file is added:

I already selected the correct subscription, etc. I was offered a list of Function apps and I selected the right one. Because there is only one function available, it was automatically selected for me.

Save this file if needed.

There is no preview available.

Let’s fix the job query so we can test it using the new input and output.

Writing a job query

Go back to the query and enter the actual names of input and output instead of the dummy values.

As experienced in the Azure Portal, the Stream Analytics query editor has some knowledge about the query language and what to write at what location in the query. Here, it hints at the name of the output:

I just fill in both the input and output names:

From the command palette, I notice I can now both compile the project and run the project locally:

Compiling the project it is not that impressive:

It gives me the confirmation that the script will work though (as seen in the output)!

On the other hand, that local Run command (seen in the command palette) is super interesting:

I can test my job with both local input data and local output data or even with LIVE inputs and outputs. Stellar!

So, I go for the live input and local output option first.

I got some one-time-only dialogs regarding the runtime license…

… and a firewall:

I just accepted them both.

I also had to provide the starting moment of the stream, just like in the portal. I selected ‘Now’:

After that, the job runs and the ingest and egress work as expected. The experience is great.

I get these live counters on inputs and outputs:

Actually, a lot of information is provided:

I can even see the outputted data:

So, this is a far better experience than offered by the Azure portal.

Did I mention already this whole project can be put under version control?

Working with live input and live output

I stopped the local experience and moved over to the live output experience.

I started a new local run:

The job started successfully:

Then I experienced this exception:

[Error] 29/11/2021 15:05:55 : **System Exception** There is a problem outputting to Azure Functions. The message size limit of 256 bytes has been exceeded. Size of the message is 447 bytes. By default, the streaming service retries writing the event indefinitely until the write succeeds. Consider choosing Drop Output Error Policy to skip such errors, so an error wouldn't block the job progress.
[Error] 29/11/2021 15:05:55 :    at Microsoft.Streaming.Diagnostics.DataErrorPolicies.StopPolicy.ErrorAction(ErrorMessage message, Exception e)
at

There is something wrong with the message size…

It suggests changing the output error policy. It normally is set to “Retry”:

I changed it to “Drop” as seen in this documentation.

Note: below we will see this error can be fixed in another, better way. Please consider putting the original type of policy back afterward.

Then, after a restart, the Stream Analytics job keeps on running. Still, no output is sent to the Azure Function:

Note: these empty array messages are just messages sent by the output definition to test the Azure Function response (so the job knows the connection is ok).

Let’s take a better look at that exception. The exception message is telling me the output message size is too small.

So, I checked the Azure Function output settings. It has a property concerning this message size. I then changed the maximum batch size to a larger number:

When you press the related Edit caption, the dialog tells you why:

Note: this is strange. I’m not sure if 256 is in bytes or kilobytes. it seems to be bytes because the exception is very explicit… I have not encountered this while adding an Azure function output in the Azure Portal. Still, this is the actual problem and this is the solution.

Update: The product team has confirmed this is a typo. The default size should be 262,144 (real 256KB). This will be fixed in the next extension update. Then, everything is on par.

Update 2: This is fixed now. Get version 1.1.1 and you are good to go:

Now, the default setting and hint are fixed:

Now, the telemetry is arriving in the Azure Function as expected:

This Visual Studio Code experience gives me full control over what is happening inside my job.

Using a local input

I tried the local input too.

As you remember, I was able to record and save live data sent by the IoT Hub. I was hoping to use that file (I accept the possible modification if needed).

When I just start the local run using a local input and local output, it’s not directly clear how to reference that file. Because when I just start it, I get this exception:

Where do I define that file?

It starts with that input.json file which is already there:

It’s not an actual file we need to modify, it’s part of the tooling.

There, you can start a dialog to add a local input or a live input:

We chose local input.

Using that dialog, you can add a new local input or manage a local input for an already existing (live) input:

I added a local input file representing a live ‘IoTHub1’ input:

A new file is added name ‘Local_IoTHub1.json’ (it seems there is a naming convention):

Here, we can reference that actual data file with recorded IoT Hub message entries:

Notice we can preview the input data already. This is a great way to see if the recorded data is in the correct format:

Although the original file just contains separate JSON messages without comma separation (technically the file is not well-formatted in JSON) it still works.

The file is handled well:

So, let’s start the local run again, now with local input:

The execution is done within seconds. All (ten) entries are read from the local file.

And the output to a local output file representing the Azure Function (instead of the actual Azure Function) shows the right information:

Note: yes, that first line is formatted differently from the other lines. That’s ok.

Again, we are in full control of input, output, and query.

Notice all information needed to analyze the behavior of the job run is recorded:

Because multiple developers could work on the job, you have to decide if this output should be part of version control… Please leave it out 🙂

User Defined Functions

We have seen how we can create a Stream Analytics project and test it both with local input and output files and with live connections to Azure resources.

The command palette also offers commands to add Machine learning functions, JavaScript User Defined Functions, and JavaScript User Defined aggregations.

This is offered in the Azure Portal already.

What is NOT offered in the Azure Portal is the support for functions written in C#.

I added one using the command palette:

As seen in this documentation (which references the Visual Studio extension), the function must be public and static, written in .Net Standard project format.

So, in the root folder of this Stream Analytics project (not within the ASA project folder itself), I created this C# library in the MyLibrary folder:

dotnet new classlib --framework "netstandard2.0" -o MyLibrary

This extra project folder is then created next to the Stream Analytics folder:

A C# library project file with one (empty) class is created. I updated the class by adding this function:

using System;

namespace MyLibrary
{
    public class Class1
    {
        public static Int64 SquareFunction(Int64 a)
        {
            return a * a;
        }
    }
}

I then built the project on the command prompt:

Now, the C# function can offer us superior complexity (opposite to the current calculation 🙂 ). It can also be tested using unit tests so it’s much more reliable over time.

I now reopened the ASA project and went to the UDF dialog.

I selected the .csproj project file of the library using the available dialog (choose library project path):

From there, I am also able to select the class and method!

Yes, all other parameters are added too:

Now, let’s consume the function inside the ASA query:

SELECT
	*,
	ai1,
	udf.CSharpUDF1(ai1) as square
INTO
	AzureFunctions1
FROM
	IoTHub1
where ai1 is not NULL

Note: The function name is the logical name, not the actual C# function name inside the class. the ‘udf.[function name]’ is a convention.

Let’s run it locally again. This performs as expected:

Note: the first line of the original input file recording lacked the ai1 value, therefor the ‘where’ clause skips that line in the output.

Note: do not forget to put this C# project under version control too!

Because we are using custom code, compiled into a library, we have to supply a cloud storage account so the ASA job can access the code while running in the cloud.

Check out the specific section in the job configuration file:

You have to supply an existing storage account but you can add a new container (a container name is written in lowercase; check the dialog for unsupported characters):

Adding this Azure storage account location will be required in the next step.

Deployment to the cloud

The same C# function also works in conjunction with live input and outputs. Check the Azure Function logging in the portal:

I think the query we created is feature complete. 🙂

Let’s try to deploy this query to a Stream Analytics job running in the Azure cloud.

Select the ‘Submit to Azure’ command from the command palette while the .asaql query job file is opened:

Within the Azure subscription, all Azure Stream Analytics jobs are shown:

So, this command can be used either for creating a new job or updating an existing job. The second time the job is executed, this question is not asked again.

Create a new job in the Azure portal:

Select the right (already existing) resource group:

Select standard environment:

Select the right region:

A confirmation is needed:

If you get this message…

… you skipped the custom code storage configuration. Go back to the previous paragraph.

If deployment succeeds, you will see this message:

Check out the Azure portal for the creation of the new Stream Analytics job. There it is already:

The job has not started yet. This can be done in the cloud… or it can be done from Visual Studio Code. Let’s start it using Visual Studio Code:

An extra dialog appears when you start it from VS Code:

For a simple query like this, one ‘resource’ is more than enough: change it from 3 to 1. This is actually the number of Streaming units.

Hit ‘OK’. The job will be deployed and start running in the Azure cloud.

This time, starting the job takes more time compared to running the service locally.

Once the service is started, we see the incoming messages appear in the Azure Function:

In the Azure portal, the job is running as expected.

Notice the Azure Stream Analytics job is not made ‘read-only’. You could potentially alter it within the portal.

The only thing that really changed compared to a ‘regular’ job is the use of this C# function.

The Azure Portal is not able to handle that function so testing the function will result in a warning:

Apart from that, the job just runs as expected in the portal.

Back in Visual Studio code, we get a great overview of job diagnostics from the cloud:

Remember, the job is running in the cloud while the live diagnostics are available on our desktop.

Recently, the same diagnostics are made available in the Azure portal too:

To see these within the Azure portal, in the menu, go to the Support + Troubleshooting | Job Diagram pane.

Note: this diagnostics view is still in preview.

What about existing projects?

If you have created a beautiful project already in the portal, you are not lost.

Using the Azure subscription dialog in Visual Studio Code, you can download the existing project:

This project just contains the Azure portal equivalent of your job.

Please put it under version control and test each change a couple of times before you just download it, change it and deploy it back again over the existing job.

Conclusion

We have seen how a Stream Analytics job can be created, tested, and deployed using Visual Studio Code.

This has a number of big advantages like version control, more control over input stream test sets, more diagnostics, and even more advanced custom functions written in the C# language.

The experience is superior to the Azure portal. Once you use this project, you will discover other shortcuts to fill in the appropriate values and templates.

Please use the Azure portal from now on just for demonstration purposes.

If you want to take the next step and put the Azure Stream Analytics job part of your automation pipeline, check out the related documentation.