in src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs [57:101]
public override void ExportToLog(Summary summary, ILogger logger)
{
var waitHandle = new CountdownEvent(1);
var benchmarksCount = summary.Reports.Length;
Exception observedException = null;
var options = new DataStreamChannelOptions<BenchmarkDocument>(Transport)
{
DataStream = new DataStreamName("benchmarks", "dotnet", Options.DataStreamNamespace),
BufferOptions = new BufferOptions
{
WaitHandle = waitHandle,
OutboundBufferMaxSize = benchmarksCount,
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(5)
},
ExportExceptionCallback = e => observedException ??= e,
ExportResponseCallback = (response, _) =>
{
var errorItems = response.Items.Where(i => i.Status >= 300).ToList();
if (response.TryGetElasticsearchServerError(out var error))
logger.WriteError(error.ToString());
else if (errorItems.Count == 0)
logger.WriteLine("Successfully indexed benchmark results");
foreach (var errorItem in errorItems)
logger.WriteError($"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}");
}
};
Options.ChannelOptionsCallback?.Invoke(options);
var channel = new EcsDataStreamChannel<BenchmarkDocument>(options);
if (channel.DiagnosticsListener != null)
Options.ChannelDiagnosticsCallback?.Invoke(channel.DiagnosticsListener);
if (!channel.BootstrapElasticsearch(Options.BootstrapMethod)) return;
var benchmarks = CreateBenchmarkDocuments(summary);
var writeResult = benchmarks.Select(b => channel.TryWrite(b)).All(b => b);
var completedOnTime = waitHandle.Wait(TimeSpan.FromSeconds(20));
if (!completedOnTime)
{
logger.WriteError($"No flush in 20 seconds, published: {writeResult}, possible error: {observedException?.Message}");
if (observedException != null)
logger.WriteError(observedException.ToString());
}
}