Programmatically ingest data into Azure Data Explorer

What is the value of a database if adding data is hard?

Over the last year, I played around with ingesting data into Azure Data Explorer in multiple ways.

Most of the time, I used the managed pipelines, aka ADX Database data connections.

But every time, I find new ways to ingest data into database tables:

This time, we look at ingesting data into Azure Data Explorer using the SDKs.

This post is the eighth part of this Azure Data Explorer blog series:

There are many SDKs for Azure Data Explorer:

Today, we look at the C# SDK, made available as a NuGet package: Microsoft.Azure.Kusto.Ingest.

Notice that if this NuGet package is referenced, not only ingesting data but also querying using the Kusto Query language is available.

In this post, we are going to create a C# console application, capable of ingesting data from code and querying it afterward when it is to the ADX database table.

Because Azure Data Explorer uses AAD accounts to protect the database, we need an identity to connect.

AAD Application

Because we do not want to use the identity of a real person, we are going to use an AAD application.

Note: Using an AAD application makes it easy to follow this sample code. In more real-life solutions, consider managed identities.

We will name our AAD application ‘AdxClientApp’.

Just open the Azure console and execute this PowerShell CLI command:

Adding a new application identity is as simple as executing:

az ad sp create-for-rbac -n AdxClientApp

This will return the secrets behind the identity so store them:

{
  "appId": "[application-id]",
  "displayName": "AdxClientApp",
  "password": "[some-password]",
  "tenant": "[tenant-guid]"
}

Now we have an identity, give it enough access rights for Azure Data Explorer.

We want to both ingest and query ADX data so we need the correct ADX database permissions.

Here is a list of possible permissions:

We go for ‘Ingestor’ and ‘Viewer’.

Within the Query editor of Azure Data Explorer, execute these two separate lines (after filling in the correct AAD application identity secrets):

.add database sdktestdata viewers ('aadapp=[application-id];[tenant-guid]')

.add database sdktestdata ingestors ('aadapp=[application-id];[tenant-guid]')

Afterward, you will see the permissions are granted:

Update: Recently I noticed that Azure resources (like Stream Analytics Jobs or Azure Functions) can also be selected as a principal for the Azure portal permission selection. These resources need to have assigned a managed identity though!

Next, let’s add a table to ingest and query.

Azure Data Explorer table

We add a table named ‘MyLogs’ in an ADX database:

.create table MyLogs ( Level:string, Timestamp:datetime, UserId:string, TraceId:string, Message:string, ProcessId:int32 )

This table will be subject to ingesting streaming data, so we need to enable this table policy:

.alter table MyLogs policy streamingingestion enable

Note: Executing the SDK method will throw an exception if this policy is not set.

As we will see later on, we are going to ingest data, streamed as JSON.

So, we need to add a JSON mapping named ‘JsonMapping’:

.create table MyLogs ingestion json mapping "JsonMapping"
'['
'{ "column" : "Level", "datatype" : "string", "Properties":{"Path":"$.Level"}},'
'{ "column" : "Timestamp", "datatype" : "datetime", "Properties":{"Path":"$.Timestamp"}},'
'{ "column" : "UserId", "datatype" : "string", "Properties":{"Path":"$.UserId"}},'
'{ "column" : "TraceId", "datatype" : "string", "Properties":{"Path":"$.TraceId"}},'
'{ "column" : "Message", "datatype" : "string", "Properties":{"Path":"$.Message"}},'
'{ "column" : "ProcessId", "datatype" : "int", "Properties":{"Path":"$.ProcessId"}}'
']'

We are almost ready to ingest data using the SDK.

Azure Data Explorer cluster URI

There is one more step, our console app needs to know the URI of the cluster.

There are two choices:

These URIs can be found on the overview page.

The URI to use depends on the way of ingestion: direct ingestion or queued ingestion.

The difference is explained here:

The alternative is using some queue and the related other URI:

Notice that the documentation says:

The Direct mode isn’t recommended for production grade ingestion solutions.

The advantages of the Direct mode include:

  • Low latency and no aggregation. However, low latency can also be achieved with Queued ingestion
  • When synchronous methods are used, method completion indicates the end of the ingestion operation

