in flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java [86:164]
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader();
MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
IOManager ioManager = containingTask.getEnvironment().getIOManager();
keySelectorA = config.getStatePartitioner(0, userCodeClassLoader);
keySelectorB = config.getStatePartitioner(1, userCodeClassLoader);
keySerializer = config.getStateKeySerializer(userCodeClassLoader);
int keyLength = keySerializer.getLength();
TypeSerializer<IN1> typeSerializerA = config.getTypeSerializerIn(0, userCodeClassLoader);
TypeSerializer<IN2> typeSerializerB = config.getTypeSerializerIn(1, userCodeClassLoader);
keyAndValueSerializerA = new KeyAndValueSerializer<>(typeSerializerA, keyLength);
keyAndValueSerializerB = new KeyAndValueSerializer<>(typeSerializerB, keyLength);
if (keyLength > 0) {
dataOutputSerializer = new DataOutputSerializer(keyLength);
comparatorA = new FixedLengthByteKeyComparator<>(keyLength);
comparatorB = new FixedLengthByteKeyComparator<>(keyLength);
} else {
dataOutputSerializer = new DataOutputSerializer(64);
comparatorA = new VariableLengthByteKeyComparator<>();
comparatorB = new VariableLengthByteKeyComparator<>();
}
ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig();
double managedMemoryFraction =
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
containingTask.getEnvironment().getTaskConfiguration(),
userCodeClassLoader)
/ 2;
Configuration jobConfiguration = containingTask.getEnvironment().getJobConfiguration();
try {
sorterA =
ExternalSorter.newBuilder(
memoryManager,
containingTask,
keyAndValueSerializerA,
comparatorA,
executionConfig)
.memoryFraction(managedMemoryFraction)
.enableSpilling(
ioManager,
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
.maxNumFileHandles(
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
.objectReuse(executionConfig.isObjectReuseEnabled())
.largeRecords(
jobConfiguration.get(
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
.build();
sorterB =
ExternalSorter.newBuilder(
memoryManager,
containingTask,
keyAndValueSerializerB,
comparatorB,
executionConfig)
.memoryFraction(managedMemoryFraction)
.enableSpilling(
ioManager,
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
.maxNumFileHandles(
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
.objectReuse(executionConfig.isObjectReuseEnabled())
.largeRecords(
jobConfiguration.get(
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
.build();
} catch (MemoryAllocationException e) {
throw new RuntimeException(e);
}
}