public ListStateWithCache()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java [66:129]


    public ListStateWithCache(
            TypeSerializer<T> serializer,
            StreamTask<?, ?> containingTask,
            StreamingRuntimeContext runtimeContext,
            StateInitializationContext stateInitializationContext,
            OperatorID operatorID)
            throws IOException {
        this.serializer = serializer;

        MemorySegmentPool segmentPool = null;
        double fraction =
                containingTask
                        .getConfiguration()
                        .getManagedMemoryFractionOperatorUseCaseOfSlot(
                                ManagedMemoryUseCase.OPERATOR,
                                runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                                runtimeContext.getUserCodeClassLoader());
        if (fraction > 0) {
            MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
            segmentPool =
                    new LazyMemorySegmentPool(
                            containingTask,
                            memoryManager,
                            memoryManager.computeNumberOfPages(fraction));
        }

        basePath =
                OperatorUtils.getDataCachePath(
                        containingTask.getEnvironment().getTaskManagerInfo().getConfiguration(),
                        containingTask
                                .getEnvironment()
                                .getIOManager()
                                .getSpillingDirectoriesPaths());

        List<StatePartitionStreamProvider> inputs =
                IteratorUtils.toList(
                        stateInitializationContext.getRawOperatorStateInputs().iterator());
        Preconditions.checkState(
                inputs.size() < 2, "The input from raw operator state should be one or zero.");

        List<Segment> priorFinishedSegments = new ArrayList<>();
        if (inputs.size() > 0) {
            DataCacheSnapshot dataCacheSnapshot =
                    DataCacheSnapshot.recover(
                            inputs.get(0).getStream(),
                            basePath.getFileSystem(),
                            OperatorUtils.createDataCacheFileGenerator(
                                    basePath, "cache", operatorID));

            if (segmentPool != null) {
                dataCacheSnapshot.tryReadSegmentsToMemory(serializer, segmentPool);
            }

            priorFinishedSegments = dataCacheSnapshot.getSegments();
        }

        this.dataCacheWriter =
                new DataCacheWriter<>(
                        serializer,
                        basePath.getFileSystem(),
                        OperatorUtils.createDataCacheFileGenerator(basePath, "cache", operatorID),
                        segmentPool,
                        priorFinishedSegments);
    }