The disadvantages of the Direct mode include:

  • The client code must implement any retry or error-handling logic
  • Ingestions are impossible when the Kusto Engine service is unavailable
  • The client code might overwhelm the Kusto Engine service with ingestion requests since it isn’t aware of the Engine service capacity

So using the ‘direct ingestion’ solution, we must do everything ourselves (eg. coping with throttling) but it gives us full control.

First, we start with a console app and reference the NuGet package:

<PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="11.2.2" />

From there, we need to create a client connection for accessing Azure Data Explorer.

The AAD App identity credentials are used:

var kcsb = new KustoConnectionStringBuilder(@"https://testweuadx.westeurope.kusto.windows.net", "sdktestdata")
  .WithAadApplicationKeyAuthentication(
    applicationClientId: "[application-id]",
    applicationKey: "[some-password]",
    authority: "[tenant-guid]");

Notice, the URI for direct ingestion is selected:

This is because we demonstrate the direct mode first, the Data Ingestion URI will be used later on.

We are now ready to ingest data.

Ingesting data by code using Direct mode

We will look at both solutions.

We are going to use ‘direct ingestion’ first, let’s start writing some code.

Direct: KustoClientFactory.CreateCslStreamIngestClient (old solution)

We start with this solution based on the KustoClientFactory:

using (var ingestClient = KustoClientFactory.CreateCslStreamIngestClient(kcsb))
{
  var myLogs = new List<MyLog> 
  { 
    new MyLog { Level = "1", Timestamp = DateTime.UtcNow, UserId="u1", TraceId = "t1", Message="one", ProcessId = 42 },
    new MyLog { Level = "2", Timestamp = DateTime.UtcNow, UserId="u2", TraceId = "t2", Message="two", ProcessId = 43 }
  };

  var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

  Console.WriteLine($"Ingest JSON: '{jsonText}'");

  using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

  var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };

  var x = ingestClient.ExecuteStreamIngestAsync("sdktestdata", "MyLogs", memoryStream, clientRequestProperties, DataSourceFormat.multijson, false, "JsonMapping", false).Result;
}

Within the context of that ingest client, a JSON message is constructed and sent.

Notice, the name of the database is both applied in the ingest call and in the connection string. The database can be left out in the connection string. Here, I fill them in at both places as references.

Notice also how a JSON string is constructed from the list of MyLog objects:

public class MyLog
{
    public string Level { get; set; }
    public DateTime Timestamp { get; set; }
    public string UserId { get; set; }
    public string TraceId { get; set; }
    public string Message { get; set; }
    public int ProcessId { get; set; }
}

The JSON text is then put in a memory stream:

Then, notice that the call is enriched with a number of parameters:

  • Each call to Azure Data Explorer should get its own clientRequestProperties Guid for traceability
  • The database name ‘sdktestdata’ and table name ‘MyLogs’ are added
  • The DataSourceFormat is set to ‘multijson’. I got an awkward exception when I tried ‘JSON’
  • I also tell which mapping is used for JSON messages: ‘JsonMapping’

Once the code is executed, the returned response is not making sense:

Note: I could not find good documentation regarding the ‘ExecuteStreamIngestAsync’ command. This GitHub ‘azure-kusto-samples-dotnet’ page does not help that much either.

Anyway, the rows are added to our table:

So, we are successful at adding data by code.

While writing the blog post, it occurred to me that the documentation references other clients, including a ‘DirectIngestClient’)’.

So I looked up the namespaces and libraries and found out the KustoClientFactory is part of ‘Kusto.Data.Net.Client’ and is coming from the “Kusto.Data” library, not the “Ingest” library (which depends on the first library):

So, I repeated the same attempt with the new (and improved?) logic.

Direct: KustoIngestFactory.CreateDirectIngestClient (new solution)

We ingest now using the DirectIngestClient, created by the KustoIngestFactory.

The stream is exactly the same apart from some text to identify this call:

