public PDone expand()

in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java [139:296]


  public PDone expand(PBegin begin) {
    PCollectionView<Dialect> dialectView =
        begin
            .apply("Read Dialect", new ReadDialect(spannerConfig))
            .apply("Dialect As PCollectionView", View.asSingleton());

    PCollection<Export> manifest =
        begin.apply("Read manifest", new ReadExportManifestFile(importDirectory, dialectView));

    PCollectionView<Export> manifestView = manifest.apply("Manifest as view", View.asSingleton());

    PCollection<KV<String, String>> allFiles =
        manifest.apply("Read all manifest files", new ReadManifestFiles(importDirectory));

    PCollection<KV<String, List<String>>> tableFiles = allFiles.apply(Combine.perKey(AsList.fn()));

    PCollection<KV<String, String>> schemas =
        tableFiles
            .apply(
                "File per table, view or change stream",
                ParDo.of(
                    new DoFn<KV<String, List<String>>, KV<String, String>>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        KV<String, List<String>> kv = c.element();
                        if (!kv.getValue().isEmpty()) {
                          c.output(KV.of(kv.getKey(), kv.getValue().get(0)));
                        }
                      }
                    }))
            .apply("Extract avro schemas", ParDo.of(new ReadAvroSchemas()));

    final PCollection<List<KV<String, String>>> avroSchemas =
        schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));

    PCollectionView<Transaction> tx =
        begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

    PCollection<Ddl> informationSchemaDdl =
        begin.apply(
            "Read Information Schema", new ReadInformationSchema(spannerConfig, tx, dialectView));

    final PCollectionView<List<KV<String, String>>> avroDdlView =
        avroSchemas.apply("Avro ddl view", View.asSingleton());
    final PCollectionView<Ddl> informationSchemaView =
        informationSchemaDdl.apply("Information schema view", View.asSingleton());
    final PCollectionTuple createTableOutput =
        begin.apply(
            "Create Cloud Spanner Tables and indexes",
            new CreateTables(
                spannerConfig,
                avroDdlView,
                informationSchemaView,
                manifestView,
                earlyIndexCreateFlag,
                ddlCreationTimeoutInMinutes));

    final PCollection<Ddl> ddl = createTableOutput.get(CreateTables.getDdlObjectTag());
    final PCollectionView<List<String>> pendingIndexes =
        createTableOutput
            .get(CreateTables.getPendingIndexesTag())
            .apply("As Index view", View.asSingleton());
    final PCollectionView<List<String>> pendingForeignKeys =
        createTableOutput
            .get(CreateTables.getPendingForeignKeysTag())
            .apply("As Foreign keys view", View.asSingleton());
    final PCollectionView<List<String>> pendingChangeStreams =
        createTableOutput
            .get(CreateTables.getPendingChangeStreamsTag())
            .apply("As change streams view", View.asSingleton());

    PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());

    PCollectionView<HashMultimap<Integer, String>> levelsView =
        ddl.apply(
                "Group tables by depth",
                ParDo.of(
                    new DoFn<Ddl, HashMultimap<Integer, String>>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Ddl ddl = c.element();
                        c.output(ddl.perLevelView());
                      }
                    }))
            .apply(View.asSingleton());

    PCollection<HashMultimap<String, String>> acc =
        tableFiles
            .apply("Combine table files", Combine.globally(AsList.fn()))
            .apply(
                "As HashMultimap",
                ParDo.of(
                    new DoFn<List<KV<String, List<String>>>, HashMultimap<String, String>>() {

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        HashMultimap<String, String> result = HashMultimap.create();
                        for (KV<String, List<String>> kv : c.element()) {
                          result.putAll(kv.getKey().toLowerCase(), kv.getValue());
                        }
                        c.output(result);
                      }
                    }));

    PCollection<?> previousComputation = ddl;
    for (int i = 0; i < MAX_DEPTH; i++) {
      final int depth = i;
      PCollection<KV<String, String>> levelFiles =
          acc.apply(
                  "Get Avro filenames depth " + depth,
                  ParDo.of(
                          new DoFn<HashMultimap<String, String>, KV<String, String>>() {

                            @ProcessElement
                            public void processElement(ProcessContext c) {
                              HashMultimap<String, String> allFiles = c.element();
                              HashMultimap<Integer, String> levels = c.sideInput(levelsView);

                              Set<String> tables = levels.get(depth);
                              for (String table : tables) {
                                for (String file : allFiles.get(table)) {
                                  c.output(KV.of(file, table));
                                }
                              }
                            }
                          })
                      .withSideInputs(levelsView))
              .apply("Wait for previous depth " + depth, Wait.on(previousComputation));
      PCollection<Mutation> mutations =
          levelFiles.apply(
              "Avro files as mutations " + depth, new AvroTableFileAsMutations(ddlView));

      SpannerWriteResult result =
          mutations.apply(
              "Write mutations " + depth,
              SpannerIO.write()
                  .withSchemaReadySignal(ddl)
                  .withSpannerConfig(spannerConfig)
                  .withCommitDeadline(Duration.standardMinutes(1))
                  .withMaxCumulativeBackoff(Duration.standardHours(2))
                  .withMaxNumMutations(10000)
                  .withGroupingFactor(100)
                  .withDialectView(dialectView));
      previousComputation = result.getOutput();
    }
    ddl.apply(Wait.on(previousComputation))
        .apply(
            "Create Indexes", new ApplyDDLTransform(spannerConfig, pendingIndexes, waitForIndexes))
        .apply(
            "Add Foreign Keys",
            new ApplyDDLTransform(spannerConfig, pendingForeignKeys, waitForForeignKeys))
        .apply(
            "Create Change Streams",
            new ApplyDDLTransform(spannerConfig, pendingChangeStreams, waitForChangeStreams));
    return PDone.in(begin.getPipeline());
  }