func()

in nimo-shake/writer/dynamo_proxy.go [67:144]


func (dpw *DynamoProxyWriter) CreateTable(tableDescribe *dynamodb.TableDescription) error {
	createTableInput := &dynamodb.CreateTableInput{
		AttributeDefinitions: tableDescribe.AttributeDefinitions,
		KeySchema:            tableDescribe.KeySchema,
		TableName:            tableDescribe.TableName,
	}

	LOG.Info("try create table: %v", *tableDescribe)

	if conf.Options.FullEnableIndexUser {
		// convert []*GlobalSecondaryIndexDescription => []*GlobalSecondaryIndex
		gsiList := make([]*dynamodb.GlobalSecondaryIndex, 0, len(tableDescribe.GlobalSecondaryIndexes))
		for _, gsiDesc := range tableDescribe.GlobalSecondaryIndexes {
			gsiIndex := &dynamodb.GlobalSecondaryIndex{
				IndexName:  gsiDesc.IndexName,
				KeySchema:  gsiDesc.KeySchema,
				Projection: gsiDesc.Projection,
				// ProvisionedThroughput: gsiDesc.ProvisionedThroughput,
			}

			// meaningless, support aliyun_dynamodb
			if gsiDesc.Projection == nil {
				gsiIndex.Projection = &dynamodb.Projection{}
			}
			gsiList = append(gsiList, gsiIndex)
		}
		createTableInput.SetGlobalSecondaryIndexes(gsiList)

		// convert []*LocalSecondaryIndexDescription => []*LocalSecondaryIndex
		lsiList := make([]*dynamodb.LocalSecondaryIndex, 0, len(tableDescribe.LocalSecondaryIndexes))
		for _, lsiDesc := range tableDescribe.LocalSecondaryIndexes {
			lsiIndex := &dynamodb.LocalSecondaryIndex{
				IndexName:  lsiDesc.IndexName,
				KeySchema:  lsiDesc.KeySchema,
				Projection: lsiDesc.Projection,
			}

			// meaningless, support aliyun_dynamodb
			if lsiDesc.Projection == nil {
				lsiIndex.Projection = &dynamodb.Projection{}
			}
			lsiList = append(lsiList, lsiIndex)
		}
		createTableInput.SetLocalSecondaryIndexes(lsiList)
	}

	_, err := dpw.svc.CreateTable(createTableInput)
	if err != nil {
		LOG.Error("create table[%v] fail: %v", *tableDescribe.TableName, err)
		return err
	}

	checkReady := func() bool {
		// check table is ready
		out, err := dpw.svc.DescribeTable(&dynamodb.DescribeTableInput{
			TableName: tableDescribe.TableName,
		})
		if err != nil {
			LOG.Warn("create table[%v] ok but describe failed: %v", *tableDescribe.TableName, err)
			return true
		}

		if *out.Table.TableStatus != "ACTIVE" {
			LOG.Warn("create table[%v] ok but describe not ready: %v", *tableDescribe.TableName, *out.Table.TableStatus)
			return true
		}

		return false
	}

	// check with retry 5 times and 1s gap
	ok := utils.CallbackRetry(5, 1000, checkReady)
	if !ok {
		return fmt.Errorf("create table[%v] fail: check ready fail", dpw.ns.Collection)
	}

	return nil
}