Programmatically ingest data into Azure Data Explorer

What is the value of a database if adding data is hard?

Over the last year, I played around with ingesting data into Azure Data Explorer in multiple ways.

Most of the time, I used the managed pipelines, aka ADX Database data connections.

But every time, I find new ways to ingest data into database tables:

This time, we look at ingesting data into Azure Data Explorer using the SDKs.

This post is the eighth part of this Azure Data Explorer blog series:

There are many SDKs for Azure Data Explorer:

Today, we look at the C# SDK, made available as a NuGet package: Microsoft.Azure.Kusto.Ingest.

Notice that if this NuGet package is referenced, not only ingesting data but also querying using the Kusto Query language is available.

In this post, we are going to create a C# console application, capable of ingesting data from code and querying it afterward when it is to the ADX database table.

Because Azure Data Explorer uses AAD accounts to protect the database, we need an identity to connect.

AAD Application

Because we do not want to use the identity of a real person, we are going to use an AAD application.

Note: Using an AAD application makes it easy to follow this sample code. In more real-life solutions, consider managed identities.

We will name our AAD application ‘AdxClientApp’.

Just open the Azure console and execute this PowerShell CLI command:

Adding a new application identity is as simple as executing:

az ad sp create-for-rbac -n AdxClientApp

This will return the secrets behind the identity so store them:

{
  "appId": "[application-id]",
  "displayName": "AdxClientApp",
  "password": "[some-password]",
  "tenant": "[tenant-guid]"
}

Now we have an identity, give it enough access rights for Azure Data Explorer.

We want to both ingest and query ADX data so we need the correct ADX database permissions.

Here is a list of possible permissions:

We go for ‘Ingestor’ and ‘Viewer’.

Within the Query editor of Azure Data Explorer, execute these two separate lines (after filling in the correct AAD application identity secrets):

.add database sdktestdata viewers ('aadapp=[application-id];[tenant-guid]')

.add database sdktestdata ingestors ('aadapp=[application-id];[tenant-guid]')

Afterward, you will see the permissions are granted:

Update: Recently I noticed that Azure resources (like Stream Analytics Jobs or Azure Functions) can also be selected as principal for the Azure portal permission selection. These resources need to have assigned a managed identity though!

Next, let’s add a table to ingest and query.

Azure Data Explorer table

We add a table named ‘MyLogs’ in an ADX database:

.create table MyLogs ( Level:string, Timestamp:datetime, UserId:string, TraceId:string, Message:string, ProcessId:int32 )

This table will be subject to ingesting streaming data, so we need to enable this table policy:

.alter table MyLogs policy streamingingestion enable

Note: Executing the SDK method will throw an exception if this policy is not set.

As we will see later on, we are going to ingest data, streamed as JSON.

So, we need to add a JSON mapping named ‘JsonMapping’:

.create table MyLogs ingestion json mapping "JsonMapping"
'['
'{ "column" : "Level", "datatype" : "string", "Properties":{"Path":"$.Level"}},'
'{ "column" : "Timestamp", "datatype" : "datetime", "Properties":{"Path":"$.Timestamp"}},'
'{ "column" : "UserId", "datatype" : "string", "Properties":{"Path":"$.UserId"}},'
'{ "column" : "TraceId", "datatype" : "string", "Properties":{"Path":"$.TraceId"}},'
'{ "column" : "Message", "datatype" : "string", "Properties":{"Path":"$.Message"}},'
'{ "column" : "ProcessId", "datatype" : "int", "Properties":{"Path":"$.ProcessId"}}'
']'

We are almost ready to ingest data using the SDK.

Azure Data Explorer cluster URI

There is one more step, our console app needs to know the URI of the cluster.

There are two choices:

These URIs can be found on the overview page.

The URI to use depends on the way of ingestion: direct ingestion or queued ingestion.

The difference is explained here:

The alternative is using some queue and the related other URI:

Notice that the documentation says:

The Direct mode isn’t recommended for production grade ingestion solutions.

The advantages of the Direct mode include:

  • Low latency and no aggregation. However, low latency can also be achieved with Queued ingestion
  • When synchronous methods are used, method completion indicates the end of the ingestion operation

