in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java [139:296]
public PDone expand(PBegin begin) {
PCollectionView<Dialect> dialectView =
begin
.apply("Read Dialect", new ReadDialect(spannerConfig))
.apply("Dialect As PCollectionView", View.asSingleton());
PCollection<Export> manifest =
begin.apply("Read manifest", new ReadExportManifestFile(importDirectory, dialectView));
PCollectionView<Export> manifestView = manifest.apply("Manifest as view", View.asSingleton());
PCollection<KV<String, String>> allFiles =
manifest.apply("Read all manifest files", new ReadManifestFiles(importDirectory));
PCollection<KV<String, List<String>>> tableFiles = allFiles.apply(Combine.perKey(AsList.fn()));
PCollection<KV<String, String>> schemas =
tableFiles
.apply(
"File per table, view or change stream",
ParDo.of(
new DoFn<KV<String, List<String>>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, List<String>> kv = c.element();
if (!kv.getValue().isEmpty()) {
c.output(KV.of(kv.getKey(), kv.getValue().get(0)));
}
}
}))
.apply("Extract avro schemas", ParDo.of(new ReadAvroSchemas()));
final PCollection<List<KV<String, String>>> avroSchemas =
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));
PCollectionView<Transaction> tx =
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
PCollection<Ddl> informationSchemaDdl =
begin.apply(
"Read Information Schema", new ReadInformationSchema(spannerConfig, tx, dialectView));
final PCollectionView<List<KV<String, String>>> avroDdlView =
avroSchemas.apply("Avro ddl view", View.asSingleton());
final PCollectionView<Ddl> informationSchemaView =
informationSchemaDdl.apply("Information schema view", View.asSingleton());
final PCollectionTuple createTableOutput =
begin.apply(
"Create Cloud Spanner Tables and indexes",
new CreateTables(
spannerConfig,
avroDdlView,
informationSchemaView,
manifestView,
earlyIndexCreateFlag,
ddlCreationTimeoutInMinutes));
final PCollection<Ddl> ddl = createTableOutput.get(CreateTables.getDdlObjectTag());
final PCollectionView<List<String>> pendingIndexes =
createTableOutput
.get(CreateTables.getPendingIndexesTag())
.apply("As Index view", View.asSingleton());
final PCollectionView<List<String>> pendingForeignKeys =
createTableOutput
.get(CreateTables.getPendingForeignKeysTag())
.apply("As Foreign keys view", View.asSingleton());
final PCollectionView<List<String>> pendingChangeStreams =
createTableOutput
.get(CreateTables.getPendingChangeStreamsTag())
.apply("As change streams view", View.asSingleton());
PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());
PCollectionView<HashMultimap<Integer, String>> levelsView =
ddl.apply(
"Group tables by depth",
ParDo.of(
new DoFn<Ddl, HashMultimap<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
c.output(ddl.perLevelView());
}
}))
.apply(View.asSingleton());
PCollection<HashMultimap<String, String>> acc =
tableFiles
.apply("Combine table files", Combine.globally(AsList.fn()))
.apply(
"As HashMultimap",
ParDo.of(
new DoFn<List<KV<String, List<String>>>, HashMultimap<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
HashMultimap<String, String> result = HashMultimap.create();
for (KV<String, List<String>> kv : c.element()) {
result.putAll(kv.getKey().toLowerCase(), kv.getValue());
}
c.output(result);
}
}));
PCollection<?> previousComputation = ddl;
for (int i = 0; i < MAX_DEPTH; i++) {
final int depth = i;
PCollection<KV<String, String>> levelFiles =
acc.apply(
"Get Avro filenames depth " + depth,
ParDo.of(
new DoFn<HashMultimap<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
HashMultimap<String, String> allFiles = c.element();
HashMultimap<Integer, String> levels = c.sideInput(levelsView);
Set<String> tables = levels.get(depth);
for (String table : tables) {
for (String file : allFiles.get(table)) {
c.output(KV.of(file, table));
}
}
}
})
.withSideInputs(levelsView))
.apply("Wait for previous depth " + depth, Wait.on(previousComputation));
PCollection<Mutation> mutations =
levelFiles.apply(
"Avro files as mutations " + depth, new AvroTableFileAsMutations(ddlView));
SpannerWriteResult result =
mutations.apply(
"Write mutations " + depth,
SpannerIO.write()
.withSchemaReadySignal(ddl)
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardHours(2))
.withMaxNumMutations(10000)
.withGroupingFactor(100)
.withDialectView(dialectView));
previousComputation = result.getOutput();
}
ddl.apply(Wait.on(previousComputation))
.apply(
"Create Indexes", new ApplyDDLTransform(spannerConfig, pendingIndexes, waitForIndexes))
.apply(
"Add Foreign Keys",
new ApplyDDLTransform(spannerConfig, pendingForeignKeys, waitForForeignKeys))
.apply(
"Create Change Streams",
new ApplyDDLTransform(spannerConfig, pendingChangeStreams, waitForChangeStreams));
return PDone.in(begin.getPipeline());
}