in cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java [156:227]
private static Pair<KeyspaceMetadata, TableMetadata> updateSchema(Schema schema,
String keyspace,
Set<String> udtStatements,
String createStatement,
Partitioner partitioner,
ReplicationFactor replicationFactor,
UUID tableId,
boolean enableCdc,
Consumer<ColumnMetadata> columnValidator)
{
// Set up and open keyspace if needed
IPartitioner cassPartitioner = CassandraTypesImplementation.getPartitioner(partitioner);
setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner);
// Set up and open table if needed, parse UDTs and include when parsing table schema
List<CreateTypeStatement.Raw> typeStatements = new ArrayList<>(udtStatements.size());
for (String udt : udtStatements)
{
try
{
typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser
.parseAnyUnhandled(CqlParser::query, udt));
}
catch (RecognitionException exception)
{
LOGGER.error("Failed to parse type expression '{}'", udt);
throw new IllegalStateException(exception);
}
}
Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace);
for (CreateTypeStatement.Raw st : typeStatements)
{
st.addToRawBuilder(typesBuilder);
}
Types types = typesBuilder.build();
CreateTableStatement.Raw createTable = CQLFragmentParser.parseAny(CqlParser::createTableStatement,
createStatement,
"CREATE TABLE");
// If the table already exists, the tableId should remain the same, unless a non-null tableId is supplied
TableMetadata maybeExistingTableMetadata = schema.getTableMetadata(keyspace, createTable.table());
if (maybeExistingTableMetadata != null && tableId == null)
{
tableId = maybeExistingTableMetadata.id.asUUID();
}
TableMetadata.Builder builder = createTable
.keyspace(keyspace)
.prepare(null)
.builder(types)
.partitioner(cassPartitioner);
if (tableId != null)
{
builder.id(TableId.fromUUID(tableId));
}
TableMetadata tableMetadata = builder.build();
if (tableMetadata.params.cdc != enableCdc)
{
tableMetadata = tableMetadata.unbuild()
.params(tableMetadata.params.unbuild()
.cdc(enableCdc)
.build())
.build();
}
tableMetadata.columns().forEach(columnValidator);
setupTableAndUdt(schema, keyspace, tableMetadata, types);
return validateKeyspaceTable(schema, keyspace, tableMetadata.name);
}