dax/internal/client/request.go (1,313 lines of code) (raw):
/*
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://www.apache.org/licenses/LICENSE-2.0
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
*/
package client
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"strings"
"github.com/aws/aws-dax-go-v2/dax/internal/cbor"
"github.com/aws/aws-dax-go-v2/dax/internal/lru"
"github.com/aws/aws-dax-go-v2/dax/internal/parser"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
"github.com/gofrs/uuid"
)
const daxServiceId = 1
const (
// Dax Control
authorizeConnection_1489122155_1_Id = 1489122155
defineAttributeList_670678385_1_Id = 670678385
defineAttributeListId_N1230579644_1_Id = -1230579644
defineKeySchema_N742646399_1_Id = -742646399
endpoints_455855874_1_Id = 455855874
methods_785068263_1_Id = 785068263
services_N1016793520_1_Id = -1016793520
// DynamoDB Data
transactWriteItems_N1160037738_1_Id = -1160037738
transactGetItems_1866287579_1_Id = 1866287579
batchGetItem_N697851100_1_Id = -697851100
batchWriteItem_116217951_1_Id = 116217951
getItem_263244906_1_Id = 263244906
putItem_N2106490455_1_Id = -2106490455
deleteItem_1013539361_1_Id = 1013539361
updateItem_1425579023_1_Id = 1425579023
query_N931250863_1_Id = -931250863
scan_N1875390620_1_Id = -1875390620
// DynamoDB Control
createTable_N313431286_1_Id = -313431286
deleteTable_2120496185_1_Id = 2120496185
describeTable_N819330193_1_Id = -819330193
updateTable_383747477_1_Id = 383747477
listTables_1874119219_1_Id = 1874119219
describeLimits_N475661135_1_Id = -475661135
)
const (
requestParamProjectionExpression = iota
requestParamExpressionAttributeNames
requestParamConsistentRead
requestParamReturnConsumedCapacity
requestParamConditionExpression
requestParamExpressionAttributeValues
requestParamReturnItemCollectionMetrics
requestParamReturnValues
requestParamUpdateExpression
requestParamExclusiveStartKey
requestParamFilterExpression
requestParamIndexName
requestParamKeyConditionExpression
requestParamLimit
requestParamScanIndexForward
requestParamSelect
requestParamSegment
requestParamTotalSegments
requestParamRequestItems
requestParamRequestItemsClientRequestToken
)
const (
returnValueNone = 1 + iota
returnValueAllOld
returnValueUpdatedOld
returnValueAllNew
returnValueUpdatedNew
)
const (
returnConsumedCapacityNone = iota
returnConsumedCapacityTotal
returnConsumedCapacityIndexes
)
const (
returnItemCollectionMetricsNone = iota
returnItemCollectionMetricsSize
)
const (
selectAllAttributes = 1 + iota
selectAllProjectedAttributes
selectCount
selectSpecificAttributes
)
const (
getOperation = iota + 1
putOperation
exchangeOperation
insertOperation
replaceOperation
updateOperation
deleteOperation
removeOperation
partialUpdateOperation
batchGetOperation
batchOperation
checkOperation
transactWriteOperation
transactGetOperation
scanOperation
queryOperation
createTableOperation
deleteTableOperation
describeTableOperation
listTablesOperation
updateTableOperation
)
const (
returnValueOnConditionCheckFailureNone = iota + 1
returnValueOnConditionCheckFailureAllOld
)
const maxWriteBatchSize = 25
func encodeEndpointsInput(writer *cbor.Writer) error {
if err := encodeServiceAndMethod(endpoints_455855874_1_Id, writer); err != nil {
return err
}
return nil
}
func encodeAuthInput(accessKey, sessionToken, stringToSign, signature, userAgent string, writer *cbor.Writer) error {
if err := encodeServiceAndMethod(authorizeConnection_1489122155_1_Id, writer); err != nil {
return err
}
if err := writer.WriteString(accessKey); err != nil {
return err
}
if err := writer.WriteString(signature); err != nil {
return err
}
if err := writer.WriteBytes([]byte(stringToSign)); err != nil {
return err
}
if len(sessionToken) == 0 {
if err := writer.WriteNull(); err != nil {
return err
}
} else {
if err := writer.WriteString(sessionToken); err != nil {
return err
}
}
if len(userAgent) == 0 {
if err := writer.WriteNull(); err != nil {
return err
}
} else {
if err := writer.WriteString(userAgent); err != nil {
return err
}
}
return nil
}
func encodeDefineAttributeListIdInput(attrNames []string, writer *cbor.Writer) error {
if err := encodeServiceAndMethod(defineAttributeListId_N1230579644_1_Id, writer); err != nil {
return err
}
if err := writer.WriteArrayHeader(len(attrNames)); err != nil {
return err
}
for _, an := range attrNames {
if err := writer.WriteString(an); err != nil {
return err
}
}
return nil
}
func encodeDefineAttributeListInput(id int64, writer *cbor.Writer) error {
if err := encodeServiceAndMethod(defineAttributeList_670678385_1_Id, writer); err != nil {
return err
}
return writer.WriteInt64(id)
}
func encodeDefineKeySchemaInput(table string, writer *cbor.Writer) error {
if err := encodeServiceAndMethod(defineKeySchema_N742646399_1_Id, writer); err != nil {
return err
}
return writer.WriteBytes([]byte(table))
}
func encodePutItemInput(ctx context.Context, input *dynamodb.PutItemInput, keySchema *lru.Lru, attrNamesListToId *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpPutItemInput(input); err != nil {
return err
}
if input, err = translateLegacyPutItemInput(input); err != nil {
return err
}
table := *input.TableName
keys, err := getKeySchema(ctx, keySchema, table)
if err != nil {
return err
}
if err := encodeServiceAndMethod(putItem_N2106490455_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(table)); err != nil {
return err
}
if err := cbor.EncodeItemKey(input.Item, keys, writer); err != nil {
return err
}
if err := encodeNonKeyAttributes(ctx, input.Item, keys, attrNamesListToId, writer); err != nil {
return err
}
return encodeItemOperationOptionalParams(
input.ReturnValues,
input.ReturnConsumedCapacity,
input.ReturnItemCollectionMetrics,
nil,
nil, input.ConditionExpression, nil, input.ExpressionAttributeNames, input.ExpressionAttributeValues, writer)
}
func encodeDeleteItemInput(ctx context.Context, input *dynamodb.DeleteItemInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpDeleteItemInput(input); err != nil {
return err
}
if input, err = translateLegacyDeleteItemInput(input); err != nil {
return err
}
table := *input.TableName
keys, err := getKeySchema(ctx, keySchema, *input.TableName)
if err != nil {
return nil
}
if err := encodeServiceAndMethod(deleteItem_1013539361_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(table)); err != nil {
return err
}
if err := cbor.EncodeItemKey(input.Key, keys, writer); err != nil {
return err
}
return encodeItemOperationOptionalParams(
input.ReturnValues,
input.ReturnConsumedCapacity,
input.ReturnItemCollectionMetrics,
nil,
nil, input.ConditionExpression, nil, input.ExpressionAttributeNames, input.ExpressionAttributeValues, writer)
}
func encodeUpdateItemInput(ctx context.Context, input *dynamodb.UpdateItemInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpUpdateItemInput(input); err != nil {
return err
}
if input, err = translateLegacyUpdateItemInput(input); err != nil {
return err
}
table := *input.TableName
keys, err := getKeySchema(ctx, keySchema, *input.TableName)
if err != nil {
return nil
}
if err := encodeServiceAndMethod(updateItem_1425579023_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(table)); err != nil {
return err
}
if err := cbor.EncodeItemKey(input.Key, keys, writer); err != nil {
return err
}
return encodeItemOperationOptionalParams(
input.ReturnValues,
input.ReturnConsumedCapacity,
input.ReturnItemCollectionMetrics,
nil,
nil, input.ConditionExpression, input.UpdateExpression, input.ExpressionAttributeNames, input.ExpressionAttributeValues, writer)
}
func encodeGetItemInput(ctx context.Context, input *dynamodb.GetItemInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpGetItemInput(input); err != nil {
return err
}
if input, err = translateLegacyGetItemInput(input); err != nil {
return err
}
table := *input.TableName
keys, err := getKeySchema(ctx, keySchema, table)
if err != nil {
return err
}
if err := encodeServiceAndMethod(getItem_263244906_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(table)); err != nil {
return err
}
if err := cbor.EncodeItemKey(input.Key, keys, writer); err != nil {
return err
}
return encodeItemOperationOptionalParams(
types.ReturnValueNone,
input.ReturnConsumedCapacity,
types.ReturnItemCollectionMetricsNone,
input.ConsistentRead,
input.ProjectionExpression, nil, nil, input.ExpressionAttributeNames, nil, writer)
}
func encodeScanInput(ctx context.Context, input *dynamodb.ScanInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpScanInput(input); err != nil {
return err
}
if input, err = translateLegacyScanInput(input); err != nil {
return err
}
if err := encodeServiceAndMethod(scan_N1875390620_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(*input.TableName)); err != nil {
return err
}
expressions, err := encodeExpressions(input.ProjectionExpression, input.FilterExpression, nil, input.ExpressionAttributeNames, input.ExpressionAttributeValues)
if err != nil {
return err
}
return encodeScanQueryOptionalParams(
ctx,
input.IndexName,
input.Select,
input.ReturnConsumedCapacity,
input.ConsistentRead,
expressions, input.Segment, input.TotalSegments, input.Limit, nil, input.ExclusiveStartKey, keySchema, *input.TableName, writer)
}
func encodeQueryInput(ctx context.Context, input *dynamodb.QueryInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpQueryInput(input); err != nil {
return err
}
if input, err = translateLegacyQueryInput(input); err != nil {
return err
}
if input.KeyConditionExpression == nil {
return smithy.NewErrParamRequired("KeyConditionExpression cannot be nil")
}
if err := encodeServiceAndMethod(query_N931250863_1_Id, writer); err != nil {
return err
}
if err := writer.WriteBytes([]byte(*input.TableName)); err != nil {
return err
}
expressions, err := encodeExpressions(input.ProjectionExpression, input.FilterExpression, input.KeyConditionExpression, input.ExpressionAttributeNames, input.ExpressionAttributeValues)
if err != nil {
return err
}
if err = writer.WriteBytes(expressions[parser.KeyConditionExpr]); err != nil {
return err
}
return encodeScanQueryOptionalParams(
ctx,
input.IndexName,
input.Select,
input.ReturnConsumedCapacity,
input.ConsistentRead,
expressions, nil, nil, input.Limit, input.ScanIndexForward, input.ExclusiveStartKey, keySchema, *input.TableName, writer)
}
func encodeBatchWriteItemInput(ctx context.Context, input *dynamodb.BatchWriteItemInput, keySchema *lru.Lru, attrNamesListToId *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpBatchWriteItemInput(input); err != nil {
return err
}
if err = encodeServiceAndMethod(batchWriteItem_116217951_1_Id, writer); err != nil {
return err
}
if err = writer.WriteMapHeader(len(input.RequestItems)); err != nil {
return err
}
totalRequests := 0
for table, wrs := range input.RequestItems {
keys, err := getKeySchema(ctx, keySchema, table)
if err != nil {
return err
}
l := len(wrs)
if l == 0 {
return &smithy.GenericAPIError{
Code: ErrCodeInvalidParameter,
Message: fmt.Sprintf("1 validation error detected: Value '{%s=%d}' at 'requestItems' failed to satisfy constraint:"+
" Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1", table, l),
Fault: smithy.FaultClient,
}
}
totalRequests = totalRequests + l
if totalRequests > maxWriteBatchSize {
return &smithy.GenericAPIError{
Code: ErrCodeInvalidParameter,
Message: fmt.Sprintf("1 validation error detected: Value '{%s=%d}' at 'requestItems' failed to satisfy constraint:"+
" Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1", table, totalRequests),
Fault: smithy.FaultClient,
}
}
if err = writer.WriteString(table); err != nil {
return err
}
if err = writer.WriteArrayHeader(2 * l); err != nil {
return err
}
if hasDuplicatesWriteRequests(wrs, keys) {
return errors.New("Provided list of item keys contains duplicates")
}
for _, wr := range wrs {
if pr := wr.PutRequest; pr != nil {
attrs := pr.Item
if err = cbor.EncodeItemKey(attrs, keys, writer); err != nil {
return err
}
if err = encodeNonKeyAttributes(ctx, attrs, keys, attrNamesListToId, writer); err != nil {
return err
}
} else if dr := wr.DeleteRequest; dr != nil {
if err = cbor.EncodeItemKey(dr.Key, keys, writer); err != nil {
return err
}
if err = writer.WriteNull(); err != nil {
return err
}
} else {
return errors.New("Both PutRequest and DeleteRequest cannot be empty")
}
}
}
return encodeItemOperationOptionalParams(
types.ReturnValueNone,
input.ReturnConsumedCapacity,
input.ReturnItemCollectionMetrics,
nil, nil, nil, nil, nil, nil, writer)
}
func encodeBatchGetItemInput(ctx context.Context, input *dynamodb.BatchGetItemInput, keySchema *lru.Lru, writer *cbor.Writer) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpBatchGetItemInput(input); err != nil {
return err
}
if input, err = translateLegacyBatchGetItemInput(input); err != nil {
return err
}
if err = encodeServiceAndMethod(batchGetItem_N697851100_1_Id, writer); err != nil {
return err
}
if err = writer.WriteMapHeader(len(input.RequestItems)); err != nil {
return err
}
for table, kaas := range input.RequestItems {
if err = writer.WriteString(table); err != nil {
return err
}
if err = writer.WriteArrayHeader(3); err != nil {
return err
}
cr := false
if kaas.ConsistentRead != nil {
cr = *kaas.ConsistentRead
}
if err = writer.WriteBoolean(cr); err != nil {
return err
}
if kaas.ProjectionExpression != nil {
expressions := make(map[int]string)
expressions[parser.ProjectionExpr] = *kaas.ProjectionExpression
encoder := parser.NewExpressionEncoder(expressions, kaas.ExpressionAttributeNames, nil)
if _, err = encoder.Parse(); err != nil {
return err
}
var buf bytes.Buffer
if err = encoder.Write(parser.ProjectionExpr, &buf); err != nil {
return err
}
if err = writer.WriteBytes(buf.Bytes()); err != nil {
return err
}
} else {
if err = writer.WriteNull(); err != nil {
return err
}
}
tableKeys, err := getKeySchema(ctx, keySchema, table)
if err != nil {
return err
}
if err = writer.WriteArrayHeader(len(kaas.Keys)); err != nil {
return err
}
if hasDuplicateKeysAndAttributes(kaas, tableKeys) {
return &smithy.GenericAPIError{
Code: ErrCodeInvalidParameter,
Message: fmt.Sprintf("Provided list of item keys contains duplicates"),
Fault: smithy.FaultClient,
}
}
for _, keys := range kaas.Keys {
if err = cbor.EncodeItemKey(keys, tableKeys, writer); err != nil {
return err
}
}
}
return encodeItemOperationOptionalParams(
types.ReturnValueNone,
input.ReturnConsumedCapacity,
types.ReturnItemCollectionMetricsNone,
nil, nil, nil, nil, nil, nil, writer)
}
func encodeTransactWriteItemsInput(
ctx context.Context,
input *dynamodb.TransactWriteItemsInput,
keySchema *lru.Lru, attrNamesListToId *lru.Lru, writer *cbor.Writer,
extractedKeys []map[string]types.AttributeValue,
) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpTransactWriteItemsInput(input); err != nil {
return err
}
if err = encodeServiceAndMethod(transactWriteItems_N1160037738_1_Id, writer); err != nil {
return err
}
var operationsBuf, tableNamesBuf, keysBuf, valuesBuf, conditionExpressionsBuf,
updateExpressionsBuf, rvOnConditionCheckFailureBuf bytes.Buffer
operationWriter := cbor.NewWriter(&operationsBuf)
tableNamesWriter := cbor.NewWriter(&tableNamesBuf)
keysWriter := cbor.NewWriter(&keysBuf)
valuesWriter := cbor.NewWriter(&valuesBuf)
conditionExpressionsWriter := cbor.NewWriter(&conditionExpressionsBuf)
updateExpressionsWriter := cbor.NewWriter(&updateExpressionsBuf)
rvOnConditionCheckFailureWriter := cbor.NewWriter(&rvOnConditionCheckFailureBuf)
l := len(input.TransactItems)
if err = operationWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = tableNamesWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = keysWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = valuesWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = conditionExpressionsWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = updateExpressionsWriter.WriteArrayHeader(l); err != nil {
return err
}
if err = rvOnConditionCheckFailureWriter.WriteArrayHeader(l); err != nil {
return err
}
defer func() {
operationWriter.Close()
tableNamesWriter.Close()
keysWriter.Close()
valuesWriter.Close()
conditionExpressionsWriter.Close()
updateExpressionsWriter.Close()
rvOnConditionCheckFailureWriter.Close()
}()
tableKeySet := make(map[string]bool)
for i, twi := range input.TransactItems {
var operation int
var tableName *string
var key map[string]types.AttributeValue
var item map[string]types.AttributeValue
var isItem bool = false
var conditionExpression *string
var updateExpression *string
var expressionAttributeNames map[string]string
var expressionAttributeValues map[string]types.AttributeValue
var rvOnConditionCheckFailure types.ReturnValuesOnConditionCheckFailure
opCount := 0
if check := twi.ConditionCheck; check != nil {
opCount++
operation = checkOperation
conditionExpression = check.ConditionExpression
expressionAttributeNames = check.ExpressionAttributeNames
expressionAttributeValues = check.ExpressionAttributeValues
tableName = check.TableName
key = check.Key
rvOnConditionCheckFailure = check.ReturnValuesOnConditionCheckFailure
}
if delete := twi.Delete; delete != nil {
opCount++
operation = deleteOperation
conditionExpression = delete.ConditionExpression
expressionAttributeNames = delete.ExpressionAttributeNames
expressionAttributeValues = delete.ExpressionAttributeValues
tableName = delete.TableName
key = delete.Key
rvOnConditionCheckFailure = delete.ReturnValuesOnConditionCheckFailure
}
if put := twi.Put; put != nil {
opCount++
operation = putOperation
conditionExpression = put.ConditionExpression
expressionAttributeNames = put.ExpressionAttributeNames
expressionAttributeValues = put.ExpressionAttributeValues
tableName = put.TableName
item = put.Item
isItem = true
rvOnConditionCheckFailure = put.ReturnValuesOnConditionCheckFailure
}
if update := twi.Update; update != nil {
opCount++
operation = partialUpdateOperation
conditionExpression = update.ConditionExpression
expressionAttributeNames = update.ExpressionAttributeNames
expressionAttributeValues = update.ExpressionAttributeValues
tableName = update.TableName
key = update.Key
updateExpression = update.UpdateExpression
rvOnConditionCheckFailure = update.ReturnValuesOnConditionCheckFailure
}
if opCount == 0 {
return smithy.NewErrParamRequired("Invalid Request: TransactWriteItemsInput should contain Delete or Put or Update request")
}
if opCount > 1 {
return smithy.NewErrParamRequired("TransactItems can only contain one of ConditionalCheck, Put, Update or Delete")
}
if err := operationWriter.WriteInt(operation); err != nil {
return err
}
if err := tableNamesWriter.WriteBytes([]byte(*tableName)); err != nil {
return err
}
keydef, err := getKeySchema(ctx, keySchema, *tableName)
if err != nil {
return nil
}
// Check if duplicate [key, tableName] pair exists
var keyBytes []byte
if isItem {
keyBytes, err = cbor.GetEncodedItemKey(item, keydef)
} else {
keyBytes, err = cbor.GetEncodedItemKey(key, keydef)
}
if err != nil {
return err
}
keysWriter.WriteBytes(keyBytes)
keyBytes = append(keyBytes, []byte(*tableName)...)
tableKey := string(keyBytes)
_, ok := tableKeySet[tableKey]
if ok {
return smithy.NewErrParamRequired("Transaction request cannot include multiple operations on one item")
} else {
tableKeySet[tableKey] = true
}
switch operation {
case checkOperation, deleteOperation, partialUpdateOperation:
if err := valuesWriter.WriteNull(); err != nil {
return err
}
case putOperation:
if err := encodeNonKeyAttributes(ctx, item, keydef, attrNamesListToId, valuesWriter); err != nil {
return err
}
key = map[string]types.AttributeValue{}
for _, attrDef := range keydef {
key[*attrDef.AttributeName] = item[*attrDef.AttributeName]
}
}
extractedKeys[i] = key
encoded, err := parseExpressions(conditionExpression, updateExpression, nil, expressionAttributeNames, expressionAttributeValues)
if err != nil {
return err
}
if parsedConditionExpr := encoded[parser.ConditionExpr]; parsedConditionExpr != nil {
if err := conditionExpressionsWriter.WriteBytes(parsedConditionExpr); err != nil {
return err
}
} else {
if err := conditionExpressionsWriter.WriteNull(); err != nil {
return err
}
}
if parsedUpdateExpr := encoded[parser.UpdateExpr]; parsedUpdateExpr != nil {
if err := updateExpressionsWriter.WriteBytes(parsedUpdateExpr); err != nil {
return err
}
} else {
if err := updateExpressionsWriter.WriteNull(); err != nil {
return err
}
}
if rvOnConditionCheckFailure == types.ReturnValuesOnConditionCheckFailureAllOld {
if err := rvOnConditionCheckFailureWriter.WriteInt(returnValueOnConditionCheckFailureAllOld); err != nil {
return err
}
} else {
if err := rvOnConditionCheckFailureWriter.WriteInt(returnValueOnConditionCheckFailureNone); err != nil {
return err
}
}
}
if err := operationWriter.Flush(); err != nil {
return err
}
if err := writer.Write(operationsBuf.Bytes()); err != nil {
return err
}
if err := tableNamesWriter.Flush(); err != nil {
return err
}
if err := writer.Write(tableNamesBuf.Bytes()); err != nil {
return err
}
if err := keysWriter.Flush(); err != nil {
return err
}
if err := writer.Write(keysBuf.Bytes()); err != nil {
return err
}
if err := valuesWriter.Flush(); err != nil {
return err
}
if err := writer.Write(valuesBuf.Bytes()); err != nil {
return err
}
// Write null for returnValues
if err := writer.WriteNull(); err != nil {
return err
}
if err := rvOnConditionCheckFailureWriter.Flush(); err != nil {
return err
}
if err := writer.Write(rvOnConditionCheckFailureBuf.Bytes()); err != nil {
return err
}
if err := conditionExpressionsWriter.Flush(); err != nil {
return err
}
if err := writer.Write(conditionExpressionsBuf.Bytes()); err != nil {
return err
}
if err := updateExpressionsWriter.Flush(); err != nil {
return err
}
if err := writer.Write(updateExpressionsBuf.Bytes()); err != nil {
return err
}
if input.ClientRequestToken == nil {
id, err := uuid.NewV4()
if err != nil {
return err
}
input.ClientRequestToken = aws.String(id.String())
}
return encodeItemOperationOptionalParamsWithToken(
types.ReturnValueNone,
input.ReturnConsumedCapacity,
input.ReturnItemCollectionMetrics,
nil, nil, nil, nil, nil, nil, input.ClientRequestToken, writer)
}
func encodeTransactGetItemsInput(
ctx context.Context,
input *dynamodb.TransactGetItemsInput,
keySchema *lru.Lru, writer *cbor.Writer,
extractedKeys []map[string]types.AttributeValue,
) error {
if input == nil {
return smithy.NewErrParamRequired("input cannot be nil")
}
var err error
if err = ValidateOpTransactGetItemsInput(input); err != nil {
return err
}
if err = encodeServiceAndMethod(transactGetItems_1866287579_1_Id, writer); err != nil {
return err
}
var tableNamesBuf, keysBuf, projectionExpressionsBuf bytes.Buffer
tableNamesWriter := cbor.NewWriter(&tableNamesBuf)
keysWriter := cbor.NewWriter(&keysBuf)
projectionExpressionsWriter := cbor.NewWriter(&projectionExpressionsBuf)
len := len(input.TransactItems)
if err = tableNamesWriter.WriteArrayHeader(len); err != nil {
return err
}
if err = keysWriter.WriteArrayHeader(len); err != nil {
return err
}
if err = projectionExpressionsWriter.WriteArrayHeader(len); err != nil {
return err
}
defer func() {
tableNamesWriter.Close()
keysWriter.Close()
projectionExpressionsWriter.Close()
}()
for i, tgi := range input.TransactItems {
var tableName *string
var key map[string]types.AttributeValue
var projectionExpression *string
var expressionAttributeNames map[string]string
get := tgi.Get
tableName = get.TableName
key = get.Key
expressionAttributeNames = get.ExpressionAttributeNames
projectionExpression = get.ProjectionExpression
extractedKeys[i] = key
if err := tableNamesWriter.WriteBytes([]byte(*tableName)); err != nil {
return err
}
keydef, err := getKeySchema(ctx, keySchema, *tableName)
if err != nil {
return err
}
if err := cbor.EncodeItemKey(key, keydef, keysWriter); err != nil {
return err
}
encoded, err := parseExpressions(nil, nil, projectionExpression, expressionAttributeNames, nil)
if err != nil {
return err
}
if parsedProjectionExpr := encoded[parser.ProjectionExpr]; parsedProjectionExpr != nil {
if err := projectionExpressionsWriter.WriteBytes(parsedProjectionExpr); err != nil {
return err
}
} else {
if err := projectionExpressionsWriter.WriteNull(); err != nil {
return err
}
}
}
if err := tableNamesWriter.Flush(); err != nil {
return err
}
if err := writer.Write(tableNamesBuf.Bytes()); err != nil {
return err
}
if err := keysWriter.Flush(); err != nil {
return err
}
if err := writer.Write(keysBuf.Bytes()); err != nil {
return err
}
if err := projectionExpressionsWriter.Flush(); err != nil {
return err
}
if err := writer.Write(projectionExpressionsBuf.Bytes()); err != nil {
return err
}
return encodeItemOperationOptionalParams(
types.ReturnValueNone,
input.ReturnConsumedCapacity,
types.ReturnItemCollectionMetricsNone,
nil, nil, nil, nil, nil, nil, writer)
}
func encodeCompoundKey(key map[string]types.AttributeValue, writer *cbor.Writer) error {
var buf bytes.Buffer
w := cbor.NewWriter(&buf)
defer w.Close()
if err := w.WriteMapStreamHeader(); err != nil {
return err
}
if len(key) > 0 {
for k, v := range key {
if err := w.WriteString(k); err != nil {
return err
}
if err := cbor.EncodeAttributeValue(v, w); err != nil {
return err
}
}
}
if err := w.WriteStreamBreak(); err != nil {
return err
}
if err := w.Flush(); err != nil {
return err
}
return writer.WriteBytes(buf.Bytes())
}
func encodeNonKeyAttributes(ctx context.Context, item map[string]types.AttributeValue, keys []types.AttributeDefinition,
attrNamesListToId *lru.Lru, writer *cbor.Writer) error {
var buf bytes.Buffer
w := cbor.NewWriter(&buf)
defer w.Close()
if err := cbor.EncodeItemNonKeyAttributes(ctx, item, keys, attrNamesListToId, w); err != nil {
return err
}
if err := w.Flush(); err != nil {
return err
}
return writer.WriteBytes(buf.Bytes())
}
func encodeScanQueryOptionalParams(
ctx context.Context,
index *string,
selection types.Select,
returnConsumedCapacity types.ReturnConsumedCapacity,
consistentRead *bool,
encodedExpressions map[int][]byte, segment, totalSegment, limit *int32, forward *bool,
startKey map[string]types.AttributeValue, keySchema *lru.Lru, table string, writer *cbor.Writer) error {
var err error
if err = writer.WriteMapStreamHeader(); err != nil {
return err
}
if index != nil {
if err = writer.WriteInt(requestParamIndexName); err != nil {
return err
}
if err = writer.WriteBytes([]byte(*index)); err != nil {
return err
}
}
if selection != types.SelectAllAttributes {
if err = writer.WriteInt(requestParamSelect); err != nil {
return err
}
if err = writer.WriteInt(translateSelect(selection)); err != nil {
return err
}
}
if returnConsumedCapacity != types.ReturnConsumedCapacityNone {
if err = writer.WriteInt(requestParamReturnConsumedCapacity); err != nil {
return err
}
if err = writer.WriteInt(translateReturnConsumedCapacity(returnConsumedCapacity)); err != nil {
return err
}
}
if consistentRead != nil {
if err = writer.WriteInt(requestParamConsistentRead); err != nil {
return err
}
cr := 0
if *consistentRead {
cr = 1
}
if err = writer.WriteInt(cr); err != nil {
return err
}
}
if len(startKey) != 0 {
if err = writer.WriteInt(requestParamExclusiveStartKey); err != nil {
return err
}
if index == nil {
tableKeys, err := getKeySchema(ctx, keySchema, table)
if err != nil {
return nil
}
if err = cbor.EncodeItemKey(startKey, tableKeys, writer); err != nil {
return err
}
} else {
if err = encodeCompoundKey(startKey, writer); err != nil {
return err
}
}
}
if segment != nil {
if err = writer.WriteInt(requestParamSegment); err != nil {
return err
}
if err = writer.WriteInt64(int64(*segment)); err != nil {
return err
}
}
if totalSegment != nil {
if err = writer.WriteInt(requestParamTotalSegments); err != nil {
return err
}
if err = writer.WriteInt64(int64(*totalSegment)); err != nil {
return err
}
}
if limit != nil {
if err = writer.WriteInt(requestParamLimit); err != nil {
return err
}
if err = writer.WriteInt64(int64(*limit)); err != nil {
return err
}
}
if forward != nil {
if err = writer.WriteInt(requestParamScanIndexForward); err != nil {
return err
}
if err = writer.WriteInt(translateScanIndexForward(forward)); err != nil {
return err
}
}
if len(encodedExpressions) > 0 {
for k, v := range encodedExpressions {
var e int
switch k {
case parser.ProjectionExpr:
e = requestParamProjectionExpression
case parser.FilterExpr:
e = requestParamFilterExpression
default:
continue
}
if err = writer.WriteInt(e); err != nil {
return err
}
if err = writer.WriteBytes(v); err != nil {
return err
}
}
}
return writer.WriteStreamBreak()
}
func encodeItemOperationOptionalParamsWithToken(
returnValues types.ReturnValue,
returnConsumedCapacity types.ReturnConsumedCapacity,
returnItemCollectionMetrics types.ReturnItemCollectionMetrics,
consistentRead *bool,
projectionExp, conditionalExpr, updateExpr *string,
exprAttrNames map[string]string,
exprAttrValues map[string]types.AttributeValue, clientRequestToken *string, writer *cbor.Writer) error {
if err := writer.WriteMapStreamHeader(); err != nil {
return err
}
if consistentRead != nil {
if err := writer.WriteInt(requestParamConsistentRead); err != nil {
return err
}
if err := writer.WriteBoolean(*consistentRead); err != nil {
return err
}
}
if dv := translateReturnValues(returnValues); dv != returnValueNone {
if err := writer.WriteInt(requestParamReturnValues); err != nil {
return err
}
if err := writer.WriteInt(dv); err != nil {
return err
}
}
if dv := translateReturnConsumedCapacity(returnConsumedCapacity); dv != returnConsumedCapacityNone {
if err := writer.WriteInt(requestParamReturnConsumedCapacity); err != nil {
return err
}
if err := writer.WriteInt(dv); err != nil {
return err
}
}
if dv := translateReturnItemCollectionMetrics(returnItemCollectionMetrics); dv != returnItemCollectionMetricsNone {
if err := writer.WriteInt(requestParamReturnItemCollectionMetrics); err != nil {
return err
}
if err := writer.WriteInt(dv); err != nil {
return err
}
}
if conditionalExpr != nil || updateExpr != nil || projectionExp != nil {
encoded, err := parseExpressions(conditionalExpr, updateExpr, projectionExp, exprAttrNames, exprAttrValues)
if err != nil {
return err
}
for k := range encoded {
var e int
switch k {
case parser.ConditionExpr:
e = requestParamConditionExpression
case parser.UpdateExpr:
e = requestParamUpdateExpression
case parser.ProjectionExpr:
e = requestParamProjectionExpression
default:
continue
}
if err := writer.WriteInt(e); err != nil {
return err
}
if err := writer.WriteBytes(encoded[k]); err != nil {
return err
}
}
}
if clientRequestToken != nil {
if err := writer.WriteInt(requestParamRequestItemsClientRequestToken); err != nil {
return err
}
if err := writer.WriteString(*clientRequestToken); err != nil {
return err
}
}
return writer.WriteStreamBreak()
}
func encodeItemOperationOptionalParams(
returnValues types.ReturnValue,
returnConsumedCapacity types.ReturnConsumedCapacity,
returnItemCollectionMetrics types.ReturnItemCollectionMetrics,
consistentRead *bool,
projectionExp, conditionalExpr, updateExpr *string,
exprAttrNames map[string]string,
exprAttrValues map[string]types.AttributeValue, writer *cbor.Writer) error {
return encodeItemOperationOptionalParamsWithToken(
returnValues,
returnConsumedCapacity,
returnItemCollectionMetrics,
consistentRead,
projectionExp, conditionalExpr, updateExpr, exprAttrNames, exprAttrValues, nil, writer)
}
func parseExpressions(
conditionalExpr, updateExpr, projectionExp *string, exprAttrNames map[string]string, exprAttrValues map[string]types.AttributeValue,
) (map[int][]byte, error) {
expressions := make(map[int]string)
if conditionalExpr != nil {
expressions[parser.ConditionExpr] = *conditionalExpr
}
if updateExpr != nil {
expressions[parser.UpdateExpr] = *updateExpr
}
if projectionExp != nil {
expressions[parser.ProjectionExpr] = *projectionExp
}
encoder := parser.NewExpressionEncoder(expressions, exprAttrNames, exprAttrValues)
encoded, err := encoder.Parse()
if err != nil {
return nil, err
}
return encoded, nil
}
func encodeServiceAndMethod(method int, writer *cbor.Writer) error {
if err := writer.WriteInt(daxServiceId); err != nil {
return err
}
return writer.WriteInt(method)
}
func encodeExpressions(projection, filter, keyCondition *string, exprAttrNames map[string]string, exprAttrValues map[string]types.AttributeValue) (map[int][]byte, error) {
expressions := make(map[int]string)
if projection != nil {
expressions[parser.ProjectionExpr] = *projection
}
if filter != nil {
expressions[parser.FilterExpr] = *filter
}
if keyCondition != nil {
expressions[parser.KeyConditionExpr] = *keyCondition
}
encoder := parser.NewExpressionEncoder(expressions, exprAttrNames, exprAttrValues)
return encoder.Parse()
}
func translateReturnValues(returnValues types.ReturnValue) int {
switch returnValues {
case types.ReturnValueAllOld:
return returnValueAllOld
case types.ReturnValueUpdatedOld:
return returnValueUpdatedOld
case types.ReturnValueAllNew:
return returnValueAllNew
case types.ReturnValueUpdatedNew:
return returnValueUpdatedNew
default:
return returnValueNone
}
}
func translateReturnConsumedCapacity(returnConsumedCapacity types.ReturnConsumedCapacity) int {
switch returnConsumedCapacity {
case types.ReturnConsumedCapacityTotal:
return returnConsumedCapacityTotal
case types.ReturnConsumedCapacityIndexes:
return returnConsumedCapacityIndexes
default:
return returnConsumedCapacityNone
}
}
func translateReturnItemCollectionMetrics(returnItemCollectionMetrics types.ReturnItemCollectionMetrics) int {
if types.ReturnItemCollectionMetricsSize == returnItemCollectionMetrics {
return returnItemCollectionMetricsSize
}
return returnItemCollectionMetricsNone
}
func translateSelect(selection types.Select) int {
switch selection {
case types.SelectAllProjectedAttributes:
return selectAllProjectedAttributes
case types.SelectCount:
return selectCount
case types.SelectSpecificAttributes:
return selectSpecificAttributes
case types.SelectAllAttributes:
return selectAllAttributes
default:
return selectAllAttributes
}
}
func translateScanIndexForward(b *bool) int {
if b == nil {
return 1
}
if *b {
return 1
}
return 0
}
func hasDuplicatesWriteRequests(wrs []types.WriteRequest, d []types.AttributeDefinition) bool {
if len(wrs) <= 1 {
return false
}
face := make([]item, len(wrs))
for i, v := range wrs {
face[i] = (writeItem)(v)
}
var err error
sort.Sort(dupKeys{d, face, func(a, b item) int {
if err != nil {
return 0
}
for _, k := range d {
r := strings.Compare(a.key(k), b.key(k))
if r != 0 {
return r
}
}
err = fmt.Errorf("dup %v %v", a, b)
return 0
}})
return err != nil
}
func hasDuplicateKeysAndAttributes(kaas types.KeysAndAttributes, d []types.AttributeDefinition) bool {
if len(kaas.Keys) <= 1 {
return false
}
face := make([]item, len(kaas.Keys))
for i, v := range kaas.Keys {
if v == nil {
return false // continue with request processing, will fail later with proper error msg
}
face[i] = (attrItem)(v)
}
var err error
sort.Sort(dupKeys{d, face, func(a, b item) int {
if err != nil {
return 0
}
for _, k := range d {
r := strings.Compare(a.key(k), b.key(k))
if r != 0 {
return r
}
}
err = fmt.Errorf("dup %v %v", a, b)
return 0
}})
return err != nil
}
type item interface {
key(def types.AttributeDefinition) string
}
type itemKey types.AttributeDefinition
func (i itemKey) extract(v types.AttributeValue) string {
if v == nil {
return ""
}
switch i.AttributeType {
case types.ScalarAttributeTypeS:
vv, ok := v.(*types.AttributeValueMemberS)
if ok {
return vv.Value
}
case types.ScalarAttributeTypeN:
vv, ok := v.(*types.AttributeValueMemberN)
if ok {
return vv.Value
}
case types.ScalarAttributeTypeB:
vv, ok := v.(*types.AttributeValueMemberB)
if ok {
return string(vv.Value)
}
}
return ""
}
type writeItem types.WriteRequest
func (w writeItem) key(def types.AttributeDefinition) string {
var v types.AttributeValue
if w.PutRequest != nil && w.PutRequest.Item != nil {
v = w.PutRequest.Item[*def.AttributeName]
} else if w.DeleteRequest != nil && w.DeleteRequest.Key != nil {
v = w.DeleteRequest.Key[*def.AttributeName]
}
return itemKey(def).extract(v)
}
type attrItem map[string]types.AttributeValue
func (w attrItem) key(def types.AttributeDefinition) string {
v := w[*def.AttributeName]
return itemKey(def).extract(v)
}
type dupKeys struct {
defs []types.AttributeDefinition
items []item
eq func(a, b item) int
}
// Implements sort.Interface
func (d dupKeys) Len() int { return len(d.items) }
func (d dupKeys) Swap(i, j int) { d.items[i], d.items[j] = d.items[j], d.items[i] }
func (d dupKeys) Less(i, j int) bool { return d.eq(d.items[i], d.items[j]) <= 0 }