override def persist()

in compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala [244:316]


  override def persist(newLevel: StorageLevel): RDD.this.type = {
    var actualUseDisk = false
    var actualUseMemory = false
    var actualDeserialized = false

    if (!newLevel.isValid) {
      throw new RuntimeException("Non-valid StorageLevel: " + newLevel.toString())
    }

    // Modify un-available options to an available option
    if (newLevel.useDisk && newLevel.useMemory) {
      actualUseDisk = true
      actualUseMemory = false
      // TODO #187: Implement disk and memory persistence (Spill)
      LOG.warn("Cannot persist data in disk and memory at the same time. The data will be persisted in disk only.")
    } else {
      actualUseDisk = newLevel.useDisk
      actualUseMemory = newLevel.useMemory
    }

    if (newLevel.useOffHeap) {
      // TODO #188: Implement off-heap memory persistence
      LOG.warn("Cannot persist data using off-heap area. The data will be persisted in heap instead of off-heap.")
    }

    if (newLevel.deserialized && actualUseDisk) {
      LOG.warn(
        "Cannot persist data as deserialized form in disk. The data will be persisted in serialized form instead.")
      actualDeserialized = false
    } else {
      actualDeserialized = newLevel.deserialized
    }

    if (newLevel.replication > 1) {
      // TODO #189: Implement replication for persisted data
      LOG.warn("Cannot persist data with replication. The data will not be replicated.")
    }

    // TODO #190: Disable changing persistence strategy after a RDD is calculated
    val builder = new DAGBuilder[IRVertex, IREdge](dag)
    if (persistedMarkerVertex.isDefined) {
      builder.removeVertex(persistedMarkerVertex.get)
    }

    val cacheID = UUID.randomUUID()
    val ghostVertex = new OperatorVertex(new EmptyTransform[T, T]("CacheMarkerTransform-" + cacheID.toString))
    ghostVertex.setProperty(IgnoreSchedulingTempDataReceiverProperty.of())
    builder.addVertex(ghostVertex, loopVertexStack)

    val newEdge = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, lastVertex, ghostVertex)
    // Setup default properties
    newEdge.setProperty(encoderProperty)
    newEdge.setProperty(decoderProperty)
    newEdge.setProperty(keyExtractorProperty)
    // Setup cache-related properties
    if (actualUseDisk) {
      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE))
    } else if (actualDeserialized) {
      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MEMORY_STORE))
    } else {
      newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SERIALIZED_MEMORY_STORE))
    }
    newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.KEEP))
    newEdge.setProperty(CacheIDProperty.of(cacheID))
    val dupEdgeVal = new DuplicateEdgeGroupPropertyValue("CacheGroup-" + cacheID)
    dupEdgeVal.setRepresentativeEdgeId(newEdge.getId)
    newEdge.setProperty(DuplicateEdgeGroupProperty.of(dupEdgeVal))
    builder.connectVertices(newEdge)

    dag = builder.buildWithoutSourceSinkCheck()
    persistedMarkerVertex = Option.apply(ghostVertex)
    this
  }