in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java [219:248]
public <T> void tryReadSegmentsToMemory(
TypeSerializer<T> serializer, MemorySegmentPool segmentPool) throws IOException {
boolean cacheSuccess;
for (Segment segment : segments) {
if (!segment.getCache().isEmpty()) {
continue;
}
SegmentReader<T> reader = new FileSegmentReader<>(serializer, segment, 0);
SegmentWriter<T> writer;
try {
writer =
new MemorySegmentWriter<>(
serializer, segment.getPath(), segmentPool, segment.getFsSize());
} catch (MemoryAllocationException e) {
break;
}
cacheSuccess = true;
while (cacheSuccess && reader.hasNext()) {
if (!writer.addRecord(reader.next())) {
writer.finish().ifPresent(x -> segmentPool.returnAll(x.getCache()));
cacheSuccess = false;
}
}
if (cacheSuccess) {
segment.setCache(writer.finish().get().getCache());
}
}
}