tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs (84 lines of code) (raw):
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Text.Json;
using Elastic.Channels;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Transport;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using HttpMethod = Elastic.Transport.HttpMethod;
namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests;
public class DataStreamIngestionTests : IntegrationTestBase
{
public DataStreamIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output)
{
}
[Fact]
public async Task EnsureDocumentsEndUpInDataStream()
{
var targetDataStream = new DataStreamName("timeseriesdocs", "dotnet");
var slim = new CountdownEvent(1);
var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
{
DataStream = targetDataStream,
BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 },
};
var channel = new EcsDataStreamChannel<TimeSeriesDocument>(options);
var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default");
bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel");
var dataStream =
await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
dataStream.DataStreams.Should().BeNullOrEmpty();
channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" });
if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException);
var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString());
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<TimeSeriesDocument>(s => s.Indices(targetDataStream.ToString()));
searchResult.Total.Should().Be(1);
var storedDocument = searchResult.Documents.First();
storedDocument.Message.Should().Be("hello-world");
var hit = searchResult.Hits.First();
hit.Index.Should().StartWith($".ds-{targetDataStream}-");
var getDataStream = await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
getDataStream.ApiCallDetails.HttpStatusCode.Should()
.Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation);
//this ensures the data stream was setup using the expected bootstrapped template
getDataStream.ApiCallDetails.DebugInformation.Should()
.Contain(@$"""template"" : ""{targetDataStream.GetTemplateName()}""");
//this ensures the data stream is managed by the expected ilm_policy
getDataStream.ApiCallDetails.DebugInformation.Should()
.Contain(@"""ilm_policy"" : ""7-days-default""");
}
[Fact]
public async Task UseCustomEventWriter()
{
var targetDataStream = new DataStreamName("customtimeseriesdocs", "dotnet");
var slim = new CountdownEvent(1);
var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
{
DataStream = targetDataStream,
BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 },
EventWriter = new CustomEventWriter<TimeSeriesDocument>()
};
var channel = new EcsDataStreamChannel<TimeSeriesDocument>(options);
var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default");
bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel");
var dataStream =
await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
dataStream.DataStreams.Should().BeNullOrEmpty();
channel.TryWrite(new TimeSeriesDocument
{
Timestamp = DateTimeOffset.Parse("2024-05-27T23:56:15.785Z"),
Message = "Hello World!",
Metadata = new MetadataDictionary { { "MyEnum", MyEnum.Two } }
});
if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException);
var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString());
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<JsonDocument>(s => s.Indices(targetDataStream.ToString()));
searchResult.Total.Should().Be(1);
var root = searchResult.Documents.First().RootElement;
root.GetProperty("@timestamp").GetString().Should().Be("2024-05-27T23:56:15.785+00:00");
root.GetProperty("message").GetString().Should().Be("Hello World!");
root.GetProperty("metadata").GetProperty("MyEnum").GetString().Should().Be("Two");
}
}