The disadvantages of the Direct mode include:

  • The client code must implement any retry or error-handling logic
  • Ingestions are impossible when the Kusto Engine service is unavailable
  • The client code might overwhelm the Kusto Engine service with ingestion requests since it isn’t aware of the Engine service capacity

So using the ‘direct ingestion’ solution, we must everything ourselves (eg. coping with throttling) but it gives us full control.

First, we start with a console app and reference the NuGet package:

<PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="11.2.2" />

From there, we need to create a client connection for accessing Azure Data Explorer.

The AAD App identity credentials are used:

var kcsb = new KustoConnectionStringBuilder(@"https://testweuadx.westeurope.kusto.windows.net", "sdktestdata")
  .WithAadApplicationKeyAuthentication(
    applicationClientId: "[application-id]",
    applicationKey: "[some-password]",
    authority: "[tenant-guid]");

Notice, the URI for direct ingestion is selected:

This is because we demonstrate the direct mode first, the Data Ingestion URI will be used later on.

We are now ready to ingest data.

Ingesting data by code using Direct mode

We will look at both solutions.

We are going to use ‘direct ingestion’ first, let’s start writing some code.

Direct: KustoClientFactory.CreateCslStreamIngestClient (old solution)

We start with this solution based on the KustoClientFactory:

using (var ingestClient = KustoClientFactory.CreateCslStreamIngestClient(kcsb))
{
  var myLogs = new List<MyLog> 
  { 
    new MyLog { Level = "1", Timestamp = DateTime.UtcNow, UserId="u1", TraceId = "t1", Message="one", ProcessId = 42 },
    new MyLog { Level = "2", Timestamp = DateTime.UtcNow, UserId="u2", TraceId = "t2", Message="two", ProcessId = 43 }
  };

  var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

  Console.WriteLine($"Ingest JSON: '{jsonText}'");

  using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

  var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };

  var x = ingestClient.ExecuteStreamIngestAsync("sdktestdata", "MyLogs", memoryStream, clientRequestProperties, DataSourceFormat.multijson, false, "JsonMapping", false).Result;
}

Within the context of that ingest client, a JSON message is constructed and sent.

Notice, the name of the database is both applied in the ingest call and in the connection string. The database can be left out in the connection string. Here, I fill them in at both places as references.

Notice also how a JSON string is constructed from the list of MyLog objects:

public class MyLog
{
    public string Level { get; set; }
    public DateTime Timestamp { get; set; }
    public string UserId { get; set; }
    public string TraceId { get; set; }
    public string Message { get; set; }
    public int ProcessId { get; set; }
}

The JSON text is then put in a memory stream:

Then, notice that the call is enriched with a number of parameters:

  • Each call to Azure Data Explorer should get its own clientRequestProperties Guid for traceability
  • The database name ‘sdktestdata’ and table name ‘MyLogs’ are added
  • The DataSourceFormat is set to ‘multijson’. I got an awkward exception when I tried ‘JSON’
  • I also tell which mapping is used for JSON messages: ‘JsonMapping’

Once the code is executed, the returned response is not making sense:

Note: I could not find good documentation regarding the ‘ExecuteStreamIngestAsync’ command. This GitHub ‘azure-kusto-samples-dotnet’ page does not help that much either.

Anyway, the rows are added to our table:

So, we are successful at adding data by code.

While writing the blog post, it occurred to me that the documentation references other clients, including a ‘DirectIngestClient’)’.

So I looked up the namespaces and libraries and found out the KustoClientFactory is part of ‘Kusto.Data.Net.Client’ and is coming from the “Kusto.Data” library, not the “Ingest” library (which depends on the first library):

So, I repeated the same attempt with the new (and improved?) logic.

Direct: KustoIngestFactory.CreateDirectIngestClient (new solution)

We ingest now using the DirectIngestClient, created by the KustoIngestFactory.

The stream is exactly the same apart from some text to identify this call:

