sdk/data/azcosmos/cosmos_container.go (533 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. package azcosmos import ( "context" "errors" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" ) // ContainerClient lets you perform read, update, change throughput, and delete container operations. // It also lets you perform read, update, change throughput, and delete item operations. type ContainerClient struct { // The Id of the Cosmos container id string // The database that contains the container database *DatabaseClient // The resource link link string } func newContainer(id string, database *DatabaseClient) (*ContainerClient, error) { return &ContainerClient{ id: id, database: database, link: createLink(database.link, pathSegmentCollection, id)}, nil } // ID returns the identifier of the Cosmos container. func (c *ContainerClient) ID() string { return c.id } // Read obtains the information for a Cosmos container. // ctx - The context for the request. // o - Options for the operation. func (c *ContainerClient) Read( ctx context.Context, o *ReadContainerOptions) (ContainerResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeRead, resourceTypeCollection, c.id) if err != nil { return ContainerResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if o == nil { o = &ReadContainerOptions{} } operationContext := pipelineRequestOptions{ resourceType: resourceTypeCollection, resourceAddress: c.link, } path, err := generatePathForNameBased(resourceTypeCollection, c.link, false) if err != nil { return ContainerResponse{}, err } azResponse, err := c.database.client.sendGetRequest( path, ctx, operationContext, o, nil) if err != nil { return ContainerResponse{}, err } response, err := newContainerResponse(azResponse) return response, err } // Replace a Cosmos container. // ctx - The context for the request. // o - Options for the operation. func (c *ContainerClient) Replace( ctx context.Context, containerProperties ContainerProperties, o *ReplaceContainerOptions) (ContainerResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeReplace, resourceTypeCollection, c.id) if err != nil { return ContainerResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if o == nil { o = &ReplaceContainerOptions{} } operationContext := pipelineRequestOptions{ resourceType: resourceTypeCollection, resourceAddress: c.link, isWriteOperation: true, } path, err := generatePathForNameBased(resourceTypeCollection, c.link, false) if err != nil { return ContainerResponse{}, err } azResponse, err := c.database.client.sendPutRequest( path, ctx, containerProperties, operationContext, o, nil) if err != nil { return ContainerResponse{}, err } response, err := newContainerResponse(azResponse) return response, err } // Delete a Cosmos container. // ctx - The context for the request. // o - Options for the operation. func (c *ContainerClient) Delete( ctx context.Context, o *DeleteContainerOptions) (ContainerResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeDelete, resourceTypeCollection, c.id) if err != nil { return ContainerResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if o == nil { o = &DeleteContainerOptions{} } operationContext := pipelineRequestOptions{ resourceType: resourceTypeCollection, resourceAddress: c.link, isWriteOperation: true, } path, err := generatePathForNameBased(resourceTypeCollection, c.link, false) if err != nil { return ContainerResponse{}, err } azResponse, err := c.database.client.sendDeleteRequest( path, ctx, operationContext, o, nil) if err != nil { return ContainerResponse{}, err } response, err := newContainerResponse(azResponse) return response, err } // ReadThroughput obtains the provisioned throughput information for the container. // ctx - The context for the request. // o - Options for the operation. func (c *ContainerClient) ReadThroughput( ctx context.Context, o *ThroughputOptions) (ThroughputResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeRead, resourceTypeOffer, c.id) if err != nil { return ThroughputResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if o == nil { o = &ThroughputOptions{} } rid, err := c.getRID(ctx) if err != nil { return ThroughputResponse{}, err } offers := &cosmosOffers{client: c.database.client} response, err := offers.ReadThroughputIfExists(ctx, rid, o) return response, err } // ReplaceThroughput updates the provisioned throughput for the container. // ctx - The context for the request. // throughputProperties - The throughput configuration of the container. // o - Options for the operation. func (c *ContainerClient) ReplaceThroughput( ctx context.Context, throughputProperties ThroughputProperties, o *ThroughputOptions) (ThroughputResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeReplace, resourceTypeOffer, c.id) if err != nil { return ThroughputResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if o == nil { o = &ThroughputOptions{} } rid, err := c.getRID(ctx) if err != nil { return ThroughputResponse{}, err } offers := &cosmosOffers{client: c.database.client} response, err := offers.ReplaceThroughputIfExists(ctx, throughputProperties, rid, o) return response, err } // CreateItem creates an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key for the item. // item - The item to create. // o - Options for the operation. func (c *ContainerClient) CreateItem( ctx context.Context, partitionKey PartitionKey, item []byte, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeCreate) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } if o == nil { o = &ItemOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: c.link, isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendPostRequest( path, ctx, item, operationContext, o, nil) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // UpsertItem creates or replaces an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key for the item. // item - The item to upsert. // o - Options for the operation. func (c *ContainerClient) UpsertItem( ctx context.Context, partitionKey PartitionKey, item []byte, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeUpsert) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } addHeader := func(r *policy.Request) { r.Raw().Header.Add(cosmosHeaderIsUpsert, "true") } if o == nil { o = &ItemOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: c.link, isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendPostRequest( path, ctx, item, operationContext, o, addHeader) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // ReplaceItem replaces an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key of the item to replace. // itemId - The id of the item to replace. // item - The content to be used to replace. // o - Options for the operation. func (c *ContainerClient) ReplaceItem( ctx context.Context, partitionKey PartitionKey, itemId string, item []byte, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeReplace) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } if o == nil { o = &ItemOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: createLink(c.link, pathSegmentDocument, itemId), isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, false) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendPutRequest( path, ctx, item, operationContext, o, nil) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // ReadItem reads an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key for the item. // itemId - The id of the item to read. // o - Options for the operation. func (c *ContainerClient) ReadItem( ctx context.Context, partitionKey PartitionKey, itemId string, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeRead) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } if o == nil { o = &ItemOptions{} } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: createLink(c.link, pathSegmentDocument, itemId), headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, false) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendGetRequest( path, ctx, operationContext, o, nil) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // DeleteItem deletes an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key for the item. // itemId - The id of the item to delete. // o - Options for the operation. func (c *ContainerClient) DeleteItem( ctx context.Context, partitionKey PartitionKey, itemId string, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeDelete) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } if o == nil { o = &ItemOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: createLink(c.link, pathSegmentDocument, itemId), isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, false) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendDeleteRequest( path, ctx, operationContext, o, nil) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // NewQueryItemsPager executes a single partition query in a Cosmos container. // query - The SQL query to execute. // partitionKey - The partition key to scope the query on. See below for more information on cross partition queries. // o - Options for the operation. // // You can specify an empty list of partition keys by passing `NewPartitionKey()` to the `partitionKey` parameter, to indicate that the query WHERE clauses will specify which partitions to query. // // Limited cross partition queries ARE possible with the Go SDK. // If you specify partition keys in the `partitionKey` parameter, you must specify ALL partition keys that the container has (in the case of hierarchical partitioning). // // If the query itself contains WHERE clauses that filter down to a single partition, the query will be executed on that partition. // If the query does not filter down to a single partition (i.e. it does not filter on partition key at all, or filters on only some of the partition keys a container defines), the query will be executed as a cross partition query. // The Azure Cosmos DB Gateway API, used by the Go SDK, can only perform a LIMITED set of cross-partition queries. // Specifically, the gateway can only perform simple projections and filtering on cross partition queries. // See https://learn.microsoft.com/rest/api/cosmos-db/querying-cosmosdb-resources-using-the-rest-api#queries-that-cannot-be-served-by-gateway for more details. // // When performing a cross-partition query, the Gateway may return pages of inconsistent size, or even empty pages (while still having a non-nil continuation token). // Ensure you fully iterate the pager, even if you receive empty pages, to ensure you get all results. // // If you provide a query that the gateway cannot execute, it will return a BadRequest error. func (c *ContainerClient) NewQueryItemsPager(query string, partitionKey PartitionKey, o *QueryOptions) *runtime.Pager[QueryItemsResponse] { correlatedActivityId, _ := uuid.New() h := headerOptionsOverride{ partitionKey: &partitionKey, correlatedActivityId: &correlatedActivityId, } queryOptions := &QueryOptions{} if o != nil { originalOptions := *o queryOptions = &originalOptions } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: c.link, headerOptionsOverride: &h, } path, _ := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) return runtime.NewPager(runtime.PagingHandler[QueryItemsResponse]{ More: func(page QueryItemsResponse) bool { return page.ContinuationToken != nil }, Fetcher: func(ctx context.Context, page *QueryItemsResponse) (QueryItemsResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeQuery) if err != nil { return QueryItemsResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if page != nil { if page.ContinuationToken != nil { // Use the previous page continuation if available queryOptions.ContinuationToken = page.ContinuationToken } } azResponse, err := c.database.client.sendQueryRequest( path, ctx, query, queryOptions.QueryParameters, operationContext, queryOptions, nil) if err != nil { return QueryItemsResponse{}, err } return newQueryResponse(azResponse) }, }) } // PatchItem patches an item in a Cosmos container. // ctx - The context for the request. // partitionKey - The partition key for the item. // itemId - The id of the item to patch. // ops - Operations to perform on the patch // o - Options for the operation. func (c *ContainerClient) PatchItem( ctx context.Context, partitionKey PartitionKey, itemId string, ops PatchOperations, o *ItemOptions) (ItemResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypePatch) if err != nil { return ItemResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() h := headerOptionsOverride{ partitionKey: &partitionKey, } if o == nil { o = &ItemOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: createLink(c.link, pathSegmentDocument, itemId), isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, false) if err != nil { return ItemResponse{}, err } azResponse, err := c.database.client.sendPatchRequest( path, ctx, ops, operationContext, o, nil) if err != nil { return ItemResponse{}, err } response, err := newItemResponse(azResponse) return response, err } // NewTransactionalBatch creates a batch of operations to be committed as a single unit. // See https://docs.microsoft.com/azure/cosmos-db/sql/transactional-batch func (c *ContainerClient) NewTransactionalBatch(partitionKey PartitionKey) TransactionalBatch { return TransactionalBatch{partitionKey: partitionKey} } // ExecuteTransactionalBatch executes a transactional batch. // Once executed, verify the Success property of the response to determine if the batch was committed func (c *ContainerClient) ExecuteTransactionalBatch(ctx context.Context, b TransactionalBatch, o *TransactionalBatchOptions) (TransactionalBatchResponse, error) { var err error spanName, err := c.getSpanForContainer(operationTypeBatch, resourceTypeCollection, c.id) if err != nil { return TransactionalBatchResponse{}, err } ctx, endSpan := runtime.StartSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() if len(b.operations) == 0 { return TransactionalBatchResponse{}, errors.New("no operations in batch") } h := headerOptionsOverride{ partitionKey: &b.partitionKey, } if o == nil { o = &TransactionalBatchOptions{} } else { h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite } // If contentResponseOnWrite is not enabled at the client level the // service will not even send a batch response payload // Instead we should automatically enforce contentResponseOnWrite for all // batch requests whenever at least one of the item operations requires a content response (read operation) enableContentResponseOnWriteForReadOperations := true for _, op := range b.operations { if op.getOperationType() == operationTypeRead { h.enableContentResponseOnWrite = &enableContentResponseOnWriteForReadOperations break } } operationContext := pipelineRequestOptions{ resourceType: resourceTypeDocument, resourceAddress: c.link, isWriteOperation: true, headerOptionsOverride: &h} path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) if err != nil { return TransactionalBatchResponse{}, err } azResponse, err := c.database.client.sendBatchRequest( ctx, path, b.operations, operationContext, o, nil) if err != nil { return TransactionalBatchResponse{}, err } response, err := newTransactionalBatchResponse(azResponse) return response, err } func (c *ContainerClient) getRID(ctx context.Context) (string, error) { containerResponse, err := c.Read(ctx, nil) if err != nil { return "", err } return containerResponse.ContainerProperties.ResourceID, nil } func (c *ContainerClient) getSpanForContainer(operationType operationType, resourceType resourceType, id string) (span, error) { return getSpanNameForContainers(c.database.client.accountEndpointUrl(), operationType, resourceType, c.database.id, id) } func (c *ContainerClient) getSpanForItems(operationType operationType) (span, error) { return getSpanNameForItems(c.database.client.accountEndpointUrl(), operationType, c.database.id, c.id) }