in v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java [101:382]
public Collection<Schema> convert(Ddl ddl) {
Collection<Schema> schemas = new ArrayList<>();
for (NamedSchema schema : ddl.schemas()) {
LOG.info("DdlToAvo Schema {}", schema.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(schema.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, schema.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
// Indicate that this is a "CREATE SCHEMA", not a table or a view.
recordBuilder.prop(SPANNER_ENTITY, SPANNER_NAMED_SCHEMA);
schemas.add(recordBuilder.fields().endRecord());
}
for (Table table : ddl.allTables()) {
LOG.info("DdlToAvo Table {}", table.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(table.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, table.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
if (table.interleaveInParent() != null) {
recordBuilder.prop(SPANNER_PARENT, table.interleaveInParent());
recordBuilder.prop(
SPANNER_INTERLEAVE_TYPE,
table.interleaveType() == InterleaveType.IN ? "IN" : "IN PARENT");
recordBuilder.prop(
SPANNER_ON_DELETE_ACTION, table.onDeleteCascade() ? "cascade" : "no action");
}
if (table.dialect() == Dialect.GOOGLE_STANDARD_SQL) {
if (table.primaryKeys() != null) {
String encodedPk =
table.primaryKeys().stream()
.map(IndexColumn::prettyPrint)
.collect(Collectors.joining(","));
recordBuilder.prop(SPANNER_PRIMARY_KEY, encodedPk);
}
for (int i = 0; i < table.primaryKeys().size(); i++) {
recordBuilder.prop(
SPANNER_PRIMARY_KEY + "_" + i, table.primaryKeys().get(i).prettyPrint());
}
} else if (table.dialect() == Dialect.POSTGRESQL) {
if (table.primaryKeys() != null) {
String encodedPk =
table.primaryKeys().stream()
.map(c -> "\"" + c.name() + "\"")
.collect(Collectors.joining(", "));
recordBuilder.prop(SPANNER_PRIMARY_KEY, encodedPk);
}
for (int i = 0; i < table.primaryKeys().size(); i++) {
IndexColumn pk = table.primaryKeys().get(i);
recordBuilder.prop(SPANNER_PRIMARY_KEY + "_" + i, "\"" + pk.name() + "\" ASC");
}
}
for (int i = 0; i < table.indexes().size(); i++) {
recordBuilder.prop(SPANNER_INDEX + i, table.indexes().get(i));
}
for (int i = 0; i < table.foreignKeys().size(); i++) {
recordBuilder.prop(SPANNER_FOREIGN_KEY + i, table.foreignKeys().get(i));
}
for (int i = 0; i < table.checkConstraints().size(); i++) {
recordBuilder.prop(SPANNER_CHECK_CONSTRAINT + i, table.checkConstraints().get(i));
}
SchemaBuilder.FieldAssembler<Schema> fieldsAssembler = recordBuilder.fields();
int columnOrdinal = 0;
for (Column cm : table.columns()) {
SchemaBuilder.FieldBuilder<Schema> fieldBuilder = fieldsAssembler.name(cm.name());
fieldBuilder.prop(SQL_TYPE, cm.typeString());
fieldBuilder.prop(HIDDEN, Boolean.toString(cm.isHidden()));
for (int i = 0; i < cm.columnOptions().size(); i++) {
fieldBuilder.prop(SPANNER_OPTION + i, cm.columnOptions().get(i));
}
if (cm.isPlacementKey()) {
fieldBuilder.prop(SPANNER_PLACEMENT_KEY, Boolean.toString(cm.isPlacementKey()));
}
if (cm.isGenerated()) {
fieldBuilder.prop(NOT_NULL, Boolean.toString(cm.notNull()));
fieldBuilder.prop(GENERATION_EXPRESSION, cm.generationExpression());
fieldBuilder.prop(STORED, Boolean.toString(cm.isStored()));
// Make the type null to allow us not export the generated column values,
// which are semantically logical entities.
fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null);
} else {
if (cm.isIdentityColumn()) {
fieldBuilder.prop(IDENTITY_COLUMN, Boolean.toString(cm.isIdentityColumn()));
if (cm.sequenceKind() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind());
}
if (cm.counterStartValue() != null) {
fieldBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
}
if (cm.skipRangeMin() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
}
if (cm.skipRangeMax() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
}
} else if (cm.defaultExpression() != null) {
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
}
Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
if (!cm.notNull()) {
avroType = wrapAsNullable(avroType);
}
fieldBuilder.type(avroType).noDefault();
}
}
Schema schema = fieldsAssembler.endRecord();
schemas.add(schema);
}
for (Model model : ddl.models()) {
LOG.info("DdlToAvo model {}", model.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(model.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, model.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_MODEL);
recordBuilder.prop(SPANNER_REMOTE, Boolean.toString(model.remote()));
if (model.options() != null) {
for (int i = 0; i < model.options().size(); i++) {
recordBuilder.prop(SPANNER_OPTION + i, model.options().get(i));
}
}
SchemaBuilder.FieldAssembler<Schema> fieldsAssembler = recordBuilder.fields();
SchemaBuilder.FieldAssembler<RecordDefault<Schema>> inputBuilder =
fieldsAssembler
.name(INPUT)
.type()
.record(model.name() + "_" + INPUT)
.namespace(this.namespace)
.fields();
int inputColumnOrdinal = 0;
for (ModelColumn c : model.inputColumns()) {
FieldBuilder<RecordDefault<Schema>> fieldBuilder = inputBuilder.name(c.name());
fieldBuilder.prop(SQL_TYPE, c.typeString());
for (int i = 0; i < c.columnOptions().size(); i++) {
fieldBuilder.prop(SPANNER_OPTION + i, c.columnOptions().get(i));
}
Schema avroType = avroType(c.type(), model.name() + "_input_" + inputColumnOrdinal++);
fieldBuilder.type(avroType).noDefault();
}
inputBuilder.endRecord().noDefault();
SchemaBuilder.FieldAssembler<RecordDefault<Schema>> outputBuilder =
fieldsAssembler
.name(OUTPUT)
.type()
.record(model.name() + "_" + OUTPUT)
.namespace(this.namespace)
.fields();
int outputColumnOrdinal = 0;
for (ModelColumn c : model.outputColumns()) {
FieldBuilder<RecordDefault<Schema>> fieldBuilder = outputBuilder.name(c.name());
fieldBuilder.prop(SQL_TYPE, c.typeString());
for (int i = 0; i < c.columnOptions().size(); i++) {
fieldBuilder.prop(SPANNER_OPTION + i, c.columnOptions().get(i));
}
Schema avroType = avroType(c.type(), model.name() + "_output_" + outputColumnOrdinal++);
fieldBuilder.type(avroType).noDefault();
}
outputBuilder.endRecord().noDefault();
Schema schema = fieldsAssembler.endRecord();
schemas.add(schema);
}
for (PropertyGraph propertyGraph : ddl.propertyGraphs()) {
LOG.info("DdlToAvro PropertyGraph {}", propertyGraph.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(propertyGraph.name()))
.namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, propertyGraph.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_PROPERTY_GRAPH);
// Encode nodeTables
for (int i = 0; i < propertyGraph.nodeTables().size(); i++) {
encodeNodeTable(recordBuilder, propertyGraph.nodeTables().get(i), i);
}
// Encode edgeTables
for (int i = 0; i < propertyGraph.edgeTables().size(); i++) {
encodeEdgeTable(recordBuilder, propertyGraph.edgeTables().get(i), i);
}
// Encode propertyDeclarations
encodePropertyDeclarations(recordBuilder, propertyGraph.propertyDeclarations());
// Encode labels
encodeLabels(recordBuilder, propertyGraph.labels());
Schema schema = recordBuilder.fields().endRecord();
schemas.add(schema);
}
for (View view : ddl.views()) {
LOG.info("DdlToAvo view {}", view.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(view.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, view.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
recordBuilder.prop(SPANNER_VIEW_QUERY, view.query());
if (view.security() != null) {
recordBuilder.prop(SPANNER_VIEW_SECURITY, view.security().toString());
}
schemas.add(recordBuilder.fields().endRecord());
}
for (ChangeStream changeStream : ddl.changeStreams()) {
LOG.info("DdlToAvo changestream {}", changeStream.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(changeStream.name()))
.namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, changeStream.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
recordBuilder.prop(
SPANNER_CHANGE_STREAM_FOR_CLAUSE,
changeStream.forClause() == null ? "" : changeStream.forClause());
if (changeStream.options() != null) {
for (int i = 0; i < changeStream.options().size(); i++) {
recordBuilder.prop(SPANNER_OPTION + i, changeStream.options().get(i));
}
}
schemas.add(recordBuilder.fields().endRecord());
}
for (Sequence sequence : ddl.sequences()) {
LOG.info("DdlToAvo sequence {}", sequence.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(sequence.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, sequence.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
if (sequence.options() != null) {
for (int i = 0; i < sequence.options().size(); i++) {
recordBuilder.prop(SPANNER_SEQUENCE_OPTION + i, sequence.options().get(i));
}
}
if (sequence.sequenceKind() != null) {
recordBuilder.prop(SPANNER_SEQUENCE_KIND, sequence.sequenceKind());
}
if (sequence.counterStartValue() != null) {
recordBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(sequence.counterStartValue()));
}
if (sequence.skipRangeMin() != null) {
recordBuilder.prop(
SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(sequence.skipRangeMin()));
}
if (sequence.skipRangeMax() != null) {
recordBuilder.prop(
SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(sequence.skipRangeMax()));
}
schemas.add(recordBuilder.fields().endRecord());
}
for (Placement placement : ddl.placements()) {
LOG.info("DdlToAvro placement {}", placement.name());
SchemaBuilder.RecordBuilder<Schema> recordBuilder =
SchemaBuilder.record(generateAvroSchemaName(placement.name())).namespace(this.namespace);
recordBuilder.prop(SPANNER_NAME, placement.name());
recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_PLACEMENT);
if (placement.options() != null) {
for (int i = 0; i < placement.options().size(); i++) {
recordBuilder.prop(SPANNER_OPTION + i, placement.options().get(i));
}
}
schemas.add(recordBuilder.fields().endRecord());
}
return schemas;
}