in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java [66:129]
public ListStateWithCache(
TypeSerializer<T> serializer,
StreamTask<?, ?> containingTask,
StreamingRuntimeContext runtimeContext,
StateInitializationContext stateInitializationContext,
OperatorID operatorID)
throws IOException {
this.serializer = serializer;
MemorySegmentPool segmentPool = null;
double fraction =
containingTask
.getConfiguration()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader());
if (fraction > 0) {
MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
segmentPool =
new LazyMemorySegmentPool(
containingTask,
memoryManager,
memoryManager.computeNumberOfPages(fraction));
}
basePath =
OperatorUtils.getDataCachePath(
containingTask.getEnvironment().getTaskManagerInfo().getConfiguration(),
containingTask
.getEnvironment()
.getIOManager()
.getSpillingDirectoriesPaths());
List<StatePartitionStreamProvider> inputs =
IteratorUtils.toList(
stateInitializationContext.getRawOperatorStateInputs().iterator());
Preconditions.checkState(
inputs.size() < 2, "The input from raw operator state should be one or zero.");
List<Segment> priorFinishedSegments = new ArrayList<>();
if (inputs.size() > 0) {
DataCacheSnapshot dataCacheSnapshot =
DataCacheSnapshot.recover(
inputs.get(0).getStream(),
basePath.getFileSystem(),
OperatorUtils.createDataCacheFileGenerator(
basePath, "cache", operatorID));
if (segmentPool != null) {
dataCacheSnapshot.tryReadSegmentsToMemory(serializer, segmentPool);
}
priorFinishedSegments = dataCacheSnapshot.getSegments();
}
this.dataCacheWriter =
new DataCacheWriter<>(
serializer,
basePath.getFileSystem(),
OperatorUtils.createDataCacheFileGenerator(basePath, "cache", operatorID),
segmentPool,
priorFinishedSegments);
}