using (var directClient = KustoIngestFactory.CreateDirectIngestClient(kcsb))
{
    var myLogs = new List<MyLog>
        {
            new MyLog { Level = "3", Timestamp = DateTime.UtcNow, UserId="u3", TraceId = "t3", Message="dc one", ProcessId = 44 },
            new MyLog { Level = "4", Timestamp = DateTime.UtcNow, UserId="u4", TraceId = "t4", Message="dc two", ProcessId = 45 }
        };

    var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

    Console.WriteLine($"Ingest JSON: '{jsonText}'");

    using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

    var ingestionMapping = new IngestionMapping { IngestionMappingKind = Kusto.Data.Ingestion.IngestionMappingKind.Json, IngestionMappingReference = "JsonMapping" };

    var ingestionProperties = new KustoIngestionProperties { DatabaseName = "sdktestdata", TableName = "MyLogs", IngestionMapping = ingestionMapping, Format = DataSourceFormat.multijson };

    try
    {
        var result = directClient.IngestFromStream(memoryStream, ingestionProperties);
        Console.WriteLine($"{result.GetIngestionStatusCollection().First().Status}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }
}

The client now knows about the database, table, kind of data, and mapping.

Note: There are more settings, but it depends a bit on which kind of source you use like a storage account of a data reader. Please check them out.

The call returns a ‘succeeded’ status.

The result is as expected, two rows more:

I recommend using this new solution because it is on par with the documentation.

Ingesting data by code using Queue mode

I went for the ‘QueuedIngestClient’.

Queued: KustoIngestFactory.CreateQueuedIngestClient

The construction offers the possibility to add queue options:

I just went for the default options.

The code for queued ingest is somewhat similar to the code for direct ingest:

using (var queuedClient = KustoIngestFactory.CreateQueuedIngestClient(kcsb))
{
    var myLogs = new List<MyLog>
            {
                new MyLog { Level = "5", Timestamp = DateTime.UtcNow, UserId="u5", TraceId = "t5", Message="qc one", ProcessId = 46 },
                new MyLog { Level = "6", Timestamp = DateTime.UtcNow, UserId="u6", TraceId = "t6", Message="qc two", ProcessId = 47 }
            };

    var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

    Console.WriteLine($"Ingest JSON: '{jsonText}'");

    using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

    var ingestionMapping = new IngestionMapping { IngestionMappingKind = Kusto.Data.Ingestion.IngestionMappingKind.Json, IngestionMappingReference = "JsonMapping" };

    var ingestionProperties = new KustoIngestionProperties { DatabaseName = "sdktestdata", TableName = "MyLogs", IngestionMapping = ingestionMapping, Format = DataSourceFormat.multijson };

    try
    {
        var result = queuedClient.IngestFromStream(memoryStream, ingestionProperties);

        Console.WriteLine($"{result.GetIngestionStatusCollection().First().Status}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }
}

Notice, I reused the same URI endpoint.

So when I run this, I get an exception when the ingest takes place:

This is a queued ingest, so it tells us explicitly to use the ingest endpoint, there is even an endpoint suggestion!

So we need this one:

If we use that endpoint, we get a message that our data is ingested into a queue:

Within the query editor of Azure Data Explorer, the data is seen as being ingested within a few seconds up to five minutes based on the batching queue:

That was simple…

So, it’s up to you regarding ingesting data directly or queued!

It’s important to understand this ‘queue’ is something managed by Azure Data Explorer self.

We do not need to add extra Azure resources like Azure Storage queue or Azure Event Hub.

Ingestion failures?

So, messages are queued and picked up by the ADX engine to store in tables.

What happens if this fails? What if we enter the wrong table name, or mapping name, or the rights are insufficient?

The failures are stored for up to fourteen days and are queryable:

.show ingestion failures

This will tell in detail what went wrong.

Exceptions

During my experiments, I encountered a number of exceptions while testing the code.

Here are some of the common mistakes and the related exceptions.

The table does not support streaming data

It is expected the table supports streaming data.

In this case, I explicitly tested this by disabling this policy:

.alter table MyLogs policy streamingingestion disable

Note: changing the policy for the table can take a moment before it is applied to the table.

This results in the following exception when I try to ingest data:

One or more errors occurred. (Request is invalid and cannot be processed: {
    "error": {
        "code": "BadRequest",
        "message": "Request is invalid and cannot be executed.",
        "@type": "Kusto.DataNode.Exceptions.StreamingIngestionRequestException",
        "@message": "Bad streaming ingestion request to sdktestdata.MyLogs : Streaming ingestion policy is not enabled for the table",
        "@context": {
            "timestamp": "2023-03-23T12:12:28.3392842Z",
            "serviceAlias": "TESTWEUADX",
            "machineName": "KEngine000000",
            "processName": "Kusto.WinSvc.Svc",
            "processId": 7712,
            "threadId": 6076,
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2",
            "subActivityId": "XYZ2",
            "activityType": "GW.Http.CallContext",
            "parentActivityId": "XYZ2",
            "activityStack": "(Activity stack: CRID=XYZ3 ARID=XYZ2 > GW.Http.CallContext/XYZ2 > GW.Http.CallContext/XYZ2)"
        },
        "@permanent": true
    }
})

The exception actually tells the policy needs to be set.

Note: this is tested with direct mode.

Wrong DataSourceFormat

The format I need is ‘multijson’ but first I tried both ‘json’ and ‘singlejson’.

For both situations, the exception I got was:

One or more errors occurred. (Request is invalid and cannot be processed: {
    "error": {
        "code": "BadRequest",
        "message": "Request is invalid and cannot be executed.",
        "@type": "Kusto.DataNode.Exceptions.StreamingIngestionRequestException",
        "@message": "Bad streaming ingestion request to sdktestdata.MyLogs : The input stream is empty after processing, tip:check stream validity",
        "@context": {
            "timestamp": "2023-03-21T20:02:28.2986078Z",
            "serviceAlias": "TESTWEUADX",
            "machineName": "KEngine000000",
            "processName": "Kusto.WinSvc.Svc",
            "processId": 7712,
            "threadId": 3724,
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2",
            "subActivityId": "XYZ3",
            "activityType": "GW.Http.CallContext",
            "parentActivityId": "XYZ4",
            "activityStack": "(Activity stack: CRID=XYZ5 ARID=XYZ6 > GW.Http.CallContext/XYZ7 > GW.Http.CallContext/XYZ8)"
        },
        "@permanent": true
    }
})

It’s hard to relate this to the wrong data source format choice.

Note: this is tested with direct mode.

Wrong ‘Data Ingestion URI’

When I entered the Data Ingestion URI instead of the regular ADX URI, I got this exception:

One or more errors occurred. (NotFound (404-NotFound): {
    "error": {
        "code": "Not found",
        "message": "There is no resource at this URI path: '/v1/rest/ingest/sdktestdata/MyLogs'",
        "@context": {
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2"
        }
    }
}. This normally represents a permanent error, and retrying is unlikely to help.
Error details:
DataSource='https://ingest-testweuadx.westeurope.kusto.windows.net/v1/rest/ingest/sdktestdata/MyLogs?streamFormat=multijson&mappingName=JsonMapping',
DatabaseName='sdktestdata',
ClientRequestId='XYZ3',
ActivityId='XYZ4,
Timestamp='2023-03-21T20:19:21.8810286Z'.)

In this exception text, there is some reference shown to the URI.

Overall: When implementing ingest code, please put some effort into handling other exceptions too. Also, take throttling (KustoRequestThrottledException) into account with a correct retry mechanism (wait for four seconds).

Note: this is tested with direct mode.

Querying data by code

Now we know how to add data to the database, let’s concentrate on querying the same data.

First, we count the number of rows in the table.

We expect a number as a result of the call:

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kcsb))
{
    var query = $"MyLogs | count";
    var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };
    var results = cslQueryProvider.ExecuteQuery<long>("sdktestdata", query, clientRequestProperties);        
    Console.WriteLine($"Count: {results.Single()}");
}

This returns the right amount of rows:

Note: this functionality was tested when only two rows were in the table.

But can we query for the actual rows too?

ExecuteQuery

The short answer is that we are able to query table rows too.

Take a look at this query:

using (var queryProvider = KustoClientFactory.CreateCslQueryProvider(kcsb))
{
    var query = "MyLogs | take 2 | project Level, Timestamp, UserId, TraceId, Message, ProcessId";

    var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };

    using (var reader = queryProvider.ExecuteQuery("sdktestdata", query, clientRequestProperties))
    {
        var myLogs = DataReaderMapToList<MyLog>(reader);

        foreach (var log in myLogs)
        {
            Console.WriteLine($"Level = {log.Level}, Timestamp = {log.Timestamp}, UserId = {log.UserId}, TraceId = {log.TraceId}, Message = {log.Message}, ProcessId = {log.ProcessId}");
        }
    }
}

