public StreamOperatorStateContext streamOperatorStateContext()

in flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java [151:361]


    public StreamOperatorStateContext streamOperatorStateContext(
            @Nonnull OperatorID operatorID,
            @Nonnull String operatorClassName,
            @Nonnull ProcessingTimeService processingTimeService,
            @Nonnull KeyContext keyContext,
            @Nullable TypeSerializer<?> keySerializer,
            @Nonnull CloseableRegistry streamTaskCloseableRegistry,
            @Nonnull MetricGroup metricGroup,
            double managedMemoryFraction,
            boolean isUsingCustomRawKeyedState,
            boolean isAsyncState)
            throws Exception {

        TaskInfo taskInfo = environment.getTaskInfo();
        registerRestoredStateToFileMergingManager(environment.getJobID(), taskInfo, operatorID);

        OperatorSubtaskDescriptionText operatorSubtaskDescription =
                new OperatorSubtaskDescriptionText(
                        operatorID,
                        operatorClassName,
                        taskInfo.getIndexOfThisSubtask(),
                        taskInfo.getNumberOfParallelSubtasks());

        final String operatorIdentifierText = operatorSubtaskDescription.toString();
        final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
                taskStateManager.prioritizedOperatorState(operatorID);

        CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;
        AsyncKeyedStateBackend asyncKeyedStateBackend = null;
        OperatorStateBackend operatorStateBackend = null;
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
        InternalTimeServiceManager<?> timeServiceManager = null;
        InternalTimeServiceManager<?> asyncTimeServiceManager = null;

        final StateObject.StateObjectSizeStatsCollector statsCollector =
                StateObject.StateObjectSizeStatsCollector.create();

        try {

            // -------------- Keyed State Backend --------------
            if (isAsyncState) {
                if (stateBackend.supportsAsyncKeyedStateBackend()) {
                    asyncKeyedStateBackend =
                            keyedStatedBackend(
                                    keySerializer,
                                    operatorIdentifierText,
                                    prioritizedOperatorSubtaskStates,
                                    streamTaskCloseableRegistry,
                                    metricGroup,
                                    managedMemoryFraction,
                                    statsCollector,
                                    StateBackend::createAsyncKeyedStateBackend);
                } else {
                    asyncKeyedStateBackend =
                            new AsyncKeyedStateBackendAdaptor<>(
                                    keyedStatedBackend(
                                            keySerializer,
                                            operatorIdentifierText,
                                            prioritizedOperatorSubtaskStates,
                                            streamTaskCloseableRegistry,
                                            metricGroup,
                                            managedMemoryFraction,
                                            statsCollector,
                                            StateBackend::createKeyedStateBackend));
                }
            } else {
                keyedStatedBackend =
                        keyedStatedBackend(
                                keySerializer,
                                operatorIdentifierText,
                                prioritizedOperatorSubtaskStates,
                                streamTaskCloseableRegistry,
                                metricGroup,
                                managedMemoryFraction,
                                statsCollector,
                                StateBackend::createKeyedStateBackend);
            }

            // -------------- Operator State Backend --------------
            operatorStateBackend =
                    operatorStateBackend(
                            operatorIdentifierText,
                            prioritizedOperatorSubtaskStates,
                            streamTaskCloseableRegistry,
                            statsCollector);

            // -------------- Raw State Streams --------------
            rawKeyedStateInputs =
                    rawKeyedStateInputs(
                            prioritizedOperatorSubtaskStates
                                    .getPrioritizedRawKeyedState()
                                    .iterator(),
                            statsCollector);
            streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

            rawOperatorStateInputs =
                    rawOperatorStateInputs(
                            prioritizedOperatorSubtaskStates
                                    .getPrioritizedRawOperatorState()
                                    .iterator(),
                            statsCollector);
            streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

            // -------------- Internal Timer Service Manager --------------
            // if the operator indicates that it is using custom raw keyed state,
            // then whatever was written in the raw keyed state snapshot was NOT written
            // by the internal timer services (because there is only ever one user of raw keyed
            // state);
            // in this case, timers should not attempt to restore timers from the raw keyed
            // state.
            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
                    (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState)
                            ? rawKeyedStateInputs
                            : Collections.emptyList();
            if (isAsyncState) {
                if (asyncKeyedStateBackend != null) {
                    asyncTimeServiceManager =
                            timeServiceManagerProvider.create(
                                    environment.getMetricGroup().getIOMetricGroup(),
                                    asyncKeyedStateBackend,
                                    asyncKeyedStateBackend.getKeyGroupRange(),
                                    environment.getUserCodeClassLoader().asClassLoader(),
                                    keyContext,
                                    processingTimeService,
                                    restoredRawKeyedStateTimers,
                                    cancellationContext);
                }
            } else {
                if (keyedStatedBackend != null) {
                    timeServiceManager =
                            timeServiceManagerProvider.create(
                                    environment.getMetricGroup().getIOMetricGroup(),
                                    keyedStatedBackend,
                                    keyedStatedBackend.getKeyGroupRange(),
                                    environment.getUserCodeClassLoader().asClassLoader(),
                                    keyContext,
                                    processingTimeService,
                                    restoredRawKeyedStateTimers,
                                    cancellationContext);
                }
            }
            // Add stats for input channel and result partition state
            Stream.concat(
                            prioritizedOperatorSubtaskStates
                                    .getPrioritizedInputChannelState()
                                    .stream(),
                            prioritizedOperatorSubtaskStates
                                    .getPrioritizedResultSubpartitionState()
                                    .stream())
                    .filter(Objects::nonNull)
                    .forEach(channelHandle -> channelHandle.collectSizeStats(statsCollector));

            // Report collected stats to metrics
            statsCollector
                    .getStats()
                    .forEach(
                            (location, metricValue) ->
                                    initializationMetrics.addDurationMetric(
                                            MetricNames.RESTORED_STATE_SIZE + "." + location,
                                            metricValue));

            // -------------- Preparing return value --------------

            return new StreamOperatorStateContextImpl(
                    prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),
                    operatorStateBackend,
                    keySerializer,
                    keyedStatedBackend,
                    asyncKeyedStateBackend,
                    timeServiceManager,
                    asyncTimeServiceManager,
                    rawOperatorStateInputs,
                    rawKeyedStateInputs);
        } catch (Exception ex) {

            // cleanup if something went wrong before results got published.
            if (keyedStatedBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
                    IOUtils.closeQuietly(keyedStatedBackend);
                }
                // release resource (e.g native resource)
                keyedStatedBackend.dispose();
            }

            if (asyncKeyedStateBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(asyncKeyedStateBackend)) {
                    IOUtils.closeQuietly(asyncKeyedStateBackend);
                }
                // release resource (e.g native resource)
                asyncKeyedStateBackend.dispose();
            }

            if (operatorStateBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
                    IOUtils.closeQuietly(operatorStateBackend);
                }
                operatorStateBackend.dispose();
            }

            if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
                IOUtils.closeQuietly(rawKeyedStateInputs);
            }

            if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
                IOUtils.closeQuietly(rawOperatorStateInputs);
            }

            throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
        }
    }