public Collection convert()

in v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java [101:382]


  public Collection<Schema> convert(Ddl ddl) {
    Collection<Schema> schemas = new ArrayList<>();

    for (NamedSchema schema : ddl.schemas()) {
      LOG.info("DdlToAvo Schema {}", schema.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(schema.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, schema.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      // Indicate that this is a "CREATE SCHEMA", not a table or a view.
      recordBuilder.prop(SPANNER_ENTITY, SPANNER_NAMED_SCHEMA);
      schemas.add(recordBuilder.fields().endRecord());
    }
    for (Table table : ddl.allTables()) {
      LOG.info("DdlToAvo Table {}", table.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(table.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, table.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      if (table.interleaveInParent() != null) {
        recordBuilder.prop(SPANNER_PARENT, table.interleaveInParent());
        recordBuilder.prop(
            SPANNER_INTERLEAVE_TYPE,
            table.interleaveType() == InterleaveType.IN ? "IN" : "IN PARENT");
        recordBuilder.prop(
            SPANNER_ON_DELETE_ACTION, table.onDeleteCascade() ? "cascade" : "no action");
      }
      if (table.dialect() == Dialect.GOOGLE_STANDARD_SQL) {
        if (table.primaryKeys() != null) {
          String encodedPk =
              table.primaryKeys().stream()
                  .map(IndexColumn::prettyPrint)
                  .collect(Collectors.joining(","));
          recordBuilder.prop(SPANNER_PRIMARY_KEY, encodedPk);
        }
        for (int i = 0; i < table.primaryKeys().size(); i++) {
          recordBuilder.prop(
              SPANNER_PRIMARY_KEY + "_" + i, table.primaryKeys().get(i).prettyPrint());
        }
      } else if (table.dialect() == Dialect.POSTGRESQL) {
        if (table.primaryKeys() != null) {
          String encodedPk =
              table.primaryKeys().stream()
                  .map(c -> "\"" + c.name() + "\"")
                  .collect(Collectors.joining(", "));
          recordBuilder.prop(SPANNER_PRIMARY_KEY, encodedPk);
        }
        for (int i = 0; i < table.primaryKeys().size(); i++) {
          IndexColumn pk = table.primaryKeys().get(i);
          recordBuilder.prop(SPANNER_PRIMARY_KEY + "_" + i, "\"" + pk.name() + "\" ASC");
        }
      }
      for (int i = 0; i < table.indexes().size(); i++) {
        recordBuilder.prop(SPANNER_INDEX + i, table.indexes().get(i));
      }
      for (int i = 0; i < table.foreignKeys().size(); i++) {
        recordBuilder.prop(SPANNER_FOREIGN_KEY + i, table.foreignKeys().get(i));
      }
      for (int i = 0; i < table.checkConstraints().size(); i++) {
        recordBuilder.prop(SPANNER_CHECK_CONSTRAINT + i, table.checkConstraints().get(i));
      }
      SchemaBuilder.FieldAssembler<Schema> fieldsAssembler = recordBuilder.fields();
      int columnOrdinal = 0;
      for (Column cm : table.columns()) {
        SchemaBuilder.FieldBuilder<Schema> fieldBuilder = fieldsAssembler.name(cm.name());
        fieldBuilder.prop(SQL_TYPE, cm.typeString());
        fieldBuilder.prop(HIDDEN, Boolean.toString(cm.isHidden()));
        for (int i = 0; i < cm.columnOptions().size(); i++) {
          fieldBuilder.prop(SPANNER_OPTION + i, cm.columnOptions().get(i));
        }
        if (cm.isPlacementKey()) {
          fieldBuilder.prop(SPANNER_PLACEMENT_KEY, Boolean.toString(cm.isPlacementKey()));
        }
        if (cm.isGenerated()) {
          fieldBuilder.prop(NOT_NULL, Boolean.toString(cm.notNull()));
          fieldBuilder.prop(GENERATION_EXPRESSION, cm.generationExpression());
          fieldBuilder.prop(STORED, Boolean.toString(cm.isStored()));
          // Make the type null to allow us not export the generated column values,
          // which are semantically logical entities.
          fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null);
        } else {
          if (cm.isIdentityColumn()) {
            fieldBuilder.prop(IDENTITY_COLUMN, Boolean.toString(cm.isIdentityColumn()));
            if (cm.sequenceKind() != null) {
              fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind());
            }
            if (cm.counterStartValue() != null) {
              fieldBuilder.prop(
                  SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
            }
            if (cm.skipRangeMin() != null) {
              fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
            }
            if (cm.skipRangeMax() != null) {
              fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
            }
          } else if (cm.defaultExpression() != null) {
            fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
          }
          Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
          if (!cm.notNull()) {
            avroType = wrapAsNullable(avroType);
          }
          fieldBuilder.type(avroType).noDefault();
        }
      }
      Schema schema = fieldsAssembler.endRecord();
      schemas.add(schema);
    }

    for (Model model : ddl.models()) {
      LOG.info("DdlToAvo model {}", model.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(model.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, model.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_MODEL);
      recordBuilder.prop(SPANNER_REMOTE, Boolean.toString(model.remote()));
      if (model.options() != null) {
        for (int i = 0; i < model.options().size(); i++) {
          recordBuilder.prop(SPANNER_OPTION + i, model.options().get(i));
        }
      }

      SchemaBuilder.FieldAssembler<Schema> fieldsAssembler = recordBuilder.fields();

      SchemaBuilder.FieldAssembler<RecordDefault<Schema>> inputBuilder =
          fieldsAssembler
              .name(INPUT)
              .type()
              .record(model.name() + "_" + INPUT)
              .namespace(this.namespace)
              .fields();
      int inputColumnOrdinal = 0;
      for (ModelColumn c : model.inputColumns()) {
        FieldBuilder<RecordDefault<Schema>> fieldBuilder = inputBuilder.name(c.name());
        fieldBuilder.prop(SQL_TYPE, c.typeString());
        for (int i = 0; i < c.columnOptions().size(); i++) {
          fieldBuilder.prop(SPANNER_OPTION + i, c.columnOptions().get(i));
        }
        Schema avroType = avroType(c.type(), model.name() + "_input_" + inputColumnOrdinal++);
        fieldBuilder.type(avroType).noDefault();
      }
      inputBuilder.endRecord().noDefault();

      SchemaBuilder.FieldAssembler<RecordDefault<Schema>> outputBuilder =
          fieldsAssembler
              .name(OUTPUT)
              .type()
              .record(model.name() + "_" + OUTPUT)
              .namespace(this.namespace)
              .fields();
      int outputColumnOrdinal = 0;
      for (ModelColumn c : model.outputColumns()) {
        FieldBuilder<RecordDefault<Schema>> fieldBuilder = outputBuilder.name(c.name());
        fieldBuilder.prop(SQL_TYPE, c.typeString());
        for (int i = 0; i < c.columnOptions().size(); i++) {
          fieldBuilder.prop(SPANNER_OPTION + i, c.columnOptions().get(i));
        }
        Schema avroType = avroType(c.type(), model.name() + "_output_" + outputColumnOrdinal++);
        fieldBuilder.type(avroType).noDefault();
      }
      outputBuilder.endRecord().noDefault();

      Schema schema = fieldsAssembler.endRecord();
      schemas.add(schema);
    }

    for (PropertyGraph propertyGraph : ddl.propertyGraphs()) {
      LOG.info("DdlToAvro PropertyGraph {}", propertyGraph.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(propertyGraph.name()))
              .namespace(this.namespace);

      recordBuilder.prop(SPANNER_NAME, propertyGraph.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_PROPERTY_GRAPH);

      // Encode nodeTables
      for (int i = 0; i < propertyGraph.nodeTables().size(); i++) {
        encodeNodeTable(recordBuilder, propertyGraph.nodeTables().get(i), i);
      }

      // Encode edgeTables
      for (int i = 0; i < propertyGraph.edgeTables().size(); i++) {
        encodeEdgeTable(recordBuilder, propertyGraph.edgeTables().get(i), i);
      }

      // Encode propertyDeclarations
      encodePropertyDeclarations(recordBuilder, propertyGraph.propertyDeclarations());

      // Encode labels
      encodeLabels(recordBuilder, propertyGraph.labels());

      Schema schema = recordBuilder.fields().endRecord();
      schemas.add(schema);
    }

    for (View view : ddl.views()) {
      LOG.info("DdlToAvo view {}", view.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(view.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, view.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      recordBuilder.prop(SPANNER_VIEW_QUERY, view.query());
      if (view.security() != null) {
        recordBuilder.prop(SPANNER_VIEW_SECURITY, view.security().toString());
      }
      schemas.add(recordBuilder.fields().endRecord());
    }

    for (ChangeStream changeStream : ddl.changeStreams()) {
      LOG.info("DdlToAvo changestream {}", changeStream.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(changeStream.name()))
              .namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, changeStream.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      recordBuilder.prop(
          SPANNER_CHANGE_STREAM_FOR_CLAUSE,
          changeStream.forClause() == null ? "" : changeStream.forClause());
      if (changeStream.options() != null) {
        for (int i = 0; i < changeStream.options().size(); i++) {
          recordBuilder.prop(SPANNER_OPTION + i, changeStream.options().get(i));
        }
      }
      schemas.add(recordBuilder.fields().endRecord());
    }

    for (Sequence sequence : ddl.sequences()) {
      LOG.info("DdlToAvo sequence {}", sequence.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(sequence.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, sequence.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      if (sequence.options() != null) {
        for (int i = 0; i < sequence.options().size(); i++) {
          recordBuilder.prop(SPANNER_SEQUENCE_OPTION + i, sequence.options().get(i));
        }
      }
      if (sequence.sequenceKind() != null) {
        recordBuilder.prop(SPANNER_SEQUENCE_KIND, sequence.sequenceKind());
      }
      if (sequence.counterStartValue() != null) {
        recordBuilder.prop(
            SPANNER_SEQUENCE_COUNTER_START, String.valueOf(sequence.counterStartValue()));
      }
      if (sequence.skipRangeMin() != null) {
        recordBuilder.prop(
            SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(sequence.skipRangeMin()));
      }
      if (sequence.skipRangeMax() != null) {
        recordBuilder.prop(
            SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(sequence.skipRangeMax()));
      }
      schemas.add(recordBuilder.fields().endRecord());
    }

    for (Placement placement : ddl.placements()) {
      LOG.info("DdlToAvro placement {}", placement.name());
      SchemaBuilder.RecordBuilder<Schema> recordBuilder =
          SchemaBuilder.record(generateAvroSchemaName(placement.name())).namespace(this.namespace);
      recordBuilder.prop(SPANNER_NAME, placement.name());
      recordBuilder.prop(GOOGLE_FORMAT_VERSION, version);
      recordBuilder.prop(GOOGLE_STORAGE, "CloudSpanner");
      recordBuilder.prop(SPANNER_ENTITY, SPANNER_ENTITY_PLACEMENT);
      if (placement.options() != null) {
        for (int i = 0; i < placement.options().size(); i++) {
          recordBuilder.prop(SPANNER_OPTION + i, placement.options().get(i));
        }
      }
      schemas.add(recordBuilder.fields().endRecord());
    }
    return schemas;
  }