Microsoft.Azure.Cosmos.Samples/Usage/ReEncryption/ReEncryptionSupport/ReEncryptionBulkOperationBuilder.cs (159 lines of code) (raw):
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Cosmos.Samples.ReEncryption
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
internal class ReEncryptionBulkOperationBuilder
{
private readonly Container container;
private readonly string partitionKey;
private readonly ReEncryptionJsonSerializer reEncryptionJsonSerializer;
public ReEncryptionBulkOperationBuilder(
Container container,
string partitionKeyPath)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.partitionKey = string.IsNullOrEmpty(partitionKeyPath) ? throw new ArgumentNullException(nameof(partitionKeyPath)) : partitionKeyPath[1..];
this.reEncryptionJsonSerializer = new ReEncryptionJsonSerializer(
new JsonSerializerSettings()
{
DateParseHandling = DateParseHandling.None,
});
}
/// <summary>
/// Builds a bulk operation request and excutes the operation.
/// </summary>
/// <param name="response"> Response containing documents read from changefeed. </param>
/// <param name="cancellationToken"> cancellation token. </param>
/// <returns> ReEncryptionBulkOperationResponse. </returns>
public async Task<ReEncryptionBulkOperationResponse<JObject>> ExecuteAsync(ResponseMessage response, CancellationToken cancellationToken)
{
Dictionary<string, List<JObject>> changeFeedChangesBatcher = this.PopulateChangeFeedChanges(response);
if (!changeFeedChangesBatcher.Any())
{
throw new InvalidOperationException("PopulateChangeFeedChanges returned empty list of changes. ");
}
ReEncryptionBulkOperationResponse<JObject> bulkOperationResponse = null;
List<JObject> bulkOperationList = this.GetChangesForBulkOperations(changeFeedChangesBatcher);
if (bulkOperationList.Count > 0)
{
ReEncryptionBulkOperations<JObject> bulkOperations = new ReEncryptionBulkOperations<JObject>(bulkOperationList.Count);
foreach (JObject document in bulkOperationList)
{
JObject metadata = document.GetValue(Constants.MetadataPropertyName).ToObject<JObject>();
string operationType = metadata.GetValue("operationType").ToString();
if (operationType.Equals("delete"))
{
JObject previousImage = metadata.GetValue(Constants.PreviousImagePropertyName).ToObject<JObject>();
if (previousImage == null)
{
throw new InvalidOperationException("Missing previous image for document with delete operation type. ");
}
string id = previousImage.GetValue("id").ToString();
string pkvalue = previousImage.GetValue(this.partitionKey).ToString();
bulkOperations.Tasks.Add(this.container.DeleteItemAsync<JObject>(
id,
new PartitionKey(pkvalue),
cancellationToken: cancellationToken).CaptureReEncryptionOperationResponseAsync(document));
}
else
{
document.Remove(Constants.MetadataPropertyName);
document.Remove(Constants.LsnPropertyName);
bulkOperations.Tasks.Add(this.container.UpsertItemAsync(
item: document,
new PartitionKey(document.GetValue(this.partitionKey).ToString()),
cancellationToken: cancellationToken).CaptureReEncryptionOperationResponseAsync(document));
}
}
bulkOperationResponse = await bulkOperations.ExecuteAsync();
}
return bulkOperationResponse;
}
/// <summary>
/// Iterates over all the changes for each document. If there are multiple changes for the same document,
/// only the last change is picked up.
/// <param name="changeFeedChangesBatcher"> List containing the changes. </param>
/// </summary>
/// <returns> List of documents. </returns>
private List<JObject> GetChangesForBulkOperations(Dictionary<string, List<JObject>> changeFeedChangesBatcher)
{
List<JObject> bulkOperationList = new List<JObject>();
foreach (KeyValuePair<string, List<JObject>> keyValuePairOps in changeFeedChangesBatcher)
{
// get the last operation if there are multiple changes corresponding to same doc id.
if (keyValuePairOps.Value.Count > 1)
{
JObject lastDocument = keyValuePairOps.Value.ElementAt(keyValuePairOps.Value.Count - 1);
bulkOperationList.Add(lastDocument);
}
else if (keyValuePairOps.Value.Count == 1)
{
bulkOperationList.Add(keyValuePairOps.Value.FirstOrDefault());
}
}
return bulkOperationList;
}
/// <summary>
/// Builds a dictionary of list of documents and hashes them by the document id.
/// If a document has multiple changes then a list of changes(chain) is made corresponding to that id
/// which serves as a key.
/// </summary>
/// <param name="response">Response containing documents read from changefeed .</param>
/// <returns> HashTable/Array of list hashed/key by document Id. </returns>
private Dictionary<string, List<JObject>> PopulateChangeFeedChanges(ResponseMessage response)
{
JObject contentJObj = this.reEncryptionJsonSerializer.FromStream<JObject>(response.Content);
if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}
Dictionary<string, List<JObject>> changeFeedChangesBatcher = new Dictionary<string, List<JObject>>();
if (documents.Count == 0)
{
return changeFeedChangesBatcher;
}
foreach (JToken value in documents)
{
if (value is not JObject document)
{
continue;
}
JObject metadata = document.GetValue(Constants.MetadataPropertyName).ToObject<JObject>();
if (metadata == null)
{
throw new InvalidOperationException("Metadata property missing in the document. ");
}
string operationType = metadata.GetValue(Constants.OperationTypePropertyName).ToString();
if (operationType.Equals("delete"))
{
JObject previousImage = metadata.GetValue(Constants.PreviousImagePropertyName).ToObject<JObject>();
if (previousImage == null)
{
throw new InvalidOperationException();
}
string rid = previousImage.GetValue(Constants.DocumentRidPropertyName).ToString();
if (changeFeedChangesBatcher.ContainsKey(rid))
{
List<JObject> operationToAdd = changeFeedChangesBatcher[rid];
operationToAdd.Add(document);
changeFeedChangesBatcher[rid] = operationToAdd;
}
else
{
List<JObject> operationToAdd = new List<JObject>
{
document,
};
changeFeedChangesBatcher.Add(rid, operationToAdd);
}
}
else
{
string rid = document.GetValue(Constants.DocumentRidPropertyName).ToString();
if (changeFeedChangesBatcher.ContainsKey(rid))
{
List<JObject> operationToAdd = changeFeedChangesBatcher[rid];
operationToAdd.Add(document);
changeFeedChangesBatcher[rid] = operationToAdd;
}
else
{
List<JObject> operationToAdd = new List<JObject>
{
document,
};
changeFeedChangesBatcher.Add(rid, operationToAdd);
}
}
}
return changeFeedChangesBatcher;
}
}
}