func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_create.go [28:122]


func (t *Translator) TranslateCreateTableToBigtable(query string) (*CreateTableStatementMap, error) {
	lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
	stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
	p := cql.NewCqlParser(stream)

	createTableObj := p.CreateTable()

	if createTableObj == nil {
		return nil, errors.New("error while parsing create object")
	}

	table := createTableObj.Table().GetText()
	if createTableObj.Keyspace() == nil {
		return nil, errors.New("missing keyspace. keyspace is required")
	}
	keyspace := createTableObj.Keyspace().GetText()

	var pmks []CreateTablePrimaryKeyConfig
	var columns []message.ColumnMetadata
	for i, col := range createTableObj.ColumnDefinitionList().AllColumnDefinition() {
		dt, err := utilities.GetCassandraColumnType(col.DataType().GetText())

		if err != nil {
			return nil, err
		}

		columns = append(columns, message.ColumnMetadata{
			Table:    table,
			Keyspace: keyspace,
			Type:     dt,
			Name:     col.Column().GetText(),
			Index:    int32(i),
		})

		if col.PrimaryKeyColumn() != nil {
			pmks = append(pmks, CreateTablePrimaryKeyConfig{
				Name:    col.Column().GetText(),
				KeyType: utilities.KEY_TYPE_REGULAR,
			})
		}
	}

	// nil if inline primary key definition used
	if createTableObj.ColumnDefinitionList().PrimaryKeyElement() != nil {
		singleKey := createTableObj.ColumnDefinitionList().PrimaryKeyElement().PrimaryKeyDefinition().SinglePrimaryKey()
		compoundKey := createTableObj.ColumnDefinitionList().PrimaryKeyElement().PrimaryKeyDefinition().CompoundKey()
		compositeKey := createTableObj.ColumnDefinitionList().PrimaryKeyElement().PrimaryKeyDefinition().CompositeKey()
		if singleKey != nil {
			pmks = []CreateTablePrimaryKeyConfig{
				{
					Name:    singleKey.GetText(),
					KeyType: utilities.KEY_TYPE_PARTITION,
				},
			}
		} else if compoundKey != nil {
			pmks = append(pmks, CreateTablePrimaryKeyConfig{
				Name:    compoundKey.PartitionKey().GetText(),
				KeyType: utilities.KEY_TYPE_PARTITION,
			})
			for _, clusterKey := range compoundKey.ClusteringKeyList().AllClusteringKey() {
				pmks = append(pmks, CreateTablePrimaryKeyConfig{
					Name:    clusterKey.Column().GetText(),
					KeyType: utilities.KEY_TYPE_CLUSTERING,
				})
			}
		} else if compositeKey != nil {
			for _, partitionKey := range compositeKey.PartitionKeyList().AllPartitionKey() {
				pmks = append(pmks, CreateTablePrimaryKeyConfig{
					Name:    partitionKey.Column().GetText(),
					KeyType: utilities.KEY_TYPE_PARTITION,
				})
			}
			for _, clusterKey := range compositeKey.ClusteringKeyList().AllClusteringKey() {
				pmks = append(pmks, CreateTablePrimaryKeyConfig{
					Name:    clusterKey.Column().GetText(),
					KeyType: utilities.KEY_TYPE_CLUSTERING,
				})
			}
		} else {
			// this should never happen
			return nil, errors.New("unknown key type for table")
		}
	}

	var stmt = CreateTableStatementMap{
		Table:       table,
		IfNotExists: createTableObj.IfNotExist() != nil,
		Keyspace:    keyspace,
		QueryType:   "create",
		Columns:     columns,
		PrimaryKeys: pmks,
	}

	return &stmt, nil
}