v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/transforms/ProcessDml.java [88:112]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void processElement(
        ProcessContext context, @StateId(PK_STATE_ID) ValueState<String> myState) {
      String stateKey = context.element().getKey();
      DmlInfo dmlInfo = context.element().getValue();
      // Empty SQL suggests the table DNE and should be skipped
      if (dmlInfo.getDmlSql().equals("")) {
        return;
      }

      // TODO(dhercher): More complex compare w/o String.join
      String lastSortKey = myState.read();
      String currentSortKey = dmlInfo.getOrderByValueString();

      // If there is no PK then state can be skipped
      if (dmlInfo.getAllPkFields().size() == 0) {
        String numThreads = Integer.toString(stateKey.hashCode() % NUM_THREADS);
        context.output(KV.of(numThreads, dmlInfo));
      } else if (lastSortKey == null || currentSortKey.compareTo(lastSortKey) > 0) {
        String numThreads = Integer.toString(stateKey.hashCode() % NUM_THREADS);
        myState.write(currentSortKey);
        context.output(KV.of(numThreads, dmlInfo));

        distribution.update(0);
      }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



v2/datastream-to-postgres/src/main/java/com/google/cloud/teleport/v2/transforms/ProcessDml.java [90:114]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void processElement(
        ProcessContext context, @StateId(PK_STATE_ID) ValueState<String> myState) {
      String stateKey = context.element().getKey();
      DmlInfo dmlInfo = context.element().getValue();
      // Empty SQL suggests the table DNE and should be skipped
      if (dmlInfo.getDmlSql().equals("")) {
        return;
      }

      // TODO(dhercher): More complex compare w/o String.join
      String lastSortKey = myState.read();
      String currentSortKey = dmlInfo.getOrderByValueString();

      // If there is no PK then state can be skipped
      if (dmlInfo.getAllPkFields().size() == 0) {
        String numThreads = Integer.toString(stateKey.hashCode() % NUM_THREADS);
        context.output(KV.of(numThreads, dmlInfo));
      } else if (lastSortKey == null || currentSortKey.compareTo(lastSortKey) > 0) {
        String numThreads = Integer.toString(stateKey.hashCode() % NUM_THREADS);
        myState.write(currentSortKey);
        context.output(KV.of(numThreads, dmlInfo));

        distribution.update(0);
      }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



