in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java [98:165]
public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
KeySelector keySelector = config.getStatePartitioner(i, cl);
if (keySelector != null) {
checkState(
keySelector instanceof ProxyKeySelector,
"The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
+ keySelector);
wrappedConfig.setStatePartitioner(
i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
}
}
StreamConfig.InputConfig[] inputs = config.getInputs(cl);
for (int i = 0; i < inputs.length; ++i) {
if (inputs[i] instanceof NetworkInputConfig) {
TypeSerializer<?> typeSerializerIn =
((NetworkInputConfig) inputs[i]).getTypeSerializer();
checkState(
typeSerializerIn instanceof IterationRecordSerializer,
"The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
i,
typeSerializerIn);
inputs[i] =
new NetworkInputConfig(
((IterationRecordSerializer<?>) typeSerializerIn)
.getInnerSerializer(),
i);
}
}
wrappedConfig.setInputs(inputs);
TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
checkState(
typeSerializerOut instanceof IterationRecordSerializer,
"The serializer of output should be IterationRecordSerializer but it is %s.",
typeSerializerOut);
wrappedConfig.setTypeSerializerOut(
((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
Stream.concat(
config.getChainedOutputs(cl).stream(),
config.getNonChainedOutputs(cl).stream())
.forEach(
edge -> {
OutputTag<?> outputTag = edge.getOutputTag();
if (outputTag != null) {
TypeSerializer<?> typeSerializerSideOut =
config.getTypeSerializerSideOut(outputTag, cl);
checkState(
typeSerializerSideOut instanceof IterationRecordSerializer,
"The serializer of side output with tag[%s] should be IterationRecordSerializer but it is %s.",
outputTag,
typeSerializerSideOut);
wrappedConfig.setTypeSerializerSideOut(
new OutputTag<>(
outputTag.getId(),
((IterationRecordTypeInfo<?>)
outputTag.getTypeInfo())
.getInnerTypeInfo()),
((IterationRecordSerializer) typeSerializerSideOut)
.getInnerSerializer());
}
});
return wrappedConfig;
}