public void createPartitionMetadataTable()

in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/changestreams/dao/PartitionMetadataAdminDao.java [130:262]


  public void createPartitionMetadataTable() {
    List<String> ddl = new ArrayList<>();
    if (this.isPostgres()) {
      // Literals need be added around literals to preserve casing.
      ddl.add(
          "CREATE TABLE IF NOT EXISTS \""
              + names.getTableName()
              + "\"(\""
              + COLUMN_PARTITION_TOKEN
              + "\" text NOT NULL,\""
              + COLUMN_PARENT_TOKENS
              + "\" text[] NOT NULL,\""
              + COLUMN_START_TIMESTAMP
              + "\" timestamptz NOT NULL,\""
              + COLUMN_END_TIMESTAMP
              + "\" timestamptz NOT NULL,\""
              + COLUMN_HEARTBEAT_MILLIS
              + "\" BIGINT NOT NULL,\""
              + COLUMN_STATE
              + "\" text NOT NULL,\""
              + COLUMN_WATERMARK
              + "\" timestamptz NOT NULL,\""
              + COLUMN_CREATED_AT
              + "\" SPANNER.COMMIT_TIMESTAMP NOT NULL,\""
              + COLUMN_SCHEDULED_AT
              + "\" SPANNER.COMMIT_TIMESTAMP,\""
              + COLUMN_RUNNING_AT
              + "\" SPANNER.COMMIT_TIMESTAMP,\""
              + COLUMN_FINISHED_AT
              + "\" SPANNER.COMMIT_TIMESTAMP,"
              + " PRIMARY KEY (\""
              + COLUMN_PARTITION_TOKEN
              + "\")"
              + ")"
              + " TTL INTERVAL '"
              + TTL_AFTER_PARTITION_FINISHED_DAYS
              + " days' ON \""
              + COLUMN_FINISHED_AT
              + "\"");
      ddl.add(
          "CREATE INDEX IF NOT EXISTS \""
              + names.getWatermarkIndexName()
              + "\" on \""
              + names.getTableName()
              + "\" (\""
              + COLUMN_WATERMARK
              + "\") INCLUDE (\""
              + COLUMN_STATE
              + "\")");
      ddl.add(
          "CREATE INDEX IF NOT EXISTS \""
              + names.getCreatedAtIndexName()
              + "\" ON \""
              + names.getTableName()
              + "\" (\""
              + COLUMN_CREATED_AT
              + "\",\""
              + COLUMN_START_TIMESTAMP
              + "\")");
    } else {
      ddl.add(
          "CREATE TABLE IF NOT EXISTS "
              + names.getTableName()
              + " ("
              + COLUMN_PARTITION_TOKEN
              + " STRING(MAX) NOT NULL,"
              + COLUMN_PARENT_TOKENS
              + " ARRAY<STRING(MAX)> NOT NULL,"
              + COLUMN_START_TIMESTAMP
              + " TIMESTAMP NOT NULL,"
              + COLUMN_END_TIMESTAMP
              + " TIMESTAMP NOT NULL,"
              + COLUMN_HEARTBEAT_MILLIS
              + " INT64 NOT NULL,"
              + COLUMN_STATE
              + " STRING(MAX) NOT NULL,"
              + COLUMN_WATERMARK
              + " TIMESTAMP NOT NULL,"
              + COLUMN_CREATED_AT
              + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),"
              + COLUMN_SCHEDULED_AT
              + " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
              + COLUMN_RUNNING_AT
              + " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
              + COLUMN_FINISHED_AT
              + " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
              + ") PRIMARY KEY ("
              + COLUMN_PARTITION_TOKEN
              + "),"
              + " ROW DELETION POLICY (OLDER_THAN("
              + COLUMN_FINISHED_AT
              + ", INTERVAL "
              + TTL_AFTER_PARTITION_FINISHED_DAYS
              + " DAY))");
      ddl.add(
          "CREATE INDEX IF NOT EXISTS "
              + names.getWatermarkIndexName()
              + " on "
              + names.getTableName()
              + " ("
              + COLUMN_WATERMARK
              + ") STORING ("
              + COLUMN_STATE
              + ")");
      ddl.add(
          "CREATE INDEX IF NOT EXISTS "
              + names.getCreatedAtIndexName()
              + " ON "
              + names.getTableName()
              + " ("
              + COLUMN_CREATED_AT
              + ","
              + COLUMN_START_TIMESTAMP
              + ")");
    }
    OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
        databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
    try {
      // Initiate the request which returns an OperationFuture.
      op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
    } catch (ExecutionException | TimeoutException e) {
      // If the operation failed or timed out during execution, expose the cause.
      if (e.getCause() != null) {
        throw (SpannerException) e.getCause();
      } else {
        throw SpannerExceptionFactory.asSpannerException(e);
      }
    } catch (InterruptedException e) {
      // Throw when a thread is waiting, sleeping, or otherwise occupied,
      // and the thread is interrupted, either before or during the activity.
      throw SpannerExceptionFactory.propagateInterrupt(e);
    }
  }