in flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java [72:243]
public static StreamMultipleInputProcessor create(
TaskInvokable ownerTask,
CheckpointedInputGate[] checkpointedInputGates,
StreamConfig.InputConfig[] configuredInputs,
IOManager ioManager,
MemoryManager memoryManager,
TaskIOMetricGroup ioMetricGroup,
Counter mainOperatorRecordsIn,
MultipleInputStreamOperator<?> mainOperator,
WatermarkGauge[] inputWatermarkGauges,
StreamConfig streamConfig,
Configuration taskManagerConfig,
Configuration jobConfig,
ExecutionConfig executionConfig,
ClassLoader userClassloader,
OperatorChain<?, ?> operatorChain,
InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
Function<Integer, StreamPartitioner<?>> gatePartitioners,
TaskInfo taskInfo,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
checkNotNull(operatorChain);
List<Input> operatorInputs = mainOperator.getInputs();
int inputsCount = operatorInputs.size();
StreamOneInputProcessor<?>[] inputProcessors = new StreamOneInputProcessor[inputsCount];
Counter networkRecordsIn = new SimpleCounter();
ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
checkState(
configuredInputs.length == inputsCount,
"Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]",
configuredInputs.length,
inputsCount);
StreamTaskInput[] inputs = new StreamTaskInput[inputsCount];
for (int i = 0; i < inputsCount; i++) {
StreamConfig.InputConfig configuredInput = configuredInputs[i];
if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
StreamConfig.NetworkInputConfig networkInput =
(StreamConfig.NetworkInputConfig) configuredInput;
inputs[i] =
StreamTaskNetworkInputFactory.create(
checkpointedInputGates[networkInput.getInputGateIndex()],
networkInput.getTypeSerializer(),
ioManager,
new StatusWatermarkValve(
checkpointedInputGates[networkInput.getInputGateIndex()]),
i,
inflightDataRescalingDescriptor,
gatePartitioners,
taskInfo,
canEmitBatchOfRecords,
streamConfig.getWatermarkDeclarations(userClassloader));
} else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
StreamConfig.SourceInputConfig sourceInput =
(StreamConfig.SourceInputConfig) configuredInput;
inputs[i] = operatorChain.getSourceTaskInput(sourceInput);
} else {
throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
}
}
InputSelectable inputSelectable =
mainOperator instanceof InputSelectable ? (InputSelectable) mainOperator : null;
StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
boolean anyRequiresSorting =
Arrays.stream(inputConfigs).anyMatch(StreamConfig::requiresSorting);
if (anyRequiresSorting) {
if (inputSelectable != null) {
throw new IllegalStateException(
"The InputSelectable interface is not supported with sorting inputs");
}
StreamTaskInput[] sortingInputs =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> inputs[idx])
.toArray(StreamTaskInput[]::new);
KeySelector[] sortingInputKeySelectors =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> streamConfig.getStatePartitioner(idx, userClassloader))
.toArray(KeySelector[]::new);
TypeSerializer[] sortingInputKeySerializers =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> streamConfig.getTypeSerializerIn(idx, userClassloader))
.toArray(TypeSerializer[]::new);
StreamTaskInput[] passThroughInputs =
IntStream.range(0, inputsCount)
.filter(idx -> !requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> inputs[idx])
.toArray(StreamTaskInput[]::new);
SelectableSortingInputs selectableSortingInputs =
MultiInputSortingDataInput.wrapInputs(
ownerTask,
sortingInputs,
sortingInputKeySelectors,
sortingInputKeySerializers,
streamConfig.getStateKeySerializer(userClassloader),
passThroughInputs,
memoryManager,
ioManager,
executionConfig.isObjectReuseEnabled(),
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
jobConfig,
taskManagerConfig,
userClassloader),
taskManagerConfig,
executionConfig);
StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
StreamTaskInput<?>[] passedThroughInputs =
selectableSortingInputs.getPassThroughInputs();
int sortedIndex = 0;
int passThroughIndex = 0;
for (int i = 0; i < inputs.length; i++) {
if (requiresSorting(inputConfigs[i])) {
inputs[i] = sortedInputs[sortedIndex];
sortedIndex++;
} else {
inputs[i] = passedThroughInputs[passThroughIndex];
passThroughIndex++;
}
}
inputSelectable = selectableSortingInputs.getInputSelectable();
}
for (int i = 0; i < inputsCount; i++) {
StreamConfig.InputConfig configuredInput = configuredInputs[i];
if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
StreamTaskNetworkOutput dataOutput =
new StreamTaskNetworkOutput<>(
operatorChain.getFinishedOnRestoreInputOrDefault(
operatorInputs.get(i)),
inputWatermarkGauges[i],
mainOperatorRecordsIn,
networkRecordsIn);
inputProcessors[i] =
new StreamOneInputProcessor(inputs[i], dataOutput, operatorChain);
} else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
StreamConfig.SourceInputConfig sourceInput =
(StreamConfig.SourceInputConfig) configuredInput;
OperatorChain.ChainedSource chainedSource =
operatorChain.getChainedSource(sourceInput);
inputProcessors[i] =
new StreamOneInputProcessor(
inputs[i],
new StreamTaskSourceOutput(
chainedSource.getSourceOutput(),
inputWatermarkGauges[i],
chainedSource
.getSourceTaskInput()
.getOperator()
.getSourceMetricGroup()),
operatorChain);
} else {
throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
}
}
return new StreamMultipleInputProcessor(
new MultipleInputSelectionHandler(inputSelectable, inputsCount), inputProcessors);
}