in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/graph/GraphWriter.scala [84:165]
def write(graphInfo: GraphInfo, spark: SparkSession): Unit = {
val vertexInfos = graphInfo.getVertexInfos()
val edgeInfos = graphInfo.getEdgeInfos()
val prefix = graphInfo.getPrefix()
var indexMappings: scala.collection.mutable.Map[String, DataFrame] =
scala.collection.mutable.Map[String, DataFrame]()
vertexInfos.foreach {
case (vertexType, vertexInfo) => {
val primaryKey = primaryKeys(vertexType)
vertices(vertexType).persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame
val df_and_mapping = IndexGenerator
.generateVertexIndexColumnAndIndexMapping(
vertices(vertexType),
primaryKey
)
df_and_mapping._1.persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame with index
df_and_mapping._2.persist(
GeneralParams.defaultStorageLevel
) // cache the index mapping DataFrame
vertices(vertexType).unpersist() // unpersist the vertex DataFrame
val df_with_index = df_and_mapping._1
indexMappings += vertexType -> df_and_mapping._2
val writer =
new VertexWriter(prefix, vertexInfo, df_with_index)
vertexNums += vertexType -> writer.getVertexNum()
writer.writeVertexProperties()
df_with_index.unpersist()
}
}
edgeInfos.foreach {
case (key, edgeInfo) => {
val srcType = edgeInfo.getSrc_type
val dstType = edgeInfo.getDst_type
val edgeType = edgeInfo.getEdge_type
val src_vertex_index_mapping = indexMappings(srcType)
val dst_vertex_index_mapping = {
if (srcType == dstType)
src_vertex_index_mapping
else
indexMappings(dstType)
}
val edge_df_with_index =
IndexGenerator.generateSrcAndDstIndexForEdgesFromMapping(
edges((srcType, edgeType, dstType)),
src_vertex_index_mapping,
dst_vertex_index_mapping
)
edge_df_with_index.persist(
GeneralParams.defaultStorageLevel
) // cache the edge DataFrame with index
val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
while (adj_list_it.hasNext()) {
val adj_list_type = adj_list_it.next().getAdjList_type_in_gar
val vertex_num = {
if (
adj_list_type == AdjListType.ordered_by_source || adj_list_type == AdjListType.unordered_by_source
) {
vertexNums(srcType)
} else {
vertexNums(dstType)
}
}
val writer = new EdgeWriter(
prefix,
edgeInfo,
adj_list_type,
vertex_num,
edge_df_with_index
)
writer.writeEdges()
}
edge_df_with_index.unpersist()
}
}
}