in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/Iterations.java [331:385]
private static DataStreamList addCriteriaStream(
DataStream<?> draftCriteriaStream,
IterationID iterationId,
StreamExecutionEnvironment env,
DraftExecutionEnvironment draftEnv,
DataStreamList initVariableStreams,
DataStreamList headStreams,
int totalInitVariableParallelism) {
// Deals with the criteria streams
DataStream<?> terminationCriteria = draftEnv.getActualStream(draftCriteriaStream.getId());
// It should always has the IterationRecordTypeInfo
checkState(
terminationCriteria.getType().getClass().equals(IterationRecordTypeInfo.class),
"The termination criteria should always return IterationRecord.");
TypeInformation<?> innerType =
((IterationRecordTypeInfo<?>) terminationCriteria.getType()).getInnerTypeInfo();
DataStream<?> emptyCriteriaSource =
env.addSource(new DraftExecutionEnvironment.EmptySource())
.returns(innerType)
.name(terminationCriteria.getTransformation().getName())
.setParallelism(terminationCriteria.getParallelism());
DataStreamList criteriaSources = DataStreamList.of(emptyCriteriaSource);
DataStreamList criteriaInputs = addInputs(criteriaSources);
DataStreamList criteriaHeaders =
addHeads(
criteriaSources,
criteriaInputs,
iterationId,
totalInitVariableParallelism,
true,
initVariableStreams.size());
// Merges the head and the actual criteria stream. This is required since if we have
// no edges from the criteria head to the criteria tail, the tail might directly received
// the MAX_EPOCH_WATERMARK without the synchronization of the head.
DataStream<?> mergedHeadAndCriteria =
mergeCriteriaHeadAndCriteriaStream(
env, criteriaHeaders.get(0), terminationCriteria, innerType);
DataStreamList criteriaTails =
addTails(
DataStreamList.of(mergedHeadAndCriteria),
iterationId,
initVariableStreams.size());
String coLocationGroupKey = "co-" + iterationId.toHexString() + "-cri";
criteriaHeaders.get(0).getTransformation().setCoLocationGroupKey(coLocationGroupKey);
criteriaTails.get(0).getTransformation().setCoLocationGroupKey(coLocationGroupKey);
// Now we notify all the head operators to count the criteria streams.
setCriteriaParallelism(headStreams, terminationCriteria.getParallelism());
setCriteriaParallelism(criteriaHeaders, terminationCriteria.getParallelism());
return criteriaTails;
}