in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/changestreams/dao/PartitionMetadataAdminDao.java [130:262]
public void createPartitionMetadataTable() {
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
ddl.add(
"CREATE TABLE IF NOT EXISTS \""
+ names.getTableName()
+ "\"(\""
+ COLUMN_PARTITION_TOKEN
+ "\" text NOT NULL,\""
+ COLUMN_PARENT_TOKENS
+ "\" text[] NOT NULL,\""
+ COLUMN_START_TIMESTAMP
+ "\" timestamptz NOT NULL,\""
+ COLUMN_END_TIMESTAMP
+ "\" timestamptz NOT NULL,\""
+ COLUMN_HEARTBEAT_MILLIS
+ "\" BIGINT NOT NULL,\""
+ COLUMN_STATE
+ "\" text NOT NULL,\""
+ COLUMN_WATERMARK
+ "\" timestamptz NOT NULL,\""
+ COLUMN_CREATED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP NOT NULL,\""
+ COLUMN_SCHEDULED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_RUNNING_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_FINISHED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,"
+ " PRIMARY KEY (\""
+ COLUMN_PARTITION_TOKEN
+ "\")"
+ ")"
+ " TTL INTERVAL '"
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " days' ON \""
+ COLUMN_FINISHED_AT
+ "\"");
ddl.add(
"CREATE INDEX IF NOT EXISTS \""
+ names.getWatermarkIndexName()
+ "\" on \""
+ names.getTableName()
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX IF NOT EXISTS \""
+ names.getCreatedAtIndexName()
+ "\" ON \""
+ names.getTableName()
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
+ COLUMN_START_TIMESTAMP
+ "\")");
} else {
ddl.add(
"CREATE TABLE IF NOT EXISTS "
+ names.getTableName()
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
+ COLUMN_PARENT_TOKENS
+ " ARRAY<STRING(MAX)> NOT NULL,"
+ COLUMN_START_TIMESTAMP
+ " TIMESTAMP NOT NULL,"
+ COLUMN_END_TIMESTAMP
+ " TIMESTAMP NOT NULL,"
+ COLUMN_HEARTBEAT_MILLIS
+ " INT64 NOT NULL,"
+ COLUMN_STATE
+ " STRING(MAX) NOT NULL,"
+ COLUMN_WATERMARK
+ " TIMESTAMP NOT NULL,"
+ COLUMN_CREATED_AT
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_SCHEDULED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_RUNNING_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_FINISHED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ ") PRIMARY KEY ("
+ COLUMN_PARTITION_TOKEN
+ "),"
+ " ROW DELETION POLICY (OLDER_THAN("
+ COLUMN_FINISHED_AT
+ ", INTERVAL "
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))");
ddl.add(
"CREATE INDEX IF NOT EXISTS "
+ names.getWatermarkIndexName()
+ " on "
+ names.getTableName()
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX IF NOT EXISTS "
+ names.getCreatedAtIndexName()
+ " ON "
+ names.getTableName()
+ " ("
+ COLUMN_CREATED_AT
+ ","
+ COLUMN_START_TIMESTAMP
+ ")");
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (ExecutionException | TimeoutException e) {
// If the operation failed or timed out during execution, expose the cause.
if (e.getCause() != null) {
throw (SpannerException) e.getCause();
} else {
throw SpannerExceptionFactory.asSpannerException(e);
}
} catch (InterruptedException e) {
// Throw when a thread is waiting, sleeping, or otherwise occupied,
// and the thread is interrupted, either before or during the activity.
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}