def write()

in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/graph/GraphWriter.scala [84:165]


  def write(graphInfo: GraphInfo, spark: SparkSession): Unit = {
    val vertexInfos = graphInfo.getVertexInfos()
    val edgeInfos = graphInfo.getEdgeInfos()
    val prefix = graphInfo.getPrefix()
    var indexMappings: scala.collection.mutable.Map[String, DataFrame] =
      scala.collection.mutable.Map[String, DataFrame]()
    vertexInfos.foreach {
      case (vertexType, vertexInfo) => {
        val primaryKey = primaryKeys(vertexType)
        vertices(vertexType).persist(
          GeneralParams.defaultStorageLevel
        ) // cache the vertex DataFrame
        val df_and_mapping = IndexGenerator
          .generateVertexIndexColumnAndIndexMapping(
            vertices(vertexType),
            primaryKey
          )
        df_and_mapping._1.persist(
          GeneralParams.defaultStorageLevel
        ) // cache the vertex DataFrame with index
        df_and_mapping._2.persist(
          GeneralParams.defaultStorageLevel
        ) // cache the index mapping DataFrame
        vertices(vertexType).unpersist() // unpersist the vertex DataFrame
        val df_with_index = df_and_mapping._1
        indexMappings += vertexType -> df_and_mapping._2
        val writer =
          new VertexWriter(prefix, vertexInfo, df_with_index)
        vertexNums += vertexType -> writer.getVertexNum()
        writer.writeVertexProperties()
        df_with_index.unpersist()
      }
    }

    edgeInfos.foreach {
      case (key, edgeInfo) => {
        val srcType = edgeInfo.getSrc_type
        val dstType = edgeInfo.getDst_type
        val edgeType = edgeInfo.getEdge_type
        val src_vertex_index_mapping = indexMappings(srcType)
        val dst_vertex_index_mapping = {
          if (srcType == dstType)
            src_vertex_index_mapping
          else
            indexMappings(dstType)
        }
        val edge_df_with_index =
          IndexGenerator.generateSrcAndDstIndexForEdgesFromMapping(
            edges((srcType, edgeType, dstType)),
            src_vertex_index_mapping,
            dst_vertex_index_mapping
          )
        edge_df_with_index.persist(
          GeneralParams.defaultStorageLevel
        ) // cache the edge DataFrame with index

        val adj_lists = edgeInfo.getAdj_lists
        val adj_list_it = adj_lists.iterator
        while (adj_list_it.hasNext()) {
          val adj_list_type = adj_list_it.next().getAdjList_type_in_gar
          val vertex_num = {
            if (
              adj_list_type == AdjListType.ordered_by_source || adj_list_type == AdjListType.unordered_by_source
            ) {
              vertexNums(srcType)
            } else {
              vertexNums(dstType)
            }
          }
          val writer = new EdgeWriter(
            prefix,
            edgeInfo,
            adj_list_type,
            vertex_num,
            edge_df_with_index
          )
          writer.writeEdges()
        }
        edge_df_with_index.unpersist()
      }
    }
  }