using (var directClient = KustoIngestFactory.CreateDirectIngestClient(kcsb))
{
    var myLogs = new List<MyLog>
        {
            new MyLog { Level = "3", Timestamp = DateTime.UtcNow, UserId="u3", TraceId = "t3", Message="dc one", ProcessId = 44 },
            new MyLog { Level = "4", Timestamp = DateTime.UtcNow, UserId="u4", TraceId = "t4", Message="dc two", ProcessId = 45 }
        };

    var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

    Console.WriteLine($"Ingest JSON: '{jsonText}'");

    using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

    var ingestionMapping = new IngestionMapping { IngestionMappingKind = Kusto.Data.Ingestion.IngestionMappingKind.Json, IngestionMappingReference = "JsonMapping" };

    var ingestionProperties = new KustoIngestionProperties { DatabaseName = "sdktestdata", TableName = "MyLogs", IngestionMapping = ingestionMapping, Format = DataSourceFormat.multijson };

    try
    {
        var result = directClient.IngestFromStream(memoryStream, ingestionProperties);
        Console.WriteLine($"{result.GetIngestionStatusCollection().First().Status}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }
}

The client now knows about the database, table, kind of data, and mapping.

Note: There are more settings, but it depends a bit on which kind of source you use like a storage account of a data reader. Please check them out.

The call returns a ‘succeeded’ status.

The result is as expected, two rows more:

I recommend using this new solution because it is on par with the documentation.

Ingesting data by code using Queue mode

I went for the ‘QueuedIngestClient’.

Queued: KustoIngestFactory.CreateQueuedIngestClient

The construction offers the possibility to add queue options:

I just went for the default options.

The code for queued ingest is somewhat similar to the code for direct ingest:

using (var queuedClient = KustoIngestFactory.CreateQueuedIngestClient(kcsb))
{
    var myLogs = new List<MyLog>
            {
                new MyLog { Level = "5", Timestamp = DateTime.UtcNow, UserId="u5", TraceId = "t5", Message="qc one", ProcessId = 46 },
                new MyLog { Level = "6", Timestamp = DateTime.UtcNow, UserId="u6", TraceId = "t6", Message="qc two", ProcessId = 47 }
            };

    var jsonText = JsonSerializer.Serialize(myLogs, new JsonSerializerOptions { });

    Console.WriteLine($"Ingest JSON: '{jsonText}'");

    using var memoryStream = new MemoryStream(Encoding.Default.GetBytes(jsonText));

    var ingestionMapping = new IngestionMapping { IngestionMappingKind = Kusto.Data.Ingestion.IngestionMappingKind.Json, IngestionMappingReference = "JsonMapping" };

    var ingestionProperties = new KustoIngestionProperties { DatabaseName = "sdktestdata", TableName = "MyLogs", IngestionMapping = ingestionMapping, Format = DataSourceFormat.multijson };

    try
    {
        var result = queuedClient.IngestFromStream(memoryStream, ingestionProperties);

        Console.WriteLine($"{result.GetIngestionStatusCollection().First().Status}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }
}

Notice, I reused the same URI endpoint.

So when I run this, I get an exception when the ingest takes place:

This is a queued ingest, so it tells us explicitly to use the ingest endpoint, there is even an endpoint suggestion!

So we need this one:

If we use that endpoint, we get a message that our data is ingested into a queue:

Within the query editor of Azure Data Explorer, the data is seen as being ingested within a few seconds or up to five minutes based on the batching queue:

That was simple…

So, it’s up to you regarding ingesting data directly or queued!

It’s important to understand this ‘queue’ is something managed by Azure Data Explorer self.

We do not need to add extra Azure resources like Azure Storage queue or Azure Event Hub.

Ingestion failures?

So, messages are queued and picked up by the ADX engine to store in tables.

If you have done everything right, the rows pop up a moment later and the ingestion is seen here:

.show commands | top 100 by StartedOn

You can even check each single table addition using the chards aka extents:

.show table MyLogs extents 

Each time a batch is ingested, an extent is created. You can even see how many rows are put inside each extent.

Azure Data Explorer optimizes the internal data storage periodically. These extents can be checked this way only for a short period, maybe minutes.

I noticed the ‘ExtentsRebuild’ operation within minutes using:

.show operations | top 100 by StartedOn

After that, extents were merged.

What happens if this fails? What if we enter the wrong table name, or mapping name, or the rights are insufficient?

The failures are stored for up to fourteen days and are queryable:

.show ingestion failures

This will tell in detail what went wrong.

Testing ingestion failures or successes using the SDK

If you want to check if queued batches are actually ingested by the database using the SDK, from the client, there is a workaround.

You can set the ingestion report level and method during the execution of queued ingestion, wait for a short period, and test the ingestion results.

Warning: This has a huge impact on ingestion performance so use this with care! Only use this reporting feature under diagnostics circumstances. Adding a delay in an Azure Function will increase the costs of that function because time is a cost factor.

We alter the ingestion properties a little bit by adding two extra properties:

var ingestionProperties =
	 new KustoQueuedIngestionProperties("database", "table")
	 {
		 IngestionMapping = ingestionMapping,
		 Format = DataSourceFormat.multijson,
		 AdditionalTags = new List<string> { $"additionalTags" }, 
		 IngestByTags = new List<string> { fileName }, 
		 IngestIfNotExists = new List<string> { fileName }, 
		 DropByTags = new List<string> { fileName },  
		 AdditionalProperties = new Dictionary<string, string> { { "creationTime", DateTime.UtcNow.AddDays(-1).ToUtcString() } } 
	 };

if (true)
{ 
	ingestionProperties.ReportLevel = IngestionReportLevel.FailuresAndSuccesses;
	ingestionProperties.ReportMethod = IngestionReportMethod.Queue;
}

Notice we now use the KustoQueuedIngestionProperties class. See also this example.

When you ingest the data and wait for a short moment, you could be able to check for failures and successes:

Here, we see a successful ingestion:

ingestionSuccesses.First()
{Kusto.Ingest.IngestionSuccess}
    Info: {OperationId: 36a4cabd-f54a-4698-8cd7-82e932c876e1
, Database: sdktestdata
, Table: MyLogs
, SucceededOn: 27/09/2023 18:19:18
, IngestionSourceId: a5d41ceb-1c39-4f32-97a0-daa404cc3e8a
, IngestionSourcePath: https://u44kstrldtestweuadx01.blob.core.windows.net/20230927-ingestdata-e5c334ee145d4b4-0/0927_Upload_a5d41ceb-1c39-4f32-97a0-daa404cc3e8a_d07b14aaeea14898887255c9c2da7803.multijson.gz
, RootActivityId: 00000000-0000-0000-0000-000000000000}

An error looks like this:

ingestionFailures.First()
{Kusto.Ingest.IngestionFailure}
    Info: {OperationId: ea3186d0-cdb9-4e5a-a246-c9c1b3fbe68f
, Database: sdktestdata
, Table: MyLogs
, FailedOn: 27/09/2023 18:37:24
, IngestionSourceId: 15d07077-f50b-4f05-a183-464804b630c6
, IngestionSourcePath: https://sa3kstrldtestweuadx00.blob.core.windows.net/20230927-ingestdata-e5c334ee145d4b4-0/6894_Upload_15d07077-f50b-4f05-a183-464804b630c6_752aaa7e36a84e6198e43c5fb10a1b48.multijson.gz
, Details: Another stream with the same ingest-by tag was already ingested
, ErrorCode: Skipped_IngestByTagAlreadyExists
, FailureStatus: Permanent
, RootActivityId: 00000000-0000-0000-0000-000000000000
, OriginatesFromUpdatePolicy: False
, ShouldRetry: False}

So, play with the methods:

var ingestionFailures = queuedClient.PeekTopIngestionFailuresAsync().Result;

var ingestionFailures2 = queuedClient.GetAndDiscardTopIngestionFailuresAsync().Result;

var ingestionSuccesses = queuedClient.GetAndDiscardTopIngestionSuccessesAsync().Result;

Again, this is not a simple check. It has a lot of impact on the performance of ingestion and your cluster!

Because we have to wait a few seconds (check out the ‘thread sleep’ in the code example) between putting the batch on the queue and checking the result (we have to give ADX some time to pick up the batch and handle it), this idle time is also measured as execution time for your Azure Function. This has a measurable impact on Azure Function costs too.

Exceptions

During my experiments, I encountered a number of exceptions while testing the code.

Here are some of the common mistakes and the related exceptions.

The table does not support streaming data

It is expected the table supports streaming data.

In this case, I explicitly tested this by disabling this policy:

.alter table MyLogs policy streamingingestion disable

Note: changing the policy for the table can take a moment before it is applied to the table.

This results in the following exception when I try to ingest data:

One or more errors occurred. (Request is invalid and cannot be processed: {
    "error": {
        "code": "BadRequest",
        "message": "Request is invalid and cannot be executed.",
        "@type": "Kusto.DataNode.Exceptions.StreamingIngestionRequestException",
        "@message": "Bad streaming ingestion request to sdktestdata.MyLogs : Streaming ingestion policy is not enabled for the table",
        "@context": {
            "timestamp": "2023-03-23T12:12:28.3392842Z",
            "serviceAlias": "TESTWEUADX",
            "machineName": "KEngine000000",
            "processName": "Kusto.WinSvc.Svc",
            "processId": 7712,
            "threadId": 6076,
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2",
            "subActivityId": "XYZ2",
            "activityType": "GW.Http.CallContext",
            "parentActivityId": "XYZ2",
            "activityStack": "(Activity stack: CRID=XYZ3 ARID=XYZ2 > GW.Http.CallContext/XYZ2 > GW.Http.CallContext/XYZ2)"
        },
        "@permanent": true
    }
})

The exception actually tells the policy needs to be set.

Note: this is tested with direct mode.

Wrong DataSourceFormat

The format I need is ‘multijson’ but first I tried both ‘json’ and ‘singlejson’.

For both situations, the exception I got was:

One or more errors occurred. (Request is invalid and cannot be processed: {
    "error": {
        "code": "BadRequest",
        "message": "Request is invalid and cannot be executed.",
        "@type": "Kusto.DataNode.Exceptions.StreamingIngestionRequestException",
        "@message": "Bad streaming ingestion request to sdktestdata.MyLogs : The input stream is empty after processing, tip:check stream validity",
        "@context": {
            "timestamp": "2023-03-21T20:02:28.2986078Z",
            "serviceAlias": "TESTWEUADX",
            "machineName": "KEngine000000",
            "processName": "Kusto.WinSvc.Svc",
            "processId": 7712,
            "threadId": 3724,
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2",
            "subActivityId": "XYZ3",
            "activityType": "GW.Http.CallContext",
            "parentActivityId": "XYZ4",
            "activityStack": "(Activity stack: CRID=XYZ5 ARID=XYZ6 > GW.Http.CallContext/XYZ7 > GW.Http.CallContext/XYZ8)"
        },
        "@permanent": true
    }
})

It’s hard to relate this to the wrong data source format choice.

Note: this is tested with direct mode.

Wrong ‘Data Ingestion URI’

When I entered the Data Ingestion URI instead of the regular ADX URI, I got this exception:

One or more errors occurred. (NotFound (404-NotFound): {
    "error": {
        "code": "Not found",
        "message": "There is no resource at this URI path: '/v1/rest/ingest/sdktestdata/MyLogs'",
        "@context": {
            "clientRequestId": "XYZ1",
            "activityId": "XYZ2"
        }
    }
}. This normally represents a permanent error, and retrying is unlikely to help.
Error details:
DataSource='https://ingest-testweuadx.westeurope.kusto.windows.net/v1/rest/ingest/sdktestdata/MyLogs?streamFormat=multijson&mappingName=JsonMapping',
DatabaseName='sdktestdata',
ClientRequestId='XYZ3',
ActivityId='XYZ4,
Timestamp='2023-03-21T20:19:21.8810286Z'.)

In this exception text, there is some reference shown to the URI.

Overall: When implementing ingest code, please put some effort into handling other exceptions too. Also, take throttling (KustoRequestThrottledException) into account with a correct retry mechanism (wait for four seconds).

Note: this is tested with direct mode.

Querying data by code

Now we know how to add data to the database, let’s concentrate on querying the same data.

First, we count the number of rows in the table.

We expect a number as a result of the call:

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kcsb))
{
    var query = $"MyLogs | count";
    var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };
    var results = cslQueryProvider.ExecuteQuery<long>("sdktestdata", query, clientRequestProperties);        
    Console.WriteLine($"Count: {results.Single()}");
}

This returns the right amount of rows:

Note: this functionality was tested when only two rows were in the table.

But can we query for the actual rows too?

ExecuteQuery

The short answer is that we are able to query table rows too.

Take a look at this query:

using (var queryProvider = KustoClientFactory.CreateCslQueryProvider(kcsb))
{
    var query = "MyLogs | take 2 | project Level, Timestamp, UserId, TraceId, Message, ProcessId";

    var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };

    using (var reader = queryProvider.ExecuteQuery("sdktestdata", query, clientRequestProperties))
    {
        var myLogs = DataReaderMapToList<MyLog>(reader);

        foreach (var log in myLogs)
        {
            Console.WriteLine($"Level = {log.Level}, Timestamp = {log.Timestamp}, UserId = {log.UserId}, TraceId = {log.TraceId}, Message = {log.Message}, ProcessId = {log.ProcessId}");
        }
    }
}

