public static StreamMultipleInputProcessor create()

in flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java [72:243]


    public static StreamMultipleInputProcessor create(
            TaskInvokable ownerTask,
            CheckpointedInputGate[] checkpointedInputGates,
            StreamConfig.InputConfig[] configuredInputs,
            IOManager ioManager,
            MemoryManager memoryManager,
            TaskIOMetricGroup ioMetricGroup,
            Counter mainOperatorRecordsIn,
            MultipleInputStreamOperator<?> mainOperator,
            WatermarkGauge[] inputWatermarkGauges,
            StreamConfig streamConfig,
            Configuration taskManagerConfig,
            Configuration jobConfig,
            ExecutionConfig executionConfig,
            ClassLoader userClassloader,
            OperatorChain<?, ?> operatorChain,
            InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
            Function<Integer, StreamPartitioner<?>> gatePartitioners,
            TaskInfo taskInfo,
            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
        checkNotNull(operatorChain);

        List<Input> operatorInputs = mainOperator.getInputs();
        int inputsCount = operatorInputs.size();

        StreamOneInputProcessor<?>[] inputProcessors = new StreamOneInputProcessor[inputsCount];
        Counter networkRecordsIn = new SimpleCounter();
        ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);

        checkState(
                configuredInputs.length == inputsCount,
                "Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]",
                configuredInputs.length,
                inputsCount);
        StreamTaskInput[] inputs = new StreamTaskInput[inputsCount];
        for (int i = 0; i < inputsCount; i++) {
            StreamConfig.InputConfig configuredInput = configuredInputs[i];
            if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
                StreamConfig.NetworkInputConfig networkInput =
                        (StreamConfig.NetworkInputConfig) configuredInput;
                inputs[i] =
                        StreamTaskNetworkInputFactory.create(
                                checkpointedInputGates[networkInput.getInputGateIndex()],
                                networkInput.getTypeSerializer(),
                                ioManager,
                                new StatusWatermarkValve(
                                        checkpointedInputGates[networkInput.getInputGateIndex()]),
                                i,
                                inflightDataRescalingDescriptor,
                                gatePartitioners,
                                taskInfo,
                                canEmitBatchOfRecords,
                                streamConfig.getWatermarkDeclarations(userClassloader));
            } else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInput =
                        (StreamConfig.SourceInputConfig) configuredInput;
                inputs[i] = operatorChain.getSourceTaskInput(sourceInput);
            } else {
                throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
            }
        }

        InputSelectable inputSelectable =
                mainOperator instanceof InputSelectable ? (InputSelectable) mainOperator : null;

        StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
        boolean anyRequiresSorting =
                Arrays.stream(inputConfigs).anyMatch(StreamConfig::requiresSorting);

        if (anyRequiresSorting) {

            if (inputSelectable != null) {
                throw new IllegalStateException(
                        "The InputSelectable interface is not supported with sorting inputs");
            }

            StreamTaskInput[] sortingInputs =
                    IntStream.range(0, inputsCount)
                            .filter(idx -> requiresSorting(inputConfigs[idx]))
                            .mapToObj(idx -> inputs[idx])
                            .toArray(StreamTaskInput[]::new);
            KeySelector[] sortingInputKeySelectors =
                    IntStream.range(0, inputsCount)
                            .filter(idx -> requiresSorting(inputConfigs[idx]))
                            .mapToObj(idx -> streamConfig.getStatePartitioner(idx, userClassloader))
                            .toArray(KeySelector[]::new);
            TypeSerializer[] sortingInputKeySerializers =
                    IntStream.range(0, inputsCount)
                            .filter(idx -> requiresSorting(inputConfigs[idx]))
                            .mapToObj(idx -> streamConfig.getTypeSerializerIn(idx, userClassloader))
                            .toArray(TypeSerializer[]::new);

            StreamTaskInput[] passThroughInputs =
                    IntStream.range(0, inputsCount)
                            .filter(idx -> !requiresSorting(inputConfigs[idx]))
                            .mapToObj(idx -> inputs[idx])
                            .toArray(StreamTaskInput[]::new);

            SelectableSortingInputs selectableSortingInputs =
                    MultiInputSortingDataInput.wrapInputs(
                            ownerTask,
                            sortingInputs,
                            sortingInputKeySelectors,
                            sortingInputKeySerializers,
                            streamConfig.getStateKeySerializer(userClassloader),
                            passThroughInputs,
                            memoryManager,
                            ioManager,
                            executionConfig.isObjectReuseEnabled(),
                            streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                    ManagedMemoryUseCase.OPERATOR,
                                    jobConfig,
                                    taskManagerConfig,
                                    userClassloader),
                            taskManagerConfig,
                            executionConfig);

            StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
            StreamTaskInput<?>[] passedThroughInputs =
                    selectableSortingInputs.getPassThroughInputs();
            int sortedIndex = 0;
            int passThroughIndex = 0;
            for (int i = 0; i < inputs.length; i++) {
                if (requiresSorting(inputConfigs[i])) {
                    inputs[i] = sortedInputs[sortedIndex];
                    sortedIndex++;
                } else {
                    inputs[i] = passedThroughInputs[passThroughIndex];
                    passThroughIndex++;
                }
            }
            inputSelectable = selectableSortingInputs.getInputSelectable();
        }

        for (int i = 0; i < inputsCount; i++) {
            StreamConfig.InputConfig configuredInput = configuredInputs[i];
            if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
                StreamTaskNetworkOutput dataOutput =
                        new StreamTaskNetworkOutput<>(
                                operatorChain.getFinishedOnRestoreInputOrDefault(
                                        operatorInputs.get(i)),
                                inputWatermarkGauges[i],
                                mainOperatorRecordsIn,
                                networkRecordsIn);

                inputProcessors[i] =
                        new StreamOneInputProcessor(inputs[i], dataOutput, operatorChain);
            } else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInput =
                        (StreamConfig.SourceInputConfig) configuredInput;
                OperatorChain.ChainedSource chainedSource =
                        operatorChain.getChainedSource(sourceInput);

                inputProcessors[i] =
                        new StreamOneInputProcessor(
                                inputs[i],
                                new StreamTaskSourceOutput(
                                        chainedSource.getSourceOutput(),
                                        inputWatermarkGauges[i],
                                        chainedSource
                                                .getSourceTaskInput()
                                                .getOperator()
                                                .getSourceMetricGroup()),
                                operatorChain);
            } else {
                throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
            }
        }

        return new StreamMultipleInputProcessor(
                new MultipleInputSelectionHandler(inputSelectable, inputsCount), inputProcessors);
    }