in flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java [65:117]
public static <OUT> DataStream<OUT> withBroadcastStream(
List<DataStream<?>> inputList,
Map<String, DataStream<?>> bcStreams,
Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) {
Preconditions.checkArgument(inputList.size() > 0);
StreamExecutionEnvironment env = inputList.get(0).getExecutionEnvironment();
String[] broadcastNames = new String[bcStreams.size()];
DataStream<?>[] broadcastInputs = new DataStream[bcStreams.size()];
TypeInformation<?>[] broadcastInTypes = new TypeInformation[bcStreams.size()];
int idx = 0;
final String broadcastId = new AbstractID().toHexString();
for (String name : bcStreams.keySet()) {
broadcastNames[idx] = broadcastId + "-" + name;
broadcastInputs[idx] = bcStreams.get(name);
broadcastInTypes[idx] = broadcastInputs[idx].getType();
idx++;
}
DataStream<OUT> resultStream =
getResultStream(env, inputList, broadcastNames, userDefinedFunction);
TypeInformation<OUT> outType = resultStream.getType();
final String coLocationKey = "broadcast-co-location-" + UUID.randomUUID();
DataStream<OUT> cachedBroadcastInputs =
cacheBroadcastVariables(
env,
broadcastNames,
broadcastInputs,
broadcastInTypes,
resultStream.getParallelism(),
outType);
boolean canCoLocate =
cachedBroadcastInputs.getTransformation() instanceof PhysicalTransformation
&& resultStream.getTransformation() instanceof PhysicalTransformation;
if (canCoLocate) {
((PhysicalTransformation<?>) cachedBroadcastInputs.getTransformation())
.setChainingStrategy(ChainingStrategy.HEAD);
((PhysicalTransformation<?>) resultStream.getTransformation())
.setChainingStrategy(ChainingStrategy.HEAD);
} else {
throw new UnsupportedOperationException(
"cannot set chaining strategy on "
+ cachedBroadcastInputs.getTransformation()
+ " and "
+ resultStream.getTransformation()
+ ".");
}
cachedBroadcastInputs.getTransformation().setCoLocationGroupKey(coLocationKey);
resultStream.getTransformation().setCoLocationGroupKey(coLocationKey);
return cachedBroadcastInputs.union(resultStream);
}