public async Task EnsureDocumentsEndUpInDataStream()

in tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs [25:70]


	public async Task EnsureDocumentsEndUpInDataStream()
	{
		var targetDataStream = new DataStreamName("timeseriesdocs", "dotnet");
		var slim = new CountdownEvent(1);
		var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
		{
			DataStream = targetDataStream,
			BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 },
		};
		var channel = new EcsDataStreamChannel<TimeSeriesDocument>(options);

		var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default");
		bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel");

		var dataStream =
			await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
		dataStream.DataStreams.Should().BeNullOrEmpty();

		channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" });
		if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
			throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException);

		var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString());
		refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
		var searchResult = await Client.SearchAsync<TimeSeriesDocument>(s => s.Indices(targetDataStream.ToString()));
		searchResult.Total.Should().Be(1);

		var storedDocument = searchResult.Documents.First();
		storedDocument.Message.Should().Be("hello-world");

		var hit = searchResult.Hits.First();
		hit.Index.Should().StartWith($".ds-{targetDataStream}-");

		var getDataStream = await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));

		getDataStream.ApiCallDetails.HttpStatusCode.Should()
			.Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation);

		//this ensures the data stream was setup using the expected bootstrapped template
		getDataStream.ApiCallDetails.DebugInformation.Should()
			.Contain(@$"""template"" : ""{targetDataStream.GetTemplateName()}""");

		//this ensures the data stream is managed by the expected ilm_policy
		getDataStream.ApiCallDetails.DebugInformation.Should()
			.Contain(@"""ilm_policy"" : ""7-days-default""");
	}