benchmarks/Elastic.Ingest.Elasticsearch.Profiling/Program.cs (59 lines of code) (raw):
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Channels;
using Elastic.Ingest.Elasticsearch.Indices;
using Performance.Common;
using Elastic.Transport;
using JetBrains.Profiler.Api;
#if DEBUG
long responses = 0;
#endif
var disableDiagnostics = true;
var waitHandle = new ManualResetEvent(false);
var stockData = StockData.CreateSampleData(100_000);
var cloudId = Environment.GetEnvironmentVariable("APM_CLOUD_ID");
var elasticPassword = Environment.GetEnvironmentVariable("APM_CLOUD_PASSWORD");
if (string.IsNullOrEmpty(cloudId) || string.IsNullOrEmpty(elasticPassword))
throw new Exception("Missing environment variables for cloud connection");
MemoryProfiler.ForceGc();
MemoryProfiler.CollectAllocations(true);
var configuration = new TransportConfigurationDescriptor(cloudId, new BasicAuthentication("elastic", elasticPassword))
.ServerCertificateValidationCallback((a,b,c,d) => true); // Trust the local certificate if we're passing through Fiddler with SSL decryption
var transport = new DistributedTransport(configuration);
MemoryProfiler.GetSnapshot("Before");
var channelOptions = new IndexChannelOptions<StockData>(transport)
{
BufferOptions = new BufferOptions { OutboundBufferMaxSize = 5_000 },
DisableDiagnostics = disableDiagnostics,
IndexFormat = "stock-data-v8",
OutboundChannelExitedCallback = () =>
{
waitHandle.Set();
},
#if DEBUG
ExportResponseCallback = (response, a) =>
{
Interlocked.Add(ref responses, a.Count);
Console.WriteLine(responses);
},
PublishToOutboundChannelCallback = () => Console.WriteLine("PUBLISHED")
#endif
};
var indexChannel = new IndexChannel<StockData>(channelOptions);
MemoryProfiler.GetSnapshot("Before write");
#if DEBUG
Console.WriteLine("Write data.");
#endif
indexChannel.TryWriteMany(stockData);
indexChannel.TryComplete();
#if DEBUG
Console.WriteLine("Awaiting completion.");
#endif
waitHandle.WaitOne();
#if DEBUG
Console.WriteLine("Completed.");
#endif
MemoryProfiler.GetSnapshot("After write and flush");
if (!disableDiagnostics)
{
var diagnostics = indexChannel.ToString();
MemoryProfiler.GetSnapshot("After diagnostics");
}