in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [648:745]
protected static PCollection<HashResult> getSpannerRecords(String tableName,
PipelineTracker pipelineTracker,
String query,
Integer keyIndex,
String rangeFieldType,
DVTOptionsCore options,
PCollection<PartitionRange> pRanges,
Integer timestampThresholdIndex) {
Boolean adjustTimestampPrecision = options.getAdjustTimestampPrecision();
String readOpsStepName = String.format("ConvertToSpannerIOReadOperationsForTable-%s",
tableName);
String reshuffleOpsStepName = String.format("ReshuffleSpannerForTable-%s",
tableName);
pRanges = (PCollection<PartitionRange>) pipelineTracker.applySpannerWait(pRanges);
pRanges = pRanges.apply(reshuffleOpsStepName, Reshuffle.viaRandomKey());
// https://cloud.google.com/spanner/docs/samples/spanner-dataflow-readall
PCollection<ReadOperation> readOps = pRanges
.apply(readOpsStepName,
MapElements.into(TypeDescriptor.of(ReadOperation.class))
.via(
(SerializableFunction<PartitionRange, ReadOperation>)
input -> {
Statement statement;
switch(rangeFieldType) {
case TableSpec.UUID_FIELD_TYPE:
case TableSpec.TIMESTAMP_FIELD_TYPE:
case TableSpec.STRING_FIELD_TYPE:
statement =
Statement.newBuilder(query)
.bind("p1")
.to(input.getStartRange())
.bind("p2")
.to(input.getEndRange())
.build();
break;
case TableSpec.INT_FIELD_TYPE:
statement =
Statement.newBuilder(query)
.bind("p1")
.to(Integer.parseInt(input.getStartRange()))
.bind("p2")
.to(Integer.parseInt(input.getEndRange()))
.build();
break;
case TableSpec.LONG_FIELD_TYPE:
statement =
Statement.newBuilder(query)
.bind("p1")
.to(Long.parseLong(input.getStartRange()))
.bind("p2")
.to(Long.parseLong(input.getEndRange()))
.build();
break;
default:
throw new RuntimeException(String.format("Unexpected range field type: %s",
rangeFieldType));
}
ReadOperation readOperation =
ReadOperation.create().withQuery(statement);
return readOperation;
}));
String spannerProjectId = options.getSpannerProjectId().isEmpty() ?
options.getProjectId() : options.getSpannerProjectId();
String spannerReadStepName = String.format("SpannerReadAllForTable-%s", tableName);
ReadAll spannerRead = SpannerIO.readAll();
if(options.getReadFromSpannerWithHighPriorityCPU()) {
spannerRead = spannerRead.withHighPriority();
}
PCollection<Struct> spannerRecords =
readOps.apply(spannerReadStepName, spannerRead
.withProjectId(spannerProjectId)
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getSpannerDatabaseId()));
String convertToHashResultStepName =
String.format("ConvertToHashResultForTable-%s", tableName);
PCollection<HashResult> spannerHashes = spannerRecords.apply(convertToHashResultStepName,
MapElements.into(TypeDescriptor.of(HashResult.class))
.via(
(SerializableFunction<? super Struct, HashResult>)
input -> HashResult.fromSpannerStruct(input,
keyIndex,
rangeFieldType,
adjustTimestampPrecision,
timestampThresholdIndex)
));
return spannerHashes;
}