in src/Serilog.Sinks.AzureDataExplorer/Sinks/AzureDataExplorerSink.cs [168:244]
public async Task EmitBatchAsync(IReadOnlyCollection<LogEvent> batch)
{
using var dataStream = CreateStreamFromLogEvents(batch);
for (int retry = 1; retry <= m_options.IngestionRetries; ++retry)
{
try
{
var sourceId = Guid.NewGuid();
if (!m_streamingIngestion)
{
await m_ingestClient.IngestFromStreamAsync(
dataStream,
new KustoQueuedIngestionProperties(m_databaseName, m_tableName)
{
DatabaseName = m_databaseName,
TableName = m_tableName,
FlushImmediately = m_flushImmediately,
Format = DataSourceFormat.multijson,
IngestionMapping = m_ingestionMapping
},
new StreamSourceOptions
{
SourceId = sourceId,
LeaveOpen = false,
CompressionType = DataSourceCompressionType.GZip
}).ConfigureAwait(false);
}
else
{
await m_ingestClient.IngestFromStreamAsync(
dataStream,
new KustoIngestionProperties()
{
DatabaseName = m_databaseName,
TableName = m_tableName,
Format = DataSourceFormat.multijson,
IngestionMapping = m_ingestionMapping
},
new StreamSourceOptions
{
SourceId = sourceId,
LeaveOpen = false,
CompressionType = DataSourceCompressionType.GZip
}).ConfigureAwait(false);
}
}
catch (KustoClientApplicationAuthenticationException ex)
{
if (m_options.FailOnError)
{
SelfLog.WriteLine("Auth failure on Kusto sink due to authentication error (EmitBatchAsync). Please check your credentials", ex);
SelfLog.WriteLine("FailOnError is set to true, the exception will be thrown to the caller to handle the failure");
throw new LoggingFailedException($"Auth failure on Kusto sink due to authentication error (EmitBatchAsync). Please check your credentials.{ex.Message}");
}
SelfLog.WriteLine("Auth failure on Kusto sink due to authentication error (EmitBatchAsync). Please check your credentials.", ex);
}
catch (IngestClientException ex)
{
if (ex.IsPermanent)
{ // <- no retry
if (m_options.FailOnError)
{
SelfLog.WriteLine("Permanent ingestion failure ingesting to Kusto.FailOnError is set to true, the exception will be thrown to the caller to handle the failure", ex);
throw new LoggingFailedException($"Permanent ingestion failure ingesting to Kusto.{ex.Message}");
}
// Since the failure is permanent, we can't retry, so we'll just log the error and continue.
SelfLog.WriteLine("Permanent ingestion failure ingesting to Kusto.Since FailOnError is not set, the batch will be dropped", ex);
break; // no more retries on unwanted exception
}
else
{
SelfLog.WriteLine($"Temporary failures writing to Kusto (Retry attempt {retry} of {m_options.IngestionRetries})", ex);
}
}
}
}