This returns the two rows:

But it’s not easy to turn the response into that list of MyLog objects:

The response is that KustJsonDataStreamReader variable.

If you dig deeper, you will see it contains JSON, describing datasets…

I used this helper method (found here) to turn the reader data into objects:

public static List<T> DataReaderMapToList<T>(IDataReader dr)
{
    var list = new List<T>();

    while (dr.Read())
    {
        var obj = Activator.CreateInstance<T>();

        foreach (PropertyInfo prop in obj.GetType().GetProperties())
        {
            if (!object.Equals(dr[prop.Name], DBNull.Value))
            {
                prop.SetValue(obj, dr[prop.Name], null);
            }
        }

        list.Add(obj);
    }

    return list;
}

But there is more information available in the response; There are multiple data tables in that reader.

Here is a snippet of the returned JSON:

It seems the statistics are made available too.

ExecuteQueryV2Async

The library also offers this ‘ExecuteQueryV2Async’ method.

I found this GitHub example using this specific method together with a number of helper methods to access the response:

var query = "MyLogs | take 2 | project Level, Timestamp, UserId, TraceId, Message, ProcessId";
var clientRequestProperties = new ClientRequestProperties() { ClientRequestId = Guid.NewGuid().ToString() };
clientRequestProperties.SetOption(ClientRequestProperties.OptionResultsProgressiveEnabled, true);
var result = queryProvider.ExecuteQueryV2Async("sdktestdata", query, clientRequestProperties);
WriteResultsToConsole(result);

