in src/Serilog.Sinks.AzureDataExplorer/Sinks/AzureDataExplorerDurableSink.cs [95:159]
public AzureDataExplorerDurableSink(AzureDataExplorerSinkOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
var databaseName = options.DatabaseName ?? throw new ArgumentNullException(nameof(options.DatabaseName));
var tableName = options.TableName ?? throw new ArgumentNullException(nameof(options.TableName));
if (options.IngestionEndpointUri == null) throw new ArgumentNullException(nameof(options.IngestionEndpointUri));
if (string.IsNullOrWhiteSpace(options.BufferBaseFileName))
throw new ArgumentException("Cannot create the durable ADX sink without a buffer base file name!");
var flushImmediately = options.FlushImmediately;
var formatProvider = options.FormatProvider;
var mappingName = options.MappingName;
var ingestionMapping = new IngestionMapping();
if (!string.IsNullOrEmpty(mappingName))
{
ingestionMapping.IngestionMappingReference = mappingName;
}
else if (options.ColumnsMapping?.Any() == true)
{
ingestionMapping.IngestionMappings = options.ColumnsMapping.Select(m => new ColumnMapping
{
ColumnName = m.ColumnName,
ColumnType = m.ColumnType,
Properties = new Dictionary<string, string>(1)
{
{
MappingConsts.Path, m.ValuePath
}
}
}).ToList();
}
else
{
ingestionMapping.IngestionMappings = SDefaultIngestionColumnMapping;
}
var kcsb = options.GetKustoConnectionStringBuilder();
m_kustoQueuedIngestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsb);
m_sink = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.File(new CompactJsonFormatter(),
options.BufferBaseFileName + FileNameSuffix,
restrictedToMinimumLevel: LevelAlias.Minimum,
fileSizeLimitBytes: options.BufferFileSizeLimitBytes,
levelSwitch: options.BufferFileLoggingLevelSwitch,
flushToDiskInterval: TimeSpan.FromSeconds(10),
rollingInterval: options.BufferFileRollingInterval,
rollOnFileSizeLimit: true,
retainedFileCountLimit: options.BufferFileCountLimit,
encoding: Encoding.UTF8,
shared: true
).CreateLogger();
var payloadReader = new AzureDataExplorerPayloadReader(
rollingInterval: options.BufferFileRollingInterval);
m_shipper = new LogShipper<List<LogEvent>>(
options: options,
period: options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5),
payloadReader: payloadReader,
ingestClient: m_kustoQueuedIngestClient,
ingestionMapping: ingestionMapping
);
}