This returns the two rows:

But it’s not easy to turn the response into that list of MyLog objects:

The response is that KustJsonDataStreamReader variable.

If you dig deeper, you will see it contains JSON, describing datasets…

I used this helper method (found here) to turn the reader data into objects:

public static List<T> DataReaderMapToList<T>(IDataReader dr)
{
    var list = new List<T>();

    while (dr.Read())
    {
        var obj = Activator.CreateInstance<T>();

        foreach (PropertyInfo prop in obj.GetType().GetProperties())
        {
            if (!object.Equals(dr[prop.Name], DBNull.Value))
            {
                prop.SetValue(obj, dr[prop.Name], null);
            }
        }

        list.Add(obj);
    }

    return list;
}

But there is more information available in the response; There are multiple data tables in that reader.

Here is a snippet of the returned JSON:

It seems the statistics are made available too.

ExecuteQueryV2Async

The library also offers this ‘ExecuteQueryV2Async’ method.

I found this GitHub example using this specific method together with a number of helper methods to access the response:

var query = "MyLogs | take 2 | project Level, Timestamp, UserId, TraceId, Message, ProcessId";
var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };
clientRequestProperties.SetOption(ClientRequestProperties.OptionResultsProgressiveEnabled, true);
var result = queryProvider.ExecuteQueryV2Async("sdktestdata", query, clientRequestProperties);
WriteResultsToConsole(result);

