public async Task UpsertAsync()

in src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosFhirDataStore.cs [105:256]


        public async Task<UpsertOutcome> UpsertAsync(
            ResourceWrapper resource,
            WeakETag weakETag,
            bool allowCreate,
            bool keepHistory,
            CancellationToken cancellationToken)
        {
            EnsureArg.IsNotNull(resource, nameof(resource));

            var cosmosWrapper = new FhirCosmosResourceWrapper(resource);
            UpdateSortIndex(cosmosWrapper);

            var partitionKey = new PartitionKey(cosmosWrapper.PartitionKey);
            AsyncPolicy retryPolicy = _retryExceptionPolicyFactory.RetryPolicy;

            _logger.LogDebug("Upserting {resourceType}/{resourceId}, ETag: \"{tag}\", AllowCreate: {allowCreate}, KeepHistory: {keepHistory}", resource.ResourceTypeName, resource.ResourceId, weakETag?.VersionId, allowCreate, keepHistory);

            if (weakETag == null && allowCreate && !cosmosWrapper.IsDeleted)
            {
                // Optimistically try to create this as a new resource
                try
                {
                    await retryPolicy.ExecuteAsync(
                        async ct => await _containerScope.Value.CreateItemAsync(
                            cosmosWrapper,
                            partitionKey,
                            cancellationToken: ct,
                            requestOptions: new ItemRequestOptions { EnableContentResponseOnWrite = false }),
                        cancellationToken);

                    return new UpsertOutcome(cosmosWrapper, SaveOutcomeType.Created);
                }
                catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict)
                {
                    // this means there is already an existing version of this resource
                }
                catch (CosmosException e) when (e.IsServiceUnavailableDueToTimeout())
                {
                    throw new CosmosException(e.Message, HttpStatusCode.RequestTimeout, e.SubStatusCode, e.ActivityId, e.RequestCharge);
                }
            }

            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                FhirCosmosResourceWrapper existingItemResource;
                try
                {
                    ItemResponse<FhirCosmosResourceWrapper> existingItem = await retryPolicy.ExecuteAsync(
                        async ct => await _containerScope.Value.ReadItemAsync<FhirCosmosResourceWrapper>(cosmosWrapper.Id, partitionKey, cancellationToken: ct),
                        cancellationToken);
                    existingItemResource = existingItem.Resource;
                }
                catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound)
                {
                    if (cosmosWrapper.IsDeleted)
                    {
                        return null;
                    }

                    if (weakETag != null)
                    {
                        throw new ResourceNotFoundException(string.Format(Core.Resources.ResourceNotFoundByIdAndVersion, resource.ResourceTypeName, resource.ResourceId, weakETag.VersionId));
                    }

                    if (!allowCreate)
                    {
                        throw new MethodNotAllowedException(Core.Resources.ResourceCreationNotAllowed);
                    }

                    throw;
                }

                if (weakETag != null && weakETag.VersionId != existingItemResource.Version)
                {
                    throw new PreconditionFailedException(string.Format(Core.Resources.ResourceVersionConflict, weakETag.VersionId));
                }

                if (existingItemResource.IsDeleted && cosmosWrapper.IsDeleted)
                {
                    return null;
                }

                cosmosWrapper.Version = int.TryParse(existingItemResource.Version, out int existingVersion) ? (existingVersion + 1).ToString(CultureInfo.InvariantCulture) : Guid.NewGuid().ToString();

                // indicate that the version in the raw resource's meta property does not reflect the actual version.
                cosmosWrapper.RawResource.IsMetaSet = false;

                if (cosmosWrapper.RawResource.Format == FhirResourceFormat.Json)
                {
                    // Update the raw resource based on the new version.
                    // This is a lot faster than re-serializing the POCO.
                    // Unfortunately, we need to allocate a string, but at least it is reused for the HTTP response.

                    // If the format is not XML, IsMetaSet will remain false and we will update the version when the resource is read.

                    using MemoryStream memoryStream = _recyclableMemoryStreamManager.GetStream();
                    await new RawResourceElement(cosmosWrapper).SerializeToStreamAsUtf8Json(memoryStream);
                    memoryStream.Position = 0;
                    using var reader = new StreamReader(memoryStream, Encoding.UTF8);
                    cosmosWrapper.RawResource = new RawResource(reader.ReadToEnd(), FhirResourceFormat.Json, isMetaSet: true);
                }

                if (keepHistory)
                {
                    existingItemResource.IsHistory = true;
                    existingItemResource.ActivePeriodEndDateTime = cosmosWrapper.LastModified;
                    existingItemResource.SearchIndices = null;

                    TransactionalBatchResponse transactionalBatchResponse = await retryPolicy.ExecuteAsync(
                        async ct =>
                            await _containerScope.Value.CreateTransactionalBatch(partitionKey)
                                .ReplaceItem(cosmosWrapper.Id, cosmosWrapper, new TransactionalBatchItemRequestOptions { EnableContentResponseOnWrite = false, IfMatchEtag = existingItemResource.ETag })
                                .CreateItem(existingItemResource, new TransactionalBatchItemRequestOptions { EnableContentResponseOnWrite = false })
                                .ExecuteAsync(cancellationToken: ct),
                        cancellationToken);

                    if (!transactionalBatchResponse.IsSuccessStatusCode)
                    {
                        if (transactionalBatchResponse.StatusCode == HttpStatusCode.PreconditionFailed)
                        {
                            // someone else beat us to it, re-read and try again
                            continue;
                        }

                        throw new InvalidOperationException(transactionalBatchResponse.ErrorMessage);
                    }
                }
                else
                {
                    try
                    {
                        await retryPolicy.ExecuteAsync(
                            async ct => await _containerScope.Value.ReplaceItemAsync(
                                cosmosWrapper,
                                cosmosWrapper.Id,
                                partitionKey,
                                new ItemRequestOptions { EnableContentResponseOnWrite = false, IfMatchEtag = existingItemResource.ETag },
                                cancellationToken: ct),
                            cancellationToken);
                    }
                    catch (CosmosException e) when (e.StatusCode == HttpStatusCode.PreconditionFailed)
                    {
                        // someone else beat us to it, re-read and try again
                        continue;
                    }
                }

                return new UpsertOutcome(cosmosWrapper, SaveOutcomeType.Updated);
            }
        }