in flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java [167:234]
public void endInput(int inputId) throws Exception {
if (inputId == 1) {
sorterA.finishReading();
remainingInputNum--;
} else if (inputId == 2) {
sorterB.finishReading();
remainingInputNum--;
} else {
throw new RuntimeException("Unknown inputId " + inputId);
}
if (remainingInputNum > 0) {
return;
}
MutableObjectIterator<Tuple2<byte[], StreamRecord<IN1>>> iteratorA = sorterA.getIterator();
MutableObjectIterator<Tuple2<byte[], StreamRecord<IN2>>> iteratorB = sorterB.getIterator();
TypePairComparator<Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>
pairComparator =
(new RuntimePairComparatorFactory<
Tuple2<byte[], StreamRecord<IN1>>,
Tuple2<byte[], StreamRecord<IN2>>>())
.createComparator12(comparatorA, comparatorB);
CoGroupTaskIterator<Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>
coGroupIterator;
if (getExecutionConfig().isObjectReuseEnabled()) {
coGroupIterator =
new ReusingSortMergeCoGroupIterator<
Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>(
iteratorA,
iteratorB,
keyAndValueSerializerA,
comparatorA,
keyAndValueSerializerB,
comparatorB,
pairComparator);
} else {
coGroupIterator =
new NonReusingSortMergeCoGroupIterator<
Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>(
iteratorA,
iteratorB,
keyAndValueSerializerA,
comparatorA,
keyAndValueSerializerB,
comparatorB,
pairComparator);
}
coGroupIterator.open();
TupleUnwrappingIterator<IN1, byte[]> unWrappediteratorA = new TupleUnwrappingIterator<>();
TupleUnwrappingIterator<IN2, byte[]> unWrappediteratorB = new TupleUnwrappingIterator<>();
Output<OUT> timestampedCollector = new TimestampedCollector<>(output);
while (coGroupIterator.next()) {
unWrappediteratorA.set(coGroupIterator.getValues1().iterator());
unWrappediteratorB.set(coGroupIterator.getValues2().iterator());
userFunction.coGroup(unWrappediteratorA, unWrappediteratorB, timestampedCollector);
}
coGroupIterator.close();
Watermark watermark = new Watermark(lastWatermarkTimestamp);
if (getTimeServiceManager().isPresent()) {
getTimeServiceManager().get().advanceWatermark(watermark);
}
output.emitWatermark(watermark);
}