in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/Dialect.scala [66:115]
def createTableStatement(table: String): immutable.Seq[String] =
immutable.Seq(
s"""CREATE TABLE IF NOT EXISTS $table (
| "PROJECTION_NAME" VARCHAR(255) NOT NULL,
| "PROJECTION_KEY" VARCHAR(255) NOT NULL,
| "CURRENT_OFFSET" VARCHAR(255) NOT NULL,
| "MANIFEST" VARCHAR(4) NOT NULL,
| "MERGEABLE" BOOLEAN NOT NULL,
| "LAST_UPDATED" BIGINT NOT NULL,
| PRIMARY KEY("PROJECTION_NAME", "PROJECTION_KEY")
|);""".stripMargin,
// create index
s"""CREATE INDEX IF NOT EXISTS "PROJECTION_NAME_INDEX" on $table ("PROJECTION_NAME");""")
def dropTableStatement(table: String): String =
s"""DROP TABLE IF EXISTS $table;"""
def readOffsetQuery(table: String) =
s"""SELECT * FROM $table WHERE "PROJECTION_NAME" = ?"""
def clearOffsetStatement(table: String) =
s"""DELETE FROM $table WHERE "PROJECTION_NAME" = ? AND "PROJECTION_KEY" = ?"""
def insertStatement(table: String): String =
s"""INSERT INTO $table (
| "PROJECTION_NAME",
| "PROJECTION_KEY",
| "CURRENT_OFFSET",
| "MANIFEST",
| "MERGEABLE",
| "LAST_UPDATED"
|) VALUES (?,?,?,?,?,?)""".stripMargin
def updateStatement(table: String): String =
s"""UPDATE $table
|SET
| "CURRENT_OFFSET" = ?,
| "MANIFEST" = ?,
| "MERGEABLE" = ?,
| "LAST_UPDATED" = ?
|WHERE "PROJECTION_NAME" = ? AND "PROJECTION_KEY" = ?""".stripMargin
object InsertIndices {
val PROJECTION_NAME = 1
val PROJECTION_KEY = 2
val OFFSET = 3
val MANIFEST = 4
val MERGEABLE = 5
val LAST_UPDATED = 6
}