in compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala [244:316]
override def persist(newLevel: StorageLevel): RDD.this.type = {
var actualUseDisk = false
var actualUseMemory = false
var actualDeserialized = false
if (!newLevel.isValid) {
throw new RuntimeException("Non-valid StorageLevel: " + newLevel.toString())
}
// Modify un-available options to an available option
if (newLevel.useDisk && newLevel.useMemory) {
actualUseDisk = true
actualUseMemory = false
// TODO #187: Implement disk and memory persistence (Spill)
LOG.warn("Cannot persist data in disk and memory at the same time. The data will be persisted in disk only.")
} else {
actualUseDisk = newLevel.useDisk
actualUseMemory = newLevel.useMemory
}
if (newLevel.useOffHeap) {
// TODO #188: Implement off-heap memory persistence
LOG.warn("Cannot persist data using off-heap area. The data will be persisted in heap instead of off-heap.")
}
if (newLevel.deserialized && actualUseDisk) {
LOG.warn(
"Cannot persist data as deserialized form in disk. The data will be persisted in serialized form instead.")
actualDeserialized = false
} else {
actualDeserialized = newLevel.deserialized
}
if (newLevel.replication > 1) {
// TODO #189: Implement replication for persisted data
LOG.warn("Cannot persist data with replication. The data will not be replicated.")
}
// TODO #190: Disable changing persistence strategy after a RDD is calculated
val builder = new DAGBuilder[IRVertex, IREdge](dag)
if (persistedMarkerVertex.isDefined) {
builder.removeVertex(persistedMarkerVertex.get)
}
val cacheID = UUID.randomUUID()
val ghostVertex = new OperatorVertex(new EmptyTransform[T, T]("CacheMarkerTransform-" + cacheID.toString))
ghostVertex.setProperty(IgnoreSchedulingTempDataReceiverProperty.of())
builder.addVertex(ghostVertex, loopVertexStack)
val newEdge = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, lastVertex, ghostVertex)
// Setup default properties
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
// Setup cache-related properties
if (actualUseDisk) {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE))
} else if (actualDeserialized) {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MEMORY_STORE))
} else {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SERIALIZED_MEMORY_STORE))
}
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.KEEP))
newEdge.setProperty(CacheIDProperty.of(cacheID))
val dupEdgeVal = new DuplicateEdgeGroupPropertyValue("CacheGroup-" + cacheID)
dupEdgeVal.setRepresentativeEdgeId(newEdge.getId)
newEdge.setProperty(DuplicateEdgeGroupProperty.of(dupEdgeVal))
builder.connectVertices(newEdge)
dag = builder.buildWithoutSourceSinkCheck()
persistedMarkerVertex = Option.apply(ghostVertex)
this
}