It seems it does not the rows of the table but the statistics behind the same query are shown:

I’m not sure what the quality of this extra method is. Hopefully, extra documentation will follow.

Bonus 1: Managed identities

In a more real-life situation, the used Identity is not the demonstrated AAD application.

Another way to connect is by using Azure-managed identities:

  • You don’t need to manage credentials. Credentials aren’t even accessible to you.
  • You can use managed identities to authenticate to any resource that supports Azure AD authentication, including your own applications.
  • Managed identities can be used at no extra cost.

For example, the code as seen above is executed in an Azure Function.

The Azure Function is exposing a managed identity:

az functionapp identity assign -g <resource group name> -n <azure function app name> --query principalId -o tsv

If you run this, you get a GUID as the response.

Our Azure Data Explorer database needs to grant this Azure Function some access.

This can be achieved using the Azure portal.

Just provide the GUID and the function is found:

The Function is granted both the Viewer and the Ingestor rights:

Now, just use another method for creating an ADX connection:

var kcsb = new KustoConnectionStringBuilder(@"https://testweuadx.westeurope.kusto.windows.net", "sdktestdata")
.WithAadSystemManagedIdentity();

As you see, no credentials are needed from the developer’s point of view. No credentials are saved.

Note: It is recommended not to use the deprecated version supporting a managed identity:

