in flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java [151:361]
public StreamOperatorStateContext streamOperatorStateContext(
@Nonnull OperatorID operatorID,
@Nonnull String operatorClassName,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup,
double managedMemoryFraction,
boolean isUsingCustomRawKeyedState,
boolean isAsyncState)
throws Exception {
TaskInfo taskInfo = environment.getTaskInfo();
registerRestoredStateToFileMergingManager(environment.getJobID(), taskInfo, operatorID);
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;
AsyncKeyedStateBackend asyncKeyedStateBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager = null;
InternalTimeServiceManager<?> asyncTimeServiceManager = null;
final StateObject.StateObjectSizeStatsCollector statsCollector =
StateObject.StateObjectSizeStatsCollector.create();
try {
// -------------- Keyed State Backend --------------
if (isAsyncState) {
if (stateBackend.supportsAsyncKeyedStateBackend()) {
asyncKeyedStateBackend =
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
statsCollector,
StateBackend::createAsyncKeyedStateBackend);
} else {
asyncKeyedStateBackend =
new AsyncKeyedStateBackendAdaptor<>(
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
statsCollector,
StateBackend::createKeyedStateBackend));
}
} else {
keyedStatedBackend =
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
statsCollector,
StateBackend::createKeyedStateBackend);
}
// -------------- Operator State Backend --------------
operatorStateBackend =
operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
statsCollector);
// -------------- Raw State Streams --------------
rawKeyedStateInputs =
rawKeyedStateInputs(
prioritizedOperatorSubtaskStates
.getPrioritizedRawKeyedState()
.iterator(),
statsCollector);
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs =
rawOperatorStateInputs(
prioritizedOperatorSubtaskStates
.getPrioritizedRawOperatorState()
.iterator(),
statsCollector);
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
// if the operator indicates that it is using custom raw keyed state,
// then whatever was written in the raw keyed state snapshot was NOT written
// by the internal timer services (because there is only ever one user of raw keyed
// state);
// in this case, timers should not attempt to restore timers from the raw keyed
// state.
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
(prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState)
? rawKeyedStateInputs
: Collections.emptyList();
if (isAsyncState) {
if (asyncKeyedStateBackend != null) {
asyncTimeServiceManager =
timeServiceManagerProvider.create(
environment.getMetricGroup().getIOMetricGroup(),
asyncKeyedStateBackend,
asyncKeyedStateBackend.getKeyGroupRange(),
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
restoredRawKeyedStateTimers,
cancellationContext);
}
} else {
if (keyedStatedBackend != null) {
timeServiceManager =
timeServiceManagerProvider.create(
environment.getMetricGroup().getIOMetricGroup(),
keyedStatedBackend,
keyedStatedBackend.getKeyGroupRange(),
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
restoredRawKeyedStateTimers,
cancellationContext);
}
}
// Add stats for input channel and result partition state
Stream.concat(
prioritizedOperatorSubtaskStates
.getPrioritizedInputChannelState()
.stream(),
prioritizedOperatorSubtaskStates
.getPrioritizedResultSubpartitionState()
.stream())
.filter(Objects::nonNull)
.forEach(channelHandle -> channelHandle.collectSizeStats(statsCollector));
// Report collected stats to metrics
statsCollector
.getStats()
.forEach(
(location, metricValue) ->
initializationMetrics.addDurationMetric(
MetricNames.RESTORED_STATE_SIZE + "." + location,
metricValue));
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),
operatorStateBackend,
keySerializer,
keyedStatedBackend,
asyncKeyedStateBackend,
timeServiceManager,
asyncTimeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
if (keyedStatedBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}
// release resource (e.g native resource)
keyedStatedBackend.dispose();
}
if (asyncKeyedStateBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(asyncKeyedStateBackend)) {
IOUtils.closeQuietly(asyncKeyedStateBackend);
}
// release resource (e.g native resource)
asyncKeyedStateBackend.dispose();
}
if (operatorStateBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
IOUtils.closeQuietly(operatorStateBackend);
}
operatorStateBackend.dispose();
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
IOUtils.closeQuietly(rawKeyedStateInputs);
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}
throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
}
}