in src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosFhirDataStore.cs [105:256]
public async Task<UpsertOutcome> UpsertAsync(
ResourceWrapper resource,
WeakETag weakETag,
bool allowCreate,
bool keepHistory,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(resource, nameof(resource));
var cosmosWrapper = new FhirCosmosResourceWrapper(resource);
UpdateSortIndex(cosmosWrapper);
var partitionKey = new PartitionKey(cosmosWrapper.PartitionKey);
AsyncPolicy retryPolicy = _retryExceptionPolicyFactory.RetryPolicy;
_logger.LogDebug("Upserting {resourceType}/{resourceId}, ETag: \"{tag}\", AllowCreate: {allowCreate}, KeepHistory: {keepHistory}", resource.ResourceTypeName, resource.ResourceId, weakETag?.VersionId, allowCreate, keepHistory);
if (weakETag == null && allowCreate && !cosmosWrapper.IsDeleted)
{
// Optimistically try to create this as a new resource
try
{
await retryPolicy.ExecuteAsync(
async ct => await _containerScope.Value.CreateItemAsync(
cosmosWrapper,
partitionKey,
cancellationToken: ct,
requestOptions: new ItemRequestOptions { EnableContentResponseOnWrite = false }),
cancellationToken);
return new UpsertOutcome(cosmosWrapper, SaveOutcomeType.Created);
}
catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict)
{
// this means there is already an existing version of this resource
}
catch (CosmosException e) when (e.IsServiceUnavailableDueToTimeout())
{
throw new CosmosException(e.Message, HttpStatusCode.RequestTimeout, e.SubStatusCode, e.ActivityId, e.RequestCharge);
}
}
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
FhirCosmosResourceWrapper existingItemResource;
try
{
ItemResponse<FhirCosmosResourceWrapper> existingItem = await retryPolicy.ExecuteAsync(
async ct => await _containerScope.Value.ReadItemAsync<FhirCosmosResourceWrapper>(cosmosWrapper.Id, partitionKey, cancellationToken: ct),
cancellationToken);
existingItemResource = existingItem.Resource;
}
catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound)
{
if (cosmosWrapper.IsDeleted)
{
return null;
}
if (weakETag != null)
{
throw new ResourceNotFoundException(string.Format(Core.Resources.ResourceNotFoundByIdAndVersion, resource.ResourceTypeName, resource.ResourceId, weakETag.VersionId));
}
if (!allowCreate)
{
throw new MethodNotAllowedException(Core.Resources.ResourceCreationNotAllowed);
}
throw;
}
if (weakETag != null && weakETag.VersionId != existingItemResource.Version)
{
throw new PreconditionFailedException(string.Format(Core.Resources.ResourceVersionConflict, weakETag.VersionId));
}
if (existingItemResource.IsDeleted && cosmosWrapper.IsDeleted)
{
return null;
}
cosmosWrapper.Version = int.TryParse(existingItemResource.Version, out int existingVersion) ? (existingVersion + 1).ToString(CultureInfo.InvariantCulture) : Guid.NewGuid().ToString();
// indicate that the version in the raw resource's meta property does not reflect the actual version.
cosmosWrapper.RawResource.IsMetaSet = false;
if (cosmosWrapper.RawResource.Format == FhirResourceFormat.Json)
{
// Update the raw resource based on the new version.
// This is a lot faster than re-serializing the POCO.
// Unfortunately, we need to allocate a string, but at least it is reused for the HTTP response.
// If the format is not XML, IsMetaSet will remain false and we will update the version when the resource is read.
using MemoryStream memoryStream = _recyclableMemoryStreamManager.GetStream();
await new RawResourceElement(cosmosWrapper).SerializeToStreamAsUtf8Json(memoryStream);
memoryStream.Position = 0;
using var reader = new StreamReader(memoryStream, Encoding.UTF8);
cosmosWrapper.RawResource = new RawResource(reader.ReadToEnd(), FhirResourceFormat.Json, isMetaSet: true);
}
if (keepHistory)
{
existingItemResource.IsHistory = true;
existingItemResource.ActivePeriodEndDateTime = cosmosWrapper.LastModified;
existingItemResource.SearchIndices = null;
TransactionalBatchResponse transactionalBatchResponse = await retryPolicy.ExecuteAsync(
async ct =>
await _containerScope.Value.CreateTransactionalBatch(partitionKey)
.ReplaceItem(cosmosWrapper.Id, cosmosWrapper, new TransactionalBatchItemRequestOptions { EnableContentResponseOnWrite = false, IfMatchEtag = existingItemResource.ETag })
.CreateItem(existingItemResource, new TransactionalBatchItemRequestOptions { EnableContentResponseOnWrite = false })
.ExecuteAsync(cancellationToken: ct),
cancellationToken);
if (!transactionalBatchResponse.IsSuccessStatusCode)
{
if (transactionalBatchResponse.StatusCode == HttpStatusCode.PreconditionFailed)
{
// someone else beat us to it, re-read and try again
continue;
}
throw new InvalidOperationException(transactionalBatchResponse.ErrorMessage);
}
}
else
{
try
{
await retryPolicy.ExecuteAsync(
async ct => await _containerScope.Value.ReplaceItemAsync(
cosmosWrapper,
cosmosWrapper.Id,
partitionKey,
new ItemRequestOptions { EnableContentResponseOnWrite = false, IfMatchEtag = existingItemResource.ETag },
cancellationToken: ct),
cancellationToken);
}
catch (CosmosException e) when (e.StatusCode == HttpStatusCode.PreconditionFailed)
{
// someone else beat us to it, re-read and try again
continue;
}
}
return new UpsertOutcome(cosmosWrapper, SaveOutcomeType.Updated);
}
}