public PCollectionTuple expand()

in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java [458:716]


    public PCollectionTuple expand(PBegin begin) {
      return begin
          .apply(Create.of(1))
          .apply(
              ParDo.of(
                      new DoFn<Integer, Ddl>() {

                        @Setup
                        public void setup() {
                          spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
                        }

                        @Teardown
                        public void teardown() {
                          spannerAccessor.close();
                        }

                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          List<KV<String, String>> avroSchemas = c.sideInput(avroSchemasView);
                          Ddl informationSchemaDdl = c.sideInput(informationSchemaView);
                          Dialect dialect = informationSchemaDdl.dialect();
                          Export manifest = c.sideInput(manifestView);
                          ByteString protoDescriptors = manifest.getProtoDescriptors();

                          if (LOG.isDebugEnabled()) {
                            LOG.debug(informationSchemaDdl.prettyPrint());
                          }
                          Schema.Parser parser = new Schema.Parser();
                          List<KV<String, Schema>> missingNamedSchemas = new ArrayList<>();
                          List<KV<String, Schema>> missingTables = new ArrayList<>();
                          List<KV<String, Schema>> missingModels = new ArrayList<>();
                          List<KV<String, Schema>> missingViews = new ArrayList<>();
                          List<KV<String, Schema>> missingChangeStreams = new ArrayList<>();
                          List<KV<String, Schema>> missingSequences = new ArrayList<>();
                          List<KV<String, Schema>> missingPlacements = new ArrayList<>();
                          List<KV<String, Schema>> missingPropertyGraphs = new ArrayList<>();
                          for (KV<String, String> kv : avroSchemas) {
                            if (informationSchemaDdl.schema(kv.getKey()) == null
                                && informationSchemaDdl.table(kv.getKey()) == null
                                && informationSchemaDdl.model(kv.getKey()) == null
                                && informationSchemaDdl.view(kv.getKey()) == null
                                && informationSchemaDdl.changeStream(kv.getKey()) == null
                                && informationSchemaDdl.sequence(kv.getKey()) == null
                                && informationSchemaDdl.placement(kv.getKey()) == null
                                && informationSchemaDdl.propertyGraph(kv.getKey()) == null) {
                              Schema schema = parser.parse(kv.getValue());
                              if (schema.getProp(AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE)
                                  != null) {
                                missingChangeStreams.add(KV.of(kv.getKey(), schema));
                              } else if ("Model".equals(schema.getProp("spannerEntity"))) {
                                missingModels.add(KV.of(kv.getKey(), schema));
                              } else if (schema.getProp("spannerViewQuery") != null) {
                                missingViews.add(KV.of(kv.getKey(), schema));
                              } else if (schema.getProp("sequenceOption_0") != null
                                  || schema.getProp(AvroUtil.SPANNER_SEQUENCE_KIND) != null) {
                                missingSequences.add(KV.of(kv.getKey(), schema));
                              } else if ("spannerNamedSchema"
                                  .equals(schema.getProp("spannerEntity"))) {
                                missingNamedSchemas.add(KV.of(kv.getKey(), schema));
                              } else if ("Placement".equals(schema.getProp("spannerEntity"))) {
                                missingPlacements.add(KV.of(kv.getKey(), schema));
                              } else if (AvroUtil.SPANNER_ENTITY_PROPERTY_GRAPH.equals(
                                  schema.getProp("spannerEntity"))) {
                                missingPropertyGraphs.add(KV.of(kv.getKey(), schema));
                              } else {
                                missingTables.add(KV.of(kv.getKey(), schema));
                              }
                            }
                          }
                          AvroSchemaToDdlConverter converter =
                              new AvroSchemaToDdlConverter(dialect);
                          List<String> createIndexStatements = new ArrayList<>();
                          List<String> createForeignKeyStatements = new ArrayList<>();
                          List<String> createChangeStreamStatements = new ArrayList<>();
                          List<String> createSequenceStatements = new ArrayList<>();
                          List<String> createPlacementStatements = new ArrayList<>();

                          Ddl.Builder mergedDdl = informationSchemaDdl.toBuilder();
                          List<String> ddlStatements = new ArrayList<>();

                          if (!manifest.getDatabaseOptionsList().isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            builder.mergeDatabaseOptions(manifest.getDatabaseOptionsList());
                            mergedDdl.mergeDatabaseOptions(manifest.getDatabaseOptionsList());
                            Ddl newDdl = builder.build();
                            ddlStatements.addAll(
                                newDdl.setOptionsStatements(spannerConfig.getDatabaseId().get()));
                          }

                          // CREATE PROTO BUNDLE statement has to be placed before
                          // table and view statements, since tables and views
                          // may use PROTO BUNDLE.
                          if (!manifest.getProtoBundleList().isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            builder.mergeProtoBundle(
                                ImmutableSet.copyOf(manifest.getProtoBundleList()));
                            mergedDdl.mergeProtoBundle(
                                ImmutableSet.copyOf(manifest.getProtoBundleList()));
                            Ddl newDdl = builder.build();
                            ddlStatements.add(newDdl.createProtoBundleStatement());
                          }

                          if (!missingNamedSchemas.isEmpty()) {
                            for (KV<String, Schema> kv : missingNamedSchemas) {
                              Ddl.Builder builder = Ddl.builder(dialect);
                              NamedSchema schema = converter.toSchema(kv.getKey(), kv.getValue());
                              builder.addSchema(schema);
                              mergedDdl.addSchema(schema);
                              Ddl newDdl = builder.build();
                              ddlStatements.addAll(newDdl.createNamedSchemaStatements());
                            }
                          }

                          // CREATE SEQUENCE statements have to be placed before
                          // table and view statements, since tables and views
                          // may use sequences.
                          if (!missingSequences.isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            for (KV<String, Schema> kv : missingSequences) {
                              Sequence sequence = converter.toSequence(kv.getKey(), kv.getValue());
                              builder.addSequence(sequence);
                              mergedDdl.addSequence(sequence);
                            }
                            Ddl newDdl = builder.build();
                            ddlStatements.addAll(newDdl.createSequenceStatements());
                          }

                          if (!missingPlacements.isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            for (KV<String, Schema> kv : missingPlacements) {
                              Placement placement =
                                  converter.toPlacement(kv.getKey(), kv.getValue());
                              builder.addPlacement(placement);
                              mergedDdl.addPlacement(placement);
                            }
                            Ddl newDdl = builder.build();
                            ddlStatements.addAll(newDdl.createPlacementStatements());
                          }

                          if (!missingTables.isEmpty()
                              || !missingModels.isEmpty()
                              || !missingViews.isEmpty()
                              || !missingPropertyGraphs.isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            for (KV<String, Schema> kv : missingViews) {
                              com.google.cloud.teleport.spanner.ddl.View view =
                                  converter.toView(kv.getKey(), kv.getValue());
                              builder.addView(view);
                              mergedDdl.addView(view);
                            }
                            for (KV<String, Schema> kv : missingModels) {
                              com.google.cloud.teleport.spanner.ddl.Model model =
                                  converter.toModel(kv.getKey(), kv.getValue());
                              builder.addModel(model);
                              mergedDdl.addModel(model);
                            }
                            for (KV<String, Schema> kv : missingTables) {
                              Table table = converter.toTable(kv.getKey(), kv.getValue());
                              builder.addTable(table);
                              mergedDdl.addTable(table);
                              // Account for additional DDL changes for tables being created
                              createIndexStatements.addAll(table.indexes());
                              createForeignKeyStatements.addAll(table.foreignKeys());
                            }
                            for (KV<String, Schema> kv : missingPropertyGraphs) {
                              PropertyGraph graph =
                                  converter.toPropertyGraph(kv.getKey(), kv.getValue());
                              builder.addPropertyGraph(graph);
                              mergedDdl.addPropertyGraph(graph);
                            }
                            Ddl newDdl = builder.build();
                            ddlStatements.addAll(newDdl.createTableStatements());
                            ddlStatements.addAll(newDdl.createModelStatements());
                            ddlStatements.addAll(newDdl.createViewStatements());
                            ddlStatements.addAll(newDdl.createPropertyGraphStatements());
                            // If the total DDL statements exceed the threshold, execute the create
                            // index statements when tables are created.
                            // Note that foreign keys can only be created after data load
                            // because if we tried to create them before data load, we would
                            // need to load rows in a specific order (insert the referenced
                            // row first before the referencing row). This is not always
                            // possible since foreign keys may introduce circular relationships.
                            if (earlyIndexCreateFlag.get()
                                && ((createForeignKeyStatements.size()
                                        + createIndexStatements.size())
                                    >= EARLY_INDEX_CREATE_THRESHOLD)) {
                              LOG.info(
                                  "Create index early: {}",
                                  String.join(";", createIndexStatements));
                              ddlStatements.addAll(createIndexStatements);
                              c.output(pendingIndexesTag, new ArrayList<String>());
                            } else {
                              LOG.info(
                                  "Pending index creation: {}",
                                  String.join(";", createIndexStatements));
                              c.output(pendingIndexesTag, createIndexStatements);
                            }
                            c.output(pendingForeignKeysTag, createForeignKeyStatements);
                          }

                          if (!missingChangeStreams.isEmpty()) {
                            Ddl.Builder builder = Ddl.builder(dialect);
                            for (KV<String, Schema> kv : missingChangeStreams) {
                              ChangeStream changeStream =
                                  converter.toChangeStream(kv.getKey(), kv.getValue());
                              builder.addChangeStream(changeStream);
                            }
                            Ddl newDdl = builder.build();
                            createChangeStreamStatements.addAll(
                                newDdl.createChangeStreamStatements());
                          }
                          c.output(pendingChangeStreamsTag, createChangeStreamStatements);

                          LOG.info(
                              "Applying DDL statements for schemas, tables, models, views and property graphs: {}",
                              ddlStatements);
                          if (!ddlStatements.isEmpty()) {
                            DatabaseAdminClient databaseAdminClient =
                                spannerAccessor.getDatabaseAdminClient();
                            Database.Builder databaseBuilder =
                                databaseAdminClient.newDatabaseBuilder(
                                    DatabaseId.of(
                                        spannerConfig.getProjectId().get(),
                                        spannerConfig.getInstanceId().get(),
                                        spannerConfig.getDatabaseId().get()));
                            if (protoDescriptors != null) {
                              databaseBuilder.setProtoDescriptors(protoDescriptors.toByteArray());
                            }
                            OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
                                databaseAdminClient.updateDatabaseDdl(
                                    databaseBuilder.build(), ddlStatements, null);
                            try {
                              op.get(ddlCreationTimeoutInMinutes.get(), TimeUnit.MINUTES);
                            } catch (InterruptedException
                                | ExecutionException
                                | TimeoutException e) {
                              throw new RuntimeException(e);
                            }
                            c.output(mergedDdl.build());
                          } else {
                            c.output(informationSchemaDdl);
                          }
                          // In case of no tables, models or property graphs, add empty list
                          if (missingTables.isEmpty()
                              && missingModels.isEmpty()
                              && missingPropertyGraphs.isEmpty()) {
                            c.output(pendingIndexesTag, createIndexStatements);
                            c.output(pendingForeignKeysTag, createForeignKeyStatements);
                          }
                        }
                      })
                  .withSideInputs(avroSchemasView, informationSchemaView, manifestView)
                  .withOutputTags(
                      ddlObjectTag,
                      TupleTagList.of(pendingIndexesTag)
                          .and(pendingForeignKeysTag)
                          .and(pendingChangeStreamsTag)));
    }