protected static PCollection getSpannerRecords()

in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [648:745]


  protected static PCollection<HashResult> getSpannerRecords(String tableName,
      PipelineTracker pipelineTracker,
      String query,
      Integer keyIndex,
      String rangeFieldType,
      DVTOptionsCore options,
      PCollection<PartitionRange> pRanges,
      Integer timestampThresholdIndex) {

    Boolean adjustTimestampPrecision = options.getAdjustTimestampPrecision();

    String readOpsStepName = String.format("ConvertToSpannerIOReadOperationsForTable-%s",
        tableName);
    String reshuffleOpsStepName = String.format("ReshuffleSpannerForTable-%s",
        tableName);

    pRanges = (PCollection<PartitionRange>) pipelineTracker.applySpannerWait(pRanges);
    pRanges = pRanges.apply(reshuffleOpsStepName, Reshuffle.viaRandomKey());

    // https://cloud.google.com/spanner/docs/samples/spanner-dataflow-readall
    PCollection<ReadOperation> readOps = pRanges
        .apply(readOpsStepName,
        MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via(
            (SerializableFunction<PartitionRange, ReadOperation>)
                input -> {
                  Statement statement;
                  switch(rangeFieldType) {
                    case TableSpec.UUID_FIELD_TYPE:
                    case TableSpec.TIMESTAMP_FIELD_TYPE:
                    case TableSpec.STRING_FIELD_TYPE:
                      statement =
                          Statement.newBuilder(query)
                              .bind("p1")
                              .to(input.getStartRange())
                              .bind("p2")
                              .to(input.getEndRange())
                              .build();
                      break;
                    case TableSpec.INT_FIELD_TYPE:
                      statement =
                          Statement.newBuilder(query)
                              .bind("p1")
                              .to(Integer.parseInt(input.getStartRange()))
                              .bind("p2")
                              .to(Integer.parseInt(input.getEndRange()))
                              .build();
                      break;
                    case TableSpec.LONG_FIELD_TYPE:
                      statement =
                          Statement.newBuilder(query)
                              .bind("p1")
                              .to(Long.parseLong(input.getStartRange()))
                              .bind("p2")
                              .to(Long.parseLong(input.getEndRange()))
                              .build();
                      break;
                    default:
                      throw new RuntimeException(String.format("Unexpected range field type: %s",
                          rangeFieldType));
                  }
                  ReadOperation readOperation =
                      ReadOperation.create().withQuery(statement);

                  return readOperation;
                }));

    String spannerProjectId = options.getSpannerProjectId().isEmpty() ?
        options.getProjectId() : options.getSpannerProjectId();

    String spannerReadStepName = String.format("SpannerReadAllForTable-%s", tableName);

    ReadAll spannerRead = SpannerIO.readAll();
    if(options.getReadFromSpannerWithHighPriorityCPU()) {
      spannerRead = spannerRead.withHighPriority();
    }

    PCollection<Struct> spannerRecords =
        readOps.apply(spannerReadStepName, spannerRead
        .withProjectId(spannerProjectId)
        .withInstanceId(options.getInstanceId())
        .withDatabaseId(options.getSpannerDatabaseId()));

    String convertToHashResultStepName =
        String.format("ConvertToHashResultForTable-%s", tableName);
    PCollection<HashResult> spannerHashes = spannerRecords.apply(convertToHashResultStepName,
        MapElements.into(TypeDescriptor.of(HashResult.class))
            .via(
                (SerializableFunction<? super Struct, HashResult>)
                    input -> HashResult.fromSpannerStruct(input,
                        keyIndex,
                        rangeFieldType,
                        adjustTimestampPrecision,
                        timestampThresholdIndex)
            ));

    return spannerHashes;
  }