public static Statement generateShadowTableReadSQL()

in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ShadowTableReadUtils.java [34:103]


  public static Statement generateShadowTableReadSQL(
      String shadowTable, List<String> readColumnList, Key primaryKey, Ddl shadowTableDdl) {
    String columnNames = String.join(", ", readColumnList);
    // TODO: Handle json type as PKs.
    String whereClause =
        String.join(
            " AND ",
            shadowTableDdl.table(shadowTable).primaryKeys().stream()
                .map(col -> col.name() + "=@" + col.name())
                .collect(Collectors.toList()));
    String sql =
        "@{LOCK_SCANNED_RANGES=exclusive} SELECT "
            + columnNames
            + " FROM "
            + shadowTable
            + " WHERE "
            + whereClause;

    Statement.Builder stmtBuilder = Statement.newBuilder(sql);
    int i = 0;
    for (Object value : primaryKey.getParts()) {
      Table table = shadowTableDdl.table(shadowTable);
      String colName = table.primaryKeys().get(i).name();
      Column key = table.column(colName);
      Type keyColType = key.type();

      switch (keyColType.getCode()) {
        case BOOL:
        case PG_BOOL:
          stmtBuilder.bind(colName).to((Boolean) value);
          break;
        case INT64:
        case PG_INT8:
          stmtBuilder.bind(colName).to((Long) value);
          break;
        case FLOAT64:
        case PG_FLOAT8:
          stmtBuilder.bind(colName).to((Double) value);
          break;
        case STRING:
        case PG_VARCHAR:
        case PG_TEXT:
        case JSON:
        case PG_JSONB:
          stmtBuilder.bind(colName).to((String) value);
          break;
        case NUMERIC:
        case PG_NUMERIC:
          stmtBuilder.bind(colName).to((BigDecimal) value);
          break;
        case BYTES:
        case PG_BYTEA:
          stmtBuilder.bind(colName).to((ByteArray) value);
          break;
        case TIMESTAMP:
        case PG_COMMIT_TIMESTAMP:
        case PG_TIMESTAMPTZ:
          stmtBuilder.bind(colName).to((Timestamp) value);
          break;
        case DATE:
        case PG_DATE:
          stmtBuilder.bind(colName).to((Date) value);
          break;
        default:
          throw new IllegalArgumentException("Unsupported type: " + keyColType);
      }
      i++;
    }
    return stmtBuilder.build();
  }