public void processElement()

in spanner-data-validator-java/src/main/java/com/google/migration/dofns/CountMatchesDoFn.java [68:122]


  public void processElement(ProcessContext c, MultiOutputReceiver out) {
    KV<String, CoGbkResult> e = c.element();
    Iterable<HashResult> jdbcRecords = e.getValue().getAll(jdbcTag);
    Iterable<HashResult> spannerRecords = e.getValue().getAll(spannerTag);

    HashResult jdbcRecord = null, spannerRecord = null;
    if (jdbcRecords.iterator().hasNext()) {
      jdbcRecord = jdbcRecords.iterator().next();
    }

    if (spannerRecords.iterator().hasNext()) {
      spannerRecord = spannerRecords.iterator().next();
    }

    boolean emitOutput = true;

    if (spannerRecord != null && jdbcRecord != null) {
      if(timestampThresholdInEffect) {
        emitOutput = spannerRecord.timestampThresholdValue >= timestampFilterStart &&
            spannerRecord.timestampThresholdValue <= timestampFilterEnd;

        // Helpers.printTimestampThresholdInfo(spannerRecord.timestampThresholdValue,
        //     timestampFilterStart,
        //     timestampFilterEnd);
      }

      if(emitOutput) {
        out.get(matchedRecordsTag).output(KV.of(spannerRecord.range, 1L));
        out.get(sourceRecordsTag).output(KV.of(spannerRecord.range, 1L));
        out.get(targetRecordsTag).output(KV.of(spannerRecord.range, 1L));
      }
    } else if(spannerRecord != null) {
      if(timestampThresholdInEffect) {
        emitOutput = spannerRecord.timestampThresholdValue >= timestampFilterStart &&
            spannerRecord.timestampThresholdValue <= timestampFilterEnd;
      }

      if(emitOutput) {
        out.get(unmatchedSpannerRecordsTag).output(KV.of(spannerRecord.range, 1L));
        out.get(targetRecordsTag).output(KV.of(spannerRecord.range, 1L));
        out.get(unmatchedSpannerRecordValuesTag).output(spannerRecord);
      }
    } else if(jdbcRecord != null) {
      if(timestampThresholdInEffect) {
        emitOutput = jdbcRecord.timestampThresholdValue >= timestampFilterStart &&
            jdbcRecord.timestampThresholdValue <= timestampFilterEnd;
      }

      if(emitOutput) {
        out.get(unmatchedJDBCRecordsTag).output(KV.of(jdbcRecord.range, 1L));
        out.get(sourceRecordsTag).output(KV.of(jdbcRecord.range, 1L));
        out.get(unmatchedJDBCRecordValuesTag).output(jdbcRecord);
      }
    } // if/else
  }