in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java [458:716]
public PCollectionTuple expand(PBegin begin) {
return begin
.apply(Create.of(1))
.apply(
ParDo.of(
new DoFn<Integer, Ddl>() {
@Setup
public void setup() {
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
}
@Teardown
public void teardown() {
spannerAccessor.close();
}
@ProcessElement
public void processElement(ProcessContext c) {
List<KV<String, String>> avroSchemas = c.sideInput(avroSchemasView);
Ddl informationSchemaDdl = c.sideInput(informationSchemaView);
Dialect dialect = informationSchemaDdl.dialect();
Export manifest = c.sideInput(manifestView);
ByteString protoDescriptors = manifest.getProtoDescriptors();
if (LOG.isDebugEnabled()) {
LOG.debug(informationSchemaDdl.prettyPrint());
}
Schema.Parser parser = new Schema.Parser();
List<KV<String, Schema>> missingNamedSchemas = new ArrayList<>();
List<KV<String, Schema>> missingTables = new ArrayList<>();
List<KV<String, Schema>> missingModels = new ArrayList<>();
List<KV<String, Schema>> missingViews = new ArrayList<>();
List<KV<String, Schema>> missingChangeStreams = new ArrayList<>();
List<KV<String, Schema>> missingSequences = new ArrayList<>();
List<KV<String, Schema>> missingPlacements = new ArrayList<>();
List<KV<String, Schema>> missingPropertyGraphs = new ArrayList<>();
for (KV<String, String> kv : avroSchemas) {
if (informationSchemaDdl.schema(kv.getKey()) == null
&& informationSchemaDdl.table(kv.getKey()) == null
&& informationSchemaDdl.model(kv.getKey()) == null
&& informationSchemaDdl.view(kv.getKey()) == null
&& informationSchemaDdl.changeStream(kv.getKey()) == null
&& informationSchemaDdl.sequence(kv.getKey()) == null
&& informationSchemaDdl.placement(kv.getKey()) == null
&& informationSchemaDdl.propertyGraph(kv.getKey()) == null) {
Schema schema = parser.parse(kv.getValue());
if (schema.getProp(AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE)
!= null) {
missingChangeStreams.add(KV.of(kv.getKey(), schema));
} else if ("Model".equals(schema.getProp("spannerEntity"))) {
missingModels.add(KV.of(kv.getKey(), schema));
} else if (schema.getProp("spannerViewQuery") != null) {
missingViews.add(KV.of(kv.getKey(), schema));
} else if (schema.getProp("sequenceOption_0") != null
|| schema.getProp(AvroUtil.SPANNER_SEQUENCE_KIND) != null) {
missingSequences.add(KV.of(kv.getKey(), schema));
} else if ("spannerNamedSchema"
.equals(schema.getProp("spannerEntity"))) {
missingNamedSchemas.add(KV.of(kv.getKey(), schema));
} else if ("Placement".equals(schema.getProp("spannerEntity"))) {
missingPlacements.add(KV.of(kv.getKey(), schema));
} else if (AvroUtil.SPANNER_ENTITY_PROPERTY_GRAPH.equals(
schema.getProp("spannerEntity"))) {
missingPropertyGraphs.add(KV.of(kv.getKey(), schema));
} else {
missingTables.add(KV.of(kv.getKey(), schema));
}
}
}
AvroSchemaToDdlConverter converter =
new AvroSchemaToDdlConverter(dialect);
List<String> createIndexStatements = new ArrayList<>();
List<String> createForeignKeyStatements = new ArrayList<>();
List<String> createChangeStreamStatements = new ArrayList<>();
List<String> createSequenceStatements = new ArrayList<>();
List<String> createPlacementStatements = new ArrayList<>();
Ddl.Builder mergedDdl = informationSchemaDdl.toBuilder();
List<String> ddlStatements = new ArrayList<>();
if (!manifest.getDatabaseOptionsList().isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
builder.mergeDatabaseOptions(manifest.getDatabaseOptionsList());
mergedDdl.mergeDatabaseOptions(manifest.getDatabaseOptionsList());
Ddl newDdl = builder.build();
ddlStatements.addAll(
newDdl.setOptionsStatements(spannerConfig.getDatabaseId().get()));
}
// CREATE PROTO BUNDLE statement has to be placed before
// table and view statements, since tables and views
// may use PROTO BUNDLE.
if (!manifest.getProtoBundleList().isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
builder.mergeProtoBundle(
ImmutableSet.copyOf(manifest.getProtoBundleList()));
mergedDdl.mergeProtoBundle(
ImmutableSet.copyOf(manifest.getProtoBundleList()));
Ddl newDdl = builder.build();
ddlStatements.add(newDdl.createProtoBundleStatement());
}
if (!missingNamedSchemas.isEmpty()) {
for (KV<String, Schema> kv : missingNamedSchemas) {
Ddl.Builder builder = Ddl.builder(dialect);
NamedSchema schema = converter.toSchema(kv.getKey(), kv.getValue());
builder.addSchema(schema);
mergedDdl.addSchema(schema);
Ddl newDdl = builder.build();
ddlStatements.addAll(newDdl.createNamedSchemaStatements());
}
}
// CREATE SEQUENCE statements have to be placed before
// table and view statements, since tables and views
// may use sequences.
if (!missingSequences.isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
for (KV<String, Schema> kv : missingSequences) {
Sequence sequence = converter.toSequence(kv.getKey(), kv.getValue());
builder.addSequence(sequence);
mergedDdl.addSequence(sequence);
}
Ddl newDdl = builder.build();
ddlStatements.addAll(newDdl.createSequenceStatements());
}
if (!missingPlacements.isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
for (KV<String, Schema> kv : missingPlacements) {
Placement placement =
converter.toPlacement(kv.getKey(), kv.getValue());
builder.addPlacement(placement);
mergedDdl.addPlacement(placement);
}
Ddl newDdl = builder.build();
ddlStatements.addAll(newDdl.createPlacementStatements());
}
if (!missingTables.isEmpty()
|| !missingModels.isEmpty()
|| !missingViews.isEmpty()
|| !missingPropertyGraphs.isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
for (KV<String, Schema> kv : missingViews) {
com.google.cloud.teleport.spanner.ddl.View view =
converter.toView(kv.getKey(), kv.getValue());
builder.addView(view);
mergedDdl.addView(view);
}
for (KV<String, Schema> kv : missingModels) {
com.google.cloud.teleport.spanner.ddl.Model model =
converter.toModel(kv.getKey(), kv.getValue());
builder.addModel(model);
mergedDdl.addModel(model);
}
for (KV<String, Schema> kv : missingTables) {
Table table = converter.toTable(kv.getKey(), kv.getValue());
builder.addTable(table);
mergedDdl.addTable(table);
// Account for additional DDL changes for tables being created
createIndexStatements.addAll(table.indexes());
createForeignKeyStatements.addAll(table.foreignKeys());
}
for (KV<String, Schema> kv : missingPropertyGraphs) {
PropertyGraph graph =
converter.toPropertyGraph(kv.getKey(), kv.getValue());
builder.addPropertyGraph(graph);
mergedDdl.addPropertyGraph(graph);
}
Ddl newDdl = builder.build();
ddlStatements.addAll(newDdl.createTableStatements());
ddlStatements.addAll(newDdl.createModelStatements());
ddlStatements.addAll(newDdl.createViewStatements());
ddlStatements.addAll(newDdl.createPropertyGraphStatements());
// If the total DDL statements exceed the threshold, execute the create
// index statements when tables are created.
// Note that foreign keys can only be created after data load
// because if we tried to create them before data load, we would
// need to load rows in a specific order (insert the referenced
// row first before the referencing row). This is not always
// possible since foreign keys may introduce circular relationships.
if (earlyIndexCreateFlag.get()
&& ((createForeignKeyStatements.size()
+ createIndexStatements.size())
>= EARLY_INDEX_CREATE_THRESHOLD)) {
LOG.info(
"Create index early: {}",
String.join(";", createIndexStatements));
ddlStatements.addAll(createIndexStatements);
c.output(pendingIndexesTag, new ArrayList<String>());
} else {
LOG.info(
"Pending index creation: {}",
String.join(";", createIndexStatements));
c.output(pendingIndexesTag, createIndexStatements);
}
c.output(pendingForeignKeysTag, createForeignKeyStatements);
}
if (!missingChangeStreams.isEmpty()) {
Ddl.Builder builder = Ddl.builder(dialect);
for (KV<String, Schema> kv : missingChangeStreams) {
ChangeStream changeStream =
converter.toChangeStream(kv.getKey(), kv.getValue());
builder.addChangeStream(changeStream);
}
Ddl newDdl = builder.build();
createChangeStreamStatements.addAll(
newDdl.createChangeStreamStatements());
}
c.output(pendingChangeStreamsTag, createChangeStreamStatements);
LOG.info(
"Applying DDL statements for schemas, tables, models, views and property graphs: {}",
ddlStatements);
if (!ddlStatements.isEmpty()) {
DatabaseAdminClient databaseAdminClient =
spannerAccessor.getDatabaseAdminClient();
Database.Builder databaseBuilder =
databaseAdminClient.newDatabaseBuilder(
DatabaseId.of(
spannerConfig.getProjectId().get(),
spannerConfig.getInstanceId().get(),
spannerConfig.getDatabaseId().get()));
if (protoDescriptors != null) {
databaseBuilder.setProtoDescriptors(protoDescriptors.toByteArray());
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
databaseBuilder.build(), ddlStatements, null);
try {
op.get(ddlCreationTimeoutInMinutes.get(), TimeUnit.MINUTES);
} catch (InterruptedException
| ExecutionException
| TimeoutException e) {
throw new RuntimeException(e);
}
c.output(mergedDdl.build());
} else {
c.output(informationSchemaDdl);
}
// In case of no tables, models or property graphs, add empty list
if (missingTables.isEmpty()
&& missingModels.isEmpty()
&& missingPropertyGraphs.isEmpty()) {
c.output(pendingIndexesTag, createIndexStatements);
c.output(pendingForeignKeysTag, createForeignKeyStatements);
}
}
})
.withSideInputs(avroSchemasView, informationSchemaView, manifestView)
.withOutputTags(
ddlObjectTag,
TupleTagList.of(pendingIndexesTag)
.and(pendingForeignKeysTag)
.and(pendingChangeStreamsTag)));
}