nimo-shake/writer/dynamo_proxy.go (221 lines of code) (raw):

package writer import ( "fmt" "bytes" "net/http" "time" "nimo-shake/common" "nimo-shake/configure" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" LOG "github.com/vinllen/log4go" "github.com/aws/aws-sdk-go/service/dynamodb" ) type DynamoProxyWriter struct { Name string svc *dynamodb.DynamoDB ns utils.NS } func NewDynamoProxyWriter(name, address string, ns utils.NS, logLevel string) *DynamoProxyWriter { config := &aws.Config{ Region: aws.String("us-east-2"), // meaningless Endpoint: aws.String(address), MaxRetries: aws.Int(3), DisableSSL: aws.Bool(true), HTTPClient: &http.Client{ Timeout: time.Duration(5000) * time.Millisecond, }, } var err error sess, err := session.NewSession(config) if err != nil { LOG.Crashf("create dynamo connection error[%v]", err) return nil } var svc *dynamodb.DynamoDB if logLevel == "debug" { svc = dynamodb.New(sess, aws.NewConfig().WithLogLevel(aws.LogDebugWithHTTPBody)) } else { svc = dynamodb.New(sess) } return &DynamoProxyWriter{ Name: name, svc: svc, ns: ns, } } func (dpw *DynamoProxyWriter) String() string { return dpw.Name } func (dpw *DynamoProxyWriter) GetSession() interface{} { return dpw.svc } func (dpw *DynamoProxyWriter) PassTableDesc(tableDescribe *dynamodb.TableDescription) { } func (dpw *DynamoProxyWriter) CreateTable(tableDescribe *dynamodb.TableDescription) error { createTableInput := &dynamodb.CreateTableInput{ AttributeDefinitions: tableDescribe.AttributeDefinitions, KeySchema: tableDescribe.KeySchema, TableName: tableDescribe.TableName, } LOG.Info("try create table: %v", *tableDescribe) if conf.Options.FullEnableIndexUser { // convert []*GlobalSecondaryIndexDescription => []*GlobalSecondaryIndex gsiList := make([]*dynamodb.GlobalSecondaryIndex, 0, len(tableDescribe.GlobalSecondaryIndexes)) for _, gsiDesc := range tableDescribe.GlobalSecondaryIndexes { gsiIndex := &dynamodb.GlobalSecondaryIndex{ IndexName: gsiDesc.IndexName, KeySchema: gsiDesc.KeySchema, Projection: gsiDesc.Projection, // ProvisionedThroughput: gsiDesc.ProvisionedThroughput, } // meaningless, support aliyun_dynamodb if gsiDesc.Projection == nil { gsiIndex.Projection = &dynamodb.Projection{} } gsiList = append(gsiList, gsiIndex) } createTableInput.SetGlobalSecondaryIndexes(gsiList) // convert []*LocalSecondaryIndexDescription => []*LocalSecondaryIndex lsiList := make([]*dynamodb.LocalSecondaryIndex, 0, len(tableDescribe.LocalSecondaryIndexes)) for _, lsiDesc := range tableDescribe.LocalSecondaryIndexes { lsiIndex := &dynamodb.LocalSecondaryIndex{ IndexName: lsiDesc.IndexName, KeySchema: lsiDesc.KeySchema, Projection: lsiDesc.Projection, } // meaningless, support aliyun_dynamodb if lsiDesc.Projection == nil { lsiIndex.Projection = &dynamodb.Projection{} } lsiList = append(lsiList, lsiIndex) } createTableInput.SetLocalSecondaryIndexes(lsiList) } _, err := dpw.svc.CreateTable(createTableInput) if err != nil { LOG.Error("create table[%v] fail: %v", *tableDescribe.TableName, err) return err } checkReady := func() bool { // check table is ready out, err := dpw.svc.DescribeTable(&dynamodb.DescribeTableInput{ TableName: tableDescribe.TableName, }) if err != nil { LOG.Warn("create table[%v] ok but describe failed: %v", *tableDescribe.TableName, err) return true } if *out.Table.TableStatus != "ACTIVE" { LOG.Warn("create table[%v] ok but describe not ready: %v", *tableDescribe.TableName, *out.Table.TableStatus) return true } return false } // check with retry 5 times and 1s gap ok := utils.CallbackRetry(5, 1000, checkReady) if !ok { return fmt.Errorf("create table[%v] fail: check ready fail", dpw.ns.Collection) } return nil } func (dpw *DynamoProxyWriter) DropTable() error { _, err := dpw.svc.DeleteTable(&dynamodb.DeleteTableInput{ TableName: aws.String(dpw.ns.Collection), }) return err } func (dpw *DynamoProxyWriter) WriteBulk(input []interface{}) error { if len(input) == 0 { return nil } // convert to WriteRequest request := make([]*dynamodb.WriteRequest, len(input)) for i, ele := range input { request[i] = &dynamodb.WriteRequest{ PutRequest: &dynamodb.PutRequest{ Item: ele.(map[string]*dynamodb.AttributeValue), }, } } _, err := dpw.svc.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{ dpw.ns.Collection: request, }, }) return err } func (dpw *DynamoProxyWriter) Close() { } // input type is map[string]*dynamodb.AttributeValue func (dpw *DynamoProxyWriter) Insert(input []interface{}, index []interface{}) error { if len(input) == 0 { return nil } request := make([]*dynamodb.WriteRequest, len(index)) for i, ele := range input { request[i] = &dynamodb.WriteRequest{ PutRequest: &dynamodb.PutRequest{ Item: ele.(map[string]*dynamodb.AttributeValue), }, } } _, err := dpw.svc.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{ dpw.ns.Collection: request, }, }) if err != nil && utils.DynamoIgnoreError(err, "i", true) { LOG.Warn("%s ignore error[%v] when insert", dpw, err) return nil } return err } func (dpw *DynamoProxyWriter) Delete(index []interface{}) error { if len(index) == 0 { return nil } request := make([]*dynamodb.WriteRequest, len(index)) for i, ele := range index { request[i] = &dynamodb.WriteRequest{ DeleteRequest: &dynamodb.DeleteRequest{ Key: ele.(map[string]*dynamodb.AttributeValue), }, } } _, err := dpw.svc.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{ dpw.ns.Collection: request, }, }) if utils.DynamoIgnoreError(err, "d", true) { LOG.Warn("%s ignore error[%v] when delete", dpw, err) return nil } return err } // upsert func (dpw *DynamoProxyWriter) Update(input []interface{}, index []interface{}) error { if len(input) == 0 { return nil } // fmt.Println(input, index) for i := range input { val := input[i].(map[string]*dynamodb.AttributeValue) key := index[i].(map[string]*dynamodb.AttributeValue) // why no update interface like BatchWriteItem !!!! // generate new map(expression-attribute-values) and expression(update-expression) newMap := make(map[string]*dynamodb.AttributeValue, len(val)) expressionBuffer := new(bytes.Buffer) expressionBuffer.WriteString("SET") cnt := 1 for k, v := range val { newKey := fmt.Sprintf(":v%d", cnt) newMap[newKey] = v if cnt == 1 { expressionBuffer.WriteString(fmt.Sprintf(" %s=%s", k, newKey)) } else { expressionBuffer.WriteString(fmt.Sprintf(",%s=%s", k, newKey)) } cnt++ } // fmt.Println(newMap) _, err := dpw.svc.UpdateItem(&dynamodb.UpdateItemInput{ TableName: aws.String(dpw.ns.Collection), Key: key, UpdateExpression: aws.String(expressionBuffer.String()), ExpressionAttributeValues: newMap, }) if err != nil && utils.DynamoIgnoreError(err, "u", true) { LOG.Warn("%s ignore error[%v] when insert", dpw, err) return nil } } return nil }