sdk/data/aztables/client.go (429 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package aztables
import (
"context"
"encoding/json"
"errors"
"net/url"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
generated "github.com/Azure/azure-sdk-for-go/sdk/data/aztables/internal"
)
// Client represents a client to the tables service affinitized to a specific table.
type Client struct {
client *generated.TableClient
service *ServiceClient
cred *SharedKeyCredential
name string
}
// ClientOptions contains the optional parameters for client constructors.
type ClientOptions struct {
azcore.ClientOptions
}
// NewClient creates a Client struct in the context of the table specified in the serviceURL, authorizing requests with an Azure AD access token.
// The serviceURL param is expected to have the name of the table in a format similar to: "https://myAccountName.table.core.windows.net/<myTableName>".
// Pass in nil for options to construct the client with the default ClientOptions.
func NewClient(serviceURL string, cred azcore.TokenCredential, options *ClientOptions) (*Client, error) {
if options == nil {
options = &ClientOptions{}
}
rawServiceURL, tableName, err := parseURL(serviceURL)
if err != nil {
return nil, err
}
s, err := NewServiceClient(rawServiceURL, cred, options)
if err != nil {
return nil, err
}
return s.NewClient(tableName), nil
}
// NewClientWithNoCredential creates a Client struct in the context of the table specified in the serviceURL.
// The serviceURL param is expected to have the name of the table in a format similar to: "https://myAccountName.table.core.windows.net/<myTableName>?<SAS token>".
// Pass in nil for options to construct the client with the default ClientOptions.
func NewClientWithNoCredential(serviceURL string, options *ClientOptions) (*Client, error) {
if options == nil {
options = &ClientOptions{}
}
rawServiceURL, tableName, err := parseURL(serviceURL)
if err != nil {
return nil, err
}
s, err := NewServiceClientWithNoCredential(rawServiceURL, options)
if err != nil {
return nil, err
}
return s.NewClient(tableName), nil
}
// NewClientWithSharedKey creates a Client struct in the context of the table specified in the serviceURL, authorizing requests with a shared key.
// The serviceURL param is expected to have the name of the table in a format similar to: "https://myAccountName.table.core.windows.net/<myTableName>".
// Pass in nil for options to construct the client with the default ClientOptions.
func NewClientWithSharedKey(serviceURL string, cred *SharedKeyCredential, options *ClientOptions) (*Client, error) {
if options == nil {
options = &ClientOptions{}
}
rawServiceURL, tableName, err := parseURL(serviceURL)
if err != nil {
return nil, err
}
s, err := NewServiceClientWithSharedKey(rawServiceURL, cred, options)
if err != nil {
return nil, err
}
return s.NewClient(tableName), nil
}
func parseURL(serviceURL string) (string, string, error) {
parsedUrl, err := url.Parse(serviceURL)
if err != nil {
return "", "", err
}
tableName := parsedUrl.Path[1:]
rawServiceURL := parsedUrl.Scheme + "://" + parsedUrl.Host
if parsedUrl.Scheme == "" {
rawServiceURL = parsedUrl.Host
}
if strings.Contains(tableName, "/") {
splits := strings.Split(parsedUrl.Path, "/")
tableName = splits[len(splits)-1]
rawServiceURL += strings.Join(splits[:len(splits)-1], "/")
}
sas := parsedUrl.Query()
if len(sas) > 0 {
rawServiceURL += "/?" + sas.Encode()
}
return rawServiceURL, tableName, nil
}
// CreateTable creates the table with the tableName specified when NewClient was called. If the service returns a non-successful
// HTTP status code, the function returns an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
// NOTE: creating a table with the same name as a table that's in the process of being deleted will return an *azcore.ResponseError
// with error code TableBeingDeleted and status code http.StatusConflict.
func (t *Client) CreateTable(ctx context.Context, options *CreateTableOptions) (CreateTableResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.CreateTable", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &CreateTableOptions{}
}
resp, err := t.client.Create(ctx, generated.TableProperties{TableName: &t.name}, options.toGenerated(), &generated.QueryOptions{})
if err != nil {
return CreateTableResponse{}, err
}
return CreateTableResponse{
TableName: resp.TableName,
}, nil
}
// Delete deletes the table with the tableName specified when NewClient was called. If the service returns a non-successful HTTP status
// code, the function returns an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
// NOTE: deleting a table can take up to 40 seconds or more to complete. If a table with the same name is created while the delete is still
// in progress, an *azcore.ResponseError is returned with error code TableBeingDeleted and status code http.StatusConflict.
func (t *Client) Delete(ctx context.Context, options *DeleteTableOptions) (DeleteTableResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.Delete", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
resp, err := t.service.DeleteTable(ctx, t.name, options)
return resp, err
}
// NewListEntitiesPager queries the entities using the specified ListEntitiesOptions.
// ListEntitiesOptions can specify the following properties to affect the query results returned:
//
// Filter: An OData filter expression that limits results to those entities that satisfy the filter expression.
// For example, the following expression would return only entities with a PartitionKey of 'foo': "PartitionKey eq 'foo'"
//
// Select: A comma delimited list of entity property names that selects which set of entity properties to return in the result set.
// For example, the following value would return results containing only the PartitionKey and RowKey properties: "PartitionKey, RowKey"
//
// Top: The maximum number of entities that will be returned per page of results.
// Note: This value does not limit the total number of results if NextPage is called on the returned Pager until it returns false.
//
// NewListEntitiesPager returns a Pager, which allows iteration through each page of results. Use nil for listOptions if you want to use the default options.
// For more information about writing query strings, check out:
// - API Documentation: https://learn.microsoft.com/rest/api/storageservices/querying-tables-and-entities
// - README samples: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/data/aztables/README.md#writing-filters
func (t *Client) NewListEntitiesPager(listOptions *ListEntitiesOptions) *runtime.Pager[ListEntitiesResponse] {
if listOptions == nil {
listOptions = &ListEntitiesOptions{}
}
return runtime.NewPager(runtime.PagingHandler[ListEntitiesResponse]{
More: func(page ListEntitiesResponse) bool {
// if there are no continuation header values, there are no more pages
// https://learn.microsoft.com/rest/api/storageservices/Query-Timeout-and-Pagination
return !((page.NextPartitionKey == nil || len(*page.NextPartitionKey) == 0) && (page.NextRowKey == nil || len(*page.NextRowKey) == 0))
},
Fetcher: func(ctx context.Context, page *ListEntitiesResponse) (ListEntitiesResponse, error) {
var partKey *string
var rowKey *string
if page != nil {
partKey = page.NextPartitionKey
rowKey = page.NextRowKey
} else {
partKey = listOptions.NextPartitionKey
rowKey = listOptions.NextRowKey
}
resp, err := t.client.QueryEntities(ctx, t.name, &generated.TableClientQueryEntitiesOptions{
NextPartitionKey: partKey,
NextRowKey: rowKey,
}, listOptions.toQueryOptions())
if err != nil {
return ListEntitiesResponse{}, err
}
var marshalledValue [][]byte
if len(resp.TableEntityQueryResponse.Value) > 0 {
marshalledValue = make([][]byte, len(resp.TableEntityQueryResponse.Value))
for i := range resp.TableEntityQueryResponse.Value {
m, err := json.Marshal(resp.TableEntityQueryResponse.Value[i])
if err != nil {
return ListEntitiesResponse{}, err
}
marshalledValue[i] = m
}
}
return ListEntitiesResponse{
NextPartitionKey: resp.XMSContinuationNextPartitionKey,
NextRowKey: resp.XMSContinuationNextRowKey,
Entities: marshalledValue,
}, nil
},
Tracer: t.client.Tracer(),
})
}
// GetEntity retrieves a specific entity from the service using the specified partitionKey and rowKey values. If
// no entity is available it returns an error. If the service returns a non-successful HTTP status code, the function
// returns an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
func (t *Client) GetEntity(ctx context.Context, partitionKey string, rowKey string, options *GetEntityOptions) (GetEntityResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.GetEntity", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &GetEntityOptions{}
}
resp, err := t.client.QueryEntityWithPartitionAndRowKey(ctx, t.name, prepareKey(partitionKey), prepareKey(rowKey), nil, &generated.QueryOptions{
Format: options.Format,
})
if err != nil {
return GetEntityResponse{}, err
}
marshalledValue, err := json.Marshal(resp.Value)
if err != nil {
return GetEntityResponse{}, err
}
var ETag azcore.ETag
if resp.ETag != nil {
ETag = azcore.ETag(*resp.ETag)
}
return GetEntityResponse{
ETag: ETag,
Value: marshalledValue,
}, nil
}
// AddEntity adds an entity (described by a byte slice) to the table. This method returns an error if an entity with
// the same PartitionKey and RowKey already exists in the table. If the supplied entity does not contain both a PartitionKey
// and a RowKey an error will be returned. If the service returns a non-successful HTTP status code, the function returns
// an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
func (t *Client) AddEntity(ctx context.Context, entity []byte, options *AddEntityOptions) (AddEntityResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.AddEntity", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
var mapEntity map[string]any
err = json.Unmarshal(entity, &mapEntity)
if err != nil {
return AddEntityResponse{}, err
}
if options == nil {
options = &AddEntityOptions{}
}
resp, err := t.client.InsertEntity(ctx, t.name, &generated.TableClientInsertEntityOptions{TableEntityProperties: mapEntity}, &generated.QueryOptions{
Format: options.Format,
})
if err != nil {
err = checkEntityForPkRk(&mapEntity, err)
return AddEntityResponse{}, err
}
marshalledValue, err := json.Marshal(resp.Value)
if err != nil {
return AddEntityResponse{}, err
}
var ETag azcore.ETag
if resp.ETag != nil {
ETag = azcore.ETag(*resp.ETag)
}
return AddEntityResponse{
ETag: ETag,
Value: marshalledValue,
}, nil
}
// DeleteEntity deletes the entity with the specified partitionKey and rowKey from the table. If the service returns a non-successful HTTP
// status code, the function returns an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
func (t *Client) DeleteEntity(ctx context.Context, partitionKey string, rowKey string, options *DeleteEntityOptions) (DeleteEntityResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.DeleteEntity", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &DeleteEntityOptions{}
}
if options.IfMatch == nil {
nilEtag := azcore.ETag("*")
options.IfMatch = &nilEtag
}
_, err = t.client.DeleteEntity(ctx, t.name, prepareKey(partitionKey), prepareKey(rowKey), string(*options.IfMatch), options.toGenerated(), &generated.QueryOptions{})
return DeleteEntityResponse{}, err
}
// UpdateEntity updates the specified table entity if it exists.
// If updateMode is Replace, the entity will be replaced. This is the only way to remove properties from an existing entity.
// If updateMode is Merge, the property values present in the specified entity will be merged with the existing entity. Properties not specified in the merge will be unaffected.
// The specified etag value will be used for optimistic concurrency. If the etag does not match the value of the entity in the table, the operation will fail.
// The response type will be TableEntityMergeResponse if updateMode is Merge and TableEntityUpdateResponse if updateMode is Replace.
// If the service returns a non-successful HTTP status code, the function returns an *azcore.ResponseError type. Specify nil for options if you want to use the default options.
func (t *Client) UpdateEntity(ctx context.Context, entity []byte, options *UpdateEntityOptions) (UpdateEntityResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.UpdateEntity", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &UpdateEntityOptions{
UpdateMode: UpdateModeMerge,
}
}
if options.IfMatch == nil {
star := azcore.ETag("*")
options.IfMatch = &star
}
var mapEntity map[string]any
err = json.Unmarshal(entity, &mapEntity)
if err != nil {
return UpdateEntityResponse{}, err
}
pk := mapEntity[partitionKey]
partKey := pk.(string)
rk := mapEntity[rowKey]
rowkey := rk.(string)
switch options.UpdateMode {
case UpdateModeMerge:
var resp generated.TableClientMergeEntityResponse
resp, err = t.client.MergeEntity(
ctx,
t.name,
prepareKey(partKey),
prepareKey(rowkey),
options.toGeneratedMergeEntity(mapEntity),
&generated.QueryOptions{},
)
if err != nil {
return UpdateEntityResponse{}, err
}
var ETag azcore.ETag
if resp.ETag != nil {
ETag = azcore.ETag(*resp.ETag)
}
return UpdateEntityResponse{
ETag: ETag,
}, nil
case UpdateModeReplace:
var resp generated.TableClientUpdateEntityResponse
resp, err = t.client.UpdateEntity(
ctx,
t.name,
prepareKey(partKey),
prepareKey(rowkey),
options.toGeneratedUpdateEntity(mapEntity),
&generated.QueryOptions{},
)
if err != nil {
return UpdateEntityResponse{}, err
}
var ETag azcore.ETag
if resp.ETag != nil {
ETag = azcore.ETag(*resp.ETag)
}
return UpdateEntityResponse{
ETag: ETag,
}, nil
}
if pk == "" || rk == "" {
err = errPartitionKeyRowKeyError
} else {
err = errInvalidUpdateMode
}
return UpdateEntityResponse{}, err
}
func insertEntityFromGeneratedMerge(g *generated.TableClientMergeEntityResponse) UpsertEntityResponse {
if g == nil {
return UpsertEntityResponse{}
}
var ETag azcore.ETag
if g.ETag != nil {
ETag = azcore.ETag(*g.ETag)
}
return UpsertEntityResponse{
ETag: ETag,
}
}
func insertEntityFromGeneratedUpdate(g *generated.TableClientUpdateEntityResponse) UpsertEntityResponse {
if g == nil {
return UpsertEntityResponse{}
}
var ETag azcore.ETag
if g.ETag != nil {
ETag = azcore.ETag(*g.ETag)
}
return UpsertEntityResponse{
ETag: ETag,
}
}
// UpsertEntity inserts an entity if it does not already exist in the table. If the entity does exist, the entity is
// replaced or merged as specified the updateMode parameter. If the entity exists and updateMode is Merge, the property
// values present in the specified entity will be merged with the existing entity rather than replaced.
// The response type will be TableEntityMergeResponse if updateMode is Merge and TableEntityUpdateResponse if updateMode is Replace.
// If the service returns a non-successful HTTP status code, the function returns an *azcore.ResponseError type.
// Specify nil for options if you want to use the default options.
func (t *Client) UpsertEntity(ctx context.Context, entity []byte, options *UpsertEntityOptions) (UpsertEntityResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.UpsertEntity", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &UpsertEntityOptions{
UpdateMode: UpdateModeMerge,
}
}
var mapEntity map[string]any
err = json.Unmarshal(entity, &mapEntity)
if err != nil {
return UpsertEntityResponse{}, err
}
pk := mapEntity[partitionKey]
partKey := pk.(string)
rk := mapEntity[rowKey]
rowkey := rk.(string)
switch options.UpdateMode {
case UpdateModeMerge:
var resp generated.TableClientMergeEntityResponse
resp, err = t.client.MergeEntity(
ctx,
t.name,
prepareKey(partKey),
prepareKey(rowkey),
&generated.TableClientMergeEntityOptions{TableEntityProperties: mapEntity},
&generated.QueryOptions{},
)
if err != nil {
return UpsertEntityResponse{}, err
}
return insertEntityFromGeneratedMerge(&resp), err
case UpdateModeReplace:
var resp generated.TableClientUpdateEntityResponse
resp, err = t.client.UpdateEntity(
ctx,
t.name,
prepareKey(partKey),
prepareKey(rowkey),
&generated.TableClientUpdateEntityOptions{TableEntityProperties: mapEntity},
&generated.QueryOptions{},
)
if err != nil {
return UpsertEntityResponse{}, err
}
return insertEntityFromGeneratedUpdate(&resp), err
}
if pk == "" || rk == "" {
err = errPartitionKeyRowKeyError
} else {
err = errInvalidUpdateMode
}
return UpsertEntityResponse{}, err
}
// GetAccessPolicy retrieves details about any stored access policies specified on the table that may be used with the Shared Access Signature.
// If the service returns a non-successful HTTP status code, the function returns an *azcore.ResponseError type.
// Specify nil for options if you want to use the default options.
func (t *Client) GetAccessPolicy(ctx context.Context, options *GetAccessPolicyOptions) (GetAccessPolicyResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.GetAccessPolicy", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
resp, err := t.client.GetAccessPolicy(ctx, t.name, options.toGenerated())
if err != nil {
return GetAccessPolicyResponse{}, err
}
if len(resp.SignedIdentifiers) == 0 {
return GetAccessPolicyResponse{}, nil
}
sis := make([]*SignedIdentifier, len(resp.SignedIdentifiers))
for i := range resp.SignedIdentifiers {
sis[i] = fromGeneratedSignedIdentifier(resp.SignedIdentifiers[i])
}
return GetAccessPolicyResponse{
SignedIdentifiers: sis,
}, nil
}
// SetAccessPolicy sets stored access policies for the table that may be used with SharedAccessSignature.
// If the service returns a non-successful HTTP status code, the function returns an *azcore.ResponseError type.
// Specify nil for options if you want to use the default options.
func (t *Client) SetAccessPolicy(ctx context.Context, options *SetAccessPolicyOptions) (SetAccessPolicyResponse, error) {
var err error
ctx, endSpan := runtime.StartSpan(ctx, "Client.SetAccessPolicy", t.client.Tracer(), nil)
defer func() { endSpan(err) }()
if options == nil {
options = &SetAccessPolicyOptions{}
}
_, err = t.client.SetAccessPolicy(ctx, t.name, options.toGenerated())
if err != nil && len(options.TableACL) > 5 {
err = errTooManyAccessPoliciesError
}
return SetAccessPolicyResponse{}, err
}
// GetTableSASURL is a convenience method for generating a SAS token for a specific table.
// It can only be used by clients created by NewClientWithSharedKey().
func (t Client) GetTableSASURL(permissions SASPermissions, start time.Time, expiry time.Time) (string, error) {
if t.cred == nil {
return "", errors.New("SAS can only be signed with a SharedKeyCredential")
}
qps, err := SASSignatureValues{
TableName: t.name,
Permissions: permissions.String(),
StartTime: start,
ExpiryTime: expiry,
StartPartitionKey: permissions.StartPartitionKey,
StartRowKey: permissions.StartRowKey,
EndPartitionKey: permissions.EndPartitionKey,
EndRowKey: permissions.EndRowKey,
}.Sign(t.cred)
if err != nil {
return "", err
}
serviceURL := t.client.Endpoint()
if !strings.Contains(serviceURL, "/") {
serviceURL += "/"
}
serviceURL += t.name + "?" + qps
return serviceURL, nil
}