Once this code is executed, managed identity credentials are exchanged between the Azure function (the host of this code) and Azure Data Explorer, and the roles are verified.

This is a great solution for better and safer Azure identity management.

You can not use this solution while debugging your code locally.

for that, the Azure CLI helps you:

var kcsb = new KustoConnectionStringBuilder(@"https://[CLUSTER].kusto.windows.net", "[DATABASE]")
  .WithAadAzCliAuthentication(true);

Here, the code will try to use the locally installed AZ CLI to log in:

I passed ‘true’ for interactive communication.

Note: If you have not installed the Azure AZ CLI yet (you see this ‘Az-Cli is either not installed or not found’ in the exception output of the source code), install it from here. Do not forget to restart Visual Studio before the next attempt. It needs to read the updated Path environment variable so it knows where to find the AZ CLI on the file system.

Bonus 2: Tags

We can ingest rows in batches.

But can we prevent ingesting the same batch twice?

Can we check how many rows are ingested compared to the source file?

We can!

Just add some tags to the KustoIngestionProperties:

var ingestionProperties = 
    new KustoIngestionProperties 
    { 
        DatabaseName = "sdktestdata", 
        TableName = "MyLogs", 
        IngestionMapping = ingestionMapping, 
        Format = DataSourceFormat.multijson,
        AdditionalTags = new List<string> { $"additionalTags" }, // some trivial tags.
        IngestByTags = new List<string> { fileName }, // ingested records will be marked with 'ingest-by:'. Starts in seaprate extent but can/will be merged with other extents.
        IngestIfNotExists = new List<string> { fileName }, // Ingestion will be skipped when this tag exists.
        DropByTags = new List<string> { fileName },  // unique for each chard. Will never be merged. Use only together with 'extent_tags_retention' policy
    };

