public static DataStream withBroadcastStream()

in flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java [65:117]


    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) {
        Preconditions.checkArgument(inputList.size() > 0);

        StreamExecutionEnvironment env = inputList.get(0).getExecutionEnvironment();
        String[] broadcastNames = new String[bcStreams.size()];
        DataStream<?>[] broadcastInputs = new DataStream[bcStreams.size()];
        TypeInformation<?>[] broadcastInTypes = new TypeInformation[bcStreams.size()];
        int idx = 0;
        final String broadcastId = new AbstractID().toHexString();
        for (String name : bcStreams.keySet()) {
            broadcastNames[idx] = broadcastId + "-" + name;
            broadcastInputs[idx] = bcStreams.get(name);
            broadcastInTypes[idx] = broadcastInputs[idx].getType();
            idx++;
        }

        DataStream<OUT> resultStream =
                getResultStream(env, inputList, broadcastNames, userDefinedFunction);
        TypeInformation<OUT> outType = resultStream.getType();
        final String coLocationKey = "broadcast-co-location-" + UUID.randomUUID();
        DataStream<OUT> cachedBroadcastInputs =
                cacheBroadcastVariables(
                        env,
                        broadcastNames,
                        broadcastInputs,
                        broadcastInTypes,
                        resultStream.getParallelism(),
                        outType);

        boolean canCoLocate =
                cachedBroadcastInputs.getTransformation() instanceof PhysicalTransformation
                        && resultStream.getTransformation() instanceof PhysicalTransformation;
        if (canCoLocate) {
            ((PhysicalTransformation<?>) cachedBroadcastInputs.getTransformation())
                    .setChainingStrategy(ChainingStrategy.HEAD);
            ((PhysicalTransformation<?>) resultStream.getTransformation())
                    .setChainingStrategy(ChainingStrategy.HEAD);
        } else {
            throw new UnsupportedOperationException(
                    "cannot set chaining strategy on "
                            + cachedBroadcastInputs.getTransformation()
                            + " and "
                            + resultStream.getTransformation()
                            + ".");
        }
        cachedBroadcastInputs.getTransformation().setCoLocationGroupKey(coLocationKey);
        resultStream.getTransformation().setCoLocationGroupKey(coLocationKey);

        return cachedBroadcastInputs.union(resultStream);
    }