Elastic.SemanticKernel.Connectors.Elasticsearch/MockableElasticsearchClient.cs (202 lines of code) (raw):
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Clients.Elasticsearch.Mapping;
using Elastic.Clients.Elasticsearch.QueryDsl;
using Elastic.Transport;
using Microsoft.SemanticKernel;
using ExistsRequest = Elastic.Clients.Elasticsearch.IndexManagement.ExistsRequest;
namespace Elastic.SemanticKernel.Connectors.Elasticsearch;
#pragma warning disable CA1852 // TODO: Remove after using MockableElasticsearchClient in unit tests
/// <summary>
/// Decorator class for <see cref="ElasticsearchClient" /> that exposes the required methods as virtual allowing
/// for mocking in unit tests.
/// </summary>
internal class MockableElasticsearchClient
{
private static readonly RequestConfiguration CustomUserAgentRequestConfiguration = new()
{
UserAgent = UserAgent.Create("elasticsearch-net", typeof(IElasticsearchClientSettings), ["integration=MSSK"])
};
/// <summary>
/// Initializes a new instance of the <see cref="MockableElasticsearchClient" /> class.
/// </summary>
/// <param name="elasticsearchClient">
/// Elasticsearch client that can be used to manage the collections and points in an
/// Elasticsearch store.
/// </param>
public MockableElasticsearchClient(ElasticsearchClient elasticsearchClient)
{
Verify.NotNull(elasticsearchClient);
ElasticsearchClient = elasticsearchClient;
}
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
/// <summary>
/// Constructor for mocking purposes only.
/// </summary>
internal MockableElasticsearchClient()
{
}
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
/// <summary>
/// Gets the internal <see cref="ElasticsearchClient" /> that this mockable instance wraps.
/// </summary>
public ElasticsearchClient ElasticsearchClient { get; }
/// <summary>
/// Gets the names of all existing indices.
/// </summary>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
public virtual async Task<IReadOnlyList<string>> ListIndicesAsync(CancellationToken cancellationToken = default)
{
var response = await ElasticsearchClient.Indices
.StatsAsync(
new IndicesStatsRequest
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
return response.Indices?.Keys.ToArray() ?? [];
}
/// <summary>
/// Check if an index exists.
/// </summary>
/// <param name="indexName">The name of the index.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
public virtual async Task<bool> IndexExistsAsync(
IndexName indexName,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
var response = await ElasticsearchClient.Indices
.ExistsAsync(
new ExistsRequest(indexName)
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
return response.Exists;
}
/// <summary>
/// Creates an index and configures the required mappings.
/// </summary>
/// <param name="indexName">The name of the index.</param>
/// <param name="properties">The property mappings.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
public virtual async Task CreateIndexAsync(
IndexName indexName,
Properties properties,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
var response = await ElasticsearchClient.Indices
.CreateAsync(
new CreateIndexRequest(indexName)
{
Mappings = new TypeMapping
{
Properties = properties
},
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
}
/// <summary>
/// Drop an index and all its associated data.
/// </summary>
/// <param name="indexName">The name of the index.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
public virtual async Task DeleteIndexAsync(
IndexName indexName,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
var response = await ElasticsearchClient.Indices.DeleteAsync(
new DeleteIndexRequest(indexName)
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
}
public virtual async Task<(string id, JsonObject document)?> GetDocumentAsync(
IndexName indexName,
Id id,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
Verify.NotNull(id);
var response = await ElasticsearchClient
.GetAsync<JsonObject>(
new GetRequest(indexName, id)
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
if (!response.Found)
{
return null;
}
return (response.Id, response.Source!);
}
/// <summary>
/// TODO: TBC
/// </summary>
/// <typeparam name="TDocument"></typeparam>
/// <param name="indexName"></param>
/// <param name="id"></param>
/// <param name="document"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="TransportException"></exception>
public virtual async Task<string> IndexDocumentAsync<TDocument>(
IndexName indexName,
Id id,
TDocument document,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
Verify.NotNull(document);
var response = await ElasticsearchClient
.IndexAsync(
new IndexRequest<TDocument>(document, indexName, id)
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
return response.Id;
}
/// <summary>
/// TODO: TBC
/// </summary>
/// <param name="indexName"></param>
/// <param name="id"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="TransportException"></exception>
public virtual async Task DeleteDocumentAsync(
IndexName indexName,
Id id,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
Verify.NotNull(id);
var response = await ElasticsearchClient
.DeleteAsync(
new DeleteRequest(indexName, id)
{
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
}
/// <summary>
/// TODO: TBC
/// </summary>
/// <param name="indexName"></param>
/// <param name="query"></param>
/// <param name="from"></param>
/// <param name="size"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="TransportException"></exception>
public virtual async Task<(long total, (string id, JsonObject document, double? score)[] hits)> SearchAsync(
IndexName indexName,
Query query,
int? from,
int? size,
CancellationToken cancellationToken = default)
{
Verify.NotNull(indexName);
Verify.NotNull(query);
var response = await ElasticsearchClient
.SearchAsync<JsonObject>(
new SearchRequest(indexName)
{
Query = query,
From = from,
Size = size,
RequestConfiguration = CustomUserAgentRequestConfiguration
},
cancellationToken
)
.ConfigureAwait(false);
if (!response.IsSuccess())
{
throw new TransportException(PipelineFailure.Unexpected, "Failed to execute request.", response);
}
return (response.Total, [.. response.Hits.Select(hit => (hit.Id!, hit.Source!, hit.Score))]);
}
}
#pragma warning restore CA1852 // TODO: Remove after using MockableElasticsearchClient in unit tests