in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java [183:772]
public WriteFilesResult<String> expand(PBegin begin) {
Pipeline p = begin.getPipeline();
/*
* Allow users to specify read timestamp.
* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
* ParDo class CreateTransactionFnWithTimestamp had to be created for this
* purpose.
*/
PCollectionView<Transaction> tx =
p.apply("CreateTransaction", Create.of(1))
.apply(
"Create transaction",
ParDo.of(new CreateTransactionFnWithTimestamp(spannerConfig, snapshotTime)))
.apply("Tx As PCollectionView", View.asSingleton());
PCollectionView<Dialect> dialectView =
p.apply("Read Dialect", new ReadDialect(spannerConfig))
.apply("Dialect As PCollectionView", View.asSingleton());
PCollection<Ddl> ddl =
p.apply(
"Read Information Schema", new ReadInformationSchema(spannerConfig, tx, dialectView));
PCollection<Ddl> exportState =
ddl.apply(
"Check export conditions",
ParDo.of(
new DoFn<Ddl, Ddl>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Ddl ddl = c.element();
List<String> tablesList = Collections.emptyList();
// If the user sets shouldRelatedTables to true without providing
// a list of export tables, throw an exception.
if (tableNames.get().trim().isEmpty() && exportRelatedTables.get()) {
throw new Exception(
"Invalid usage of --tableNames and --shouldExportRelatedTables. Set"
+ " --shouldExportRelatedTables=true only if --tableNames is given"
+ " selected tables for export.");
}
// If the user provides a comma-separated list of strings, parse it into a List
if (!tableNames.get().trim().isEmpty()) {
tablesList = Arrays.asList(tableNames.get().split(",\\s*"));
}
// If the user provided any invalid table names, throw an exception.
List<String> allSpannerTables =
ddl.allTables().stream().map(t -> t.name()).collect(Collectors.toList());
List<String> invalidTables =
tablesList.stream()
.distinct()
.filter(t -> !allSpannerTables.contains(t))
.collect(Collectors.toList());
if (invalidTables.size() != 0) {
throw new Exception(
"INVALID_ARGUMENT: Table(s) not found: "
+ String.join(", ", invalidTables)
+ ".");
}
List<String> filteredTables =
getFilteredTables(ddl, tablesList).stream()
.map(t -> t.name())
.collect(Collectors.toList());
// Save any missing necessary export table names; save a copy of the original
// table list to bypass 'final or effectively final' condition of the lambda
// expression below.
List<String> usersTables = tablesList.stream().collect(Collectors.toList());
List<String> missingTables =
filteredTables.stream()
.distinct()
.filter(t -> !usersTables.contains(t))
.collect(Collectors.toList());
Collections.sort(missingTables);
// If user has specified a list of tables without including required
// related tables, and not explicitly set shouldExportRelatedTables,
// throw an exception.
if (tablesList.size() != 0
&& !(tablesList.equals(filteredTables))
&& !exportRelatedTables.get()) {
throw new Exception(
"Attempted to export table(s) requiring parent and/or foreign keys tables"
+ " without setting the shouldExportRelatedTables parameter. Set"
+ " --shouldExportRelatedTables=true to export all necessary"
+ " tables, or add "
+ String.join(", ", missingTables)
+ " to --tableNames.");
}
c.output(ddl);
}
}));
PCollection<ReadOperation> tableReadOperations =
ddl.apply("Build table read operations", new BuildReadFromTableOperations(tableNames));
PCollection<KV<String, Void>> allTableAndViewNames =
ddl.apply(
"List all table and view names",
ParDo.of(
new DoFn<Ddl, KV<String, Void>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (Table t : ddl.allTables()) {
c.output(KV.of(t.name(), null));
}
// We want the resulting collection to contain the names of all entities that
// need to be exported, both tables and views. Ddl holds these separately, so
// we need to add the names of all views separately here.
for (com.google.cloud.teleport.spanner.ddl.View v : ddl.views()) {
c.output(KV.of(v.name(), null));
}
}
}));
PCollection<String> allModelNames =
ddl.apply(
"List all model names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (Model model : ddl.models()) {
c.output(model.name());
}
}
}));
PCollection<String> allPropertyGraphNames =
ddl.apply(
"List all property graph names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (PropertyGraph graph : ddl.propertyGraphs()) {
c.output(graph.name());
}
}
}));
PCollection<String> allChangeStreamNames =
ddl.apply(
"List all change stream names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (ChangeStream changeStream : ddl.changeStreams()) {
c.output(changeStream.name());
}
}
}));
PCollection<String> allSequenceNames =
ddl.apply(
"List all sequence names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (Sequence sequence : ddl.sequences()) {
c.output(sequence.name());
}
}
}));
PCollection<String> allNamedSchemaNames =
ddl.apply(
"List all named schema names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (NamedSchema t : ddl.schemas()) {
c.output(t.name());
}
}
}));
PCollection<String> allPlacementNames =
ddl.apply(
"List all placement names",
ParDo.of(
new DoFn<Ddl, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Ddl ddl = c.element();
for (Placement placement : ddl.placements()) {
c.output(placement.name());
}
}
}));
// Generate a unique output directory name.
final PCollectionView<String> outputDirectoryName =
p.apply(Create.of(1))
.apply(
"Create Avro output folder",
ParDo.of(
new DoFn<Integer, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String instanceId = spannerConfig.getInstanceId().get();
String dbId = spannerConfig.getDatabaseId().get();
// For direct runner or tests we need a deterministic jobId.
String testJobId = ExportTransform.this.testJobId.get();
if (!Strings.isNullOrEmpty(testJobId)) {
c.output(testJobId);
return;
}
try {
DataflowWorkerHarnessOptions workerHarnessOptions =
c.getPipelineOptions().as(DataflowWorkerHarnessOptions.class);
String jobId = workerHarnessOptions.getJobId();
c.output(instanceId + "-" + dbId + "-" + jobId);
} catch (Exception e) {
throw new IllegalStateException(
"Please specify --testJobId to run with non-dataflow runner");
}
}
}))
.apply(View.asSingleton());
final PCollectionView<Map<String, SerializableSchemaSupplier>> avroSchemas =
ddl.apply(
"Build Avro schemas from DDL",
ParDo.of(
new DoFn<Ddl, KV<String, SerializableSchemaSupplier>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Collection<Schema> avroSchemas =
new DdlToAvroSchemaConverter(
"spannerexport",
"1.0.0",
shouldExportTimestampAsLogicalType.get())
.convert(c.element());
for (Schema schema : avroSchemas) {
// Here we need to use the real spanner object name which the name will be
// used as the write
// destination in SchemaBasedDynamicDestinations
c.output(
KV.of(
AvroUtil.getSpannerObjectName(schema),
new SerializableSchemaSupplier(schema)));
}
}
}))
.apply("As view", View.asMap());
PCollection<Struct> rows =
tableReadOperations.apply(
"Read all rows from Spanner",
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
ValueProvider<ResourceId> resource =
ValueProvider.NestedValueProvider.of(
outputDir,
(SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));
ValueProvider<ResourceId> tempResource =
ValueProvider.NestedValueProvider.of(
eitherOrValueProvider(avroTempDirectory, outputDir),
(SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));
WriteFilesResult<String> fileWriteResults =
rows.apply(
"Store Avro files",
AvroIO.<Struct>writeCustomTypeToGenericRecords()
.to(
new SchemaBasedDynamicDestinations(
avroSchemas, outputDirectoryName, dialectView, resource))
.withTempDirectory(tempResource));
// Generate the manifest file.
PCollection<KV<String, Iterable<String>>> tableFiles =
fileWriteResults.getPerDestinationOutputFilenames().apply(GroupByKey.create());
final TupleTag<Void> allTables = new TupleTag<>();
final TupleTag<Iterable<String>> nonEmptyTables = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> groupedTables =
KeyedPCollectionTuple.of(allTables, allTableAndViewNames)
.and(nonEmptyTables, tableFiles)
.apply("Group with all tables", CoGroupByKey.create());
// The following is to export empty tables and views from the database. Empty tables and views
// are handled together because we do not export any rows for views, only their metadata,
// including the queries defining them.
PCollection<KV<String, Iterable<String>>> emptyTablesAndViews =
groupedTables.apply(
"Export empty tables and views",
ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> kv = c.element();
String table = kv.getKey();
CoGbkResult coGbkResult = kv.getValue();
Iterable<String> only = coGbkResult.getOnly(nonEmptyTables, null);
if (only == null) {
LOG.info("Exporting empty table or view: " + table);
// This file will contain the schema definition: column definitions for empty
// tables or defining queries for views.
c.output(KV.of(table, Collections.singleton(table + ".avro-00000-of-00001")));
}
}
}));
PCollection<KV<String, Iterable<String>>> models =
allModelNames.apply(
"Export models",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String modelName = c.element();
LOG.info("Exporting model: " + modelName);
// This file will contain the schema definition for the model.
c.output(
KV.of(
modelName, Collections.singleton(modelName + ".avro-00000-of-00001")));
}
}));
PCollection<KV<String, Iterable<String>>> propertyGraphs =
allPropertyGraphNames.apply(
"Export property graphs",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String propertyGraphName = c.element();
LOG.info("Exporting property graph: " + propertyGraphName);
// This file will contain the schema definition for the propertyGraph.
c.output(
KV.of(
propertyGraphName,
Collections.singleton(propertyGraphName + ".avro-00000-of-00001")));
}
}));
PCollection<KV<String, Iterable<String>>> changeStreams =
allChangeStreamNames.apply(
"Export change streams",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String changeStreamName = c.element();
LOG.info("Exporting change stream: " + changeStreamName);
// This file will contain the schema definition for the change stream.
c.output(
KV.of(
changeStreamName,
Collections.singleton(changeStreamName + ".avro-00000-of-00001")));
}
}));
PCollection<KV<String, Iterable<String>>> sequences =
allSequenceNames.apply(
"Export sequences",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String sequenceName = c.element();
LOG.info("Exporting sequence: " + sequenceName);
// This file will contain the schema definition for the sequence.
c.output(
KV.of(
sequenceName,
Collections.singleton(sequenceName + ".avro-00000-of-00001")));
}
}));
PCollection<KV<String, Iterable<String>>> namedSchemas =
allNamedSchemaNames.apply(
"Export named schemas",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String namedSchema = c.element();
LOG.info("Exporting named schema: " + namedSchema);
// This file will contain the schema definition for the named schema.
c.output(
KV.of(
namedSchema,
Collections.singleton(namedSchema + ".avro-00000-of-00001")));
}
}));
PCollection<KV<String, Iterable<String>>> placements =
allPlacementNames.apply(
"Export placements",
ParDo.of(
new DoFn<String, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String placementName = c.element();
LOG.info("Exporting placement: " + placementName);
// This file will contain the schema definition for the placement.
c.output(
KV.of(
placementName,
Collections.singleton(placementName + ".avro-00000-of-00001")));
}
}));
// Empty tables, views, models, change streams, sequences and named schema are handled together,
// because we export them as empty Avro files that only contain the Avro schemas.
PCollection<KV<String, Iterable<String>>> emptySchemaFiles =
PCollectionList.of(emptyTablesAndViews)
.and(models)
.and(changeStreams)
.and(sequences)
.and(namedSchemas)
.and(placements)
.and(propertyGraphs)
.apply("Combine all empty schema files", Flatten.pCollections());
emptySchemaFiles =
emptySchemaFiles.apply(
"Save empty schema files",
ParDo.of(
new DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, SerializableSchemaSupplier> schemaMap =
c.sideInput(avroSchemas);
KV<String, Iterable<String>> kv = c.element();
String objectName = kv.getKey();
String fileName = kv.getValue().iterator().next();
Schema schema = schemaMap.get(objectName).get();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
Path fullPath =
createOutputPath(
outputDir.get(), c.sideInput(outputDirectoryName), fileName);
try (DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, createOutputStream(fullPath, c));
} catch (IOException e) {
throw new RuntimeException(e);
}
c.output(KV.of(objectName, Collections.singleton(fullPath.toString())));
}
/**
* Resolves the complete path name for Avro files for both GCS and local FS
* (for testing).
*
* @param outputDirectoryPath Initial directory path for the file.
* @param outputDirectoryName Terminal directory for the file.
* @param fileName Name of the Avro file
* @return The full {@link Path} of the output Avro file.
*/
private Path createOutputPath(
String outputDirectoryPath, String outputDirectoryName, String fileName) {
if (GcsPath.GCS_URI.matcher(outputDirectoryPath).matches()) {
// Avro file path in GCS.
return GcsPath.fromUri(outputDirectoryPath)
.resolve(outputDirectoryName)
.resolve(fileName);
} else {
// Avro file path in local filesystem
return Paths.get(outputDirectoryPath, outputDirectoryName, fileName);
}
}
/**
* Creates the {@link OutputStream} for the output file either on GCS or on
* local FS (for testing).
*
* @param outputPath The full path of the output file.
* @param c The {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext}
* @return An {@link OutputStream} for the opened output file.
* @throws IOException if the output file cannot be opened.
*/
private OutputStream createOutputStream(Path outputPath, ProcessContext c)
throws IOException {
if (GcsPath.GCS_URI.matcher(outputPath.toString()).matches()) {
// Writing the Avro file to GCS.
org.apache.beam.sdk.extensions.gcp.util.GcsUtil gcsUtil =
c.getPipelineOptions().as(GcsOptions.class).getGcsUtil();
String gcsType = "application/octet-stream";
WritableByteChannel gcsChannel =
gcsUtil.create((GcsPath) outputPath, gcsType);
return Channels.newOutputStream(gcsChannel);
} else {
// Avro file is created on local filesystem (for testing).
Files.createDirectories(outputPath.getParent());
return Files.newOutputStream(outputPath);
}
}
})
.withSideInputs(avroSchemas, outputDirectoryName));
PCollection<KV<String, Iterable<String>>> allFiles =
PCollectionList.of(tableFiles)
.and(emptySchemaFiles)
.apply("Combine all files", Flatten.pCollections());
PCollection<KV<String, String>> tableManifests =
allFiles.apply("Build table manifests", ParDo.of(new BuildTableManifests()));
Contextful.Fn<String, FileIO.Write.FileNaming> tableManifestNaming =
(element, c) ->
(window, pane, numShards, shardIndex, compression) ->
GcsUtil.joinPath(
outputDir.get(),
c.sideInput(outputDirectoryName),
tableManifestFileName(element));
tableManifests.apply(
"Store table manifests",
FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(
Contextful.of(
tableManifestNaming, Requirements.requiresSideInputs(outputDirectoryName)))
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withTempDirectory(eitherOrValueProvider(avroTempDirectory, outputDir)));
PCollection<List<Export.Table>> metadataTables =
tableManifests.apply(
"Combine table metadata", Combine.globally(new CombineTableMetadata()));
PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());
PCollection<String> metadataContent =
metadataTables.apply(
"Create database manifest",
ParDo.of(new CreateDatabaseManifest(ddlView, dialectView))
.withSideInputs(ddlView, dialectView));
Contextful.Fn<String, FileIO.Write.FileNaming> manifestNaming =
(element, c) ->
(window, pane, numShards, shardIndex, compression) ->
GcsUtil.joinPath(
outputDir.get(), c.sideInput(outputDirectoryName), "spanner-export.json");
metadataContent.apply(
"Store the database manifest",
FileIO.<String, String>writeDynamic()
.by(SerializableFunctions.constant(""))
.withDestinationCoder(StringUtf8Coder.of())
.via(TextIO.sink())
.withNaming(
Contextful.of(manifestNaming, Requirements.requiresSideInputs(outputDirectoryName)))
.withTempDirectory(eitherOrValueProvider(avroTempDirectory, outputDir)));
return fileWriteResults;
}