public void tryReadSegmentsToMemory()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java [219:248]


    public <T> void tryReadSegmentsToMemory(
            TypeSerializer<T> serializer, MemorySegmentPool segmentPool) throws IOException {
        boolean cacheSuccess;
        for (Segment segment : segments) {
            if (!segment.getCache().isEmpty()) {
                continue;
            }

            SegmentReader<T> reader = new FileSegmentReader<>(serializer, segment, 0);
            SegmentWriter<T> writer;
            try {
                writer =
                        new MemorySegmentWriter<>(
                                serializer, segment.getPath(), segmentPool, segment.getFsSize());
            } catch (MemoryAllocationException e) {
                break;
            }

            cacheSuccess = true;
            while (cacheSuccess && reader.hasNext()) {
                if (!writer.addRecord(reader.next())) {
                    writer.finish().ifPresent(x -> segmentPool.returnAll(x.getCache()));
                    cacheSuccess = false;
                }
            }
            if (cacheSuccess) {
                segment.setCache(writer.finish().get().getCache());
            }
        }
    }