in flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java [483:529]
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
streamConfig.getStateKeySerializer(containingTask.getUserCodeClassLoader());
StreamOperatorStateContext streamOperatorStateContext =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
parameters.getProcessingTimeService(),
this,
keySerializer,
containingTask.getCancelables(),
metrics,
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
containingTask
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration(),
containingTask.getUserCodeClassLoader()),
false);
stateHandler =
new StreamOperatorStateHandler(
streamOperatorStateContext,
containingTask.getExecutionConfig(),
containingTask.getCancelables());
stateHandler.initializeOperatorState(this);
timeServiceManager = streamOperatorStateContext.internalTimerServiceManager();
wrappedOperator.initializeState(
(operatorID,
operatorClassName,
processingTimeService,
keyContext,
keySerializerX,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
isUsingCustomRawKeyedState) ->
new ProxyStreamOperatorStateContext(
streamOperatorStateContext,
"wrapped-",
CloseableIterator.empty(),
0));
}