in tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs [25:70]
public async Task EnsureDocumentsEndUpInDataStream()
{
var targetDataStream = new DataStreamName("timeseriesdocs", "dotnet");
var slim = new CountdownEvent(1);
var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
{
DataStream = targetDataStream,
BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 },
};
var channel = new EcsDataStreamChannel<TimeSeriesDocument>(options);
var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default");
bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel");
var dataStream =
await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
dataStream.DataStreams.Should().BeNullOrEmpty();
channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" });
if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException);
var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString());
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<TimeSeriesDocument>(s => s.Indices(targetDataStream.ToString()));
searchResult.Total.Should().Be(1);
var storedDocument = searchResult.Documents.First();
storedDocument.Message.Should().Be("hello-world");
var hit = searchResult.Hits.First();
hit.Index.Should().StartWith($".ds-{targetDataStream}-");
var getDataStream = await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
getDataStream.ApiCallDetails.HttpStatusCode.Should()
.Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation);
//this ensures the data stream was setup using the expected bootstrapped template
getDataStream.ApiCallDetails.DebugInformation.Should()
.Contain(@$"""template"" : ""{targetDataStream.GetTemplateName()}""");
//this ensures the data stream is managed by the expected ilm_policy
getDataStream.ApiCallDetails.DebugInformation.Should()
.Contain(@"""ilm_policy"" : ""7-days-default""");
}