Here, we set three types of tags:

  1. Additional tags. Just some additional harmless tags.
  2. IngestBy tags. Once rows with this tag are ingested, no new rows with the same tag can be ingested anymore. This prevents ingestion of the same batch (the same file?) twice. Extents with different ‘ingest-by:’ tags will be merged in the end if needed. So an extent can have multiple tags beginning with ‘ingest-by:’. Counting the number of rows inside an extent to compare it with a source file is not perfect
  3. DropBy tags. Forces batches of rows to be ingested in separate chards. Great for checking the quality of ingestion but potentially harmful for database optimization in the long run

It is recommended to use the ‘drop-by’ tag only in conjunction with this policy:

.alter database sdktestdata policy extent_tags_retention ```[
	{
		"TagPrefix": "drop-by:",
		"RetentionPeriod": "08:00:00"
	},
	{
		"TagPrefix": "ingest-by:",
		"RetentionPeriod": "24:00:00"
	}
]```

Here, we remove specific tags from the database after a few minutes.

This way, we can check ingestion for a limited time while not disturbing database storage optimization too much.

Bonus 3: Altering the ingestion date

If you are ingesting data, normally the ingestion date used on the extent is related to the current date.

If you want to change that date, check out the ‘additional properties’:

AdditionalProperties = new Dictionary<string, string> { { "creationTime", DateTime.UtcNow.AddDays(-1).ToUtcString() } } // use an alternative ingestion date.

Here, I change the creation time, part of the addition properties:

The first line is ingested 17 minutes later than the second line but I changed the date to be one day earlier.

This is a nice solution to cope with the retention time of the data. The older rows (marked by the changed creation time) will be removed first.

Update July 2023: Azure Function Bindings

At this moment, Azure Function bindings for Azure Data Explorer are in public preview,

Using these bindings, let code is needed to query the database using an input binding to retrieve data. The output binding is useful for ingesting data in Azure Data Explorer coming from other sources.

If you are working with Azure Functions a lot, please check out the sample application. Feel free to submit issues.

Conclusion

Yes, we can both write data to Azure Data Explorer first and then query the Azure Data Explorer tables in the database too.

However, the functionality available in the NuGet libraries (Data library and Ingest library) is not really well documented.

Here, the use of both ‘Direct Ingestion’ and ‘Queued Ingestion’ are shown. Microsoft recommends queued ingestion and that is okay. You then only see the data is queued inside Azure Data Explorer, though.

Direct ingestion is not recommended for production but it is a working solution and gives you a direct response.

It also took some effort to test out different ways to authenticate the access. In this example, I demonstrated, AAD Application, AZ CLI, and Managed identities.

Please read the documentation if you want to go a step further.