in spanner-data-validator-java/src/main/java/com/google/migration/dofns/CountMatchesDoFn.java [68:122]
public void processElement(ProcessContext c, MultiOutputReceiver out) {
KV<String, CoGbkResult> e = c.element();
Iterable<HashResult> jdbcRecords = e.getValue().getAll(jdbcTag);
Iterable<HashResult> spannerRecords = e.getValue().getAll(spannerTag);
HashResult jdbcRecord = null, spannerRecord = null;
if (jdbcRecords.iterator().hasNext()) {
jdbcRecord = jdbcRecords.iterator().next();
}
if (spannerRecords.iterator().hasNext()) {
spannerRecord = spannerRecords.iterator().next();
}
boolean emitOutput = true;
if (spannerRecord != null && jdbcRecord != null) {
if(timestampThresholdInEffect) {
emitOutput = spannerRecord.timestampThresholdValue >= timestampFilterStart &&
spannerRecord.timestampThresholdValue <= timestampFilterEnd;
// Helpers.printTimestampThresholdInfo(spannerRecord.timestampThresholdValue,
// timestampFilterStart,
// timestampFilterEnd);
}
if(emitOutput) {
out.get(matchedRecordsTag).output(KV.of(spannerRecord.range, 1L));
out.get(sourceRecordsTag).output(KV.of(spannerRecord.range, 1L));
out.get(targetRecordsTag).output(KV.of(spannerRecord.range, 1L));
}
} else if(spannerRecord != null) {
if(timestampThresholdInEffect) {
emitOutput = spannerRecord.timestampThresholdValue >= timestampFilterStart &&
spannerRecord.timestampThresholdValue <= timestampFilterEnd;
}
if(emitOutput) {
out.get(unmatchedSpannerRecordsTag).output(KV.of(spannerRecord.range, 1L));
out.get(targetRecordsTag).output(KV.of(spannerRecord.range, 1L));
out.get(unmatchedSpannerRecordValuesTag).output(spannerRecord);
}
} else if(jdbcRecord != null) {
if(timestampThresholdInEffect) {
emitOutput = jdbcRecord.timestampThresholdValue >= timestampFilterStart &&
jdbcRecord.timestampThresholdValue <= timestampFilterEnd;
}
if(emitOutput) {
out.get(unmatchedJDBCRecordsTag).output(KV.of(jdbcRecord.range, 1L));
out.get(sourceRecordsTag).output(KV.of(jdbcRecord.range, 1L));
out.get(unmatchedJDBCRecordValuesTag).output(jdbcRecord);
}
} // if/else
}