It seems it does not the rows of the table but the statistics behind the same query are shown:

I’m not sure what the quality of this extra method is. Hopefully, extra documentation will follow.

Bonus, managed identities

In a more real-life situation, the used Identity is not the demonstrated AAD application.

Another way to connect is by using Azure-managed identities:

  • You don’t need to manage credentials. Credentials aren’t even accessible to you.
  • You can use managed identities to authenticate to any resource that supports Azure AD authentication, including your own applications.
  • Managed identities can be used at no extra cost.

For example, the code as seen above is executed in an Azure Function.

The Azure Function is exposing a managed identity:

az functionapp identity assign -g <resource group name> -n <azure function app name> --query principalId -o tsv

If you run this, you get a GUID as the response.

Our Azure Data Explorer database needs to grant this Azure Function some access.

This can be achieved using the Azure portal.

Just provide the GUID and the function is found:

The Function is provided both Viewer and Ingestor rights:

Now, just use another method for creating an ADX connection:

var kcsb = new KustoConnectionStringBuilder(@"https://testweuadx.westeurope.kusto.windows.net", "sdktestdata")
.WithAadSystemManagedIdentity();

As you see, no credentials are needed from the developer’s point of view. No credentials are saved.

Note: It is recommended not to use the deprecated version supporting a managed identity:

Once this code is executed, managed identity credentials are exchanged between the Azure function (the host of this code) and Azure Data Explorer, and the roles are verified.

This is a great solution for better and safer Azure identity management.

You can not use this solution while debugging your code locally.

for that, the Azure CLI helps you:

var kcsb = new KustoConnectionStringBuilder(@"https://[CLUSTER].kusto.windows.net", "[DATABASE]")
  .WithAadAzCliAuthentication(true);

Here, the code will try to use the locally installed AZ CLI to log in:

I passed ‘true’ for interactive communication.

Note: If you have not installed the Azure AZ CLI yet (you see this ‘Az-Cli is either not installed or not found’ in the exception output of the source code), install it from here. Do not forget to restart Visual Studio before the next attempt. It needs to read the updated Path environment variable so it knows where to find the AZ CLI on the file system.

Conclusion

Yes, we can both write data to Azure Data Explorer first and then query the Azure Data Explorer tables in the database too.

However, the functionality available in the NuGet libraries (Data library and Ingest library) is not really well documented.

Here, the use of both ‘Direct Ingestion’ and ‘Queued Ingestion’ are shown. Microsoft recommends queued ingestion and that is okay. You then only see the data is queued inside Azure Data Explorer, though.

Direct ingestion is not recommended for production but it is a working solution and gives you a direct response.

It also took some effort to test out different ways to authenticate the access. In this example, I demonstrated, AAD Application, AZ CLI, and Managed identities.

Please read the documentation if you want to go a step further.

Advertentie