nimo-shake/common/dynamodb.go (115 lines of code) (raw):

package utils import ( "fmt" "net/http" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodbstreams" ) var ( globalSession *session.Session ) /* * all client share the same session. * Sessions can be shared across all service clients that share the same base configuration * refer: https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ */ func InitSession(accessKeyID, secretAccessKey, sessionToken, region, endpoint string, maxRetries, timeout uint) error { config := &aws.Config{ Region: aws.String(region), Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, sessionToken), MaxRetries: aws.Int(int(maxRetries)), HTTPClient: &http.Client{ Timeout: time.Duration(timeout) * time.Millisecond, }, } if endpoint != "" { config.Endpoint = aws.String(endpoint) } var err error globalSession, err = session.NewSession(config) if err != nil { return err } return nil } func CreateDynamoSession(logLevel string) (*dynamodb.DynamoDB, error) { if logLevel == "debug" { svc := dynamodb.New(globalSession, aws.NewConfig().WithLogLevel(aws.LogDebugWithHTTPBody)) return svc, nil } svc := dynamodb.New(globalSession) return svc, nil } func CreateDynamoStreamSession(logLevel string) (*dynamodbstreams.DynamoDBStreams, error) { if logLevel == "debug" { svc := dynamodbstreams.New(globalSession, aws.NewConfig().WithLogLevel(aws.LogDebugWithHTTPBody)) return svc, nil } svc := dynamodbstreams.New(globalSession) return svc, nil } func ParseIndexType(input []*dynamodb.AttributeDefinition) map[string]string { mp := make(map[string]string, len(input)) for _, ele := range input { mp[*ele.AttributeName] = *ele.AttributeType } return mp } // fetch dynamodb table list func FetchTableList(dynamoSession *dynamodb.DynamoDB) ([]string, error) { ans := make([]string, 0) var lastEvaluatedTableName *string for { out, err := dynamoSession.ListTables(&dynamodb.ListTablesInput{ ExclusiveStartTableName: lastEvaluatedTableName, }) if err != nil { return nil, err } ans = AppendStringList(ans, out.TableNames) if out.LastEvaluatedTableName == nil { // finish break } lastEvaluatedTableName = out.LastEvaluatedTableName } return ans, nil } func ParsePrimaryAndSortKey(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string) (string, string, error) { var primaryKey string var sortKey string for _, index := range primaryIndexes { if *(index.KeyType) == "HASH" { if primaryKey != "" { return "", "", fmt.Errorf("duplicate primary key type[%v]", *(index.AttributeName)) } primaryKey = *(index.AttributeName) } else if *(index.KeyType) == "RANGE" { if sortKey != "" { return "", "", fmt.Errorf("duplicate sort key type[%v]", *(index.AttributeName)) } sortKey = *(index.AttributeName) } else { return "", "", fmt.Errorf("unknonw key type[%v]", *(index.KeyType)) } } return primaryKey, sortKey, nil } /* input: "begin```N```1646724207280~~~end```S```1646724207283" output: map[string]*dynamodb.AttributeValue{ ":begin": &dynamodb.AttributeValue{N: aws.String("1646724207280")}, ":end": &dynamodb.AttributeValue{S: aws.String("1646724207283")}, } */ func ParseAttributes(input string) map[string]*dynamodb.AttributeValue { result := make(map[string]*dynamodb.AttributeValue) pairs := strings.Split(input, "~~~") for _, pair := range pairs { parts := strings.Split(pair, "```") if len(parts) != 3 { continue } key := ":" + parts[0] dataType := parts[1] value := parts[2] switch dataType { case "N": result[key] = &dynamodb.AttributeValue{N: aws.String(value)} case "S": result[key] = &dynamodb.AttributeValue{S: aws.String(value)} } } return result }