public WriteFilesResult expand()

in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java [183:772]


  public WriteFilesResult<String> expand(PBegin begin) {
    Pipeline p = begin.getPipeline();

    /*
     * Allow users to specify read timestamp.
     * CreateTransaction and CreateTransactionFn classes in SpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        p.apply("CreateTransaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(new CreateTransactionFnWithTimestamp(spannerConfig, snapshotTime)))
            .apply("Tx As PCollectionView", View.asSingleton());

    PCollectionView<Dialect> dialectView =
        p.apply("Read Dialect", new ReadDialect(spannerConfig))
            .apply("Dialect As PCollectionView", View.asSingleton());

    PCollection<Ddl> ddl =
        p.apply(
            "Read Information Schema", new ReadInformationSchema(spannerConfig, tx, dialectView));

    PCollection<Ddl> exportState =
        ddl.apply(
            "Check export conditions",
            ParDo.of(
                new DoFn<Ddl, Ddl>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) throws Exception {
                    Ddl ddl = c.element();
                    List<String> tablesList = Collections.emptyList();

                    // If the user sets shouldRelatedTables to true without providing
                    // a list of export tables, throw an exception.
                    if (tableNames.get().trim().isEmpty() && exportRelatedTables.get()) {
                      throw new Exception(
                          "Invalid usage of --tableNames and --shouldExportRelatedTables. Set"
                              + " --shouldExportRelatedTables=true only if --tableNames is given"
                              + " selected tables for export.");
                    }

                    // If the user provides a comma-separated list of strings, parse it into a List
                    if (!tableNames.get().trim().isEmpty()) {
                      tablesList = Arrays.asList(tableNames.get().split(",\\s*"));
                    }

                    // If the user provided any invalid table names, throw an exception.
                    List<String> allSpannerTables =
                        ddl.allTables().stream().map(t -> t.name()).collect(Collectors.toList());

                    List<String> invalidTables =
                        tablesList.stream()
                            .distinct()
                            .filter(t -> !allSpannerTables.contains(t))
                            .collect(Collectors.toList());

                    if (invalidTables.size() != 0) {
                      throw new Exception(
                          "INVALID_ARGUMENT: Table(s) not found: "
                              + String.join(", ", invalidTables)
                              + ".");
                    }

                    List<String> filteredTables =
                        getFilteredTables(ddl, tablesList).stream()
                            .map(t -> t.name())
                            .collect(Collectors.toList());

                    // Save any missing necessary export table names; save a copy of the original
                    // table list to bypass 'final or effectively final' condition of the lambda
                    // expression below.
                    List<String> usersTables = tablesList.stream().collect(Collectors.toList());
                    List<String> missingTables =
                        filteredTables.stream()
                            .distinct()
                            .filter(t -> !usersTables.contains(t))
                            .collect(Collectors.toList());
                    Collections.sort(missingTables);

                    // If user has specified a list of tables without including required
                    // related tables, and not explicitly set shouldExportRelatedTables,
                    // throw an exception.
                    if (tablesList.size() != 0
                        && !(tablesList.equals(filteredTables))
                        && !exportRelatedTables.get()) {
                      throw new Exception(
                          "Attempted to export table(s) requiring parent and/or foreign keys tables"
                              + " without setting the shouldExportRelatedTables parameter. Set"
                              + " --shouldExportRelatedTables=true to export all necessary"
                              + " tables, or add "
                              + String.join(", ", missingTables)
                              + " to --tableNames.");
                    }
                    c.output(ddl);
                  }
                }));
    PCollection<ReadOperation> tableReadOperations =
        ddl.apply("Build table read operations", new BuildReadFromTableOperations(tableNames));

    PCollection<KV<String, Void>> allTableAndViewNames =
        ddl.apply(
            "List all table and view names",
            ParDo.of(
                new DoFn<Ddl, KV<String, Void>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (Table t : ddl.allTables()) {
                      c.output(KV.of(t.name(), null));
                    }
                    // We want the resulting collection to contain the names of all entities that
                    // need to be exported, both tables and views.  Ddl holds these separately, so
                    // we need to add the names of all views separately here.
                    for (com.google.cloud.teleport.spanner.ddl.View v : ddl.views()) {
                      c.output(KV.of(v.name(), null));
                    }
                  }
                }));

    PCollection<String> allModelNames =
        ddl.apply(
            "List all model names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (Model model : ddl.models()) {
                      c.output(model.name());
                    }
                  }
                }));

    PCollection<String> allPropertyGraphNames =
        ddl.apply(
            "List all property graph names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (PropertyGraph graph : ddl.propertyGraphs()) {
                      c.output(graph.name());
                    }
                  }
                }));

    PCollection<String> allChangeStreamNames =
        ddl.apply(
            "List all change stream names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (ChangeStream changeStream : ddl.changeStreams()) {
                      c.output(changeStream.name());
                    }
                  }
                }));

    PCollection<String> allSequenceNames =
        ddl.apply(
            "List all sequence names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (Sequence sequence : ddl.sequences()) {
                      c.output(sequence.name());
                    }
                  }
                }));

    PCollection<String> allNamedSchemaNames =
        ddl.apply(
            "List all named schema names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (NamedSchema t : ddl.schemas()) {
                      c.output(t.name());
                    }
                  }
                }));

    PCollection<String> allPlacementNames =
        ddl.apply(
            "List all placement names",
            ParDo.of(
                new DoFn<Ddl, String>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Ddl ddl = c.element();
                    for (Placement placement : ddl.placements()) {
                      c.output(placement.name());
                    }
                  }
                }));

    // Generate a unique output directory name.
    final PCollectionView<String> outputDirectoryName =
        p.apply(Create.of(1))
            .apply(
                "Create Avro output folder",
                ParDo.of(
                    new DoFn<Integer, String>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String instanceId = spannerConfig.getInstanceId().get();
                        String dbId = spannerConfig.getDatabaseId().get();
                        // For direct runner or tests we need a deterministic jobId.
                        String testJobId = ExportTransform.this.testJobId.get();
                        if (!Strings.isNullOrEmpty(testJobId)) {
                          c.output(testJobId);
                          return;
                        }
                        try {
                          DataflowWorkerHarnessOptions workerHarnessOptions =
                              c.getPipelineOptions().as(DataflowWorkerHarnessOptions.class);
                          String jobId = workerHarnessOptions.getJobId();
                          c.output(instanceId + "-" + dbId + "-" + jobId);
                        } catch (Exception e) {
                          throw new IllegalStateException(
                              "Please specify --testJobId to run with non-dataflow runner");
                        }
                      }
                    }))
            .apply(View.asSingleton());

    final PCollectionView<Map<String, SerializableSchemaSupplier>> avroSchemas =
        ddl.apply(
                "Build Avro schemas from DDL",
                ParDo.of(
                    new DoFn<Ddl, KV<String, SerializableSchemaSupplier>>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Collection<Schema> avroSchemas =
                            new DdlToAvroSchemaConverter(
                                    "spannerexport",
                                    "1.0.0",
                                    shouldExportTimestampAsLogicalType.get())
                                .convert(c.element());
                        for (Schema schema : avroSchemas) {
                          // Here we need to use the real spanner object name which the name will be
                          // used as the write
                          // destination in SchemaBasedDynamicDestinations
                          c.output(
                              KV.of(
                                  AvroUtil.getSpannerObjectName(schema),
                                  new SerializableSchemaSupplier(schema)));
                        }
                      }
                    }))
            .apply("As view", View.asMap());

    PCollection<Struct> rows =
        tableReadOperations.apply(
            "Read all rows from Spanner",
            SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

    ValueProvider<ResourceId> resource =
        ValueProvider.NestedValueProvider.of(
            outputDir,
            (SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));

    ValueProvider<ResourceId> tempResource =
        ValueProvider.NestedValueProvider.of(
            eitherOrValueProvider(avroTempDirectory, outputDir),
            (SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));

    WriteFilesResult<String> fileWriteResults =
        rows.apply(
            "Store Avro files",
            AvroIO.<Struct>writeCustomTypeToGenericRecords()
                .to(
                    new SchemaBasedDynamicDestinations(
                        avroSchemas, outputDirectoryName, dialectView, resource))
                .withTempDirectory(tempResource));

    // Generate the manifest file.
    PCollection<KV<String, Iterable<String>>> tableFiles =
        fileWriteResults.getPerDestinationOutputFilenames().apply(GroupByKey.create());

    final TupleTag<Void> allTables = new TupleTag<>();
    final TupleTag<Iterable<String>> nonEmptyTables = new TupleTag<>();

    PCollection<KV<String, CoGbkResult>> groupedTables =
        KeyedPCollectionTuple.of(allTables, allTableAndViewNames)
            .and(nonEmptyTables, tableFiles)
            .apply("Group with all tables", CoGroupByKey.create());

    // The following is to export empty tables and views from the database.  Empty tables and views
    // are handled together because we do not export any rows for views, only their metadata,
    // including the queries defining them.
    PCollection<KV<String, Iterable<String>>> emptyTablesAndViews =
        groupedTables.apply(
            "Export empty tables and views",
            ParDo.of(
                new DoFn<KV<String, CoGbkResult>, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    KV<String, CoGbkResult> kv = c.element();
                    String table = kv.getKey();
                    CoGbkResult coGbkResult = kv.getValue();
                    Iterable<String> only = coGbkResult.getOnly(nonEmptyTables, null);
                    if (only == null) {
                      LOG.info("Exporting empty table or view: " + table);
                      // This file will contain the schema definition: column definitions for empty
                      // tables or defining queries for views.
                      c.output(KV.of(table, Collections.singleton(table + ".avro-00000-of-00001")));
                    }
                  }
                }));

    PCollection<KV<String, Iterable<String>>> models =
        allModelNames.apply(
            "Export models",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String modelName = c.element();
                    LOG.info("Exporting model: " + modelName);
                    // This file will contain the schema definition for the model.
                    c.output(
                        KV.of(
                            modelName, Collections.singleton(modelName + ".avro-00000-of-00001")));
                  }
                }));

    PCollection<KV<String, Iterable<String>>> propertyGraphs =
        allPropertyGraphNames.apply(
            "Export property graphs",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String propertyGraphName = c.element();
                    LOG.info("Exporting property graph: " + propertyGraphName);
                    // This file will contain the schema definition for the propertyGraph.
                    c.output(
                        KV.of(
                            propertyGraphName,
                            Collections.singleton(propertyGraphName + ".avro-00000-of-00001")));
                  }
                }));

    PCollection<KV<String, Iterable<String>>> changeStreams =
        allChangeStreamNames.apply(
            "Export change streams",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String changeStreamName = c.element();
                    LOG.info("Exporting change stream: " + changeStreamName);
                    // This file will contain the schema definition for the change stream.
                    c.output(
                        KV.of(
                            changeStreamName,
                            Collections.singleton(changeStreamName + ".avro-00000-of-00001")));
                  }
                }));

    PCollection<KV<String, Iterable<String>>> sequences =
        allSequenceNames.apply(
            "Export sequences",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String sequenceName = c.element();
                    LOG.info("Exporting sequence: " + sequenceName);
                    // This file will contain the schema definition for the sequence.
                    c.output(
                        KV.of(
                            sequenceName,
                            Collections.singleton(sequenceName + ".avro-00000-of-00001")));
                  }
                }));

    PCollection<KV<String, Iterable<String>>> namedSchemas =
        allNamedSchemaNames.apply(
            "Export named schemas",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String namedSchema = c.element();
                    LOG.info("Exporting named schema: " + namedSchema);
                    // This file will contain the schema definition for the named schema.
                    c.output(
                        KV.of(
                            namedSchema,
                            Collections.singleton(namedSchema + ".avro-00000-of-00001")));
                  }
                }));

    PCollection<KV<String, Iterable<String>>> placements =
        allPlacementNames.apply(
            "Export placements",
            ParDo.of(
                new DoFn<String, KV<String, Iterable<String>>>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    String placementName = c.element();
                    LOG.info("Exporting placement: " + placementName);
                    // This file will contain the schema definition for the placement.
                    c.output(
                        KV.of(
                            placementName,
                            Collections.singleton(placementName + ".avro-00000-of-00001")));
                  }
                }));

    // Empty tables, views, models, change streams, sequences and named schema are handled together,
    // because we export them as empty Avro files that only contain the Avro schemas.
    PCollection<KV<String, Iterable<String>>> emptySchemaFiles =
        PCollectionList.of(emptyTablesAndViews)
            .and(models)
            .and(changeStreams)
            .and(sequences)
            .and(namedSchemas)
            .and(placements)
            .and(propertyGraphs)
            .apply("Combine all empty schema files", Flatten.pCollections());

    emptySchemaFiles =
        emptySchemaFiles.apply(
            "Save empty schema files",
            ParDo.of(
                    new DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Map<String, SerializableSchemaSupplier> schemaMap =
                            c.sideInput(avroSchemas);
                        KV<String, Iterable<String>> kv = c.element();
                        String objectName = kv.getKey();
                        String fileName = kv.getValue().iterator().next();

                        Schema schema = schemaMap.get(objectName).get();

                        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

                        Path fullPath =
                            createOutputPath(
                                outputDir.get(), c.sideInput(outputDirectoryName), fileName);

                        try (DataFileWriter<GenericRecord> dataFileWriter =
                            new DataFileWriter<>(datumWriter)) {
                          dataFileWriter.create(schema, createOutputStream(fullPath, c));
                        } catch (IOException e) {
                          throw new RuntimeException(e);
                        }
                        c.output(KV.of(objectName, Collections.singleton(fullPath.toString())));
                      }

                      /**
                       * Resolves the complete path name for Avro files for both GCS and local FS
                       * (for testing).
                       *
                       * @param outputDirectoryPath Initial directory path for the file.
                       * @param outputDirectoryName Terminal directory for the file.
                       * @param fileName Name of the Avro file
                       * @return The full {@link Path} of the output Avro file.
                       */
                      private Path createOutputPath(
                          String outputDirectoryPath, String outputDirectoryName, String fileName) {
                        if (GcsPath.GCS_URI.matcher(outputDirectoryPath).matches()) {
                          // Avro file path in GCS.
                          return GcsPath.fromUri(outputDirectoryPath)
                              .resolve(outputDirectoryName)
                              .resolve(fileName);
                        } else {
                          // Avro file path in local filesystem
                          return Paths.get(outputDirectoryPath, outputDirectoryName, fileName);
                        }
                      }

                      /**
                       * Creates the {@link OutputStream} for the output file either on GCS or on
                       * local FS (for testing).
                       *
                       * @param outputPath The full path of the output file.
                       * @param c The {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext}
                       * @return An {@link OutputStream} for the opened output file.
                       * @throws IOException if the output file cannot be opened.
                       */
                      private OutputStream createOutputStream(Path outputPath, ProcessContext c)
                          throws IOException {
                        if (GcsPath.GCS_URI.matcher(outputPath.toString()).matches()) {
                          // Writing the Avro file to GCS.
                          org.apache.beam.sdk.extensions.gcp.util.GcsUtil gcsUtil =
                              c.getPipelineOptions().as(GcsOptions.class).getGcsUtil();
                          String gcsType = "application/octet-stream";
                          WritableByteChannel gcsChannel =
                              gcsUtil.create((GcsPath) outputPath, gcsType);
                          return Channels.newOutputStream(gcsChannel);
                        } else {
                          // Avro file is created on local filesystem (for testing).
                          Files.createDirectories(outputPath.getParent());
                          return Files.newOutputStream(outputPath);
                        }
                      }
                    })
                .withSideInputs(avroSchemas, outputDirectoryName));

    PCollection<KV<String, Iterable<String>>> allFiles =
        PCollectionList.of(tableFiles)
            .and(emptySchemaFiles)
            .apply("Combine all files", Flatten.pCollections());

    PCollection<KV<String, String>> tableManifests =
        allFiles.apply("Build table manifests", ParDo.of(new BuildTableManifests()));

    Contextful.Fn<String, FileIO.Write.FileNaming> tableManifestNaming =
        (element, c) ->
            (window, pane, numShards, shardIndex, compression) ->
                GcsUtil.joinPath(
                    outputDir.get(),
                    c.sideInput(outputDirectoryName),
                    tableManifestFileName(element));

    tableManifests.apply(
        "Store table manifests",
        FileIO.<String, KV<String, String>>writeDynamic()
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .withNaming(
                Contextful.of(
                    tableManifestNaming, Requirements.requiresSideInputs(outputDirectoryName)))
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .withTempDirectory(eitherOrValueProvider(avroTempDirectory, outputDir)));

    PCollection<List<Export.Table>> metadataTables =
        tableManifests.apply(
            "Combine table metadata", Combine.globally(new CombineTableMetadata()));

    PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());

    PCollection<String> metadataContent =
        metadataTables.apply(
            "Create database manifest",
            ParDo.of(new CreateDatabaseManifest(ddlView, dialectView))
                .withSideInputs(ddlView, dialectView));

    Contextful.Fn<String, FileIO.Write.FileNaming> manifestNaming =
        (element, c) ->
            (window, pane, numShards, shardIndex, compression) ->
                GcsUtil.joinPath(
                    outputDir.get(), c.sideInput(outputDirectoryName), "spanner-export.json");

    metadataContent.apply(
        "Store the database manifest",
        FileIO.<String, String>writeDynamic()
            .by(SerializableFunctions.constant(""))
            .withDestinationCoder(StringUtf8Coder.of())
            .via(TextIO.sink())
            .withNaming(
                Contextful.of(manifestNaming, Requirements.requiresSideInputs(outputDirectoryName)))
            .withTempDirectory(eitherOrValueProvider(avroTempDirectory, outputDir)));
    return fileWriteResults;
  }