public void setup()

in flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java [86:164]


    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader();
        MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
        IOManager ioManager = containingTask.getEnvironment().getIOManager();

        keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
        keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
        keySerializer = config.getStateKeySerializer(userCodeClassLoader);
        int keyLength = keySerializer.getLength();

        TypeSerializer<IN1> typeSerializerA = config.getTypeSerializerIn(0, userCodeClassLoader);
        TypeSerializer<IN2> typeSerializerB = config.getTypeSerializerIn(1, userCodeClassLoader);
        keyAndValueSerializerA = new KeyAndValueSerializer<>(typeSerializerA, keyLength);
        keyAndValueSerializerB = new KeyAndValueSerializer<>(typeSerializerB, keyLength);

        if (keyLength > 0) {
            dataOutputSerializer = new DataOutputSerializer(keyLength);
            comparatorA = new FixedLengthByteKeyComparator<>(keyLength);
            comparatorB = new FixedLengthByteKeyComparator<>(keyLength);
        } else {
            dataOutputSerializer = new DataOutputSerializer(64);
            comparatorA = new VariableLengthByteKeyComparator<>();
            comparatorB = new VariableLengthByteKeyComparator<>();
        }

        ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig();
        double managedMemoryFraction =
                config.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                ManagedMemoryUseCase.OPERATOR,
                                containingTask.getEnvironment().getTaskConfiguration(),
                                userCodeClassLoader)
                        / 2;
        Configuration jobConfiguration = containingTask.getEnvironment().getJobConfiguration();

        try {
            sorterA =
                    ExternalSorter.newBuilder(
                                    memoryManager,
                                    containingTask,
                                    keyAndValueSerializerA,
                                    comparatorA,
                                    executionConfig)
                            .memoryFraction(managedMemoryFraction)
                            .enableSpilling(
                                    ioManager,
                                    jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
                            .maxNumFileHandles(
                                    jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
                            .objectReuse(executionConfig.isObjectReuseEnabled())
                            .largeRecords(
                                    jobConfiguration.get(
                                            AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
                            .build();
            sorterB =
                    ExternalSorter.newBuilder(
                                    memoryManager,
                                    containingTask,
                                    keyAndValueSerializerB,
                                    comparatorB,
                                    executionConfig)
                            .memoryFraction(managedMemoryFraction)
                            .enableSpilling(
                                    ioManager,
                                    jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
                            .maxNumFileHandles(
                                    jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
                            .objectReuse(executionConfig.isObjectReuseEnabled())
                            .largeRecords(
                                    jobConfiguration.get(
                                            AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
                            .build();
        } catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }